aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/ast.c4
-rw-r--r--fs/dlm/config.c86
-rw-r--r--fs/dlm/config.h3
-rw-r--r--fs/dlm/debug_fs.c103
-rw-r--r--fs/dlm/dir.c287
-rw-r--r--fs/dlm/dir.h7
-rw-r--r--fs/dlm/dlm_internal.h108
-rw-r--r--fs/dlm/lock.c1292
-rw-r--r--fs/dlm/lock.h5
-rw-r--r--fs/dlm/lockspace.c60
-rw-r--r--fs/dlm/lowcomms.c215
-rw-r--r--fs/dlm/lowcomms.h2
-rw-r--r--fs/dlm/main.c2
-rw-r--r--fs/dlm/member.c17
-rw-r--r--fs/dlm/netlink.c8
-rw-r--r--fs/dlm/rcom.c149
-rw-r--r--fs/dlm/rcom.h1
-rw-r--r--fs/dlm/recover.c295
-rw-r--r--fs/dlm/recover.h2
-rw-r--r--fs/dlm/recoverd.c41
-rw-r--r--fs/dlm/recoverd.h1
-rw-r--r--fs/dlm/user.c7
22 files changed, 1893 insertions, 802 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 63dc19c54d5a..27a6ba9aaeec 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -15,8 +15,8 @@
15#include "lock.h" 15#include "lock.h"
16#include "user.h" 16#include "user.h"
17 17
18static uint64_t dlm_cb_seq; 18static uint64_t dlm_cb_seq;
19static spinlock_t dlm_cb_seq_spin; 19static DEFINE_SPINLOCK(dlm_cb_seq_spin);
20 20
21static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb) 21static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
22{ 22{
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index e7e327d43fa5..a0387dd8b1f0 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -96,7 +96,6 @@ struct dlm_cluster {
96 unsigned int cl_tcp_port; 96 unsigned int cl_tcp_port;
97 unsigned int cl_buffer_size; 97 unsigned int cl_buffer_size;
98 unsigned int cl_rsbtbl_size; 98 unsigned int cl_rsbtbl_size;
99 unsigned int cl_dirtbl_size;
100 unsigned int cl_recover_timer; 99 unsigned int cl_recover_timer;
101 unsigned int cl_toss_secs; 100 unsigned int cl_toss_secs;
102 unsigned int cl_scan_secs; 101 unsigned int cl_scan_secs;
@@ -113,7 +112,6 @@ enum {
113 CLUSTER_ATTR_TCP_PORT = 0, 112 CLUSTER_ATTR_TCP_PORT = 0,
114 CLUSTER_ATTR_BUFFER_SIZE, 113 CLUSTER_ATTR_BUFFER_SIZE,
115 CLUSTER_ATTR_RSBTBL_SIZE, 114 CLUSTER_ATTR_RSBTBL_SIZE,
116 CLUSTER_ATTR_DIRTBL_SIZE,
117 CLUSTER_ATTR_RECOVER_TIMER, 115 CLUSTER_ATTR_RECOVER_TIMER,
118 CLUSTER_ATTR_TOSS_SECS, 116 CLUSTER_ATTR_TOSS_SECS,
119 CLUSTER_ATTR_SCAN_SECS, 117 CLUSTER_ATTR_SCAN_SECS,
@@ -189,7 +187,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write)
189CLUSTER_ATTR(tcp_port, 1); 187CLUSTER_ATTR(tcp_port, 1);
190CLUSTER_ATTR(buffer_size, 1); 188CLUSTER_ATTR(buffer_size, 1);
191CLUSTER_ATTR(rsbtbl_size, 1); 189CLUSTER_ATTR(rsbtbl_size, 1);
192CLUSTER_ATTR(dirtbl_size, 1);
193CLUSTER_ATTR(recover_timer, 1); 190CLUSTER_ATTR(recover_timer, 1);
194CLUSTER_ATTR(toss_secs, 1); 191CLUSTER_ATTR(toss_secs, 1);
195CLUSTER_ATTR(scan_secs, 1); 192CLUSTER_ATTR(scan_secs, 1);
@@ -204,7 +201,6 @@ static struct configfs_attribute *cluster_attrs[] = {
204 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, 201 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
205 [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr, 202 [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr,
206 [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr, 203 [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr,
207 [CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr,
208 [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr, 204 [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr,
209 [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, 205 [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
210 [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, 206 [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr,
@@ -478,7 +474,6 @@ static struct config_group *make_cluster(struct config_group *g,
478 cl->cl_tcp_port = dlm_config.ci_tcp_port; 474 cl->cl_tcp_port = dlm_config.ci_tcp_port;
479 cl->cl_buffer_size = dlm_config.ci_buffer_size; 475 cl->cl_buffer_size = dlm_config.ci_buffer_size;
480 cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size; 476 cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size;
481 cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size;
482 cl->cl_recover_timer = dlm_config.ci_recover_timer; 477 cl->cl_recover_timer = dlm_config.ci_recover_timer;
483 cl->cl_toss_secs = dlm_config.ci_toss_secs; 478 cl->cl_toss_secs = dlm_config.ci_toss_secs;
484 cl->cl_scan_secs = dlm_config.ci_scan_secs; 479 cl->cl_scan_secs = dlm_config.ci_scan_secs;
@@ -755,6 +750,7 @@ static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf,
755static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len) 750static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len)
756{ 751{
757 struct sockaddr_storage *addr; 752 struct sockaddr_storage *addr;
753 int rv;
758 754
759 if (len != sizeof(struct sockaddr_storage)) 755 if (len != sizeof(struct sockaddr_storage))
760 return -EINVAL; 756 return -EINVAL;
@@ -767,6 +763,13 @@ static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len)
767 return -ENOMEM; 763 return -ENOMEM;
768 764
769 memcpy(addr, buf, len); 765 memcpy(addr, buf, len);
766
767 rv = dlm_lowcomms_addr(cm->nodeid, addr, len);
768 if (rv) {
769 kfree(addr);
770 return rv;
771 }
772
770 cm->addr[cm->addr_count++] = addr; 773 cm->addr[cm->addr_count++] = addr;
771 return len; 774 return len;
772} 775}
@@ -883,34 +886,7 @@ static void put_space(struct dlm_space *sp)
883 config_item_put(&sp->group.cg_item); 886 config_item_put(&sp->group.cg_item);
884} 887}
885 888
886static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) 889static struct dlm_comm *get_comm(int nodeid)
887{
888 switch (x->ss_family) {
889 case AF_INET: {
890 struct sockaddr_in *sinx = (struct sockaddr_in *)x;
891 struct sockaddr_in *siny = (struct sockaddr_in *)y;
892 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
893 return 0;
894 if (sinx->sin_port != siny->sin_port)
895 return 0;
896 break;
897 }
898 case AF_INET6: {
899 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
900 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
901 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
902 return 0;
903 if (sinx->sin6_port != siny->sin6_port)
904 return 0;
905 break;
906 }
907 default:
908 return 0;
909 }
910 return 1;
911}
912
913static struct dlm_comm *get_comm(int nodeid, struct sockaddr_storage *addr)
914{ 890{
915 struct config_item *i; 891 struct config_item *i;
916 struct dlm_comm *cm = NULL; 892 struct dlm_comm *cm = NULL;
@@ -924,19 +900,11 @@ static struct dlm_comm *get_comm(int nodeid, struct sockaddr_storage *addr)
924 list_for_each_entry(i, &comm_list->cg_children, ci_entry) { 900 list_for_each_entry(i, &comm_list->cg_children, ci_entry) {
925 cm = config_item_to_comm(i); 901 cm = config_item_to_comm(i);
926 902
927 if (nodeid) { 903 if (cm->nodeid != nodeid)
928 if (cm->nodeid != nodeid) 904 continue;
929 continue; 905 found = 1;
930 found = 1; 906 config_item_get(i);
931 config_item_get(i); 907 break;
932 break;
933 } else {
934 if (!cm->addr_count || !addr_compare(cm->addr[0], addr))
935 continue;
936 found = 1;
937 config_item_get(i);
938 break;
939 }
940 } 908 }
941 mutex_unlock(&clusters_root.subsys.su_mutex); 909 mutex_unlock(&clusters_root.subsys.su_mutex);
942 910
@@ -1000,7 +968,7 @@ int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
1000 968
1001int dlm_comm_seq(int nodeid, uint32_t *seq) 969int dlm_comm_seq(int nodeid, uint32_t *seq)
1002{ 970{
1003 struct dlm_comm *cm = get_comm(nodeid, NULL); 971 struct dlm_comm *cm = get_comm(nodeid);
1004 if (!cm) 972 if (!cm)
1005 return -EEXIST; 973 return -EEXIST;
1006 *seq = cm->seq; 974 *seq = cm->seq;
@@ -1008,28 +976,6 @@ int dlm_comm_seq(int nodeid, uint32_t *seq)
1008 return 0; 976 return 0;
1009} 977}
1010 978
1011int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr)
1012{
1013 struct dlm_comm *cm = get_comm(nodeid, NULL);
1014 if (!cm)
1015 return -EEXIST;
1016 if (!cm->addr_count)
1017 return -ENOENT;
1018 memcpy(addr, cm->addr[0], sizeof(*addr));
1019 put_comm(cm);
1020 return 0;
1021}
1022
1023int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
1024{
1025 struct dlm_comm *cm = get_comm(0, addr);
1026 if (!cm)
1027 return -EEXIST;
1028 *nodeid = cm->nodeid;
1029 put_comm(cm);
1030 return 0;
1031}
1032
1033int dlm_our_nodeid(void) 979int dlm_our_nodeid(void)
1034{ 980{
1035 return local_comm ? local_comm->nodeid : 0; 981 return local_comm ? local_comm->nodeid : 0;
@@ -1050,7 +996,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
1050#define DEFAULT_TCP_PORT 21064 996#define DEFAULT_TCP_PORT 21064
1051#define DEFAULT_BUFFER_SIZE 4096 997#define DEFAULT_BUFFER_SIZE 4096
1052#define DEFAULT_RSBTBL_SIZE 1024 998#define DEFAULT_RSBTBL_SIZE 1024
1053#define DEFAULT_DIRTBL_SIZE 1024
1054#define DEFAULT_RECOVER_TIMER 5 999#define DEFAULT_RECOVER_TIMER 5
1055#define DEFAULT_TOSS_SECS 10 1000#define DEFAULT_TOSS_SECS 10
1056#define DEFAULT_SCAN_SECS 5 1001#define DEFAULT_SCAN_SECS 5
@@ -1066,7 +1011,6 @@ struct dlm_config_info dlm_config = {
1066 .ci_tcp_port = DEFAULT_TCP_PORT, 1011 .ci_tcp_port = DEFAULT_TCP_PORT,
1067 .ci_buffer_size = DEFAULT_BUFFER_SIZE, 1012 .ci_buffer_size = DEFAULT_BUFFER_SIZE,
1068 .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE, 1013 .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
1069 .ci_dirtbl_size = DEFAULT_DIRTBL_SIZE,
1070 .ci_recover_timer = DEFAULT_RECOVER_TIMER, 1014 .ci_recover_timer = DEFAULT_RECOVER_TIMER,
1071 .ci_toss_secs = DEFAULT_TOSS_SECS, 1015 .ci_toss_secs = DEFAULT_TOSS_SECS,
1072 .ci_scan_secs = DEFAULT_SCAN_SECS, 1016 .ci_scan_secs = DEFAULT_SCAN_SECS,
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 9f5e3663bb0c..f30697bc2780 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -27,7 +27,6 @@ struct dlm_config_info {
27 int ci_tcp_port; 27 int ci_tcp_port;
28 int ci_buffer_size; 28 int ci_buffer_size;
29 int ci_rsbtbl_size; 29 int ci_rsbtbl_size;
30 int ci_dirtbl_size;
31 int ci_recover_timer; 30 int ci_recover_timer;
32 int ci_toss_secs; 31 int ci_toss_secs;
33 int ci_scan_secs; 32 int ci_scan_secs;
@@ -47,8 +46,6 @@ void dlm_config_exit(void);
47int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, 46int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
48 int *count_out); 47 int *count_out);
49int dlm_comm_seq(int nodeid, uint32_t *seq); 48int dlm_comm_seq(int nodeid, uint32_t *seq);
50int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr);
51int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid);
52int dlm_our_nodeid(void); 49int dlm_our_nodeid(void);
53int dlm_our_addr(struct sockaddr_storage *addr, int num); 50int dlm_our_addr(struct sockaddr_storage *addr, int num);
54 51
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 1c9b08095f98..b969deef9ebb 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -344,6 +344,45 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s)
344 return rv; 344 return rv;
345} 345}
346 346
347static int print_format4(struct dlm_rsb *r, struct seq_file *s)
348{
349 int our_nodeid = dlm_our_nodeid();
350 int print_name = 1;
351 int i, rv;
352
353 lock_rsb(r);
354
355 rv = seq_printf(s, "rsb %p %d %d %d %d %lu %lx %d ",
356 r,
357 r->res_nodeid,
358 r->res_master_nodeid,
359 r->res_dir_nodeid,
360 our_nodeid,
361 r->res_toss_time,
362 r->res_flags,
363 r->res_length);
364 if (rv)
365 goto out;
366
367 for (i = 0; i < r->res_length; i++) {
368 if (!isascii(r->res_name[i]) || !isprint(r->res_name[i]))
369 print_name = 0;
370 }
371
372 seq_printf(s, "%s", print_name ? "str " : "hex");
373
374 for (i = 0; i < r->res_length; i++) {
375 if (print_name)
376 seq_printf(s, "%c", r->res_name[i]);
377 else
378 seq_printf(s, " %02x", (unsigned char)r->res_name[i]);
379 }
380 rv = seq_printf(s, "\n");
381 out:
382 unlock_rsb(r);
383 return rv;
384}
385
347struct rsbtbl_iter { 386struct rsbtbl_iter {
348 struct dlm_rsb *rsb; 387 struct dlm_rsb *rsb;
349 unsigned bucket; 388 unsigned bucket;
@@ -382,6 +421,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
382 } 421 }
383 rv = print_format3(ri->rsb, seq); 422 rv = print_format3(ri->rsb, seq);
384 break; 423 break;
424 case 4:
425 if (ri->header) {
426 seq_printf(seq, "version 4 rsb 2\n");
427 ri->header = 0;
428 }
429 rv = print_format4(ri->rsb, seq);
430 break;
385 } 431 }
386 432
387 return rv; 433 return rv;
@@ -390,15 +436,18 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
390static const struct seq_operations format1_seq_ops; 436static const struct seq_operations format1_seq_ops;
391static const struct seq_operations format2_seq_ops; 437static const struct seq_operations format2_seq_ops;
392static const struct seq_operations format3_seq_ops; 438static const struct seq_operations format3_seq_ops;
439static const struct seq_operations format4_seq_ops;
393 440
394static void *table_seq_start(struct seq_file *seq, loff_t *pos) 441static void *table_seq_start(struct seq_file *seq, loff_t *pos)
395{ 442{
443 struct rb_root *tree;
396 struct rb_node *node; 444 struct rb_node *node;
397 struct dlm_ls *ls = seq->private; 445 struct dlm_ls *ls = seq->private;
398 struct rsbtbl_iter *ri; 446 struct rsbtbl_iter *ri;
399 struct dlm_rsb *r; 447 struct dlm_rsb *r;
400 loff_t n = *pos; 448 loff_t n = *pos;
401 unsigned bucket, entry; 449 unsigned bucket, entry;
450 int toss = (seq->op == &format4_seq_ops);
402 451
403 bucket = n >> 32; 452 bucket = n >> 32;
404 entry = n & ((1LL << 32) - 1); 453 entry = n & ((1LL << 32) - 1);
@@ -417,11 +466,14 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
417 ri->format = 2; 466 ri->format = 2;
418 if (seq->op == &format3_seq_ops) 467 if (seq->op == &format3_seq_ops)
419 ri->format = 3; 468 ri->format = 3;
469 if (seq->op == &format4_seq_ops)
470 ri->format = 4;
471
472 tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
420 473
421 spin_lock(&ls->ls_rsbtbl[bucket].lock); 474 spin_lock(&ls->ls_rsbtbl[bucket].lock);
422 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { 475 if (!RB_EMPTY_ROOT(tree)) {
423 for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node; 476 for (node = rb_first(tree); node; node = rb_next(node)) {
424 node = rb_next(node)) {
425 r = rb_entry(node, struct dlm_rsb, res_hashnode); 477 r = rb_entry(node, struct dlm_rsb, res_hashnode);
426 if (!entry--) { 478 if (!entry--) {
427 dlm_hold_rsb(r); 479 dlm_hold_rsb(r);
@@ -449,10 +501,11 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
449 kfree(ri); 501 kfree(ri);
450 return NULL; 502 return NULL;
451 } 503 }
504 tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
452 505
453 spin_lock(&ls->ls_rsbtbl[bucket].lock); 506 spin_lock(&ls->ls_rsbtbl[bucket].lock);
454 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { 507 if (!RB_EMPTY_ROOT(tree)) {
455 node = rb_first(&ls->ls_rsbtbl[bucket].keep); 508 node = rb_first(tree);
456 r = rb_entry(node, struct dlm_rsb, res_hashnode); 509 r = rb_entry(node, struct dlm_rsb, res_hashnode);
457 dlm_hold_rsb(r); 510 dlm_hold_rsb(r);
458 ri->rsb = r; 511 ri->rsb = r;
@@ -469,10 +522,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
469{ 522{
470 struct dlm_ls *ls = seq->private; 523 struct dlm_ls *ls = seq->private;
471 struct rsbtbl_iter *ri = iter_ptr; 524 struct rsbtbl_iter *ri = iter_ptr;
525 struct rb_root *tree;
472 struct rb_node *next; 526 struct rb_node *next;
473 struct dlm_rsb *r, *rp; 527 struct dlm_rsb *r, *rp;
474 loff_t n = *pos; 528 loff_t n = *pos;
475 unsigned bucket; 529 unsigned bucket;
530 int toss = (seq->op == &format4_seq_ops);
476 531
477 bucket = n >> 32; 532 bucket = n >> 32;
478 533
@@ -511,10 +566,11 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
511 kfree(ri); 566 kfree(ri);
512 return NULL; 567 return NULL;
513 } 568 }
569 tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
514 570
515 spin_lock(&ls->ls_rsbtbl[bucket].lock); 571 spin_lock(&ls->ls_rsbtbl[bucket].lock);
516 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { 572 if (!RB_EMPTY_ROOT(tree)) {
517 next = rb_first(&ls->ls_rsbtbl[bucket].keep); 573 next = rb_first(tree);
518 r = rb_entry(next, struct dlm_rsb, res_hashnode); 574 r = rb_entry(next, struct dlm_rsb, res_hashnode);
519 dlm_hold_rsb(r); 575 dlm_hold_rsb(r);
520 ri->rsb = r; 576 ri->rsb = r;
@@ -558,9 +614,17 @@ static const struct seq_operations format3_seq_ops = {
558 .show = table_seq_show, 614 .show = table_seq_show,
559}; 615};
560 616
617static const struct seq_operations format4_seq_ops = {
618 .start = table_seq_start,
619 .next = table_seq_next,
620 .stop = table_seq_stop,
621 .show = table_seq_show,
622};
623
561static const struct file_operations format1_fops; 624static const struct file_operations format1_fops;
562static const struct file_operations format2_fops; 625static const struct file_operations format2_fops;
563static const struct file_operations format3_fops; 626static const struct file_operations format3_fops;
627static const struct file_operations format4_fops;
564 628
565static int table_open(struct inode *inode, struct file *file) 629static int table_open(struct inode *inode, struct file *file)
566{ 630{
@@ -573,6 +637,8 @@ static int table_open(struct inode *inode, struct file *file)
573 ret = seq_open(file, &format2_seq_ops); 637 ret = seq_open(file, &format2_seq_ops);
574 else if (file->f_op == &format3_fops) 638 else if (file->f_op == &format3_fops)
575 ret = seq_open(file, &format3_seq_ops); 639 ret = seq_open(file, &format3_seq_ops);
640 else if (file->f_op == &format4_fops)
641 ret = seq_open(file, &format4_seq_ops);
576 642
577 if (ret) 643 if (ret)
578 return ret; 644 return ret;
@@ -606,6 +672,14 @@ static const struct file_operations format3_fops = {
606 .release = seq_release 672 .release = seq_release
607}; 673};
608 674
675static const struct file_operations format4_fops = {
676 .owner = THIS_MODULE,
677 .open = table_open,
678 .read = seq_read,
679 .llseek = seq_lseek,
680 .release = seq_release
681};
682
609/* 683/*
610 * dump lkb's on the ls_waiters list 684 * dump lkb's on the ls_waiters list
611 */ 685 */
@@ -652,6 +726,8 @@ void dlm_delete_debug_file(struct dlm_ls *ls)
652 debugfs_remove(ls->ls_debug_locks_dentry); 726 debugfs_remove(ls->ls_debug_locks_dentry);
653 if (ls->ls_debug_all_dentry) 727 if (ls->ls_debug_all_dentry)
654 debugfs_remove(ls->ls_debug_all_dentry); 728 debugfs_remove(ls->ls_debug_all_dentry);
729 if (ls->ls_debug_toss_dentry)
730 debugfs_remove(ls->ls_debug_toss_dentry);
655} 731}
656 732
657int dlm_create_debug_file(struct dlm_ls *ls) 733int dlm_create_debug_file(struct dlm_ls *ls)
@@ -694,6 +770,19 @@ int dlm_create_debug_file(struct dlm_ls *ls)
694 if (!ls->ls_debug_all_dentry) 770 if (!ls->ls_debug_all_dentry)
695 goto fail; 771 goto fail;
696 772
773 /* format 4 */
774
775 memset(name, 0, sizeof(name));
776 snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_toss", ls->ls_name);
777
778 ls->ls_debug_toss_dentry = debugfs_create_file(name,
779 S_IFREG | S_IRUGO,
780 dlm_root,
781 ls,
782 &format4_fops);
783 if (!ls->ls_debug_toss_dentry)
784 goto fail;
785
697 memset(name, 0, sizeof(name)); 786 memset(name, 0, sizeof(name));
698 snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_waiters", ls->ls_name); 787 snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_waiters", ls->ls_name);
699 788
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index dc5eb598b81f..278a75cda446 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -23,50 +23,6 @@
23#include "lock.h" 23#include "lock.h"
24#include "dir.h" 24#include "dir.h"
25 25
26
27static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28{
29 spin_lock(&ls->ls_recover_list_lock);
30 list_add(&de->list, &ls->ls_recover_list);
31 spin_unlock(&ls->ls_recover_list_lock);
32}
33
34static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35{
36 int found = 0;
37 struct dlm_direntry *de;
38
39 spin_lock(&ls->ls_recover_list_lock);
40 list_for_each_entry(de, &ls->ls_recover_list, list) {
41 if (de->length == len) {
42 list_del(&de->list);
43 de->master_nodeid = 0;
44 memset(de->name, 0, len);
45 found = 1;
46 break;
47 }
48 }
49 spin_unlock(&ls->ls_recover_list_lock);
50
51 if (!found)
52 de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
53 return de;
54}
55
56void dlm_clear_free_entries(struct dlm_ls *ls)
57{
58 struct dlm_direntry *de;
59
60 spin_lock(&ls->ls_recover_list_lock);
61 while (!list_empty(&ls->ls_recover_list)) {
62 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63 list);
64 list_del(&de->list);
65 kfree(de);
66 }
67 spin_unlock(&ls->ls_recover_list_lock);
68}
69
70/* 26/*
71 * We use the upper 16 bits of the hash value to select the directory node. 27 * We use the upper 16 bits of the hash value to select the directory node.
72 * Low bits are used for distribution of rsb's among hash buckets on each node. 28 * Low bits are used for distribution of rsb's among hash buckets on each node.
@@ -78,144 +34,53 @@ void dlm_clear_free_entries(struct dlm_ls *ls)
78 34
79int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) 35int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80{ 36{
81 struct list_head *tmp; 37 uint32_t node;
82 struct dlm_member *memb = NULL;
83 uint32_t node, n = 0;
84 int nodeid;
85
86 if (ls->ls_num_nodes == 1) {
87 nodeid = dlm_our_nodeid();
88 goto out;
89 }
90 38
91 if (ls->ls_node_array) { 39 if (ls->ls_num_nodes == 1)
40 return dlm_our_nodeid();
41 else {
92 node = (hash >> 16) % ls->ls_total_weight; 42 node = (hash >> 16) % ls->ls_total_weight;
93 nodeid = ls->ls_node_array[node]; 43 return ls->ls_node_array[node];
94 goto out;
95 }
96
97 /* make_member_array() failed to kmalloc ls_node_array... */
98
99 node = (hash >> 16) % ls->ls_num_nodes;
100
101 list_for_each(tmp, &ls->ls_nodes) {
102 if (n++ != node)
103 continue;
104 memb = list_entry(tmp, struct dlm_member, list);
105 break;
106 } 44 }
107
108 DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 ls->ls_num_nodes, n, node););
110 nodeid = memb->nodeid;
111 out:
112 return nodeid;
113} 45}
114 46
115int dlm_dir_nodeid(struct dlm_rsb *r) 47int dlm_dir_nodeid(struct dlm_rsb *r)
116{ 48{
117 return dlm_hash2nodeid(r->res_ls, r->res_hash); 49 return r->res_dir_nodeid;
118}
119
120static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121{
122 uint32_t val;
123
124 val = jhash(name, len, 0);
125 val &= (ls->ls_dirtbl_size - 1);
126
127 return val;
128}
129
130static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131{
132 uint32_t bucket;
133
134 bucket = dir_hash(ls, de->name, de->length);
135 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136} 50}
137 51
138static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name, 52void dlm_recover_dir_nodeid(struct dlm_ls *ls)
139 int namelen, uint32_t bucket)
140{ 53{
141 struct dlm_direntry *de; 54 struct dlm_rsb *r;
142
143 list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 if (de->length == namelen && !memcmp(name, de->name, namelen))
145 goto out;
146 }
147 de = NULL;
148 out:
149 return de;
150}
151
152void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153{
154 struct dlm_direntry *de;
155 uint32_t bucket;
156
157 bucket = dir_hash(ls, name, namelen);
158
159 spin_lock(&ls->ls_dirtbl[bucket].lock);
160
161 de = search_bucket(ls, name, namelen, bucket);
162
163 if (!de) {
164 log_error(ls, "remove fr %u none", nodeid);
165 goto out;
166 }
167
168 if (de->master_nodeid != nodeid) {
169 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170 goto out;
171 }
172
173 list_del(&de->list);
174 kfree(de);
175 out:
176 spin_unlock(&ls->ls_dirtbl[bucket].lock);
177}
178 55
179void dlm_dir_clear(struct dlm_ls *ls) 56 down_read(&ls->ls_root_sem);
180{ 57 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
181 struct list_head *head; 58 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
182 struct dlm_direntry *de;
183 int i;
184
185 DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186
187 for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 spin_lock(&ls->ls_dirtbl[i].lock);
189 head = &ls->ls_dirtbl[i].list;
190 while (!list_empty(head)) {
191 de = list_entry(head->next, struct dlm_direntry, list);
192 list_del(&de->list);
193 put_free_de(ls, de);
194 }
195 spin_unlock(&ls->ls_dirtbl[i].lock);
196 } 59 }
60 up_read(&ls->ls_root_sem);
197} 61}
198 62
199int dlm_recover_directory(struct dlm_ls *ls) 63int dlm_recover_directory(struct dlm_ls *ls)
200{ 64{
201 struct dlm_member *memb; 65 struct dlm_member *memb;
202 struct dlm_direntry *de;
203 char *b, *last_name = NULL; 66 char *b, *last_name = NULL;
204 int error = -ENOMEM, last_len, count = 0; 67 int error = -ENOMEM, last_len, nodeid, result;
205 uint16_t namelen; 68 uint16_t namelen;
69 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
206 70
207 log_debug(ls, "dlm_recover_directory"); 71 log_debug(ls, "dlm_recover_directory");
208 72
209 if (dlm_no_directory(ls)) 73 if (dlm_no_directory(ls))
210 goto out_status; 74 goto out_status;
211 75
212 dlm_dir_clear(ls);
213
214 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS); 76 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
215 if (!last_name) 77 if (!last_name)
216 goto out; 78 goto out;
217 79
218 list_for_each_entry(memb, &ls->ls_nodes, list) { 80 list_for_each_entry(memb, &ls->ls_nodes, list) {
81 if (memb->nodeid == dlm_our_nodeid())
82 continue;
83
219 memset(last_name, 0, DLM_RESNAME_MAXLEN); 84 memset(last_name, 0, DLM_RESNAME_MAXLEN);
220 last_len = 0; 85 last_len = 0;
221 86
@@ -230,7 +95,7 @@ int dlm_recover_directory(struct dlm_ls *ls)
230 if (error) 95 if (error)
231 goto out_free; 96 goto out_free;
232 97
233 schedule(); 98 cond_resched();
234 99
235 /* 100 /*
236 * pick namelen/name pairs out of received buffer 101 * pick namelen/name pairs out of received buffer
@@ -267,87 +132,71 @@ int dlm_recover_directory(struct dlm_ls *ls)
267 if (namelen > DLM_RESNAME_MAXLEN) 132 if (namelen > DLM_RESNAME_MAXLEN)
268 goto out_free; 133 goto out_free;
269 134
270 error = -ENOMEM; 135 error = dlm_master_lookup(ls, memb->nodeid,
271 de = get_free_de(ls, namelen); 136 b, namelen,
272 if (!de) 137 DLM_LU_RECOVER_DIR,
138 &nodeid, &result);
139 if (error) {
140 log_error(ls, "recover_dir lookup %d",
141 error);
273 goto out_free; 142 goto out_free;
143 }
144
145 /* The name was found in rsbtbl, but the
146 * master nodeid is different from
147 * memb->nodeid which says it is the master.
148 * This should not happen. */
149
150 if (result == DLM_LU_MATCH &&
151 nodeid != memb->nodeid) {
152 count_bad++;
153 log_error(ls, "recover_dir lookup %d "
154 "nodeid %d memb %d bad %u",
155 result, nodeid, memb->nodeid,
156 count_bad);
157 print_hex_dump_bytes("dlm_recover_dir ",
158 DUMP_PREFIX_NONE,
159 b, namelen);
160 }
161
162 /* The name was found in rsbtbl, and the
163 * master nodeid matches memb->nodeid. */
164
165 if (result == DLM_LU_MATCH &&
166 nodeid == memb->nodeid) {
167 count_match++;
168 }
169
170 /* The name was not found in rsbtbl and was
171 * added with memb->nodeid as the master. */
172
173 if (result == DLM_LU_ADD) {
174 count_add++;
175 }
274 176
275 de->master_nodeid = memb->nodeid;
276 de->length = namelen;
277 last_len = namelen; 177 last_len = namelen;
278 memcpy(de->name, b, namelen);
279 memcpy(last_name, b, namelen); 178 memcpy(last_name, b, namelen);
280 b += namelen; 179 b += namelen;
281 left -= namelen; 180 left -= namelen;
282
283 add_entry_to_hash(ls, de);
284 count++; 181 count++;
285 } 182 }
286 } 183 }
287 done: 184 done:
288 ; 185 ;
289 } 186 }
290 187
291 out_status: 188 out_status:
292 error = 0; 189 error = 0;
293 log_debug(ls, "dlm_recover_directory %d entries", count); 190 dlm_set_recover_status(ls, DLM_RS_DIR);
191
192 log_debug(ls, "dlm_recover_directory %u in %u new",
193 count, count_add);
294 out_free: 194 out_free:
295 kfree(last_name); 195 kfree(last_name);
296 out: 196 out:
297 dlm_clear_free_entries(ls);
298 return error; 197 return error;
299} 198}
300 199
301static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
302 int namelen, int *r_nodeid)
303{
304 struct dlm_direntry *de, *tmp;
305 uint32_t bucket;
306
307 bucket = dir_hash(ls, name, namelen);
308
309 spin_lock(&ls->ls_dirtbl[bucket].lock);
310 de = search_bucket(ls, name, namelen, bucket);
311 if (de) {
312 *r_nodeid = de->master_nodeid;
313 spin_unlock(&ls->ls_dirtbl[bucket].lock);
314 if (*r_nodeid == nodeid)
315 return -EEXIST;
316 return 0;
317 }
318
319 spin_unlock(&ls->ls_dirtbl[bucket].lock);
320
321 if (namelen > DLM_RESNAME_MAXLEN)
322 return -EINVAL;
323
324 de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
325 if (!de)
326 return -ENOMEM;
327
328 de->master_nodeid = nodeid;
329 de->length = namelen;
330 memcpy(de->name, name, namelen);
331
332 spin_lock(&ls->ls_dirtbl[bucket].lock);
333 tmp = search_bucket(ls, name, namelen, bucket);
334 if (tmp) {
335 kfree(de);
336 de = tmp;
337 } else {
338 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
339 }
340 *r_nodeid = de->master_nodeid;
341 spin_unlock(&ls->ls_dirtbl[bucket].lock);
342 return 0;
343}
344
345int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
346 int *r_nodeid)
347{
348 return get_entry(ls, nodeid, name, namelen, r_nodeid);
349}
350
351static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) 200static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
352{ 201{
353 struct dlm_rsb *r; 202 struct dlm_rsb *r;
@@ -358,10 +207,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
358 bucket = hash & (ls->ls_rsbtbl_size - 1); 207 bucket = hash & (ls->ls_rsbtbl_size - 1);
359 208
360 spin_lock(&ls->ls_rsbtbl[bucket].lock); 209 spin_lock(&ls->ls_rsbtbl[bucket].lock);
361 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r); 210 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
362 if (rv) 211 if (rv)
363 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, 212 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
364 name, len, 0, &r); 213 name, len, &r);
365 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 214 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
366 215
367 if (!rv) 216 if (!rv)
@@ -371,7 +220,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
371 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 220 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
372 if (len == r->res_length && !memcmp(name, r->res_name, len)) { 221 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
373 up_read(&ls->ls_root_sem); 222 up_read(&ls->ls_root_sem);
374 log_error(ls, "find_rsb_root revert to root_list %s", 223 log_debug(ls, "find_rsb_root revert to root_list %s",
375 r->res_name); 224 r->res_name);
376 return r; 225 return r;
377 } 226 }
@@ -429,6 +278,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
429 be_namelen = cpu_to_be16(0); 278 be_namelen = cpu_to_be16(0);
430 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 279 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
431 offset += sizeof(__be16); 280 offset += sizeof(__be16);
281 ls->ls_recover_dir_sent_msg++;
432 goto out; 282 goto out;
433 } 283 }
434 284
@@ -437,6 +287,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
437 offset += sizeof(__be16); 287 offset += sizeof(__be16);
438 memcpy(outbuf + offset, r->res_name, r->res_length); 288 memcpy(outbuf + offset, r->res_name, r->res_length);
439 offset += r->res_length; 289 offset += r->res_length;
290 ls->ls_recover_dir_sent_res++;
440 } 291 }
441 292
442 /* 293 /*
@@ -449,8 +300,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
449 be_namelen = cpu_to_be16(0xFFFF); 300 be_namelen = cpu_to_be16(0xFFFF);
450 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 301 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
451 offset += sizeof(__be16); 302 offset += sizeof(__be16);
303 ls->ls_recover_dir_sent_msg++;
452 } 304 }
453
454 out: 305 out:
455 up_read(&ls->ls_root_sem); 306 up_read(&ls->ls_root_sem);
456} 307}
diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h
index 0b0eb1267b6e..417506344456 100644
--- a/fs/dlm/dir.h
+++ b/fs/dlm/dir.h
@@ -14,15 +14,10 @@
14#ifndef __DIR_DOT_H__ 14#ifndef __DIR_DOT_H__
15#define __DIR_DOT_H__ 15#define __DIR_DOT_H__
16 16
17
18int dlm_dir_nodeid(struct dlm_rsb *rsb); 17int dlm_dir_nodeid(struct dlm_rsb *rsb);
19int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash); 18int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
20void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int len); 19void dlm_recover_dir_nodeid(struct dlm_ls *ls);
21void dlm_dir_clear(struct dlm_ls *ls);
22void dlm_clear_free_entries(struct dlm_ls *ls);
23int dlm_recover_directory(struct dlm_ls *ls); 20int dlm_recover_directory(struct dlm_ls *ls);
24int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
25 int *r_nodeid);
26void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, 21void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
27 char *outbuf, int outlen, int nodeid); 22 char *outbuf, int outlen, int nodeid);
28 23
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index bc342f7ac3af..871c1abf6029 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -55,8 +55,6 @@ struct dlm_lkb;
55struct dlm_rsb; 55struct dlm_rsb;
56struct dlm_member; 56struct dlm_member;
57struct dlm_rsbtable; 57struct dlm_rsbtable;
58struct dlm_dirtable;
59struct dlm_direntry;
60struct dlm_recover; 58struct dlm_recover;
61struct dlm_header; 59struct dlm_header;
62struct dlm_message; 60struct dlm_message;
@@ -98,18 +96,6 @@ do { \
98} 96}
99 97
100 98
101struct dlm_direntry {
102 struct list_head list;
103 uint32_t master_nodeid;
104 uint16_t length;
105 char name[1];
106};
107
108struct dlm_dirtable {
109 struct list_head list;
110 spinlock_t lock;
111};
112
113struct dlm_rsbtable { 99struct dlm_rsbtable {
114 struct rb_root keep; 100 struct rb_root keep;
115 struct rb_root toss; 101 struct rb_root toss;
@@ -283,6 +269,15 @@ struct dlm_lkb {
283 }; 269 };
284}; 270};
285 271
272/*
273 * res_master_nodeid is "normal": 0 is unset/invalid, non-zero is the real
274 * nodeid, even when nodeid is our_nodeid.
275 *
276 * res_nodeid is "odd": -1 is unset/invalid, zero means our_nodeid,
277 * greater than zero when another nodeid.
278 *
279 * (TODO: remove res_nodeid and only use res_master_nodeid)
280 */
286 281
287struct dlm_rsb { 282struct dlm_rsb {
288 struct dlm_ls *res_ls; /* the lockspace */ 283 struct dlm_ls *res_ls; /* the lockspace */
@@ -291,6 +286,9 @@ struct dlm_rsb {
291 unsigned long res_flags; 286 unsigned long res_flags;
292 int res_length; /* length of rsb name */ 287 int res_length; /* length of rsb name */
293 int res_nodeid; 288 int res_nodeid;
289 int res_master_nodeid;
290 int res_dir_nodeid;
291 int res_id; /* for ls_recover_idr */
294 uint32_t res_lvbseq; 292 uint32_t res_lvbseq;
295 uint32_t res_hash; 293 uint32_t res_hash;
296 uint32_t res_bucket; /* rsbtbl */ 294 uint32_t res_bucket; /* rsbtbl */
@@ -313,10 +311,21 @@ struct dlm_rsb {
313 char res_name[DLM_RESNAME_MAXLEN+1]; 311 char res_name[DLM_RESNAME_MAXLEN+1];
314}; 312};
315 313
314/* dlm_master_lookup() flags */
315
316#define DLM_LU_RECOVER_DIR 1
317#define DLM_LU_RECOVER_MASTER 2
318
319/* dlm_master_lookup() results */
320
321#define DLM_LU_MATCH 1
322#define DLM_LU_ADD 2
323
316/* find_rsb() flags */ 324/* find_rsb() flags */
317 325
318#define R_MASTER 1 /* only return rsb if it's a master */ 326#define R_REQUEST 0x00000001
319#define R_CREATE 2 /* create/add rsb if not found */ 327#define R_RECEIVE_REQUEST 0x00000002
328#define R_RECEIVE_RECOVER 0x00000004
320 329
321/* rsb_flags */ 330/* rsb_flags */
322 331
@@ -489,6 +498,13 @@ struct rcom_lock {
489 char rl_lvb[0]; 498 char rl_lvb[0];
490}; 499};
491 500
501/*
502 * The max number of resources per rsbtbl bucket that shrink will attempt
503 * to remove in each iteration.
504 */
505
506#define DLM_REMOVE_NAMES_MAX 8
507
492struct dlm_ls { 508struct dlm_ls {
493 struct list_head ls_list; /* list of lockspaces */ 509 struct list_head ls_list; /* list of lockspaces */
494 dlm_lockspace_t *ls_local_handle; 510 dlm_lockspace_t *ls_local_handle;
@@ -509,9 +525,6 @@ struct dlm_ls {
509 struct dlm_rsbtable *ls_rsbtbl; 525 struct dlm_rsbtable *ls_rsbtbl;
510 uint32_t ls_rsbtbl_size; 526 uint32_t ls_rsbtbl_size;
511 527
512 struct dlm_dirtable *ls_dirtbl;
513 uint32_t ls_dirtbl_size;
514
515 struct mutex ls_waiters_mutex; 528 struct mutex ls_waiters_mutex;
516 struct list_head ls_waiters; /* lkbs needing a reply */ 529 struct list_head ls_waiters; /* lkbs needing a reply */
517 530
@@ -525,6 +538,12 @@ struct dlm_ls {
525 int ls_new_rsb_count; 538 int ls_new_rsb_count;
526 struct list_head ls_new_rsb; /* new rsb structs */ 539 struct list_head ls_new_rsb; /* new rsb structs */
527 540
541 spinlock_t ls_remove_spin;
542 char ls_remove_name[DLM_RESNAME_MAXLEN+1];
543 char *ls_remove_names[DLM_REMOVE_NAMES_MAX];
544 int ls_remove_len;
545 int ls_remove_lens[DLM_REMOVE_NAMES_MAX];
546
528 struct list_head ls_nodes; /* current nodes in ls */ 547 struct list_head ls_nodes; /* current nodes in ls */
529 struct list_head ls_nodes_gone; /* dead node list, recovery */ 548 struct list_head ls_nodes_gone; /* dead node list, recovery */
530 int ls_num_nodes; /* number of nodes in ls */ 549 int ls_num_nodes; /* number of nodes in ls */
@@ -545,6 +564,7 @@ struct dlm_ls {
545 struct dentry *ls_debug_waiters_dentry; /* debugfs */ 564 struct dentry *ls_debug_waiters_dentry; /* debugfs */
546 struct dentry *ls_debug_locks_dentry; /* debugfs */ 565 struct dentry *ls_debug_locks_dentry; /* debugfs */
547 struct dentry *ls_debug_all_dentry; /* debugfs */ 566 struct dentry *ls_debug_all_dentry; /* debugfs */
567 struct dentry *ls_debug_toss_dentry; /* debugfs */
548 568
549 wait_queue_head_t ls_uevent_wait; /* user part of join/leave */ 569 wait_queue_head_t ls_uevent_wait; /* user part of join/leave */
550 int ls_uevent_result; 570 int ls_uevent_result;
@@ -573,13 +593,18 @@ struct dlm_ls {
573 struct mutex ls_requestqueue_mutex; 593 struct mutex ls_requestqueue_mutex;
574 struct dlm_rcom *ls_recover_buf; 594 struct dlm_rcom *ls_recover_buf;
575 int ls_recover_nodeid; /* for debugging */ 595 int ls_recover_nodeid; /* for debugging */
596 unsigned int ls_recover_dir_sent_res; /* for log info */
597 unsigned int ls_recover_dir_sent_msg; /* for log info */
576 unsigned int ls_recover_locks_in; /* for log info */ 598 unsigned int ls_recover_locks_in; /* for log info */
577 uint64_t ls_rcom_seq; 599 uint64_t ls_rcom_seq;
578 spinlock_t ls_rcom_spin; 600 spinlock_t ls_rcom_spin;
579 struct list_head ls_recover_list; 601 struct list_head ls_recover_list;
580 spinlock_t ls_recover_list_lock; 602 spinlock_t ls_recover_list_lock;
581 int ls_recover_list_count; 603 int ls_recover_list_count;
604 struct idr ls_recover_idr;
605 spinlock_t ls_recover_idr_lock;
582 wait_queue_head_t ls_wait_general; 606 wait_queue_head_t ls_wait_general;
607 wait_queue_head_t ls_recover_lock_wait;
583 struct mutex ls_clear_proc_locks; 608 struct mutex ls_clear_proc_locks;
584 609
585 struct list_head ls_root_list; /* root resources */ 610 struct list_head ls_root_list; /* root resources */
@@ -592,15 +617,40 @@ struct dlm_ls {
592 char ls_name[1]; 617 char ls_name[1];
593}; 618};
594 619
595#define LSFL_WORK 0 620/*
596#define LSFL_RUNNING 1 621 * LSFL_RECOVER_STOP - dlm_ls_stop() sets this to tell dlm recovery routines
597#define LSFL_RECOVERY_STOP 2 622 * that they should abort what they're doing so new recovery can be started.
598#define LSFL_RCOM_READY 3 623 *
599#define LSFL_RCOM_WAIT 4 624 * LSFL_RECOVER_DOWN - dlm_ls_stop() sets this to tell dlm_recoverd that it
600#define LSFL_UEVENT_WAIT 5 625 * should do down_write() on the in_recovery rw_semaphore. (doing down_write
601#define LSFL_TIMEWARN 6 626 * within dlm_ls_stop causes complaints about the lock acquired/released
602#define LSFL_CB_DELAY 7 627 * in different contexts.)
603#define LSFL_NODIR 8 628 *
629 * LSFL_RECOVER_LOCK - dlm_recoverd holds the in_recovery rw_semaphore.
630 * It sets this after it is done with down_write() on the in_recovery
631 * rw_semaphore and clears it after it has released the rw_semaphore.
632 *
633 * LSFL_RECOVER_WORK - dlm_ls_start() sets this to tell dlm_recoverd that it
634 * should begin recovery of the lockspace.
635 *
636 * LSFL_RUNNING - set when normal locking activity is enabled.
637 * dlm_ls_stop() clears this to tell dlm locking routines that they should
638 * quit what they are doing so recovery can run. dlm_recoverd sets
639 * this after recovery is finished.
640 */
641
642#define LSFL_RECOVER_STOP 0
643#define LSFL_RECOVER_DOWN 1
644#define LSFL_RECOVER_LOCK 2
645#define LSFL_RECOVER_WORK 3
646#define LSFL_RUNNING 4
647
648#define LSFL_RCOM_READY 5
649#define LSFL_RCOM_WAIT 6
650#define LSFL_UEVENT_WAIT 7
651#define LSFL_TIMEWARN 8
652#define LSFL_CB_DELAY 9
653#define LSFL_NODIR 10
604 654
605/* much of this is just saving user space pointers associated with the 655/* much of this is just saving user space pointers associated with the
606 lock that we pass back to the user lib with an ast */ 656 lock that we pass back to the user lib with an ast */
@@ -643,7 +693,7 @@ static inline int dlm_locking_stopped(struct dlm_ls *ls)
643 693
644static inline int dlm_recovery_stopped(struct dlm_ls *ls) 694static inline int dlm_recovery_stopped(struct dlm_ls *ls)
645{ 695{
646 return test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 696 return test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
647} 697}
648 698
649static inline int dlm_no_directory(struct dlm_ls *ls) 699static inline int dlm_no_directory(struct dlm_ls *ls)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index bdafb65a5234..b56950758188 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -90,6 +90,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
90static int receive_extralen(struct dlm_message *ms); 90static int receive_extralen(struct dlm_message *ms);
91static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 91static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92static void del_timeout(struct dlm_lkb *lkb); 92static void del_timeout(struct dlm_lkb *lkb);
93static void toss_rsb(struct kref *kref);
93 94
94/* 95/*
95 * Lock compatibilty matrix - thanks Steve 96 * Lock compatibilty matrix - thanks Steve
@@ -170,9 +171,11 @@ void dlm_print_lkb(struct dlm_lkb *lkb)
170 171
171static void dlm_print_rsb(struct dlm_rsb *r) 172static void dlm_print_rsb(struct dlm_rsb *r)
172{ 173{
173 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", 174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 r->res_nodeid, r->res_flags, r->res_first_lkid, 175 "rlc %d name %s\n",
175 r->res_recover_locks_count, r->res_name); 176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178 r->res_name);
176} 179}
177 180
178void dlm_dump_rsb(struct dlm_rsb *r) 181void dlm_dump_rsb(struct dlm_rsb *r)
@@ -327,6 +330,37 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
327 * Basic operations on rsb's and lkb's 330 * Basic operations on rsb's and lkb's
328 */ 331 */
329 332
333/* This is only called to add a reference when the code already holds
334 a valid reference to the rsb, so there's no need for locking. */
335
336static inline void hold_rsb(struct dlm_rsb *r)
337{
338 kref_get(&r->res_ref);
339}
340
341void dlm_hold_rsb(struct dlm_rsb *r)
342{
343 hold_rsb(r);
344}
345
346/* When all references to the rsb are gone it's transferred to
347 the tossed list for later disposal. */
348
349static void put_rsb(struct dlm_rsb *r)
350{
351 struct dlm_ls *ls = r->res_ls;
352 uint32_t bucket = r->res_bucket;
353
354 spin_lock(&ls->ls_rsbtbl[bucket].lock);
355 kref_put(&r->res_ref, toss_rsb);
356 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357}
358
359void dlm_put_rsb(struct dlm_rsb *r)
360{
361 put_rsb(r);
362}
363
330static int pre_rsb_struct(struct dlm_ls *ls) 364static int pre_rsb_struct(struct dlm_ls *ls)
331{ 365{
332 struct dlm_rsb *r1, *r2; 366 struct dlm_rsb *r1, *r2;
@@ -411,11 +445,10 @@ static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
411} 445}
412 446
413int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, 447int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
414 unsigned int flags, struct dlm_rsb **r_ret) 448 struct dlm_rsb **r_ret)
415{ 449{
416 struct rb_node *node = tree->rb_node; 450 struct rb_node *node = tree->rb_node;
417 struct dlm_rsb *r; 451 struct dlm_rsb *r;
418 int error = 0;
419 int rc; 452 int rc;
420 453
421 while (node) { 454 while (node) {
@@ -432,10 +465,8 @@ int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
432 return -EBADR; 465 return -EBADR;
433 466
434 found: 467 found:
435 if (r->res_nodeid && (flags & R_MASTER))
436 error = -ENOTBLK;
437 *r_ret = r; 468 *r_ret = r;
438 return error; 469 return 0;
439} 470}
440 471
441static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) 472static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
@@ -467,124 +498,587 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
467 return 0; 498 return 0;
468} 499}
469 500
470static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, 501/*
471 unsigned int flags, struct dlm_rsb **r_ret) 502 * Find rsb in rsbtbl and potentially create/add one
503 *
504 * Delaying the release of rsb's has a similar benefit to applications keeping
505 * NL locks on an rsb, but without the guarantee that the cached master value
506 * will still be valid when the rsb is reused. Apps aren't always smart enough
507 * to keep NL locks on an rsb that they may lock again shortly; this can lead
508 * to excessive master lookups and removals if we don't delay the release.
509 *
510 * Searching for an rsb means looking through both the normal list and toss
511 * list. When found on the toss list the rsb is moved to the normal list with
512 * ref count of 1; when found on normal list the ref count is incremented.
513 *
514 * rsb's on the keep list are being used locally and refcounted.
515 * rsb's on the toss list are not being used locally, and are not refcounted.
516 *
517 * The toss list rsb's were either
518 * - previously used locally but not any more (were on keep list, then
519 * moved to toss list when last refcount dropped)
520 * - created and put on toss list as a directory record for a lookup
521 * (we are the dir node for the res, but are not using the res right now,
522 * but some other node is)
523 *
524 * The purpose of find_rsb() is to return a refcounted rsb for local use.
525 * So, if the given rsb is on the toss list, it is moved to the keep list
526 * before being returned.
527 *
528 * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529 * more refcounts exist, so the rsb is moved from the keep list to the
530 * toss list.
531 *
532 * rsb's on both keep and toss lists are used for doing a name to master
533 * lookups. rsb's that are in use locally (and being refcounted) are on
534 * the keep list, rsb's that are not in use locally (not refcounted) and
535 * only exist for name/master lookups are on the toss list.
536 *
537 * rsb's on the toss list who's dir_nodeid is not local can have stale
538 * name/master mappings. So, remote requests on such rsb's can potentially
539 * return with an error, which means the mapping is stale and needs to
540 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
541 * first_lkid is to keep only a single outstanding request on an rsb
542 * while that rsb has a potentially stale master.)
543 */
544
545static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546 uint32_t hash, uint32_t b,
547 int dir_nodeid, int from_nodeid,
548 unsigned int flags, struct dlm_rsb **r_ret)
472{ 549{
473 struct dlm_rsb *r; 550 struct dlm_rsb *r = NULL;
551 int our_nodeid = dlm_our_nodeid();
552 int from_local = 0;
553 int from_other = 0;
554 int from_dir = 0;
555 int create = 0;
474 int error; 556 int error;
475 557
476 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r); 558 if (flags & R_RECEIVE_REQUEST) {
477 if (!error) { 559 if (from_nodeid == dir_nodeid)
478 kref_get(&r->res_ref); 560 from_dir = 1;
479 goto out; 561 else
562 from_other = 1;
563 } else if (flags & R_REQUEST) {
564 from_local = 1;
480 } 565 }
481 if (error == -ENOTBLK)
482 goto out;
483 566
484 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); 567 /*
568 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569 * from_nodeid has sent us a lock in dlm_recover_locks, believing
570 * we're the new master. Our local recovery may not have set
571 * res_master_nodeid to our_nodeid yet, so allow either. Don't
572 * create the rsb; dlm_recover_process_copy() will handle EBADR
573 * by resending.
574 *
575 * If someone sends us a request, we are the dir node, and we do
576 * not find the rsb anywhere, then recreate it. This happens if
577 * someone sends us a request after we have removed/freed an rsb
578 * from our toss list. (They sent a request instead of lookup
579 * because they are using an rsb from their toss list.)
580 */
581
582 if (from_local || from_dir ||
583 (from_other && (dir_nodeid == our_nodeid))) {
584 create = 1;
585 }
586
587 retry:
588 if (create) {
589 error = pre_rsb_struct(ls);
590 if (error < 0)
591 goto out;
592 }
593
594 spin_lock(&ls->ls_rsbtbl[b].lock);
595
596 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
485 if (error) 597 if (error)
486 goto out; 598 goto do_toss;
599
600 /*
601 * rsb is active, so we can't check master_nodeid without lock_rsb.
602 */
487 603
488 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 604 kref_get(&r->res_ref);
489 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 605 error = 0;
606 goto out_unlock;
607
608
609 do_toss:
610 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
490 if (error) 611 if (error)
491 return error; 612 goto do_new;
492 613
493 if (dlm_no_directory(ls)) 614 /*
494 goto out; 615 * rsb found inactive (master_nodeid may be out of date unless
616 * we are the dir_nodeid or were the master) No other thread
617 * is using this rsb because it's on the toss list, so we can
618 * look at or update res_master_nodeid without lock_rsb.
619 */
620
621 if ((r->res_master_nodeid != our_nodeid) && from_other) {
622 /* our rsb was not master, and another node (not the dir node)
623 has sent us a request */
624 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625 from_nodeid, r->res_master_nodeid, dir_nodeid,
626 r->res_name);
627 error = -ENOTBLK;
628 goto out_unlock;
629 }
495 630
496 if (r->res_nodeid == -1) { 631 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632 /* don't think this should ever happen */
633 log_error(ls, "find_rsb toss from_dir %d master %d",
634 from_nodeid, r->res_master_nodeid);
635 dlm_print_rsb(r);
636 /* fix it and go on */
637 r->res_master_nodeid = our_nodeid;
638 r->res_nodeid = 0;
497 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 639 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
498 r->res_first_lkid = 0; 640 r->res_first_lkid = 0;
499 } else if (r->res_nodeid > 0) { 641 }
642
643 if (from_local && (r->res_master_nodeid != our_nodeid)) {
644 /* Because we have held no locks on this rsb,
645 res_master_nodeid could have become stale. */
500 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 646 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
501 r->res_first_lkid = 0; 647 r->res_first_lkid = 0;
648 }
649
650 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652 goto out_unlock;
653
654
655 do_new:
656 /*
657 * rsb not found
658 */
659
660 if (error == -EBADR && !create)
661 goto out_unlock;
662
663 error = get_rsb_struct(ls, name, len, &r);
664 if (error == -EAGAIN) {
665 spin_unlock(&ls->ls_rsbtbl[b].lock);
666 goto retry;
667 }
668 if (error)
669 goto out_unlock;
670
671 r->res_hash = hash;
672 r->res_bucket = b;
673 r->res_dir_nodeid = dir_nodeid;
674 kref_init(&r->res_ref);
675
676 if (from_dir) {
677 /* want to see how often this happens */
678 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679 from_nodeid, r->res_name);
680 r->res_master_nodeid = our_nodeid;
681 r->res_nodeid = 0;
682 goto out_add;
683 }
684
685 if (from_other && (dir_nodeid != our_nodeid)) {
686 /* should never happen */
687 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689 dlm_free_rsb(r);
690 error = -ENOTBLK;
691 goto out_unlock;
692 }
693
694 if (from_other) {
695 log_debug(ls, "find_rsb new from_other %d dir %d %s",
696 from_nodeid, dir_nodeid, r->res_name);
697 }
698
699 if (dir_nodeid == our_nodeid) {
700 /* When we are the dir nodeid, we can set the master
701 node immediately */
702 r->res_master_nodeid = our_nodeid;
703 r->res_nodeid = 0;
502 } else { 704 } else {
503 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r);); 705 /* set_master will send_lookup to dir_nodeid */
504 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),); 706 r->res_master_nodeid = 0;
707 r->res_nodeid = -1;
708 }
709
710 out_add:
711 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
712 out_unlock:
713 spin_unlock(&ls->ls_rsbtbl[b].lock);
714 out:
715 *r_ret = r;
716 return error;
717}
718
719/* During recovery, other nodes can send us new MSTCPY locks (from
720 dlm_recover_locks) before we've made ourself master (in
721 dlm_recover_masters). */
722
723static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
724 uint32_t hash, uint32_t b,
725 int dir_nodeid, int from_nodeid,
726 unsigned int flags, struct dlm_rsb **r_ret)
727{
728 struct dlm_rsb *r = NULL;
729 int our_nodeid = dlm_our_nodeid();
730 int recover = (flags & R_RECEIVE_RECOVER);
731 int error;
732
733 retry:
734 error = pre_rsb_struct(ls);
735 if (error < 0)
736 goto out;
737
738 spin_lock(&ls->ls_rsbtbl[b].lock);
739
740 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
741 if (error)
742 goto do_toss;
743
744 /*
745 * rsb is active, so we can't check master_nodeid without lock_rsb.
746 */
747
748 kref_get(&r->res_ref);
749 goto out_unlock;
750
751
752 do_toss:
753 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
754 if (error)
755 goto do_new;
756
757 /*
758 * rsb found inactive. No other thread is using this rsb because
759 * it's on the toss list, so we can look at or update
760 * res_master_nodeid without lock_rsb.
761 */
762
763 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
764 /* our rsb is not master, and another node has sent us a
765 request; this should never happen */
766 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
767 from_nodeid, r->res_master_nodeid, dir_nodeid);
768 dlm_print_rsb(r);
769 error = -ENOTBLK;
770 goto out_unlock;
505 } 771 }
772
773 if (!recover && (r->res_master_nodeid != our_nodeid) &&
774 (dir_nodeid == our_nodeid)) {
775 /* our rsb is not master, and we are dir; may as well fix it;
776 this should never happen */
777 log_error(ls, "find_rsb toss our %d master %d dir %d",
778 our_nodeid, r->res_master_nodeid, dir_nodeid);
779 dlm_print_rsb(r);
780 r->res_master_nodeid = our_nodeid;
781 r->res_nodeid = 0;
782 }
783
784 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
785 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
786 goto out_unlock;
787
788
789 do_new:
790 /*
791 * rsb not found
792 */
793
794 error = get_rsb_struct(ls, name, len, &r);
795 if (error == -EAGAIN) {
796 spin_unlock(&ls->ls_rsbtbl[b].lock);
797 goto retry;
798 }
799 if (error)
800 goto out_unlock;
801
802 r->res_hash = hash;
803 r->res_bucket = b;
804 r->res_dir_nodeid = dir_nodeid;
805 r->res_master_nodeid = dir_nodeid;
806 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
807 kref_init(&r->res_ref);
808
809 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
810 out_unlock:
811 spin_unlock(&ls->ls_rsbtbl[b].lock);
506 out: 812 out:
507 *r_ret = r; 813 *r_ret = r;
508 return error; 814 return error;
509} 815}
510 816
817static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
818 unsigned int flags, struct dlm_rsb **r_ret)
819{
820 uint32_t hash, b;
821 int dir_nodeid;
822
823 if (len > DLM_RESNAME_MAXLEN)
824 return -EINVAL;
825
826 hash = jhash(name, len, 0);
827 b = hash & (ls->ls_rsbtbl_size - 1);
828
829 dir_nodeid = dlm_hash2nodeid(ls, hash);
830
831 if (dlm_no_directory(ls))
832 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
833 from_nodeid, flags, r_ret);
834 else
835 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836 from_nodeid, flags, r_ret);
837}
838
839/* we have received a request and found that res_master_nodeid != our_nodeid,
840 so we need to return an error or make ourself the master */
841
842static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
843 int from_nodeid)
844{
845 if (dlm_no_directory(ls)) {
846 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
847 from_nodeid, r->res_master_nodeid,
848 r->res_dir_nodeid);
849 dlm_print_rsb(r);
850 return -ENOTBLK;
851 }
852
853 if (from_nodeid != r->res_dir_nodeid) {
854 /* our rsb is not master, and another node (not the dir node)
855 has sent us a request. this is much more common when our
856 master_nodeid is zero, so limit debug to non-zero. */
857
858 if (r->res_master_nodeid) {
859 log_debug(ls, "validate master from_other %d master %d "
860 "dir %d first %x %s", from_nodeid,
861 r->res_master_nodeid, r->res_dir_nodeid,
862 r->res_first_lkid, r->res_name);
863 }
864 return -ENOTBLK;
865 } else {
866 /* our rsb is not master, but the dir nodeid has sent us a
867 request; this could happen with master 0 / res_nodeid -1 */
868
869 if (r->res_master_nodeid) {
870 log_error(ls, "validate master from_dir %d master %d "
871 "first %x %s",
872 from_nodeid, r->res_master_nodeid,
873 r->res_first_lkid, r->res_name);
874 }
875
876 r->res_master_nodeid = dlm_our_nodeid();
877 r->res_nodeid = 0;
878 return 0;
879 }
880}
881
511/* 882/*
512 * Find rsb in rsbtbl and potentially create/add one 883 * We're the dir node for this res and another node wants to know the
884 * master nodeid. During normal operation (non recovery) this is only
885 * called from receive_lookup(); master lookups when the local node is
886 * the dir node are done by find_rsb().
513 * 887 *
514 * Delaying the release of rsb's has a similar benefit to applications keeping 888 * normal operation, we are the dir node for a resource
515 * NL locks on an rsb, but without the guarantee that the cached master value 889 * . _request_lock
516 * will still be valid when the rsb is reused. Apps aren't always smart enough 890 * . set_master
517 * to keep NL locks on an rsb that they may lock again shortly; this can lead 891 * . send_lookup
518 * to excessive master lookups and removals if we don't delay the release. 892 * . receive_lookup
893 * . dlm_master_lookup flags 0
519 * 894 *
520 * Searching for an rsb means looking through both the normal list and toss 895 * recover directory, we are rebuilding dir for all resources
521 * list. When found on the toss list the rsb is moved to the normal list with 896 * . dlm_recover_directory
522 * ref count of 1; when found on normal list the ref count is incremented. 897 * . dlm_rcom_names
898 * remote node sends back the rsb names it is master of and we are dir of
899 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
900 * we either create new rsb setting remote node as master, or find existing
901 * rsb and set master to be the remote node.
902 *
903 * recover masters, we are finding the new master for resources
904 * . dlm_recover_masters
905 * . recover_master
906 * . dlm_send_rcom_lookup
907 * . receive_rcom_lookup
908 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
523 */ 909 */
524 910
525static int find_rsb(struct dlm_ls *ls, char *name, int namelen, 911int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
526 unsigned int flags, struct dlm_rsb **r_ret) 912 unsigned int flags, int *r_nodeid, int *result)
527{ 913{
528 struct dlm_rsb *r = NULL; 914 struct dlm_rsb *r = NULL;
529 uint32_t hash, bucket; 915 uint32_t hash, b;
530 int error; 916 int from_master = (flags & DLM_LU_RECOVER_DIR);
917 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
918 int our_nodeid = dlm_our_nodeid();
919 int dir_nodeid, error, toss_list = 0;
531 920
532 if (namelen > DLM_RESNAME_MAXLEN) { 921 if (len > DLM_RESNAME_MAXLEN)
533 error = -EINVAL; 922 return -EINVAL;
534 goto out; 923
924 if (from_nodeid == our_nodeid) {
925 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
926 our_nodeid, flags);
927 return -EINVAL;
535 } 928 }
536 929
537 if (dlm_no_directory(ls)) 930 hash = jhash(name, len, 0);
538 flags |= R_CREATE; 931 b = hash & (ls->ls_rsbtbl_size - 1);
539 932
540 hash = jhash(name, namelen, 0); 933 dir_nodeid = dlm_hash2nodeid(ls, hash);
541 bucket = hash & (ls->ls_rsbtbl_size - 1); 934 if (dir_nodeid != our_nodeid) {
935 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
936 from_nodeid, dir_nodeid, our_nodeid, hash,
937 ls->ls_num_nodes);
938 *r_nodeid = -1;
939 return -EINVAL;
940 }
542 941
543 retry: 942 retry:
544 if (flags & R_CREATE) { 943 error = pre_rsb_struct(ls);
545 error = pre_rsb_struct(ls); 944 if (error < 0)
546 if (error < 0) 945 return error;
547 goto out; 946
947 spin_lock(&ls->ls_rsbtbl[b].lock);
948 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
949 if (!error) {
950 /* because the rsb is active, we need to lock_rsb before
951 checking/changing re_master_nodeid */
952
953 hold_rsb(r);
954 spin_unlock(&ls->ls_rsbtbl[b].lock);
955 lock_rsb(r);
956 goto found;
548 } 957 }
549 958
550 spin_lock(&ls->ls_rsbtbl[bucket].lock); 959 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
960 if (error)
961 goto not_found;
551 962
552 error = _search_rsb(ls, name, namelen, bucket, flags, &r); 963 /* because the rsb is inactive (on toss list), it's not refcounted
553 if (!error) 964 and lock_rsb is not used, but is protected by the rsbtbl lock */
554 goto out_unlock;
555 965
556 if (error == -EBADR && !(flags & R_CREATE)) 966 toss_list = 1;
557 goto out_unlock; 967 found:
968 if (r->res_dir_nodeid != our_nodeid) {
969 /* should not happen, but may as well fix it and carry on */
970 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
971 r->res_dir_nodeid, our_nodeid, r->res_name);
972 r->res_dir_nodeid = our_nodeid;
973 }
974
975 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
976 /* Recovery uses this function to set a new master when
977 the previous master failed. Setting NEW_MASTER will
978 force dlm_recover_masters to call recover_master on this
979 rsb even though the res_nodeid is no longer removed. */
980
981 r->res_master_nodeid = from_nodeid;
982 r->res_nodeid = from_nodeid;
983 rsb_set_flag(r, RSB_NEW_MASTER);
984
985 if (toss_list) {
986 /* I don't think we should ever find it on toss list. */
987 log_error(ls, "dlm_master_lookup fix_master on toss");
988 dlm_dump_rsb(r);
989 }
990 }
558 991
559 /* the rsb was found but wasn't a master copy */ 992 if (from_master && (r->res_master_nodeid != from_nodeid)) {
560 if (error == -ENOTBLK) 993 /* this will happen if from_nodeid became master during
561 goto out_unlock; 994 a previous recovery cycle, and we aborted the previous
995 cycle before recovering this master value */
996
997 log_limit(ls, "dlm_master_lookup from_master %d "
998 "master_nodeid %d res_nodeid %d first %x %s",
999 from_nodeid, r->res_master_nodeid, r->res_nodeid,
1000 r->res_first_lkid, r->res_name);
1001
1002 if (r->res_master_nodeid == our_nodeid) {
1003 log_error(ls, "from_master %d our_master", from_nodeid);
1004 dlm_dump_rsb(r);
1005 dlm_send_rcom_lookup_dump(r, from_nodeid);
1006 goto out_found;
1007 }
1008
1009 r->res_master_nodeid = from_nodeid;
1010 r->res_nodeid = from_nodeid;
1011 rsb_set_flag(r, RSB_NEW_MASTER);
1012 }
1013
1014 if (!r->res_master_nodeid) {
1015 /* this will happen if recovery happens while we're looking
1016 up the master for this rsb */
1017
1018 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019 from_nodeid, r->res_first_lkid, r->res_name);
1020 r->res_master_nodeid = from_nodeid;
1021 r->res_nodeid = from_nodeid;
1022 }
562 1023
563 error = get_rsb_struct(ls, name, namelen, &r); 1024 if (!from_master && !fix_master &&
1025 (r->res_master_nodeid == from_nodeid)) {
1026 /* this can happen when the master sends remove, the dir node
1027 finds the rsb on the keep list and ignores the remove,
1028 and the former master sends a lookup */
1029
1030 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031 "first %x %s", from_nodeid, flags,
1032 r->res_first_lkid, r->res_name);
1033 }
1034
1035 out_found:
1036 *r_nodeid = r->res_master_nodeid;
1037 if (result)
1038 *result = DLM_LU_MATCH;
1039
1040 if (toss_list) {
1041 r->res_toss_time = jiffies;
1042 /* the rsb was inactive (on toss list) */
1043 spin_unlock(&ls->ls_rsbtbl[b].lock);
1044 } else {
1045 /* the rsb was active */
1046 unlock_rsb(r);
1047 put_rsb(r);
1048 }
1049 return 0;
1050
1051 not_found:
1052 error = get_rsb_struct(ls, name, len, &r);
564 if (error == -EAGAIN) { 1053 if (error == -EAGAIN) {
565 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 1054 spin_unlock(&ls->ls_rsbtbl[b].lock);
566 goto retry; 1055 goto retry;
567 } 1056 }
568 if (error) 1057 if (error)
569 goto out_unlock; 1058 goto out_unlock;
570 1059
571 r->res_hash = hash; 1060 r->res_hash = hash;
572 r->res_bucket = bucket; 1061 r->res_bucket = b;
573 r->res_nodeid = -1; 1062 r->res_dir_nodeid = our_nodeid;
1063 r->res_master_nodeid = from_nodeid;
1064 r->res_nodeid = from_nodeid;
574 kref_init(&r->res_ref); 1065 kref_init(&r->res_ref);
1066 r->res_toss_time = jiffies;
575 1067
576 /* With no directory, the master can be set immediately */ 1068 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
577 if (dlm_no_directory(ls)) { 1069 if (error) {
578 int nodeid = dlm_dir_nodeid(r); 1070 /* should never happen */
579 if (nodeid == dlm_our_nodeid()) 1071 dlm_free_rsb(r);
580 nodeid = 0; 1072 spin_unlock(&ls->ls_rsbtbl[b].lock);
581 r->res_nodeid = nodeid; 1073 goto retry;
582 } 1074 }
583 error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep); 1075
1076 if (result)
1077 *result = DLM_LU_ADD;
1078 *r_nodeid = from_nodeid;
1079 error = 0;
584 out_unlock: 1080 out_unlock:
585 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 1081 spin_unlock(&ls->ls_rsbtbl[b].lock);
586 out:
587 *r_ret = r;
588 return error; 1082 return error;
589} 1083}
590 1084
@@ -605,17 +1099,27 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
605 } 1099 }
606} 1100}
607 1101
608/* This is only called to add a reference when the code already holds 1102void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
609 a valid reference to the rsb, so there's no need for locking. */
610
611static inline void hold_rsb(struct dlm_rsb *r)
612{ 1103{
613 kref_get(&r->res_ref); 1104 struct dlm_rsb *r = NULL;
614} 1105 uint32_t hash, b;
1106 int error;
615 1107
616void dlm_hold_rsb(struct dlm_rsb *r) 1108 hash = jhash(name, len, 0);
617{ 1109 b = hash & (ls->ls_rsbtbl_size - 1);
618 hold_rsb(r); 1110
1111 spin_lock(&ls->ls_rsbtbl[b].lock);
1112 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1113 if (!error)
1114 goto out_dump;
1115
1116 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1117 if (error)
1118 goto out;
1119 out_dump:
1120 dlm_dump_rsb(r);
1121 out:
1122 spin_unlock(&ls->ls_rsbtbl[b].lock);
619} 1123}
620 1124
621static void toss_rsb(struct kref *kref) 1125static void toss_rsb(struct kref *kref)
@@ -634,24 +1138,6 @@ static void toss_rsb(struct kref *kref)
634 } 1138 }
635} 1139}
636 1140
637/* When all references to the rsb are gone it's transferred to
638 the tossed list for later disposal. */
639
640static void put_rsb(struct dlm_rsb *r)
641{
642 struct dlm_ls *ls = r->res_ls;
643 uint32_t bucket = r->res_bucket;
644
645 spin_lock(&ls->ls_rsbtbl[bucket].lock);
646 kref_put(&r->res_ref, toss_rsb);
647 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
648}
649
650void dlm_put_rsb(struct dlm_rsb *r)
651{
652 put_rsb(r);
653}
654
655/* See comment for unhold_lkb */ 1141/* See comment for unhold_lkb */
656 1142
657static void unhold_rsb(struct dlm_rsb *r) 1143static void unhold_rsb(struct dlm_rsb *r)
@@ -1138,61 +1624,170 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1138 return error; 1624 return error;
1139} 1625}
1140 1626
1141static void dir_remove(struct dlm_rsb *r) 1627/* If there's an rsb for the same resource being removed, ensure
1142{ 1628 that the remove message is sent before the new lookup message.
1143 int to_nodeid; 1629 It should be rare to need a delay here, but if not, then it may
1144 1630 be worthwhile to add a proper wait mechanism rather than a delay. */
1145 if (dlm_no_directory(r->res_ls))
1146 return;
1147 1631
1148 to_nodeid = dlm_dir_nodeid(r); 1632static void wait_pending_remove(struct dlm_rsb *r)
1149 if (to_nodeid != dlm_our_nodeid()) 1633{
1150 send_remove(r); 1634 struct dlm_ls *ls = r->res_ls;
1151 else 1635 restart:
1152 dlm_dir_remove_entry(r->res_ls, to_nodeid, 1636 spin_lock(&ls->ls_remove_spin);
1153 r->res_name, r->res_length); 1637 if (ls->ls_remove_len &&
1638 !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1639 log_debug(ls, "delay lookup for remove dir %d %s",
1640 r->res_dir_nodeid, r->res_name);
1641 spin_unlock(&ls->ls_remove_spin);
1642 msleep(1);
1643 goto restart;
1644 }
1645 spin_unlock(&ls->ls_remove_spin);
1154} 1646}
1155 1647
1156/* FIXME: make this more efficient */ 1648/*
1649 * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1650 * read by other threads in wait_pending_remove. ls_remove_names
1651 * and ls_remove_lens are only used by the scan thread, so they do
1652 * not need protection.
1653 */
1157 1654
1158static int shrink_bucket(struct dlm_ls *ls, int b) 1655static void shrink_bucket(struct dlm_ls *ls, int b)
1159{ 1656{
1160 struct rb_node *n; 1657 struct rb_node *n, *next;
1161 struct dlm_rsb *r; 1658 struct dlm_rsb *r;
1162 int count = 0, found; 1659 char *name;
1660 int our_nodeid = dlm_our_nodeid();
1661 int remote_count = 0;
1662 int i, len, rv;
1663
1664 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1665
1666 spin_lock(&ls->ls_rsbtbl[b].lock);
1667 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1668 next = rb_next(n);
1669 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1670
1671 /* If we're the directory record for this rsb, and
1672 we're not the master of it, then we need to wait
1673 for the master node to send us a dir remove for
1674 before removing the dir record. */
1675
1676 if (!dlm_no_directory(ls) &&
1677 (r->res_master_nodeid != our_nodeid) &&
1678 (dlm_dir_nodeid(r) == our_nodeid)) {
1679 continue;
1680 }
1681
1682 if (!time_after_eq(jiffies, r->res_toss_time +
1683 dlm_config.ci_toss_secs * HZ)) {
1684 continue;
1685 }
1686
1687 if (!dlm_no_directory(ls) &&
1688 (r->res_master_nodeid == our_nodeid) &&
1689 (dlm_dir_nodeid(r) != our_nodeid)) {
1690
1691 /* We're the master of this rsb but we're not
1692 the directory record, so we need to tell the
1693 dir node to remove the dir record. */
1694
1695 ls->ls_remove_lens[remote_count] = r->res_length;
1696 memcpy(ls->ls_remove_names[remote_count], r->res_name,
1697 DLM_RESNAME_MAXLEN);
1698 remote_count++;
1699
1700 if (remote_count >= DLM_REMOVE_NAMES_MAX)
1701 break;
1702 continue;
1703 }
1704
1705 if (!kref_put(&r->res_ref, kill_rsb)) {
1706 log_error(ls, "tossed rsb in use %s", r->res_name);
1707 continue;
1708 }
1709
1710 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1711 dlm_free_rsb(r);
1712 }
1713 spin_unlock(&ls->ls_rsbtbl[b].lock);
1714
1715 /*
1716 * While searching for rsb's to free, we found some that require
1717 * remote removal. We leave them in place and find them again here
1718 * so there is a very small gap between removing them from the toss
1719 * list and sending the removal. Keeping this gap small is
1720 * important to keep us (the master node) from being out of sync
1721 * with the remote dir node for very long.
1722 *
1723 * From the time the rsb is removed from toss until just after
1724 * send_remove, the rsb name is saved in ls_remove_name. A new
1725 * lookup checks this to ensure that a new lookup message for the
1726 * same resource name is not sent just before the remove message.
1727 */
1728
1729 for (i = 0; i < remote_count; i++) {
1730 name = ls->ls_remove_names[i];
1731 len = ls->ls_remove_lens[i];
1163 1732
1164 for (;;) {
1165 found = 0;
1166 spin_lock(&ls->ls_rsbtbl[b].lock); 1733 spin_lock(&ls->ls_rsbtbl[b].lock);
1167 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) { 1734 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1168 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1735 if (rv) {
1169 if (!time_after_eq(jiffies, r->res_toss_time + 1736 spin_unlock(&ls->ls_rsbtbl[b].lock);
1170 dlm_config.ci_toss_secs * HZ)) 1737 log_debug(ls, "remove_name not toss %s", name);
1171 continue; 1738 continue;
1172 found = 1;
1173 break;
1174 } 1739 }
1175 1740
1176 if (!found) { 1741 if (r->res_master_nodeid != our_nodeid) {
1177 spin_unlock(&ls->ls_rsbtbl[b].lock); 1742 spin_unlock(&ls->ls_rsbtbl[b].lock);
1178 break; 1743 log_debug(ls, "remove_name master %d dir %d our %d %s",
1744 r->res_master_nodeid, r->res_dir_nodeid,
1745 our_nodeid, name);
1746 continue;
1179 } 1747 }
1180 1748
1181 if (kref_put(&r->res_ref, kill_rsb)) { 1749 if (r->res_dir_nodeid == our_nodeid) {
1182 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1750 /* should never happen */
1183 spin_unlock(&ls->ls_rsbtbl[b].lock); 1751 spin_unlock(&ls->ls_rsbtbl[b].lock);
1752 log_error(ls, "remove_name dir %d master %d our %d %s",
1753 r->res_dir_nodeid, r->res_master_nodeid,
1754 our_nodeid, name);
1755 continue;
1756 }
1184 1757
1185 if (is_master(r)) 1758 if (!time_after_eq(jiffies, r->res_toss_time +
1186 dir_remove(r); 1759 dlm_config.ci_toss_secs * HZ)) {
1187 dlm_free_rsb(r);
1188 count++;
1189 } else {
1190 spin_unlock(&ls->ls_rsbtbl[b].lock); 1760 spin_unlock(&ls->ls_rsbtbl[b].lock);
1191 log_error(ls, "tossed rsb in use %s", r->res_name); 1761 log_debug(ls, "remove_name toss_time %lu now %lu %s",
1762 r->res_toss_time, jiffies, name);
1763 continue;
1192 } 1764 }
1193 }
1194 1765
1195 return count; 1766 if (!kref_put(&r->res_ref, kill_rsb)) {
1767 spin_unlock(&ls->ls_rsbtbl[b].lock);
1768 log_error(ls, "remove_name in use %s", name);
1769 continue;
1770 }
1771
1772 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1773
1774 /* block lookup of same name until we've sent remove */
1775 spin_lock(&ls->ls_remove_spin);
1776 ls->ls_remove_len = len;
1777 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1778 spin_unlock(&ls->ls_remove_spin);
1779 spin_unlock(&ls->ls_rsbtbl[b].lock);
1780
1781 send_remove(r);
1782
1783 /* allow lookup of name again */
1784 spin_lock(&ls->ls_remove_spin);
1785 ls->ls_remove_len = 0;
1786 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1787 spin_unlock(&ls->ls_remove_spin);
1788
1789 dlm_free_rsb(r);
1790 }
1196} 1791}
1197 1792
1198void dlm_scan_rsbs(struct dlm_ls *ls) 1793void dlm_scan_rsbs(struct dlm_ls *ls)
@@ -1684,10 +2279,14 @@ static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1684 * immediate request, it is 0 if called later, after the lock has been 2279 * immediate request, it is 0 if called later, after the lock has been
1685 * queued. 2280 * queued.
1686 * 2281 *
2282 * recover is 1 if dlm_recover_grant() is trying to grant conversions
2283 * after recovery.
2284 *
1687 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis 2285 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1688 */ 2286 */
1689 2287
1690static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) 2288static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2289 int recover)
1691{ 2290{
1692 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); 2291 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1693 2292
@@ -1719,7 +2318,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1719 */ 2318 */
1720 2319
1721 if (queue_conflict(&r->res_grantqueue, lkb)) 2320 if (queue_conflict(&r->res_grantqueue, lkb))
1722 goto out; 2321 return 0;
1723 2322
1724 /* 2323 /*
1725 * 6-3: By default, a conversion request is immediately granted if the 2324 * 6-3: By default, a conversion request is immediately granted if the
@@ -1728,7 +2327,24 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1728 */ 2327 */
1729 2328
1730 if (queue_conflict(&r->res_convertqueue, lkb)) 2329 if (queue_conflict(&r->res_convertqueue, lkb))
1731 goto out; 2330 return 0;
2331
2332 /*
2333 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2334 * locks for a recovered rsb, on which lkb's have been rebuilt.
2335 * The lkb's may have been rebuilt on the queues in a different
2336 * order than they were in on the previous master. So, granting
2337 * queued conversions in order after recovery doesn't make sense
2338 * since the order hasn't been preserved anyway. The new order
2339 * could also have created a new "in place" conversion deadlock.
2340 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2341 * After recovery, there would be no granted locks, and possibly
2342 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after
2343 * recovery, grant conversions without considering order.
2344 */
2345
2346 if (conv && recover)
2347 return 1;
1732 2348
1733 /* 2349 /*
1734 * 6-5: But the default algorithm for deciding whether to grant or 2350 * 6-5: But the default algorithm for deciding whether to grant or
@@ -1765,7 +2381,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1765 if (list_empty(&r->res_convertqueue)) 2381 if (list_empty(&r->res_convertqueue))
1766 return 1; 2382 return 1;
1767 else 2383 else
1768 goto out; 2384 return 0;
1769 } 2385 }
1770 2386
1771 /* 2387 /*
@@ -1811,12 +2427,12 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1811 if (!now && !conv && list_empty(&r->res_convertqueue) && 2427 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1812 first_in_list(lkb, &r->res_waitqueue)) 2428 first_in_list(lkb, &r->res_waitqueue))
1813 return 1; 2429 return 1;
1814 out: 2430
1815 return 0; 2431 return 0;
1816} 2432}
1817 2433
1818static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2434static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1819 int *err) 2435 int recover, int *err)
1820{ 2436{
1821 int rv; 2437 int rv;
1822 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 2438 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
@@ -1825,7 +2441,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1825 if (err) 2441 if (err)
1826 *err = 0; 2442 *err = 0;
1827 2443
1828 rv = _can_be_granted(r, lkb, now); 2444 rv = _can_be_granted(r, lkb, now, recover);
1829 if (rv) 2445 if (rv)
1830 goto out; 2446 goto out;
1831 2447
@@ -1866,7 +2482,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1866 2482
1867 if (alt) { 2483 if (alt) {
1868 lkb->lkb_rqmode = alt; 2484 lkb->lkb_rqmode = alt;
1869 rv = _can_be_granted(r, lkb, now); 2485 rv = _can_be_granted(r, lkb, now, 0);
1870 if (rv) 2486 if (rv)
1871 lkb->lkb_sbflags |= DLM_SBF_ALTMODE; 2487 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1872 else 2488 else
@@ -1890,6 +2506,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
1890 unsigned int *count) 2506 unsigned int *count)
1891{ 2507{
1892 struct dlm_lkb *lkb, *s; 2508 struct dlm_lkb *lkb, *s;
2509 int recover = rsb_flag(r, RSB_RECOVER_GRANT);
1893 int hi, demoted, quit, grant_restart, demote_restart; 2510 int hi, demoted, quit, grant_restart, demote_restart;
1894 int deadlk; 2511 int deadlk;
1895 2512
@@ -1903,7 +2520,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
1903 demoted = is_demoted(lkb); 2520 demoted = is_demoted(lkb);
1904 deadlk = 0; 2521 deadlk = 0;
1905 2522
1906 if (can_be_granted(r, lkb, 0, &deadlk)) { 2523 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
1907 grant_lock_pending(r, lkb); 2524 grant_lock_pending(r, lkb);
1908 grant_restart = 1; 2525 grant_restart = 1;
1909 if (count) 2526 if (count)
@@ -1947,7 +2564,7 @@ static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
1947 struct dlm_lkb *lkb, *s; 2564 struct dlm_lkb *lkb, *s;
1948 2565
1949 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 2566 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1950 if (can_be_granted(r, lkb, 0, NULL)) { 2567 if (can_be_granted(r, lkb, 0, 0, NULL)) {
1951 grant_lock_pending(r, lkb); 2568 grant_lock_pending(r, lkb);
1952 if (count) 2569 if (count)
1953 (*count)++; 2570 (*count)++;
@@ -2078,8 +2695,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2078 2695
2079static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 2696static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2080{ 2697{
2081 struct dlm_ls *ls = r->res_ls; 2698 int our_nodeid = dlm_our_nodeid();
2082 int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
2083 2699
2084 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 2700 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2085 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 2701 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
@@ -2093,53 +2709,37 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2093 return 1; 2709 return 1;
2094 } 2710 }
2095 2711
2096 if (r->res_nodeid == 0) { 2712 if (r->res_master_nodeid == our_nodeid) {
2097 lkb->lkb_nodeid = 0; 2713 lkb->lkb_nodeid = 0;
2098 return 0; 2714 return 0;
2099 } 2715 }
2100 2716
2101 if (r->res_nodeid > 0) { 2717 if (r->res_master_nodeid) {
2102 lkb->lkb_nodeid = r->res_nodeid; 2718 lkb->lkb_nodeid = r->res_master_nodeid;
2103 return 0; 2719 return 0;
2104 } 2720 }
2105 2721
2106 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r);); 2722 if (dlm_dir_nodeid(r) == our_nodeid) {
2107 2723 /* This is a somewhat unusual case; find_rsb will usually
2108 dir_nodeid = dlm_dir_nodeid(r); 2724 have set res_master_nodeid when dir nodeid is local, but
2109 2725 there are cases where we become the dir node after we've
2110 if (dir_nodeid != our_nodeid) { 2726 past find_rsb and go through _request_lock again.
2111 r->res_first_lkid = lkb->lkb_id; 2727 confirm_master() or process_lookup_list() needs to be
2112 send_lookup(r, lkb); 2728 called after this. */
2113 return 1; 2729 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2114 } 2730 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2115 2731 r->res_name);
2116 for (i = 0; i < 2; i++) { 2732 r->res_master_nodeid = our_nodeid;
2117 /* It's possible for dlm_scand to remove an old rsb for
2118 this same resource from the toss list, us to create
2119 a new one, look up the master locally, and find it
2120 already exists just before dlm_scand does the
2121 dir_remove() on the previous rsb. */
2122
2123 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2124 r->res_length, &ret_nodeid);
2125 if (!error)
2126 break;
2127 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2128 schedule();
2129 }
2130 if (error && error != -EEXIST)
2131 return error;
2132
2133 if (ret_nodeid == our_nodeid) {
2134 r->res_first_lkid = 0;
2135 r->res_nodeid = 0; 2733 r->res_nodeid = 0;
2136 lkb->lkb_nodeid = 0; 2734 lkb->lkb_nodeid = 0;
2137 } else { 2735 return 0;
2138 r->res_first_lkid = lkb->lkb_id;
2139 r->res_nodeid = ret_nodeid;
2140 lkb->lkb_nodeid = ret_nodeid;
2141 } 2736 }
2142 return 0; 2737
2738 wait_pending_remove(r);
2739
2740 r->res_first_lkid = lkb->lkb_id;
2741 send_lookup(r, lkb);
2742 return 1;
2143} 2743}
2144 2744
2145static void process_lookup_list(struct dlm_rsb *r) 2745static void process_lookup_list(struct dlm_rsb *r)
@@ -2464,7 +3064,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2464{ 3064{
2465 int error = 0; 3065 int error = 0;
2466 3066
2467 if (can_be_granted(r, lkb, 1, NULL)) { 3067 if (can_be_granted(r, lkb, 1, 0, NULL)) {
2468 grant_lock(r, lkb); 3068 grant_lock(r, lkb);
2469 queue_cast(r, lkb, 0); 3069 queue_cast(r, lkb, 0);
2470 goto out; 3070 goto out;
@@ -2504,7 +3104,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2504 3104
2505 /* changing an existing lock may allow others to be granted */ 3105 /* changing an existing lock may allow others to be granted */
2506 3106
2507 if (can_be_granted(r, lkb, 1, &deadlk)) { 3107 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
2508 grant_lock(r, lkb); 3108 grant_lock(r, lkb);
2509 queue_cast(r, lkb, 0); 3109 queue_cast(r, lkb, 0);
2510 goto out; 3110 goto out;
@@ -2530,7 +3130,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2530 3130
2531 if (is_demoted(lkb)) { 3131 if (is_demoted(lkb)) {
2532 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); 3132 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2533 if (_can_be_granted(r, lkb, 1)) { 3133 if (_can_be_granted(r, lkb, 1, 0)) {
2534 grant_lock(r, lkb); 3134 grant_lock(r, lkb);
2535 queue_cast(r, lkb, 0); 3135 queue_cast(r, lkb, 0);
2536 goto out; 3136 goto out;
@@ -2584,7 +3184,7 @@ static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2584} 3184}
2585 3185
2586/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 3186/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2587 3187
2588static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3188static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2589{ 3189{
2590 int error; 3190 int error;
@@ -2708,11 +3308,11 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2708 3308
2709 error = validate_lock_args(ls, lkb, args); 3309 error = validate_lock_args(ls, lkb, args);
2710 if (error) 3310 if (error)
2711 goto out; 3311 return error;
2712 3312
2713 error = find_rsb(ls, name, len, R_CREATE, &r); 3313 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
2714 if (error) 3314 if (error)
2715 goto out; 3315 return error;
2716 3316
2717 lock_rsb(r); 3317 lock_rsb(r);
2718 3318
@@ -2723,8 +3323,6 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2723 3323
2724 unlock_rsb(r); 3324 unlock_rsb(r);
2725 put_rsb(r); 3325 put_rsb(r);
2726
2727 out:
2728 return error; 3326 return error;
2729} 3327}
2730 3328
@@ -3402,11 +4000,72 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3402 return error; 4000 return error;
3403} 4001}
3404 4002
4003static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4004{
4005 char name[DLM_RESNAME_MAXLEN + 1];
4006 struct dlm_message *ms;
4007 struct dlm_mhandle *mh;
4008 struct dlm_rsb *r;
4009 uint32_t hash, b;
4010 int rv, dir_nodeid;
4011
4012 memset(name, 0, sizeof(name));
4013 memcpy(name, ms_name, len);
4014
4015 hash = jhash(name, len, 0);
4016 b = hash & (ls->ls_rsbtbl_size - 1);
4017
4018 dir_nodeid = dlm_hash2nodeid(ls, hash);
4019
4020 log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4021
4022 spin_lock(&ls->ls_rsbtbl[b].lock);
4023 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4024 if (!rv) {
4025 spin_unlock(&ls->ls_rsbtbl[b].lock);
4026 log_error(ls, "repeat_remove on keep %s", name);
4027 return;
4028 }
4029
4030 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4031 if (!rv) {
4032 spin_unlock(&ls->ls_rsbtbl[b].lock);
4033 log_error(ls, "repeat_remove on toss %s", name);
4034 return;
4035 }
4036
4037 /* use ls->remove_name2 to avoid conflict with shrink? */
4038
4039 spin_lock(&ls->ls_remove_spin);
4040 ls->ls_remove_len = len;
4041 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4042 spin_unlock(&ls->ls_remove_spin);
4043 spin_unlock(&ls->ls_rsbtbl[b].lock);
4044
4045 rv = _create_message(ls, sizeof(struct dlm_message) + len,
4046 dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4047 if (rv)
4048 return;
4049
4050 memcpy(ms->m_extra, name, len);
4051 ms->m_hash = hash;
4052
4053 send_message(mh, ms);
4054
4055 spin_lock(&ls->ls_remove_spin);
4056 ls->ls_remove_len = 0;
4057 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4058 spin_unlock(&ls->ls_remove_spin);
4059}
4060
3405static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) 4061static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3406{ 4062{
3407 struct dlm_lkb *lkb; 4063 struct dlm_lkb *lkb;
3408 struct dlm_rsb *r; 4064 struct dlm_rsb *r;
3409 int error, namelen; 4065 int from_nodeid;
4066 int error, namelen = 0;
4067
4068 from_nodeid = ms->m_header.h_nodeid;
3410 4069
3411 error = create_lkb(ls, &lkb); 4070 error = create_lkb(ls, &lkb);
3412 if (error) 4071 if (error)
@@ -3420,9 +4079,16 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3420 goto fail; 4079 goto fail;
3421 } 4080 }
3422 4081
4082 /* The dir node is the authority on whether we are the master
4083 for this rsb or not, so if the master sends us a request, we should
4084 recreate the rsb if we've destroyed it. This race happens when we
4085 send a remove message to the dir node at the same time that the dir
4086 node sends us a request for the rsb. */
4087
3423 namelen = receive_extralen(ms); 4088 namelen = receive_extralen(ms);
3424 4089
3425 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r); 4090 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4091 R_RECEIVE_REQUEST, &r);
3426 if (error) { 4092 if (error) {
3427 __put_lkb(ls, lkb); 4093 __put_lkb(ls, lkb);
3428 goto fail; 4094 goto fail;
@@ -3430,6 +4096,16 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3430 4096
3431 lock_rsb(r); 4097 lock_rsb(r);
3432 4098
4099 if (r->res_master_nodeid != dlm_our_nodeid()) {
4100 error = validate_master_nodeid(ls, r, from_nodeid);
4101 if (error) {
4102 unlock_rsb(r);
4103 put_rsb(r);
4104 __put_lkb(ls, lkb);
4105 goto fail;
4106 }
4107 }
4108
3433 attach_lkb(r, lkb); 4109 attach_lkb(r, lkb);
3434 error = do_request(r, lkb); 4110 error = do_request(r, lkb);
3435 send_request_reply(r, lkb, error); 4111 send_request_reply(r, lkb, error);
@@ -3445,6 +4121,31 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3445 return 0; 4121 return 0;
3446 4122
3447 fail: 4123 fail:
4124 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4125 and do this receive_request again from process_lookup_list once
4126 we get the lookup reply. This would avoid a many repeated
4127 ENOTBLK request failures when the lookup reply designating us
4128 as master is delayed. */
4129
4130 /* We could repeatedly return -EBADR here if our send_remove() is
4131 delayed in being sent/arriving/being processed on the dir node.
4132 Another node would repeatedly lookup up the master, and the dir
4133 node would continue returning our nodeid until our send_remove
4134 took effect.
4135
4136 We send another remove message in case our previous send_remove
4137 was lost/ignored/missed somehow. */
4138
4139 if (error != -ENOTBLK) {
4140 log_limit(ls, "receive_request %x from %d %d",
4141 ms->m_lkid, from_nodeid, error);
4142 }
4143
4144 if (namelen && error == -EBADR) {
4145 send_repeat_remove(ls, ms->m_extra, namelen);
4146 msleep(1000);
4147 }
4148
3448 setup_stub_lkb(ls, ms); 4149 setup_stub_lkb(ls, ms);
3449 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4150 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3450 return error; 4151 return error;
@@ -3651,49 +4352,110 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3651 4352
3652static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 4353static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3653{ 4354{
3654 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid; 4355 int len, error, ret_nodeid, from_nodeid, our_nodeid;
3655 4356
3656 from_nodeid = ms->m_header.h_nodeid; 4357 from_nodeid = ms->m_header.h_nodeid;
3657 our_nodeid = dlm_our_nodeid(); 4358 our_nodeid = dlm_our_nodeid();
3658 4359
3659 len = receive_extralen(ms); 4360 len = receive_extralen(ms);
3660 4361
3661 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 4362 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
3662 if (dir_nodeid != our_nodeid) { 4363 &ret_nodeid, NULL);
3663 log_error(ls, "lookup dir_nodeid %d from %d",
3664 dir_nodeid, from_nodeid);
3665 error = -EINVAL;
3666 ret_nodeid = -1;
3667 goto out;
3668 }
3669
3670 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3671 4364
3672 /* Optimization: we're master so treat lookup as a request */ 4365 /* Optimization: we're master so treat lookup as a request */
3673 if (!error && ret_nodeid == our_nodeid) { 4366 if (!error && ret_nodeid == our_nodeid) {
3674 receive_request(ls, ms); 4367 receive_request(ls, ms);
3675 return; 4368 return;
3676 } 4369 }
3677 out:
3678 send_lookup_reply(ls, ms, ret_nodeid, error); 4370 send_lookup_reply(ls, ms, ret_nodeid, error);
3679} 4371}
3680 4372
3681static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) 4373static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3682{ 4374{
3683 int len, dir_nodeid, from_nodeid; 4375 char name[DLM_RESNAME_MAXLEN+1];
4376 struct dlm_rsb *r;
4377 uint32_t hash, b;
4378 int rv, len, dir_nodeid, from_nodeid;
3684 4379
3685 from_nodeid = ms->m_header.h_nodeid; 4380 from_nodeid = ms->m_header.h_nodeid;
3686 4381
3687 len = receive_extralen(ms); 4382 len = receive_extralen(ms);
3688 4383
4384 if (len > DLM_RESNAME_MAXLEN) {
4385 log_error(ls, "receive_remove from %d bad len %d",
4386 from_nodeid, len);
4387 return;
4388 }
4389
3689 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 4390 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3690 if (dir_nodeid != dlm_our_nodeid()) { 4391 if (dir_nodeid != dlm_our_nodeid()) {
3691 log_error(ls, "remove dir entry dir_nodeid %d from %d", 4392 log_error(ls, "receive_remove from %d bad nodeid %d",
3692 dir_nodeid, from_nodeid); 4393 from_nodeid, dir_nodeid);
4394 return;
4395 }
4396
4397 /* Look for name on rsbtbl.toss, if it's there, kill it.
4398 If it's on rsbtbl.keep, it's being used, and we should ignore this
4399 message. This is an expected race between the dir node sending a
4400 request to the master node at the same time as the master node sends
4401 a remove to the dir node. The resolution to that race is for the
4402 dir node to ignore the remove message, and the master node to
4403 recreate the master rsb when it gets a request from the dir node for
4404 an rsb it doesn't have. */
4405
4406 memset(name, 0, sizeof(name));
4407 memcpy(name, ms->m_extra, len);
4408
4409 hash = jhash(name, len, 0);
4410 b = hash & (ls->ls_rsbtbl_size - 1);
4411
4412 spin_lock(&ls->ls_rsbtbl[b].lock);
4413
4414 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4415 if (rv) {
4416 /* verify the rsb is on keep list per comment above */
4417 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4418 if (rv) {
4419 /* should not happen */
4420 log_error(ls, "receive_remove from %d not found %s",
4421 from_nodeid, name);
4422 spin_unlock(&ls->ls_rsbtbl[b].lock);
4423 return;
4424 }
4425 if (r->res_master_nodeid != from_nodeid) {
4426 /* should not happen */
4427 log_error(ls, "receive_remove keep from %d master %d",
4428 from_nodeid, r->res_master_nodeid);
4429 dlm_print_rsb(r);
4430 spin_unlock(&ls->ls_rsbtbl[b].lock);
4431 return;
4432 }
4433
4434 log_debug(ls, "receive_remove from %d master %d first %x %s",
4435 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4436 name);
4437 spin_unlock(&ls->ls_rsbtbl[b].lock);
4438 return;
4439 }
4440
4441 if (r->res_master_nodeid != from_nodeid) {
4442 log_error(ls, "receive_remove toss from %d master %d",
4443 from_nodeid, r->res_master_nodeid);
4444 dlm_print_rsb(r);
4445 spin_unlock(&ls->ls_rsbtbl[b].lock);
3693 return; 4446 return;
3694 } 4447 }
3695 4448
3696 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); 4449 if (kref_put(&r->res_ref, kill_rsb)) {
4450 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4451 spin_unlock(&ls->ls_rsbtbl[b].lock);
4452 dlm_free_rsb(r);
4453 } else {
4454 log_error(ls, "receive_remove from %d rsb ref error",
4455 from_nodeid);
4456 dlm_print_rsb(r);
4457 spin_unlock(&ls->ls_rsbtbl[b].lock);
4458 }
3697} 4459}
3698 4460
3699static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 4461static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3706,6 +4468,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3706 struct dlm_lkb *lkb; 4468 struct dlm_lkb *lkb;
3707 struct dlm_rsb *r; 4469 struct dlm_rsb *r;
3708 int error, mstype, result; 4470 int error, mstype, result;
4471 int from_nodeid = ms->m_header.h_nodeid;
3709 4472
3710 error = find_lkb(ls, ms->m_remid, &lkb); 4473 error = find_lkb(ls, ms->m_remid, &lkb);
3711 if (error) 4474 if (error)
@@ -3723,8 +4486,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3723 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 4486 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3724 if (error) { 4487 if (error) {
3725 log_error(ls, "receive_request_reply %x remote %d %x result %d", 4488 log_error(ls, "receive_request_reply %x remote %d %x result %d",
3726 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, 4489 lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
3727 ms->m_result);
3728 dlm_dump_rsb(r); 4490 dlm_dump_rsb(r);
3729 goto out; 4491 goto out;
3730 } 4492 }
@@ -3732,8 +4494,9 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3732 /* Optimization: the dir node was also the master, so it took our 4494 /* Optimization: the dir node was also the master, so it took our
3733 lookup as a request and sent request reply instead of lookup reply */ 4495 lookup as a request and sent request reply instead of lookup reply */
3734 if (mstype == DLM_MSG_LOOKUP) { 4496 if (mstype == DLM_MSG_LOOKUP) {
3735 r->res_nodeid = ms->m_header.h_nodeid; 4497 r->res_master_nodeid = from_nodeid;
3736 lkb->lkb_nodeid = r->res_nodeid; 4498 r->res_nodeid = from_nodeid;
4499 lkb->lkb_nodeid = from_nodeid;
3737 } 4500 }
3738 4501
3739 /* this is the value returned from do_request() on the master */ 4502 /* this is the value returned from do_request() on the master */
@@ -3767,18 +4530,30 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3767 case -EBADR: 4530 case -EBADR:
3768 case -ENOTBLK: 4531 case -ENOTBLK:
3769 /* find_rsb failed to find rsb or rsb wasn't master */ 4532 /* find_rsb failed to find rsb or rsb wasn't master */
3770 log_debug(ls, "receive_request_reply %x %x master diff %d %d", 4533 log_limit(ls, "receive_request_reply %x from %d %d "
3771 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result); 4534 "master %d dir %d first %x %s", lkb->lkb_id,
3772 r->res_nodeid = -1; 4535 from_nodeid, result, r->res_master_nodeid,
3773 lkb->lkb_nodeid = -1; 4536 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4537
4538 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4539 r->res_master_nodeid != dlm_our_nodeid()) {
4540 /* cause _request_lock->set_master->send_lookup */
4541 r->res_master_nodeid = 0;
4542 r->res_nodeid = -1;
4543 lkb->lkb_nodeid = -1;
4544 }
3774 4545
3775 if (is_overlap(lkb)) { 4546 if (is_overlap(lkb)) {
3776 /* we'll ignore error in cancel/unlock reply */ 4547 /* we'll ignore error in cancel/unlock reply */
3777 queue_cast_overlap(r, lkb); 4548 queue_cast_overlap(r, lkb);
3778 confirm_master(r, result); 4549 confirm_master(r, result);
3779 unhold_lkb(lkb); /* undoes create_lkb() */ 4550 unhold_lkb(lkb); /* undoes create_lkb() */
3780 } else 4551 } else {
3781 _request_lock(r, lkb); 4552 _request_lock(r, lkb);
4553
4554 if (r->res_master_nodeid == dlm_our_nodeid())
4555 confirm_master(r, 0);
4556 }
3782 break; 4557 break;
3783 4558
3784 default: 4559 default:
@@ -3994,6 +4769,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3994 struct dlm_lkb *lkb; 4769 struct dlm_lkb *lkb;
3995 struct dlm_rsb *r; 4770 struct dlm_rsb *r;
3996 int error, ret_nodeid; 4771 int error, ret_nodeid;
4772 int do_lookup_list = 0;
3997 4773
3998 error = find_lkb(ls, ms->m_lkid, &lkb); 4774 error = find_lkb(ls, ms->m_lkid, &lkb);
3999 if (error) { 4775 if (error) {
@@ -4001,7 +4777,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4001 return; 4777 return;
4002 } 4778 }
4003 4779
4004 /* ms->m_result is the value returned by dlm_dir_lookup on dir node 4780 /* ms->m_result is the value returned by dlm_master_lookup on dir node
4005 FIXME: will a non-zero error ever be returned? */ 4781 FIXME: will a non-zero error ever be returned? */
4006 4782
4007 r = lkb->lkb_resource; 4783 r = lkb->lkb_resource;
@@ -4013,12 +4789,37 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4013 goto out; 4789 goto out;
4014 4790
4015 ret_nodeid = ms->m_nodeid; 4791 ret_nodeid = ms->m_nodeid;
4792
4793 /* We sometimes receive a request from the dir node for this
4794 rsb before we've received the dir node's loookup_reply for it.
4795 The request from the dir node implies we're the master, so we set
4796 ourself as master in receive_request_reply, and verify here that
4797 we are indeed the master. */
4798
4799 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4800 /* This should never happen */
4801 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4802 "master %d dir %d our %d first %x %s",
4803 lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4804 r->res_master_nodeid, r->res_dir_nodeid,
4805 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4806 }
4807
4016 if (ret_nodeid == dlm_our_nodeid()) { 4808 if (ret_nodeid == dlm_our_nodeid()) {
4809 r->res_master_nodeid = ret_nodeid;
4017 r->res_nodeid = 0; 4810 r->res_nodeid = 0;
4018 ret_nodeid = 0; 4811 do_lookup_list = 1;
4019 r->res_first_lkid = 0; 4812 r->res_first_lkid = 0;
4813 } else if (ret_nodeid == -1) {
4814 /* the remote node doesn't believe it's the dir node */
4815 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4816 lkb->lkb_id, ms->m_header.h_nodeid);
4817 r->res_master_nodeid = 0;
4818 r->res_nodeid = -1;
4819 lkb->lkb_nodeid = -1;
4020 } else { 4820 } else {
4021 /* set_master() will copy res_nodeid to lkb_nodeid */ 4821 /* set_master() will set lkb_nodeid from r */
4822 r->res_master_nodeid = ret_nodeid;
4022 r->res_nodeid = ret_nodeid; 4823 r->res_nodeid = ret_nodeid;
4023 } 4824 }
4024 4825
@@ -4033,7 +4834,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4033 _request_lock(r, lkb); 4834 _request_lock(r, lkb);
4034 4835
4035 out_list: 4836 out_list:
4036 if (!ret_nodeid) 4837 if (do_lookup_list)
4037 process_lookup_list(r); 4838 process_lookup_list(r);
4038 out: 4839 out:
4039 unlock_rsb(r); 4840 unlock_rsb(r);
@@ -4047,7 +4848,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4047 int error = 0, noent = 0; 4848 int error = 0, noent = 0;
4048 4849
4049 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { 4850 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4050 log_debug(ls, "ignore non-member message %d from %d %x %x %d", 4851 log_limit(ls, "receive %d from non-member %d %x %x %d",
4051 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, 4852 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4052 ms->m_remid, ms->m_result); 4853 ms->m_remid, ms->m_result);
4053 return; 4854 return;
@@ -4174,6 +4975,15 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4174 int nodeid) 4975 int nodeid)
4175{ 4976{
4176 if (dlm_locking_stopped(ls)) { 4977 if (dlm_locking_stopped(ls)) {
4978 /* If we were a member of this lockspace, left, and rejoined,
4979 other nodes may still be sending us messages from the
4980 lockspace generation before we left. */
4981 if (!ls->ls_generation) {
4982 log_limit(ls, "receive %d from %d ignore old gen",
4983 ms->m_type, nodeid);
4984 return;
4985 }
4986
4177 dlm_add_requestqueue(ls, nodeid, ms); 4987 dlm_add_requestqueue(ls, nodeid, ms);
4178 } else { 4988 } else {
4179 dlm_wait_requestqueue(ls); 4989 dlm_wait_requestqueue(ls);
@@ -4651,9 +5461,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
4651 5461
4652 if (!rsb_flag(r, RSB_RECOVER_GRANT)) 5462 if (!rsb_flag(r, RSB_RECOVER_GRANT))
4653 continue; 5463 continue;
4654 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5464 if (!is_master(r)) {
4655 if (!is_master(r)) 5465 rsb_clear_flag(r, RSB_RECOVER_GRANT);
4656 continue; 5466 continue;
5467 }
4657 hold_rsb(r); 5468 hold_rsb(r);
4658 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5469 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4659 return r; 5470 return r;
@@ -4698,7 +5509,9 @@ void dlm_recover_grant(struct dlm_ls *ls)
4698 rsb_count++; 5509 rsb_count++;
4699 count = 0; 5510 count = 0;
4700 lock_rsb(r); 5511 lock_rsb(r);
5512 /* the RECOVER_GRANT flag is checked in the grant path */
4701 grant_pending_locks(r, &count); 5513 grant_pending_locks(r, &count);
5514 rsb_clear_flag(r, RSB_RECOVER_GRANT);
4702 lkb_count += count; 5515 lkb_count += count;
4703 confirm_master(r, 0); 5516 confirm_master(r, 0);
4704 unlock_rsb(r); 5517 unlock_rsb(r);
@@ -4798,6 +5611,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4798 struct dlm_rsb *r; 5611 struct dlm_rsb *r;
4799 struct dlm_lkb *lkb; 5612 struct dlm_lkb *lkb;
4800 uint32_t remid = 0; 5613 uint32_t remid = 0;
5614 int from_nodeid = rc->rc_header.h_nodeid;
4801 int error; 5615 int error;
4802 5616
4803 if (rl->rl_parent_lkid) { 5617 if (rl->rl_parent_lkid) {
@@ -4815,21 +5629,21 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4815 we make ourselves master, dlm_recover_masters() won't touch the 5629 we make ourselves master, dlm_recover_masters() won't touch the
4816 MSTCPY locks we've received early. */ 5630 MSTCPY locks we've received early. */
4817 5631
4818 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r); 5632 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5633 from_nodeid, R_RECEIVE_RECOVER, &r);
4819 if (error) 5634 if (error)
4820 goto out; 5635 goto out;
4821 5636
5637 lock_rsb(r);
5638
4822 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { 5639 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
4823 log_error(ls, "dlm_recover_master_copy remote %d %x not dir", 5640 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
4824 rc->rc_header.h_nodeid, remid); 5641 from_nodeid, remid);
4825 error = -EBADR; 5642 error = -EBADR;
4826 put_rsb(r); 5643 goto out_unlock;
4827 goto out;
4828 } 5644 }
4829 5645
4830 lock_rsb(r); 5646 lkb = search_remid(r, from_nodeid, remid);
4831
4832 lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
4833 if (lkb) { 5647 if (lkb) {
4834 error = -EEXIST; 5648 error = -EEXIST;
4835 goto out_remid; 5649 goto out_remid;
@@ -4866,7 +5680,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4866 out: 5680 out:
4867 if (error && error != -EEXIST) 5681 if (error && error != -EEXIST)
4868 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d", 5682 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
4869 rc->rc_header.h_nodeid, remid, error); 5683 from_nodeid, remid, error);
4870 rl->rl_result = cpu_to_le32(error); 5684 rl->rl_result = cpu_to_le32(error);
4871 return error; 5685 return error;
4872} 5686}
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index c8b226c62807..5e0c72e36a9b 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -14,6 +14,7 @@
14#define __LOCK_DOT_H__ 14#define __LOCK_DOT_H__
15 15
16void dlm_dump_rsb(struct dlm_rsb *r); 16void dlm_dump_rsb(struct dlm_rsb *r);
17void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len);
17void dlm_print_lkb(struct dlm_lkb *lkb); 18void dlm_print_lkb(struct dlm_lkb *lkb);
18void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, 19void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
19 uint32_t saved_seq); 20 uint32_t saved_seq);
@@ -28,9 +29,11 @@ void dlm_unlock_recovery(struct dlm_ls *ls);
28void dlm_scan_waiters(struct dlm_ls *ls); 29void dlm_scan_waiters(struct dlm_ls *ls);
29void dlm_scan_timeout(struct dlm_ls *ls); 30void dlm_scan_timeout(struct dlm_ls *ls);
30void dlm_adjust_timeouts(struct dlm_ls *ls); 31void dlm_adjust_timeouts(struct dlm_ls *ls);
32int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len,
33 unsigned int flags, int *r_nodeid, int *result);
31 34
32int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, 35int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
33 unsigned int flags, struct dlm_rsb **r_ret); 36 struct dlm_rsb **r_ret);
34 37
35void dlm_recover_purge(struct dlm_ls *ls); 38void dlm_recover_purge(struct dlm_ls *ls);
36void dlm_purge_mstcpy_locks(struct dlm_rsb *r); 39void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index ca506abbdd3b..2e99fb0c9737 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -506,20 +506,18 @@ static int new_lockspace(const char *name, const char *cluster,
506 spin_lock_init(&ls->ls_rsbtbl[i].lock); 506 spin_lock_init(&ls->ls_rsbtbl[i].lock);
507 } 507 }
508 508
509 idr_init(&ls->ls_lkbidr); 509 spin_lock_init(&ls->ls_remove_spin);
510 spin_lock_init(&ls->ls_lkbidr_spin);
511
512 size = dlm_config.ci_dirtbl_size;
513 ls->ls_dirtbl_size = size;
514 510
515 ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size); 511 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
516 if (!ls->ls_dirtbl) 512 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
517 goto out_lkbfree; 513 GFP_KERNEL);
518 for (i = 0; i < size; i++) { 514 if (!ls->ls_remove_names[i])
519 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list); 515 goto out_rsbtbl;
520 spin_lock_init(&ls->ls_dirtbl[i].lock);
521 } 516 }
522 517
518 idr_init(&ls->ls_lkbidr);
519 spin_lock_init(&ls->ls_lkbidr_spin);
520
523 INIT_LIST_HEAD(&ls->ls_waiters); 521 INIT_LIST_HEAD(&ls->ls_waiters);
524 mutex_init(&ls->ls_waiters_mutex); 522 mutex_init(&ls->ls_waiters_mutex);
525 INIT_LIST_HEAD(&ls->ls_orphans); 523 INIT_LIST_HEAD(&ls->ls_orphans);
@@ -567,7 +565,7 @@ static int new_lockspace(const char *name, const char *cluster,
567 565
568 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); 566 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
569 if (!ls->ls_recover_buf) 567 if (!ls->ls_recover_buf)
570 goto out_dirfree; 568 goto out_lkbidr;
571 569
572 ls->ls_slot = 0; 570 ls->ls_slot = 0;
573 ls->ls_num_slots = 0; 571 ls->ls_num_slots = 0;
@@ -576,14 +574,14 @@ static int new_lockspace(const char *name, const char *cluster,
576 574
577 INIT_LIST_HEAD(&ls->ls_recover_list); 575 INIT_LIST_HEAD(&ls->ls_recover_list);
578 spin_lock_init(&ls->ls_recover_list_lock); 576 spin_lock_init(&ls->ls_recover_list_lock);
577 idr_init(&ls->ls_recover_idr);
578 spin_lock_init(&ls->ls_recover_idr_lock);
579 ls->ls_recover_list_count = 0; 579 ls->ls_recover_list_count = 0;
580 ls->ls_local_handle = ls; 580 ls->ls_local_handle = ls;
581 init_waitqueue_head(&ls->ls_wait_general); 581 init_waitqueue_head(&ls->ls_wait_general);
582 INIT_LIST_HEAD(&ls->ls_root_list); 582 INIT_LIST_HEAD(&ls->ls_root_list);
583 init_rwsem(&ls->ls_root_sem); 583 init_rwsem(&ls->ls_root_sem);
584 584
585 down_write(&ls->ls_in_recovery);
586
587 spin_lock(&lslist_lock); 585 spin_lock(&lslist_lock);
588 ls->ls_create_count = 1; 586 ls->ls_create_count = 1;
589 list_add(&ls->ls_list, &lslist); 587 list_add(&ls->ls_list, &lslist);
@@ -597,13 +595,24 @@ static int new_lockspace(const char *name, const char *cluster,
597 } 595 }
598 } 596 }
599 597
600 /* needs to find ls in lslist */ 598 init_waitqueue_head(&ls->ls_recover_lock_wait);
599
600 /*
601 * Once started, dlm_recoverd first looks for ls in lslist, then
602 * initializes ls_in_recovery as locked in "down" mode. We need
603 * to wait for the wakeup from dlm_recoverd because in_recovery
604 * has to start out in down mode.
605 */
606
601 error = dlm_recoverd_start(ls); 607 error = dlm_recoverd_start(ls);
602 if (error) { 608 if (error) {
603 log_error(ls, "can't start dlm_recoverd %d", error); 609 log_error(ls, "can't start dlm_recoverd %d", error);
604 goto out_callback; 610 goto out_callback;
605 } 611 }
606 612
613 wait_event(ls->ls_recover_lock_wait,
614 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
615
607 ls->ls_kobj.kset = dlm_kset; 616 ls->ls_kobj.kset = dlm_kset;
608 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, 617 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
609 "%s", ls->ls_name); 618 "%s", ls->ls_name);
@@ -647,11 +656,15 @@ static int new_lockspace(const char *name, const char *cluster,
647 spin_lock(&lslist_lock); 656 spin_lock(&lslist_lock);
648 list_del(&ls->ls_list); 657 list_del(&ls->ls_list);
649 spin_unlock(&lslist_lock); 658 spin_unlock(&lslist_lock);
659 idr_destroy(&ls->ls_recover_idr);
650 kfree(ls->ls_recover_buf); 660 kfree(ls->ls_recover_buf);
651 out_dirfree: 661 out_lkbidr:
652 vfree(ls->ls_dirtbl);
653 out_lkbfree:
654 idr_destroy(&ls->ls_lkbidr); 662 idr_destroy(&ls->ls_lkbidr);
663 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
664 if (ls->ls_remove_names[i])
665 kfree(ls->ls_remove_names[i]);
666 }
667 out_rsbtbl:
655 vfree(ls->ls_rsbtbl); 668 vfree(ls->ls_rsbtbl);
656 out_lsfree: 669 out_lsfree:
657 if (do_unreg) 670 if (do_unreg)
@@ -779,13 +792,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
779 kfree(ls->ls_recover_buf); 792 kfree(ls->ls_recover_buf);
780 793
781 /* 794 /*
782 * Free direntry structs.
783 */
784
785 dlm_dir_clear(ls);
786 vfree(ls->ls_dirtbl);
787
788 /*
789 * Free all lkb's in idr 795 * Free all lkb's in idr
790 */ 796 */
791 797
@@ -813,6 +819,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
813 819
814 vfree(ls->ls_rsbtbl); 820 vfree(ls->ls_rsbtbl);
815 821
822 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
823 kfree(ls->ls_remove_names[i]);
824
816 while (!list_empty(&ls->ls_new_rsb)) { 825 while (!list_empty(&ls->ls_new_rsb)) {
817 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, 826 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
818 res_hashchain); 827 res_hashchain);
@@ -826,7 +835,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
826 835
827 dlm_purge_requestqueue(ls); 836 dlm_purge_requestqueue(ls);
828 kfree(ls->ls_recover_args); 837 kfree(ls->ls_recover_args);
829 dlm_clear_free_entries(ls);
830 dlm_clear_members(ls); 838 dlm_clear_members(ls);
831 dlm_clear_members_gone(ls); 839 dlm_clear_members_gone(ls);
832 kfree(ls->ls_node_array); 840 kfree(ls->ls_node_array);
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 5c1b0e38c7a4..331ea4f94efd 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -140,6 +140,16 @@ struct writequeue_entry {
140 struct connection *con; 140 struct connection *con;
141}; 141};
142 142
143struct dlm_node_addr {
144 struct list_head list;
145 int nodeid;
146 int addr_count;
147 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
148};
149
150static LIST_HEAD(dlm_node_addrs);
151static DEFINE_SPINLOCK(dlm_node_addrs_spin);
152
143static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 153static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
144static int dlm_local_count; 154static int dlm_local_count;
145static int dlm_allow_conn; 155static int dlm_allow_conn;
@@ -264,31 +274,146 @@ static struct connection *assoc2con(int assoc_id)
264 return NULL; 274 return NULL;
265} 275}
266 276
267static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) 277static struct dlm_node_addr *find_node_addr(int nodeid)
278{
279 struct dlm_node_addr *na;
280
281 list_for_each_entry(na, &dlm_node_addrs, list) {
282 if (na->nodeid == nodeid)
283 return na;
284 }
285 return NULL;
286}
287
288static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
289{
290 switch (x->ss_family) {
291 case AF_INET: {
292 struct sockaddr_in *sinx = (struct sockaddr_in *)x;
293 struct sockaddr_in *siny = (struct sockaddr_in *)y;
294 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
295 return 0;
296 if (sinx->sin_port != siny->sin_port)
297 return 0;
298 break;
299 }
300 case AF_INET6: {
301 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
302 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
303 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
304 return 0;
305 if (sinx->sin6_port != siny->sin6_port)
306 return 0;
307 break;
308 }
309 default:
310 return 0;
311 }
312 return 1;
313}
314
315static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
316 struct sockaddr *sa_out)
268{ 317{
269 struct sockaddr_storage addr; 318 struct sockaddr_storage sas;
270 int error; 319 struct dlm_node_addr *na;
271 320
272 if (!dlm_local_count) 321 if (!dlm_local_count)
273 return -1; 322 return -1;
274 323
275 error = dlm_nodeid_to_addr(nodeid, &addr); 324 spin_lock(&dlm_node_addrs_spin);
276 if (error) 325 na = find_node_addr(nodeid);
277 return error; 326 if (na && na->addr_count)
327 memcpy(&sas, na->addr[0], sizeof(struct sockaddr_storage));
328 spin_unlock(&dlm_node_addrs_spin);
329
330 if (!na)
331 return -EEXIST;
332
333 if (!na->addr_count)
334 return -ENOENT;
335
336 if (sas_out)
337 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
338
339 if (!sa_out)
340 return 0;
278 341
279 if (dlm_local_addr[0]->ss_family == AF_INET) { 342 if (dlm_local_addr[0]->ss_family == AF_INET) {
280 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; 343 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
281 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; 344 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
282 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 345 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
283 } else { 346 } else {
284 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; 347 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas;
285 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; 348 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
286 ret6->sin6_addr = in6->sin6_addr; 349 ret6->sin6_addr = in6->sin6_addr;
287 } 350 }
288 351
289 return 0; 352 return 0;
290} 353}
291 354
355static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
356{
357 struct dlm_node_addr *na;
358 int rv = -EEXIST;
359
360 spin_lock(&dlm_node_addrs_spin);
361 list_for_each_entry(na, &dlm_node_addrs, list) {
362 if (!na->addr_count)
363 continue;
364
365 if (!addr_compare(na->addr[0], addr))
366 continue;
367
368 *nodeid = na->nodeid;
369 rv = 0;
370 break;
371 }
372 spin_unlock(&dlm_node_addrs_spin);
373 return rv;
374}
375
376int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
377{
378 struct sockaddr_storage *new_addr;
379 struct dlm_node_addr *new_node, *na;
380
381 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
382 if (!new_node)
383 return -ENOMEM;
384
385 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
386 if (!new_addr) {
387 kfree(new_node);
388 return -ENOMEM;
389 }
390
391 memcpy(new_addr, addr, len);
392
393 spin_lock(&dlm_node_addrs_spin);
394 na = find_node_addr(nodeid);
395 if (!na) {
396 new_node->nodeid = nodeid;
397 new_node->addr[0] = new_addr;
398 new_node->addr_count = 1;
399 list_add(&new_node->list, &dlm_node_addrs);
400 spin_unlock(&dlm_node_addrs_spin);
401 return 0;
402 }
403
404 if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
405 spin_unlock(&dlm_node_addrs_spin);
406 kfree(new_addr);
407 kfree(new_node);
408 return -ENOSPC;
409 }
410
411 na->addr[na->addr_count++] = new_addr;
412 spin_unlock(&dlm_node_addrs_spin);
413 kfree(new_node);
414 return 0;
415}
416
292/* Data available on socket or listen socket received a connect */ 417/* Data available on socket or listen socket received a connect */
293static void lowcomms_data_ready(struct sock *sk, int count_unused) 418static void lowcomms_data_ready(struct sock *sk, int count_unused)
294{ 419{
@@ -348,7 +473,7 @@ int dlm_lowcomms_connect_node(int nodeid)
348} 473}
349 474
350/* Make a socket active */ 475/* Make a socket active */
351static int add_sock(struct socket *sock, struct connection *con) 476static void add_sock(struct socket *sock, struct connection *con)
352{ 477{
353 con->sock = sock; 478 con->sock = sock;
354 479
@@ -358,7 +483,6 @@ static int add_sock(struct socket *sock, struct connection *con)
358 con->sock->sk->sk_state_change = lowcomms_state_change; 483 con->sock->sk->sk_state_change = lowcomms_state_change;
359 con->sock->sk->sk_user_data = con; 484 con->sock->sk->sk_user_data = con;
360 con->sock->sk->sk_allocation = GFP_NOFS; 485 con->sock->sk->sk_allocation = GFP_NOFS;
361 return 0;
362} 486}
363 487
364/* Add the port number to an IPv6 or 4 sockaddr and return the address 488/* Add the port number to an IPv6 or 4 sockaddr and return the address
@@ -510,7 +634,7 @@ static void process_sctp_notification(struct connection *con,
510 return; 634 return;
511 } 635 }
512 make_sockaddr(&prim.ssp_addr, 0, &addr_len); 636 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
513 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { 637 if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
514 unsigned char *b=(unsigned char *)&prim.ssp_addr; 638 unsigned char *b=(unsigned char *)&prim.ssp_addr;
515 log_print("reject connect from unknown addr"); 639 log_print("reject connect from unknown addr");
516 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 640 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
@@ -747,7 +871,7 @@ static int tcp_accept_from_sock(struct connection *con)
747 871
748 /* Get the new node's NODEID */ 872 /* Get the new node's NODEID */
749 make_sockaddr(&peeraddr, 0, &len); 873 make_sockaddr(&peeraddr, 0, &len);
750 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 874 if (addr_to_nodeid(&peeraddr, &nodeid)) {
751 unsigned char *b=(unsigned char *)&peeraddr; 875 unsigned char *b=(unsigned char *)&peeraddr;
752 log_print("connect from non cluster node"); 876 log_print("connect from non cluster node");
753 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 877 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
@@ -862,7 +986,7 @@ static void sctp_init_assoc(struct connection *con)
862 if (con->retries++ > MAX_CONNECT_RETRIES) 986 if (con->retries++ > MAX_CONNECT_RETRIES)
863 return; 987 return;
864 988
865 if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) { 989 if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr)) {
866 log_print("no address for nodeid %d", con->nodeid); 990 log_print("no address for nodeid %d", con->nodeid);
867 return; 991 return;
868 } 992 }
@@ -928,11 +1052,11 @@ static void sctp_init_assoc(struct connection *con)
928/* Connect a new socket to its peer */ 1052/* Connect a new socket to its peer */
929static void tcp_connect_to_sock(struct connection *con) 1053static void tcp_connect_to_sock(struct connection *con)
930{ 1054{
931 int result = -EHOSTUNREACH;
932 struct sockaddr_storage saddr, src_addr; 1055 struct sockaddr_storage saddr, src_addr;
933 int addr_len; 1056 int addr_len;
934 struct socket *sock = NULL; 1057 struct socket *sock = NULL;
935 int one = 1; 1058 int one = 1;
1059 int result;
936 1060
937 if (con->nodeid == 0) { 1061 if (con->nodeid == 0) {
938 log_print("attempt to connect sock 0 foiled"); 1062 log_print("attempt to connect sock 0 foiled");
@@ -944,10 +1068,8 @@ static void tcp_connect_to_sock(struct connection *con)
944 goto out; 1068 goto out;
945 1069
946 /* Some odd races can cause double-connects, ignore them */ 1070 /* Some odd races can cause double-connects, ignore them */
947 if (con->sock) { 1071 if (con->sock)
948 result = 0;
949 goto out; 1072 goto out;
950 }
951 1073
952 /* Create a socket to communicate with */ 1074 /* Create a socket to communicate with */
953 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 1075 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
@@ -956,8 +1078,11 @@ static void tcp_connect_to_sock(struct connection *con)
956 goto out_err; 1078 goto out_err;
957 1079
958 memset(&saddr, 0, sizeof(saddr)); 1080 memset(&saddr, 0, sizeof(saddr));
959 if (dlm_nodeid_to_addr(con->nodeid, &saddr)) 1081 result = nodeid_to_addr(con->nodeid, &saddr, NULL);
1082 if (result < 0) {
1083 log_print("no address for nodeid %d", con->nodeid);
960 goto out_err; 1084 goto out_err;
1085 }
961 1086
962 sock->sk->sk_user_data = con; 1087 sock->sk->sk_user_data = con;
963 con->rx_action = receive_from_sock; 1088 con->rx_action = receive_from_sock;
@@ -983,8 +1108,7 @@ static void tcp_connect_to_sock(struct connection *con)
983 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 1108 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
984 sizeof(one)); 1109 sizeof(one));
985 1110
986 result = 1111 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
987 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
988 O_NONBLOCK); 1112 O_NONBLOCK);
989 if (result == -EINPROGRESS) 1113 if (result == -EINPROGRESS)
990 result = 0; 1114 result = 0;
@@ -1002,11 +1126,17 @@ out_err:
1002 * Some errors are fatal and this list might need adjusting. For other 1126 * Some errors are fatal and this list might need adjusting. For other
1003 * errors we try again until the max number of retries is reached. 1127 * errors we try again until the max number of retries is reached.
1004 */ 1128 */
1005 if (result != -EHOSTUNREACH && result != -ENETUNREACH && 1129 if (result != -EHOSTUNREACH &&
1006 result != -ENETDOWN && result != -EINVAL 1130 result != -ENETUNREACH &&
1007 && result != -EPROTONOSUPPORT) { 1131 result != -ENETDOWN &&
1132 result != -EINVAL &&
1133 result != -EPROTONOSUPPORT) {
1134 log_print("connect %d try %d error %d", con->nodeid,
1135 con->retries, result);
1136 mutex_unlock(&con->sock_mutex);
1137 msleep(1000);
1008 lowcomms_connect_sock(con); 1138 lowcomms_connect_sock(con);
1009 result = 0; 1139 return;
1010 } 1140 }
1011out: 1141out:
1012 mutex_unlock(&con->sock_mutex); 1142 mutex_unlock(&con->sock_mutex);
@@ -1044,10 +1174,8 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
1044 if (result < 0) { 1174 if (result < 0) {
1045 log_print("Failed to set SO_REUSEADDR on socket: %d", result); 1175 log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1046 } 1176 }
1047 sock->sk->sk_user_data = con;
1048 con->rx_action = tcp_accept_from_sock; 1177 con->rx_action = tcp_accept_from_sock;
1049 con->connect_action = tcp_connect_to_sock; 1178 con->connect_action = tcp_connect_to_sock;
1050 con->sock = sock;
1051 1179
1052 /* Bind to our port */ 1180 /* Bind to our port */
1053 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1181 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
@@ -1358,8 +1486,7 @@ static void send_to_sock(struct connection *con)
1358 } 1486 }
1359 cond_resched(); 1487 cond_resched();
1360 goto out; 1488 goto out;
1361 } 1489 } else if (ret < 0)
1362 if (ret <= 0)
1363 goto send_error; 1490 goto send_error;
1364 } 1491 }
1365 1492
@@ -1376,7 +1503,6 @@ static void send_to_sock(struct connection *con)
1376 if (e->len == 0 && e->users == 0) { 1503 if (e->len == 0 && e->users == 0) {
1377 list_del(&e->list); 1504 list_del(&e->list);
1378 free_entry(e); 1505 free_entry(e);
1379 continue;
1380 } 1506 }
1381 } 1507 }
1382 spin_unlock(&con->writequeue_lock); 1508 spin_unlock(&con->writequeue_lock);
@@ -1394,7 +1520,6 @@ out_connect:
1394 mutex_unlock(&con->sock_mutex); 1520 mutex_unlock(&con->sock_mutex);
1395 if (!test_bit(CF_INIT_PENDING, &con->flags)) 1521 if (!test_bit(CF_INIT_PENDING, &con->flags))
1396 lowcomms_connect_sock(con); 1522 lowcomms_connect_sock(con);
1397 return;
1398} 1523}
1399 1524
1400static void clean_one_writequeue(struct connection *con) 1525static void clean_one_writequeue(struct connection *con)
@@ -1414,6 +1539,7 @@ static void clean_one_writequeue(struct connection *con)
1414int dlm_lowcomms_close(int nodeid) 1539int dlm_lowcomms_close(int nodeid)
1415{ 1540{
1416 struct connection *con; 1541 struct connection *con;
1542 struct dlm_node_addr *na;
1417 1543
1418 log_print("closing connection to node %d", nodeid); 1544 log_print("closing connection to node %d", nodeid);
1419 con = nodeid2con(nodeid, 0); 1545 con = nodeid2con(nodeid, 0);
@@ -1428,6 +1554,17 @@ int dlm_lowcomms_close(int nodeid)
1428 clean_one_writequeue(con); 1554 clean_one_writequeue(con);
1429 close_connection(con, true); 1555 close_connection(con, true);
1430 } 1556 }
1557
1558 spin_lock(&dlm_node_addrs_spin);
1559 na = find_node_addr(nodeid);
1560 if (na) {
1561 list_del(&na->list);
1562 while (na->addr_count--)
1563 kfree(na->addr[na->addr_count]);
1564 kfree(na);
1565 }
1566 spin_unlock(&dlm_node_addrs_spin);
1567
1431 return 0; 1568 return 0;
1432} 1569}
1433 1570
@@ -1577,3 +1714,17 @@ fail_destroy:
1577fail: 1714fail:
1578 return error; 1715 return error;
1579} 1716}
1717
1718void dlm_lowcomms_exit(void)
1719{
1720 struct dlm_node_addr *na, *safe;
1721
1722 spin_lock(&dlm_node_addrs_spin);
1723 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1724 list_del(&na->list);
1725 while (na->addr_count--)
1726 kfree(na->addr[na->addr_count]);
1727 kfree(na);
1728 }
1729 spin_unlock(&dlm_node_addrs_spin);
1730}
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 1311e6426287..67462e54fc2f 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -16,10 +16,12 @@
16 16
17int dlm_lowcomms_start(void); 17int dlm_lowcomms_start(void);
18void dlm_lowcomms_stop(void); 18void dlm_lowcomms_stop(void);
19void dlm_lowcomms_exit(void);
19int dlm_lowcomms_close(int nodeid); 20int dlm_lowcomms_close(int nodeid);
20void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc); 21void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc);
21void dlm_lowcomms_commit_buffer(void *mh); 22void dlm_lowcomms_commit_buffer(void *mh);
22int dlm_lowcomms_connect_node(int nodeid); 23int dlm_lowcomms_connect_node(int nodeid);
24int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
23 25
24#endif /* __LOWCOMMS_DOT_H__ */ 26#endif /* __LOWCOMMS_DOT_H__ */
25 27
diff --git a/fs/dlm/main.c b/fs/dlm/main.c
index 5a59efa0bb46..079c0bd71ab7 100644
--- a/fs/dlm/main.c
+++ b/fs/dlm/main.c
@@ -17,6 +17,7 @@
17#include "user.h" 17#include "user.h"
18#include "memory.h" 18#include "memory.h"
19#include "config.h" 19#include "config.h"
20#include "lowcomms.h"
20 21
21static int __init init_dlm(void) 22static int __init init_dlm(void)
22{ 23{
@@ -78,6 +79,7 @@ static void __exit exit_dlm(void)
78 dlm_config_exit(); 79 dlm_config_exit();
79 dlm_memory_exit(); 80 dlm_memory_exit();
80 dlm_lockspace_exit(); 81 dlm_lockspace_exit();
82 dlm_lowcomms_exit();
81 dlm_unregister_debugfs(); 83 dlm_unregister_debugfs();
82} 84}
83 85
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 862640a36d5c..476557b54921 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -616,13 +616,13 @@ int dlm_ls_stop(struct dlm_ls *ls)
616 down_write(&ls->ls_recv_active); 616 down_write(&ls->ls_recv_active);
617 617
618 /* 618 /*
619 * Abort any recovery that's in progress (see RECOVERY_STOP, 619 * Abort any recovery that's in progress (see RECOVER_STOP,
620 * dlm_recovery_stopped()) and tell any other threads running in the 620 * dlm_recovery_stopped()) and tell any other threads running in the
621 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()). 621 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
622 */ 622 */
623 623
624 spin_lock(&ls->ls_recover_lock); 624 spin_lock(&ls->ls_recover_lock);
625 set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 625 set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
626 new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); 626 new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
627 ls->ls_recover_seq++; 627 ls->ls_recover_seq++;
628 spin_unlock(&ls->ls_recover_lock); 628 spin_unlock(&ls->ls_recover_lock);
@@ -642,12 +642,16 @@ int dlm_ls_stop(struct dlm_ls *ls)
642 * when recovery is complete. 642 * when recovery is complete.
643 */ 643 */
644 644
645 if (new) 645 if (new) {
646 down_write(&ls->ls_in_recovery); 646 set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
647 wake_up_process(ls->ls_recoverd_task);
648 wait_event(ls->ls_recover_lock_wait,
649 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
650 }
647 651
648 /* 652 /*
649 * The recoverd suspend/resume makes sure that dlm_recoverd (if 653 * The recoverd suspend/resume makes sure that dlm_recoverd (if
650 * running) has noticed RECOVERY_STOP above and quit processing the 654 * running) has noticed RECOVER_STOP above and quit processing the
651 * previous recovery. 655 * previous recovery.
652 */ 656 */
653 657
@@ -709,7 +713,8 @@ int dlm_ls_start(struct dlm_ls *ls)
709 kfree(rv_old); 713 kfree(rv_old);
710 } 714 }
711 715
712 dlm_recoverd_kick(ls); 716 set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
717 wake_up_process(ls->ls_recoverd_task);
713 return 0; 718 return 0;
714 719
715 fail: 720 fail:
diff --git a/fs/dlm/netlink.c b/fs/dlm/netlink.c
index ef17e0169da1..60a327863b11 100644
--- a/fs/dlm/netlink.c
+++ b/fs/dlm/netlink.c
@@ -14,7 +14,7 @@
14#include "dlm_internal.h" 14#include "dlm_internal.h"
15 15
16static uint32_t dlm_nl_seqnum; 16static uint32_t dlm_nl_seqnum;
17static uint32_t listener_nlpid; 17static uint32_t listener_nlportid;
18 18
19static struct genl_family family = { 19static struct genl_family family = {
20 .id = GENL_ID_GENERATE, 20 .id = GENL_ID_GENERATE,
@@ -64,13 +64,13 @@ static int send_data(struct sk_buff *skb)
64 return rv; 64 return rv;
65 } 65 }
66 66
67 return genlmsg_unicast(&init_net, skb, listener_nlpid); 67 return genlmsg_unicast(&init_net, skb, listener_nlportid);
68} 68}
69 69
70static int user_cmd(struct sk_buff *skb, struct genl_info *info) 70static int user_cmd(struct sk_buff *skb, struct genl_info *info)
71{ 71{
72 listener_nlpid = info->snd_pid; 72 listener_nlportid = info->snd_portid;
73 printk("user_cmd nlpid %u\n", listener_nlpid); 73 printk("user_cmd nlpid %u\n", listener_nlportid);
74 return 0; 74 return 0;
75} 75}
76 76
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 64d3e2b958c7..9d61947d473a 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,8 +23,6 @@
23#include "memory.h" 23#include "memory.h"
24#include "lock.h" 24#include "lock.h"
25#include "util.h" 25#include "util.h"
26#include "member.h"
27
28 26
29static int rcom_response(struct dlm_ls *ls) 27static int rcom_response(struct dlm_ls *ls)
30{ 28{
@@ -275,19 +273,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
275 struct dlm_rcom *rc; 273 struct dlm_rcom *rc;
276 struct dlm_mhandle *mh; 274 struct dlm_mhandle *mh;
277 int error = 0; 275 int error = 0;
278 int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
279 276
280 ls->ls_recover_nodeid = nodeid; 277 ls->ls_recover_nodeid = nodeid;
281 278
282 if (nodeid == dlm_our_nodeid()) {
283 ls->ls_recover_buf->rc_header.h_length =
284 dlm_config.ci_buffer_size;
285 dlm_copy_master_names(ls, last_name, last_len,
286 ls->ls_recover_buf->rc_buf,
287 max_size, nodeid);
288 goto out;
289 }
290
291 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh); 279 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
292 if (error) 280 if (error)
293 goto out; 281 goto out;
@@ -337,7 +325,26 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
337 if (error) 325 if (error)
338 goto out; 326 goto out;
339 memcpy(rc->rc_buf, r->res_name, r->res_length); 327 memcpy(rc->rc_buf, r->res_name, r->res_length);
340 rc->rc_id = (unsigned long) r; 328 rc->rc_id = (unsigned long) r->res_id;
329
330 send_rcom(ls, mh, rc);
331 out:
332 return error;
333}
334
335int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
336{
337 struct dlm_rcom *rc;
338 struct dlm_mhandle *mh;
339 struct dlm_ls *ls = r->res_ls;
340 int error;
341
342 error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
343 &rc, &mh);
344 if (error)
345 goto out;
346 memcpy(rc->rc_buf, r->res_name, r->res_length);
347 rc->rc_id = 0xFFFFFFFF;
341 348
342 send_rcom(ls, mh, rc); 349 send_rcom(ls, mh, rc);
343 out: 350 out:
@@ -355,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
355 if (error) 362 if (error)
356 return; 363 return;
357 364
358 error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid); 365 if (rc_in->rc_id == 0xFFFFFFFF) {
366 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
367 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
368 return;
369 }
370
371 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
372 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
359 if (error) 373 if (error)
360 ret_nodeid = error; 374 ret_nodeid = error;
361 rc->rc_result = ret_nodeid; 375 rc->rc_result = ret_nodeid;
@@ -486,17 +500,76 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
486 return 0; 500 return 0;
487} 501}
488 502
503/*
504 * Ignore messages for stage Y before we set
505 * recover_status bit for stage X:
506 *
507 * recover_status = 0
508 *
509 * dlm_recover_members()
510 * - send nothing
511 * - recv nothing
512 * - ignore NAMES, NAMES_REPLY
513 * - ignore LOOKUP, LOOKUP_REPLY
514 * - ignore LOCK, LOCK_REPLY
515 *
516 * recover_status |= NODES
517 *
518 * dlm_recover_members_wait()
519 *
520 * dlm_recover_directory()
521 * - send NAMES
522 * - recv NAMES_REPLY
523 * - ignore LOOKUP, LOOKUP_REPLY
524 * - ignore LOCK, LOCK_REPLY
525 *
526 * recover_status |= DIR
527 *
528 * dlm_recover_directory_wait()
529 *
530 * dlm_recover_masters()
531 * - send LOOKUP
532 * - recv LOOKUP_REPLY
533 *
534 * dlm_recover_locks()
535 * - send LOCKS
536 * - recv LOCKS_REPLY
537 *
538 * recover_status |= LOCKS
539 *
540 * dlm_recover_locks_wait()
541 *
542 * recover_status |= DONE
543 */
544
489/* Called by dlm_recv; corresponds to dlm_receive_message() but special 545/* Called by dlm_recv; corresponds to dlm_receive_message() but special
490 recovery-only comms are sent through here. */ 546 recovery-only comms are sent through here. */
491 547
492void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 548void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
493{ 549{
494 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); 550 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
495 int stop, reply = 0, lock = 0; 551 int stop, reply = 0, names = 0, lookup = 0, lock = 0;
496 uint32_t status; 552 uint32_t status;
497 uint64_t seq; 553 uint64_t seq;
498 554
499 switch (rc->rc_type) { 555 switch (rc->rc_type) {
556 case DLM_RCOM_STATUS_REPLY:
557 reply = 1;
558 break;
559 case DLM_RCOM_NAMES:
560 names = 1;
561 break;
562 case DLM_RCOM_NAMES_REPLY:
563 names = 1;
564 reply = 1;
565 break;
566 case DLM_RCOM_LOOKUP:
567 lookup = 1;
568 break;
569 case DLM_RCOM_LOOKUP_REPLY:
570 lookup = 1;
571 reply = 1;
572 break;
500 case DLM_RCOM_LOCK: 573 case DLM_RCOM_LOCK:
501 lock = 1; 574 lock = 1;
502 break; 575 break;
@@ -504,31 +577,25 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
504 lock = 1; 577 lock = 1;
505 reply = 1; 578 reply = 1;
506 break; 579 break;
507 case DLM_RCOM_STATUS_REPLY:
508 case DLM_RCOM_NAMES_REPLY:
509 case DLM_RCOM_LOOKUP_REPLY:
510 reply = 1;
511 }; 580 };
512 581
513 spin_lock(&ls->ls_recover_lock); 582 spin_lock(&ls->ls_recover_lock);
514 status = ls->ls_recover_status; 583 status = ls->ls_recover_status;
515 stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 584 stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
516 seq = ls->ls_recover_seq; 585 seq = ls->ls_recover_seq;
517 spin_unlock(&ls->ls_recover_lock); 586 spin_unlock(&ls->ls_recover_lock);
518 587
519 if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) || 588 if (stop && (rc->rc_type != DLM_RCOM_STATUS))
520 (reply && (rc->rc_seq_reply != seq)) || 589 goto ignore;
521 (lock && !(status & DLM_RS_DIR))) { 590
522 log_limit(ls, "dlm_receive_rcom ignore msg %d " 591 if (reply && (rc->rc_seq_reply != seq))
523 "from %d %llu %llu recover seq %llu sts %x gen %u", 592 goto ignore;
524 rc->rc_type, 593
525 nodeid, 594 if (!(status & DLM_RS_NODES) && (names || lookup || lock))
526 (unsigned long long)rc->rc_seq, 595 goto ignore;
527 (unsigned long long)rc->rc_seq_reply, 596
528 (unsigned long long)seq, 597 if (!(status & DLM_RS_DIR) && (lookup || lock))
529 status, ls->ls_generation); 598 goto ignore;
530 goto out;
531 }
532 599
533 switch (rc->rc_type) { 600 switch (rc->rc_type) {
534 case DLM_RCOM_STATUS: 601 case DLM_RCOM_STATUS:
@@ -570,10 +637,20 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
570 default: 637 default:
571 log_error(ls, "receive_rcom bad type %d", rc->rc_type); 638 log_error(ls, "receive_rcom bad type %d", rc->rc_type);
572 } 639 }
573out: 640 return;
641
642ignore:
643 log_limit(ls, "dlm_receive_rcom ignore msg %d "
644 "from %d %llu %llu recover seq %llu sts %x gen %u",
645 rc->rc_type,
646 nodeid,
647 (unsigned long long)rc->rc_seq,
648 (unsigned long long)rc->rc_seq_reply,
649 (unsigned long long)seq,
650 status, ls->ls_generation);
574 return; 651 return;
575Eshort: 652Eshort:
576 log_error(ls, "recovery message %x from %d is too short", 653 log_error(ls, "recovery message %d from %d is too short",
577 rc->rc_type, nodeid); 654 rc->rc_type, nodeid);
578} 655}
579 656
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index 206723ab744d..f8e243463c15 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -17,6 +17,7 @@
17int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags); 17int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
18int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); 18int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
19int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); 19int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
20int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid);
20int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 21int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
21void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid); 22void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
22int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in); 23int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 7554e4dac6bb..4a7a76e42fc3 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -36,30 +36,23 @@
36 * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another 36 * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another
37 * function thinks it could have completed the waited-on task, they should wake 37 * function thinks it could have completed the waited-on task, they should wake
38 * up ls_wait_general to get an immediate response rather than waiting for the 38 * up ls_wait_general to get an immediate response rather than waiting for the
39 * timer to detect the result. A timer wakes us up periodically while waiting 39 * timeout. This uses a timeout so it can check periodically if the wait
40 * to see if we should abort due to a node failure. This should only be called 40 * should abort due to node failure (which doesn't cause a wake_up).
41 * by the dlm_recoverd thread. 41 * This should only be called by the dlm_recoverd thread.
42 */ 42 */
43 43
44static void dlm_wait_timer_fn(unsigned long data)
45{
46 struct dlm_ls *ls = (struct dlm_ls *) data;
47 mod_timer(&ls->ls_timer, jiffies + (dlm_config.ci_recover_timer * HZ));
48 wake_up(&ls->ls_wait_general);
49}
50
51int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)) 44int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
52{ 45{
53 int error = 0; 46 int error = 0;
47 int rv;
54 48
55 init_timer(&ls->ls_timer); 49 while (1) {
56 ls->ls_timer.function = dlm_wait_timer_fn; 50 rv = wait_event_timeout(ls->ls_wait_general,
57 ls->ls_timer.data = (long) ls; 51 testfn(ls) || dlm_recovery_stopped(ls),
58 ls->ls_timer.expires = jiffies + (dlm_config.ci_recover_timer * HZ); 52 dlm_config.ci_recover_timer * HZ);
59 add_timer(&ls->ls_timer); 53 if (rv)
60 54 break;
61 wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls)); 55 }
62 del_timer_sync(&ls->ls_timer);
63 56
64 if (dlm_recovery_stopped(ls)) { 57 if (dlm_recovery_stopped(ls)) {
65 log_debug(ls, "dlm_wait_function aborted"); 58 log_debug(ls, "dlm_wait_function aborted");
@@ -277,22 +270,6 @@ static void recover_list_del(struct dlm_rsb *r)
277 dlm_put_rsb(r); 270 dlm_put_rsb(r);
278} 271}
279 272
280static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id)
281{
282 struct dlm_rsb *r = NULL;
283
284 spin_lock(&ls->ls_recover_list_lock);
285
286 list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) {
287 if (id == (unsigned long) r)
288 goto out;
289 }
290 r = NULL;
291 out:
292 spin_unlock(&ls->ls_recover_list_lock);
293 return r;
294}
295
296static void recover_list_clear(struct dlm_ls *ls) 273static void recover_list_clear(struct dlm_ls *ls)
297{ 274{
298 struct dlm_rsb *r, *s; 275 struct dlm_rsb *r, *s;
@@ -313,6 +290,94 @@ static void recover_list_clear(struct dlm_ls *ls)
313 spin_unlock(&ls->ls_recover_list_lock); 290 spin_unlock(&ls->ls_recover_list_lock);
314} 291}
315 292
293static int recover_idr_empty(struct dlm_ls *ls)
294{
295 int empty = 1;
296
297 spin_lock(&ls->ls_recover_idr_lock);
298 if (ls->ls_recover_list_count)
299 empty = 0;
300 spin_unlock(&ls->ls_recover_idr_lock);
301
302 return empty;
303}
304
305static int recover_idr_add(struct dlm_rsb *r)
306{
307 struct dlm_ls *ls = r->res_ls;
308 int rv, id;
309
310 rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS);
311 if (!rv)
312 return -ENOMEM;
313
314 spin_lock(&ls->ls_recover_idr_lock);
315 if (r->res_id) {
316 spin_unlock(&ls->ls_recover_idr_lock);
317 return -1;
318 }
319 rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id);
320 if (rv) {
321 spin_unlock(&ls->ls_recover_idr_lock);
322 return rv;
323 }
324 r->res_id = id;
325 ls->ls_recover_list_count++;
326 dlm_hold_rsb(r);
327 spin_unlock(&ls->ls_recover_idr_lock);
328 return 0;
329}
330
331static void recover_idr_del(struct dlm_rsb *r)
332{
333 struct dlm_ls *ls = r->res_ls;
334
335 spin_lock(&ls->ls_recover_idr_lock);
336 idr_remove(&ls->ls_recover_idr, r->res_id);
337 r->res_id = 0;
338 ls->ls_recover_list_count--;
339 spin_unlock(&ls->ls_recover_idr_lock);
340
341 dlm_put_rsb(r);
342}
343
344static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
345{
346 struct dlm_rsb *r;
347
348 spin_lock(&ls->ls_recover_idr_lock);
349 r = idr_find(&ls->ls_recover_idr, (int)id);
350 spin_unlock(&ls->ls_recover_idr_lock);
351 return r;
352}
353
354static int recover_idr_clear_rsb(int id, void *p, void *data)
355{
356 struct dlm_ls *ls = data;
357 struct dlm_rsb *r = p;
358
359 r->res_id = 0;
360 r->res_recover_locks_count = 0;
361 ls->ls_recover_list_count--;
362
363 dlm_put_rsb(r);
364 return 0;
365}
366
367static void recover_idr_clear(struct dlm_ls *ls)
368{
369 spin_lock(&ls->ls_recover_idr_lock);
370 idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls);
371 idr_remove_all(&ls->ls_recover_idr);
372
373 if (ls->ls_recover_list_count != 0) {
374 log_error(ls, "warning: recover_list_count %d",
375 ls->ls_recover_list_count);
376 ls->ls_recover_list_count = 0;
377 }
378 spin_unlock(&ls->ls_recover_idr_lock);
379}
380
316 381
317/* Master recovery: find new master node for rsb's that were 382/* Master recovery: find new master node for rsb's that were
318 mastered on nodes that have been removed. 383 mastered on nodes that have been removed.
@@ -361,9 +426,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
361 * rsb's to consider. 426 * rsb's to consider.
362 */ 427 */
363 428
364static void set_new_master(struct dlm_rsb *r, int nodeid) 429static void set_new_master(struct dlm_rsb *r)
365{ 430{
366 r->res_nodeid = nodeid;
367 set_master_lkbs(r); 431 set_master_lkbs(r);
368 rsb_set_flag(r, RSB_NEW_MASTER); 432 rsb_set_flag(r, RSB_NEW_MASTER);
369 rsb_set_flag(r, RSB_NEW_MASTER2); 433 rsb_set_flag(r, RSB_NEW_MASTER2);
@@ -372,31 +436,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
372/* 436/*
373 * We do async lookups on rsb's that need new masters. The rsb's 437 * We do async lookups on rsb's that need new masters. The rsb's
374 * waiting for a lookup reply are kept on the recover_list. 438 * waiting for a lookup reply are kept on the recover_list.
439 *
440 * Another node recovering the master may have sent us a rcom lookup,
441 * and our dlm_master_lookup() set it as the new master, along with
442 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
443 * equals our_nodeid below).
375 */ 444 */
376 445
377static int recover_master(struct dlm_rsb *r) 446static int recover_master(struct dlm_rsb *r, unsigned int *count)
378{ 447{
379 struct dlm_ls *ls = r->res_ls; 448 struct dlm_ls *ls = r->res_ls;
380 int error, ret_nodeid; 449 int our_nodeid, dir_nodeid;
381 int our_nodeid = dlm_our_nodeid(); 450 int is_removed = 0;
382 int dir_nodeid = dlm_dir_nodeid(r); 451 int error;
452
453 if (is_master(r))
454 return 0;
455
456 is_removed = dlm_is_removed(ls, r->res_nodeid);
457
458 if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
459 return 0;
460
461 our_nodeid = dlm_our_nodeid();
462 dir_nodeid = dlm_dir_nodeid(r);
383 463
384 if (dir_nodeid == our_nodeid) { 464 if (dir_nodeid == our_nodeid) {
385 error = dlm_dir_lookup(ls, our_nodeid, r->res_name, 465 if (is_removed) {
386 r->res_length, &ret_nodeid); 466 r->res_master_nodeid = our_nodeid;
387 if (error) 467 r->res_nodeid = 0;
388 log_error(ls, "recover dir lookup error %d", error); 468 }
389 469
390 if (ret_nodeid == our_nodeid) 470 /* set master of lkbs to ourself when is_removed, or to
391 ret_nodeid = 0; 471 another new master which we set along with NEW_MASTER
392 lock_rsb(r); 472 in dlm_master_lookup */
393 set_new_master(r, ret_nodeid); 473 set_new_master(r);
394 unlock_rsb(r); 474 error = 0;
395 } else { 475 } else {
396 recover_list_add(r); 476 recover_idr_add(r);
397 error = dlm_send_rcom_lookup(r, dir_nodeid); 477 error = dlm_send_rcom_lookup(r, dir_nodeid);
398 } 478 }
399 479
480 (*count)++;
400 return error; 481 return error;
401} 482}
402 483
@@ -415,7 +496,7 @@ static int recover_master(struct dlm_rsb *r)
415 * resent. 496 * resent.
416 */ 497 */
417 498
418static int recover_master_static(struct dlm_rsb *r) 499static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
419{ 500{
420 int dir_nodeid = dlm_dir_nodeid(r); 501 int dir_nodeid = dlm_dir_nodeid(r);
421 int new_master = dir_nodeid; 502 int new_master = dir_nodeid;
@@ -423,11 +504,12 @@ static int recover_master_static(struct dlm_rsb *r)
423 if (dir_nodeid == dlm_our_nodeid()) 504 if (dir_nodeid == dlm_our_nodeid())
424 new_master = 0; 505 new_master = 0;
425 506
426 lock_rsb(r);
427 dlm_purge_mstcpy_locks(r); 507 dlm_purge_mstcpy_locks(r);
428 set_new_master(r, new_master); 508 r->res_master_nodeid = dir_nodeid;
429 unlock_rsb(r); 509 r->res_nodeid = new_master;
430 return 1; 510 set_new_master(r);
511 (*count)++;
512 return 0;
431} 513}
432 514
433/* 515/*
@@ -443,7 +525,10 @@ static int recover_master_static(struct dlm_rsb *r)
443int dlm_recover_masters(struct dlm_ls *ls) 525int dlm_recover_masters(struct dlm_ls *ls)
444{ 526{
445 struct dlm_rsb *r; 527 struct dlm_rsb *r;
446 int error = 0, count = 0; 528 unsigned int total = 0;
529 unsigned int count = 0;
530 int nodir = dlm_no_directory(ls);
531 int error;
447 532
448 log_debug(ls, "dlm_recover_masters"); 533 log_debug(ls, "dlm_recover_masters");
449 534
@@ -455,50 +540,58 @@ int dlm_recover_masters(struct dlm_ls *ls)
455 goto out; 540 goto out;
456 } 541 }
457 542
458 if (dlm_no_directory(ls)) 543 lock_rsb(r);
459 count += recover_master_static(r); 544 if (nodir)
460 else if (!is_master(r) && 545 error = recover_master_static(r, &count);
461 (dlm_is_removed(ls, r->res_nodeid) || 546 else
462 rsb_flag(r, RSB_NEW_MASTER))) { 547 error = recover_master(r, &count);
463 recover_master(r); 548 unlock_rsb(r);
464 count++; 549 cond_resched();
465 } 550 total++;
466 551
467 schedule(); 552 if (error) {
553 up_read(&ls->ls_root_sem);
554 goto out;
555 }
468 } 556 }
469 up_read(&ls->ls_root_sem); 557 up_read(&ls->ls_root_sem);
470 558
471 log_debug(ls, "dlm_recover_masters %d resources", count); 559 log_debug(ls, "dlm_recover_masters %u of %u", count, total);
472 560
473 error = dlm_wait_function(ls, &recover_list_empty); 561 error = dlm_wait_function(ls, &recover_idr_empty);
474 out: 562 out:
475 if (error) 563 if (error)
476 recover_list_clear(ls); 564 recover_idr_clear(ls);
477 return error; 565 return error;
478} 566}
479 567
480int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) 568int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
481{ 569{
482 struct dlm_rsb *r; 570 struct dlm_rsb *r;
483 int nodeid; 571 int ret_nodeid, new_master;
484 572
485 r = recover_list_find(ls, rc->rc_id); 573 r = recover_idr_find(ls, rc->rc_id);
486 if (!r) { 574 if (!r) {
487 log_error(ls, "dlm_recover_master_reply no id %llx", 575 log_error(ls, "dlm_recover_master_reply no id %llx",
488 (unsigned long long)rc->rc_id); 576 (unsigned long long)rc->rc_id);
489 goto out; 577 goto out;
490 } 578 }
491 579
492 nodeid = rc->rc_result; 580 ret_nodeid = rc->rc_result;
493 if (nodeid == dlm_our_nodeid()) 581
494 nodeid = 0; 582 if (ret_nodeid == dlm_our_nodeid())
583 new_master = 0;
584 else
585 new_master = ret_nodeid;
495 586
496 lock_rsb(r); 587 lock_rsb(r);
497 set_new_master(r, nodeid); 588 r->res_master_nodeid = ret_nodeid;
589 r->res_nodeid = new_master;
590 set_new_master(r);
498 unlock_rsb(r); 591 unlock_rsb(r);
499 recover_list_del(r); 592 recover_idr_del(r);
500 593
501 if (recover_list_empty(ls)) 594 if (recover_idr_empty(ls))
502 wake_up(&ls->ls_wait_general); 595 wake_up(&ls->ls_wait_general);
503 out: 596 out:
504 return 0; 597 return 0;
@@ -711,6 +804,7 @@ static void recover_lvb(struct dlm_rsb *r)
711 804
712static void recover_conversion(struct dlm_rsb *r) 805static void recover_conversion(struct dlm_rsb *r)
713{ 806{
807 struct dlm_ls *ls = r->res_ls;
714 struct dlm_lkb *lkb; 808 struct dlm_lkb *lkb;
715 int grmode = -1; 809 int grmode = -1;
716 810
@@ -725,10 +819,15 @@ static void recover_conversion(struct dlm_rsb *r)
725 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { 819 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
726 if (lkb->lkb_grmode != DLM_LOCK_IV) 820 if (lkb->lkb_grmode != DLM_LOCK_IV)
727 continue; 821 continue;
728 if (grmode == -1) 822 if (grmode == -1) {
823 log_debug(ls, "recover_conversion %x set gr to rq %d",
824 lkb->lkb_id, lkb->lkb_rqmode);
729 lkb->lkb_grmode = lkb->lkb_rqmode; 825 lkb->lkb_grmode = lkb->lkb_rqmode;
730 else 826 } else {
827 log_debug(ls, "recover_conversion %x set gr %d",
828 lkb->lkb_id, grmode);
731 lkb->lkb_grmode = grmode; 829 lkb->lkb_grmode = grmode;
830 }
732 } 831 }
733} 832}
734 833
@@ -791,20 +890,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
791 dlm_hold_rsb(r); 890 dlm_hold_rsb(r);
792 } 891 }
793 892
794 /* If we're using a directory, add tossed rsbs to the root 893 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
795 list; they'll have entries created in the new directory, 894 log_error(ls, "dlm_create_root_list toss not empty");
796 but no other recovery steps should do anything with them. */
797
798 if (dlm_no_directory(ls)) {
799 spin_unlock(&ls->ls_rsbtbl[i].lock);
800 continue;
801 }
802
803 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
804 r = rb_entry(n, struct dlm_rsb, res_hashnode);
805 list_add(&r->res_root_list, &ls->ls_root_list);
806 dlm_hold_rsb(r);
807 }
808 spin_unlock(&ls->ls_rsbtbl[i].lock); 895 spin_unlock(&ls->ls_rsbtbl[i].lock);
809 } 896 }
810 out: 897 out:
@@ -824,28 +911,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
824 up_write(&ls->ls_root_sem); 911 up_write(&ls->ls_root_sem);
825} 912}
826 913
827/* If not using a directory, clear the entire toss list, there's no benefit to 914void dlm_clear_toss(struct dlm_ls *ls)
828 caching the master value since it's fixed. If we are using a dir, keep the
829 rsb's we're the master of. Recovery will add them to the root list and from
830 there they'll be entered in the rebuilt directory. */
831
832void dlm_clear_toss_list(struct dlm_ls *ls)
833{ 915{
834 struct rb_node *n, *next; 916 struct rb_node *n, *next;
835 struct dlm_rsb *rsb; 917 struct dlm_rsb *r;
918 unsigned int count = 0;
836 int i; 919 int i;
837 920
838 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 921 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
839 spin_lock(&ls->ls_rsbtbl[i].lock); 922 spin_lock(&ls->ls_rsbtbl[i].lock);
840 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { 923 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
841 next = rb_next(n);; 924 next = rb_next(n);
842 rsb = rb_entry(n, struct dlm_rsb, res_hashnode); 925 r = rb_entry(n, struct dlm_rsb, res_hashnode);
843 if (dlm_no_directory(ls) || !is_master(rsb)) { 926 rb_erase(n, &ls->ls_rsbtbl[i].toss);
844 rb_erase(n, &ls->ls_rsbtbl[i].toss); 927 dlm_free_rsb(r);
845 dlm_free_rsb(rsb); 928 count++;
846 }
847 } 929 }
848 spin_unlock(&ls->ls_rsbtbl[i].lock); 930 spin_unlock(&ls->ls_rsbtbl[i].lock);
849 } 931 }
932
933 if (count)
934 log_debug(ls, "dlm_clear_toss %u done", count);
850} 935}
851 936
diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h
index ebd0363f1e08..d8c8738c70eb 100644
--- a/fs/dlm/recover.h
+++ b/fs/dlm/recover.h
@@ -27,7 +27,7 @@ int dlm_recover_locks(struct dlm_ls *ls);
27void dlm_recovered_lock(struct dlm_rsb *r); 27void dlm_recovered_lock(struct dlm_rsb *r);
28int dlm_create_root_list(struct dlm_ls *ls); 28int dlm_create_root_list(struct dlm_ls *ls);
29void dlm_release_root_list(struct dlm_ls *ls); 29void dlm_release_root_list(struct dlm_ls *ls);
30void dlm_clear_toss_list(struct dlm_ls *ls); 30void dlm_clear_toss(struct dlm_ls *ls);
31void dlm_recover_rsbs(struct dlm_ls *ls); 31void dlm_recover_rsbs(struct dlm_ls *ls);
32 32
33#endif /* __RECOVER_DOT_H__ */ 33#endif /* __RECOVER_DOT_H__ */
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index f1a9073c0835..32f9f8926ec3 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -41,6 +41,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
41 set_bit(LSFL_RUNNING, &ls->ls_flags); 41 set_bit(LSFL_RUNNING, &ls->ls_flags);
42 /* unblocks processes waiting to enter the dlm */ 42 /* unblocks processes waiting to enter the dlm */
43 up_write(&ls->ls_in_recovery); 43 up_write(&ls->ls_in_recovery);
44 clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
44 error = 0; 45 error = 0;
45 } 46 }
46 spin_unlock(&ls->ls_recover_lock); 47 spin_unlock(&ls->ls_recover_lock);
@@ -60,12 +61,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
60 61
61 dlm_callback_suspend(ls); 62 dlm_callback_suspend(ls);
62 63
63 /* 64 dlm_clear_toss(ls);
64 * Free non-master tossed rsb's. Master rsb's are kept on toss
65 * list and put on root list to be included in resdir recovery.
66 */
67
68 dlm_clear_toss_list(ls);
69 65
70 /* 66 /*
71 * This list of root rsb's will be the basis of most of the recovery 67 * This list of root rsb's will be the basis of most of the recovery
@@ -84,6 +80,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
84 goto fail; 80 goto fail;
85 } 81 }
86 82
83 dlm_recover_dir_nodeid(ls);
84
85 ls->ls_recover_dir_sent_res = 0;
86 ls->ls_recover_dir_sent_msg = 0;
87 ls->ls_recover_locks_in = 0; 87 ls->ls_recover_locks_in = 0;
88 88
89 dlm_set_recover_status(ls, DLM_RS_NODES); 89 dlm_set_recover_status(ls, DLM_RS_NODES);
@@ -115,6 +115,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
115 goto fail; 115 goto fail;
116 } 116 }
117 117
118 log_debug(ls, "dlm_recover_directory %u out %u messages",
119 ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
120
118 /* 121 /*
119 * We may have outstanding operations that are waiting for a reply from 122 * We may have outstanding operations that are waiting for a reply from
120 * a failed node. Mark these to be resent after recovery. Unlock and 123 * a failed node. Mark these to be resent after recovery. Unlock and
@@ -260,7 +263,7 @@ static void do_ls_recovery(struct dlm_ls *ls)
260 rv = ls->ls_recover_args; 263 rv = ls->ls_recover_args;
261 ls->ls_recover_args = NULL; 264 ls->ls_recover_args = NULL;
262 if (rv && ls->ls_recover_seq == rv->seq) 265 if (rv && ls->ls_recover_seq == rv->seq)
263 clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 266 clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
264 spin_unlock(&ls->ls_recover_lock); 267 spin_unlock(&ls->ls_recover_lock);
265 268
266 if (rv) { 269 if (rv) {
@@ -280,26 +283,34 @@ static int dlm_recoverd(void *arg)
280 return -1; 283 return -1;
281 } 284 }
282 285
286 down_write(&ls->ls_in_recovery);
287 set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
288 wake_up(&ls->ls_recover_lock_wait);
289
283 while (!kthread_should_stop()) { 290 while (!kthread_should_stop()) {
284 set_current_state(TASK_INTERRUPTIBLE); 291 set_current_state(TASK_INTERRUPTIBLE);
285 if (!test_bit(LSFL_WORK, &ls->ls_flags)) 292 if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
293 !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags))
286 schedule(); 294 schedule();
287 set_current_state(TASK_RUNNING); 295 set_current_state(TASK_RUNNING);
288 296
289 if (test_and_clear_bit(LSFL_WORK, &ls->ls_flags)) 297 if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
298 down_write(&ls->ls_in_recovery);
299 set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
300 wake_up(&ls->ls_recover_lock_wait);
301 }
302
303 if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
290 do_ls_recovery(ls); 304 do_ls_recovery(ls);
291 } 305 }
292 306
307 if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
308 up_write(&ls->ls_in_recovery);
309
293 dlm_put_lockspace(ls); 310 dlm_put_lockspace(ls);
294 return 0; 311 return 0;
295} 312}
296 313
297void dlm_recoverd_kick(struct dlm_ls *ls)
298{
299 set_bit(LSFL_WORK, &ls->ls_flags);
300 wake_up_process(ls->ls_recoverd_task);
301}
302
303int dlm_recoverd_start(struct dlm_ls *ls) 314int dlm_recoverd_start(struct dlm_ls *ls)
304{ 315{
305 struct task_struct *p; 316 struct task_struct *p;
diff --git a/fs/dlm/recoverd.h b/fs/dlm/recoverd.h
index 866657c5d69d..8856079733fa 100644
--- a/fs/dlm/recoverd.h
+++ b/fs/dlm/recoverd.h
@@ -14,7 +14,6 @@
14#ifndef __RECOVERD_DOT_H__ 14#ifndef __RECOVERD_DOT_H__
15#define __RECOVERD_DOT_H__ 15#define __RECOVERD_DOT_H__
16 16
17void dlm_recoverd_kick(struct dlm_ls *ls);
18void dlm_recoverd_stop(struct dlm_ls *ls); 17void dlm_recoverd_stop(struct dlm_ls *ls);
19int dlm_recoverd_start(struct dlm_ls *ls); 18int dlm_recoverd_start(struct dlm_ls *ls);
20void dlm_recoverd_suspend(struct dlm_ls *ls); 19void dlm_recoverd_suspend(struct dlm_ls *ls);
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index eb4ed9ba3098..7ff49852b0cb 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -503,6 +503,13 @@ static ssize_t device_write(struct file *file, const char __user *buf,
503#endif 503#endif
504 return -EINVAL; 504 return -EINVAL;
505 505
506#ifdef CONFIG_COMPAT
507 if (count > sizeof(struct dlm_write_request32) + DLM_RESNAME_MAXLEN)
508#else
509 if (count > sizeof(struct dlm_write_request) + DLM_RESNAME_MAXLEN)
510#endif
511 return -EINVAL;
512
506 kbuf = kzalloc(count + 1, GFP_NOFS); 513 kbuf = kzalloc(count + 1, GFP_NOFS);
507 if (!kbuf) 514 if (!kbuf)
508 return -ENOMEM; 515 return -ENOMEM;