aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/config.c130
-rw-r--r--fs/dlm/config.h17
-rw-r--r--fs/dlm/debug_fs.c28
-rw-r--r--fs/dlm/dir.c1
-rw-r--r--fs/dlm/dlm_internal.h60
-rw-r--r--fs/dlm/lock.c87
-rw-r--r--fs/dlm/lockspace.c71
-rw-r--r--fs/dlm/member.c486
-rw-r--r--fs/dlm/member.h10
-rw-r--r--fs/dlm/rcom.c99
-rw-r--r--fs/dlm/rcom.h2
-rw-r--r--fs/dlm/recover.c87
-rw-r--r--fs/dlm/recoverd.c53
-rw-r--r--fs/dlm/user.c5
-rw-r--r--fs/gfs2/lock_dlm.c4
-rw-r--r--fs/ocfs2/stack_user.c4
-rw-r--r--include/linux/dlm.h71
17 files changed, 943 insertions, 272 deletions
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 6cf72fcc0d0c..e7e327d43fa5 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -17,6 +17,7 @@
17#include <linux/slab.h> 17#include <linux/slab.h>
18#include <linux/in.h> 18#include <linux/in.h>
19#include <linux/in6.h> 19#include <linux/in6.h>
20#include <linux/dlmconstants.h>
20#include <net/ipv6.h> 21#include <net/ipv6.h>
21#include <net/sock.h> 22#include <net/sock.h>
22 23
@@ -36,6 +37,7 @@
36static struct config_group *space_list; 37static struct config_group *space_list;
37static struct config_group *comm_list; 38static struct config_group *comm_list;
38static struct dlm_comm *local_comm; 39static struct dlm_comm *local_comm;
40static uint32_t dlm_comm_count;
39 41
40struct dlm_clusters; 42struct dlm_clusters;
41struct dlm_cluster; 43struct dlm_cluster;
@@ -103,6 +105,8 @@ struct dlm_cluster {
103 unsigned int cl_timewarn_cs; 105 unsigned int cl_timewarn_cs;
104 unsigned int cl_waitwarn_us; 106 unsigned int cl_waitwarn_us;
105 unsigned int cl_new_rsb_count; 107 unsigned int cl_new_rsb_count;
108 unsigned int cl_recover_callbacks;
109 char cl_cluster_name[DLM_LOCKSPACE_LEN];
106}; 110};
107 111
108enum { 112enum {
@@ -118,6 +122,8 @@ enum {
118 CLUSTER_ATTR_TIMEWARN_CS, 122 CLUSTER_ATTR_TIMEWARN_CS,
119 CLUSTER_ATTR_WAITWARN_US, 123 CLUSTER_ATTR_WAITWARN_US,
120 CLUSTER_ATTR_NEW_RSB_COUNT, 124 CLUSTER_ATTR_NEW_RSB_COUNT,
125 CLUSTER_ATTR_RECOVER_CALLBACKS,
126 CLUSTER_ATTR_CLUSTER_NAME,
121}; 127};
122 128
123struct cluster_attribute { 129struct cluster_attribute {
@@ -126,6 +132,27 @@ struct cluster_attribute {
126 ssize_t (*store)(struct dlm_cluster *, const char *, size_t); 132 ssize_t (*store)(struct dlm_cluster *, const char *, size_t);
127}; 133};
128 134
135static ssize_t cluster_cluster_name_read(struct dlm_cluster *cl, char *buf)
136{
137 return sprintf(buf, "%s\n", cl->cl_cluster_name);
138}
139
140static ssize_t cluster_cluster_name_write(struct dlm_cluster *cl,
141 const char *buf, size_t len)
142{
143 strncpy(dlm_config.ci_cluster_name, buf, DLM_LOCKSPACE_LEN);
144 strncpy(cl->cl_cluster_name, buf, DLM_LOCKSPACE_LEN);
145 return len;
146}
147
148static struct cluster_attribute cluster_attr_cluster_name = {
149 .attr = { .ca_owner = THIS_MODULE,
150 .ca_name = "cluster_name",
151 .ca_mode = S_IRUGO | S_IWUSR },
152 .show = cluster_cluster_name_read,
153 .store = cluster_cluster_name_write,
154};
155
129static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, 156static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
130 int *info_field, int check_zero, 157 int *info_field, int check_zero,
131 const char *buf, size_t len) 158 const char *buf, size_t len)
@@ -171,6 +198,7 @@ CLUSTER_ATTR(protocol, 0);
171CLUSTER_ATTR(timewarn_cs, 1); 198CLUSTER_ATTR(timewarn_cs, 1);
172CLUSTER_ATTR(waitwarn_us, 0); 199CLUSTER_ATTR(waitwarn_us, 0);
173CLUSTER_ATTR(new_rsb_count, 0); 200CLUSTER_ATTR(new_rsb_count, 0);
201CLUSTER_ATTR(recover_callbacks, 0);
174 202
175static struct configfs_attribute *cluster_attrs[] = { 203static struct configfs_attribute *cluster_attrs[] = {
176 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, 204 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
@@ -185,6 +213,8 @@ static struct configfs_attribute *cluster_attrs[] = {
185 [CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr, 213 [CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
186 [CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr, 214 [CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr,
187 [CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count.attr, 215 [CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count.attr,
216 [CLUSTER_ATTR_RECOVER_CALLBACKS] = &cluster_attr_recover_callbacks.attr,
217 [CLUSTER_ATTR_CLUSTER_NAME] = &cluster_attr_cluster_name.attr,
188 NULL, 218 NULL,
189}; 219};
190 220
@@ -293,6 +323,7 @@ struct dlm_comms {
293 323
294struct dlm_comm { 324struct dlm_comm {
295 struct config_item item; 325 struct config_item item;
326 int seq;
296 int nodeid; 327 int nodeid;
297 int local; 328 int local;
298 int addr_count; 329 int addr_count;
@@ -309,6 +340,7 @@ struct dlm_node {
309 int nodeid; 340 int nodeid;
310 int weight; 341 int weight;
311 int new; 342 int new;
343 int comm_seq; /* copy of cm->seq when nd->nodeid is set */
312}; 344};
313 345
314static struct configfs_group_operations clusters_ops = { 346static struct configfs_group_operations clusters_ops = {
@@ -455,6 +487,9 @@ static struct config_group *make_cluster(struct config_group *g,
455 cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs; 487 cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
456 cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us; 488 cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us;
457 cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count; 489 cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count;
490 cl->cl_recover_callbacks = dlm_config.ci_recover_callbacks;
491 memcpy(cl->cl_cluster_name, dlm_config.ci_cluster_name,
492 DLM_LOCKSPACE_LEN);
458 493
459 space_list = &sps->ss_group; 494 space_list = &sps->ss_group;
460 comm_list = &cms->cs_group; 495 comm_list = &cms->cs_group;
@@ -558,6 +593,11 @@ static struct config_item *make_comm(struct config_group *g, const char *name)
558 return ERR_PTR(-ENOMEM); 593 return ERR_PTR(-ENOMEM);
559 594
560 config_item_init_type_name(&cm->item, name, &comm_type); 595 config_item_init_type_name(&cm->item, name, &comm_type);
596
597 cm->seq = dlm_comm_count++;
598 if (!cm->seq)
599 cm->seq = dlm_comm_count++;
600
561 cm->nodeid = -1; 601 cm->nodeid = -1;
562 cm->local = 0; 602 cm->local = 0;
563 cm->addr_count = 0; 603 cm->addr_count = 0;
@@ -801,7 +841,10 @@ static ssize_t node_nodeid_read(struct dlm_node *nd, char *buf)
801static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf, 841static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf,
802 size_t len) 842 size_t len)
803{ 843{
844 uint32_t seq = 0;
804 nd->nodeid = simple_strtol(buf, NULL, 0); 845 nd->nodeid = simple_strtol(buf, NULL, 0);
846 dlm_comm_seq(nd->nodeid, &seq);
847 nd->comm_seq = seq;
805 return len; 848 return len;
806} 849}
807 850
@@ -908,13 +951,13 @@ static void put_comm(struct dlm_comm *cm)
908} 951}
909 952
910/* caller must free mem */ 953/* caller must free mem */
911int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out, 954int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
912 int **new_out, int *new_count_out) 955 int *count_out)
913{ 956{
914 struct dlm_space *sp; 957 struct dlm_space *sp;
915 struct dlm_node *nd; 958 struct dlm_node *nd;
916 int i = 0, rv = 0, ids_count = 0, new_count = 0; 959 struct dlm_config_node *nodes, *node;
917 int *ids, *new; 960 int rv, count;
918 961
919 sp = get_space(lsname); 962 sp = get_space(lsname);
920 if (!sp) 963 if (!sp)
@@ -927,73 +970,42 @@ int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out,
927 goto out; 970 goto out;
928 } 971 }
929 972
930 ids_count = sp->members_count; 973 count = sp->members_count;
931 974
932 ids = kcalloc(ids_count, sizeof(int), GFP_NOFS); 975 nodes = kcalloc(count, sizeof(struct dlm_config_node), GFP_NOFS);
933 if (!ids) { 976 if (!nodes) {
934 rv = -ENOMEM; 977 rv = -ENOMEM;
935 goto out; 978 goto out;
936 } 979 }
937 980
981 node = nodes;
938 list_for_each_entry(nd, &sp->members, list) { 982 list_for_each_entry(nd, &sp->members, list) {
939 ids[i++] = nd->nodeid; 983 node->nodeid = nd->nodeid;
940 if (nd->new) 984 node->weight = nd->weight;
941 new_count++; 985 node->new = nd->new;
942 } 986 node->comm_seq = nd->comm_seq;
943 987 node++;
944 if (ids_count != i)
945 printk(KERN_ERR "dlm: bad nodeid count %d %d\n", ids_count, i);
946
947 if (!new_count)
948 goto out_ids;
949 988
950 new = kcalloc(new_count, sizeof(int), GFP_NOFS); 989 nd->new = 0;
951 if (!new) {
952 kfree(ids);
953 rv = -ENOMEM;
954 goto out;
955 } 990 }
956 991
957 i = 0; 992 *count_out = count;
958 list_for_each_entry(nd, &sp->members, list) { 993 *nodes_out = nodes;
959 if (nd->new) { 994 rv = 0;
960 new[i++] = nd->nodeid;
961 nd->new = 0;
962 }
963 }
964 *new_count_out = new_count;
965 *new_out = new;
966
967 out_ids:
968 *ids_count_out = ids_count;
969 *ids_out = ids;
970 out: 995 out:
971 mutex_unlock(&sp->members_lock); 996 mutex_unlock(&sp->members_lock);
972 put_space(sp); 997 put_space(sp);
973 return rv; 998 return rv;
974} 999}
975 1000
976int dlm_node_weight(char *lsname, int nodeid) 1001int dlm_comm_seq(int nodeid, uint32_t *seq)
977{ 1002{
978 struct dlm_space *sp; 1003 struct dlm_comm *cm = get_comm(nodeid, NULL);
979 struct dlm_node *nd; 1004 if (!cm)
980 int w = -EEXIST; 1005 return -EEXIST;
981 1006 *seq = cm->seq;
982 sp = get_space(lsname); 1007 put_comm(cm);
983 if (!sp) 1008 return 0;
984 goto out;
985
986 mutex_lock(&sp->members_lock);
987 list_for_each_entry(nd, &sp->members, list) {
988 if (nd->nodeid != nodeid)
989 continue;
990 w = nd->weight;
991 break;
992 }
993 mutex_unlock(&sp->members_lock);
994 put_space(sp);
995 out:
996 return w;
997} 1009}
998 1010
999int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr) 1011int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr)
@@ -1047,6 +1059,8 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
1047#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */ 1059#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */
1048#define DEFAULT_WAITWARN_US 0 1060#define DEFAULT_WAITWARN_US 0
1049#define DEFAULT_NEW_RSB_COUNT 128 1061#define DEFAULT_NEW_RSB_COUNT 128
1062#define DEFAULT_RECOVER_CALLBACKS 0
1063#define DEFAULT_CLUSTER_NAME ""
1050 1064
1051struct dlm_config_info dlm_config = { 1065struct dlm_config_info dlm_config = {
1052 .ci_tcp_port = DEFAULT_TCP_PORT, 1066 .ci_tcp_port = DEFAULT_TCP_PORT,
@@ -1060,6 +1074,8 @@ struct dlm_config_info dlm_config = {
1060 .ci_protocol = DEFAULT_PROTOCOL, 1074 .ci_protocol = DEFAULT_PROTOCOL,
1061 .ci_timewarn_cs = DEFAULT_TIMEWARN_CS, 1075 .ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
1062 .ci_waitwarn_us = DEFAULT_WAITWARN_US, 1076 .ci_waitwarn_us = DEFAULT_WAITWARN_US,
1063 .ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT 1077 .ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT,
1078 .ci_recover_callbacks = DEFAULT_RECOVER_CALLBACKS,
1079 .ci_cluster_name = DEFAULT_CLUSTER_NAME
1064}; 1080};
1065 1081
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 3099d0dd26c0..9f5e3663bb0c 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -14,6 +14,13 @@
14#ifndef __CONFIG_DOT_H__ 14#ifndef __CONFIG_DOT_H__
15#define __CONFIG_DOT_H__ 15#define __CONFIG_DOT_H__
16 16
17struct dlm_config_node {
18 int nodeid;
19 int weight;
20 int new;
21 uint32_t comm_seq;
22};
23
17#define DLM_MAX_ADDR_COUNT 3 24#define DLM_MAX_ADDR_COUNT 3
18 25
19struct dlm_config_info { 26struct dlm_config_info {
@@ -29,15 +36,17 @@ struct dlm_config_info {
29 int ci_timewarn_cs; 36 int ci_timewarn_cs;
30 int ci_waitwarn_us; 37 int ci_waitwarn_us;
31 int ci_new_rsb_count; 38 int ci_new_rsb_count;
39 int ci_recover_callbacks;
40 char ci_cluster_name[DLM_LOCKSPACE_LEN];
32}; 41};
33 42
34extern struct dlm_config_info dlm_config; 43extern struct dlm_config_info dlm_config;
35 44
36int dlm_config_init(void); 45int dlm_config_init(void);
37void dlm_config_exit(void); 46void dlm_config_exit(void);
38int dlm_node_weight(char *lsname, int nodeid); 47int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
39int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out, 48 int *count_out);
40 int **new_out, int *new_count_out); 49int dlm_comm_seq(int nodeid, uint32_t *seq);
41int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr); 50int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr);
42int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid); 51int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid);
43int dlm_our_nodeid(void); 52int dlm_our_nodeid(void);
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 59779237e2b4..3dca2b39e83f 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -393,6 +393,7 @@ static const struct seq_operations format3_seq_ops;
393 393
394static void *table_seq_start(struct seq_file *seq, loff_t *pos) 394static void *table_seq_start(struct seq_file *seq, loff_t *pos)
395{ 395{
396 struct rb_node *node;
396 struct dlm_ls *ls = seq->private; 397 struct dlm_ls *ls = seq->private;
397 struct rsbtbl_iter *ri; 398 struct rsbtbl_iter *ri;
398 struct dlm_rsb *r; 399 struct dlm_rsb *r;
@@ -418,9 +419,10 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
418 ri->format = 3; 419 ri->format = 3;
419 420
420 spin_lock(&ls->ls_rsbtbl[bucket].lock); 421 spin_lock(&ls->ls_rsbtbl[bucket].lock);
421 if (!list_empty(&ls->ls_rsbtbl[bucket].list)) { 422 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
422 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, 423 for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node;
423 res_hashchain) { 424 node = rb_next(node)) {
425 r = rb_entry(node, struct dlm_rsb, res_hashnode);
424 if (!entry--) { 426 if (!entry--) {
425 dlm_hold_rsb(r); 427 dlm_hold_rsb(r);
426 ri->rsb = r; 428 ri->rsb = r;
@@ -449,9 +451,9 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
449 } 451 }
450 452
451 spin_lock(&ls->ls_rsbtbl[bucket].lock); 453 spin_lock(&ls->ls_rsbtbl[bucket].lock);
452 if (!list_empty(&ls->ls_rsbtbl[bucket].list)) { 454 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
453 r = list_first_entry(&ls->ls_rsbtbl[bucket].list, 455 node = rb_first(&ls->ls_rsbtbl[bucket].keep);
454 struct dlm_rsb, res_hashchain); 456 r = rb_entry(node, struct dlm_rsb, res_hashnode);
455 dlm_hold_rsb(r); 457 dlm_hold_rsb(r);
456 ri->rsb = r; 458 ri->rsb = r;
457 ri->bucket = bucket; 459 ri->bucket = bucket;
@@ -467,7 +469,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
467{ 469{
468 struct dlm_ls *ls = seq->private; 470 struct dlm_ls *ls = seq->private;
469 struct rsbtbl_iter *ri = iter_ptr; 471 struct rsbtbl_iter *ri = iter_ptr;
470 struct list_head *next; 472 struct rb_node *next;
471 struct dlm_rsb *r, *rp; 473 struct dlm_rsb *r, *rp;
472 loff_t n = *pos; 474 loff_t n = *pos;
473 unsigned bucket; 475 unsigned bucket;
@@ -480,10 +482,10 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
480 482
481 spin_lock(&ls->ls_rsbtbl[bucket].lock); 483 spin_lock(&ls->ls_rsbtbl[bucket].lock);
482 rp = ri->rsb; 484 rp = ri->rsb;
483 next = rp->res_hashchain.next; 485 next = rb_next(&rp->res_hashnode);
484 486
485 if (next != &ls->ls_rsbtbl[bucket].list) { 487 if (next) {
486 r = list_entry(next, struct dlm_rsb, res_hashchain); 488 r = rb_entry(next, struct dlm_rsb, res_hashnode);
487 dlm_hold_rsb(r); 489 dlm_hold_rsb(r);
488 ri->rsb = r; 490 ri->rsb = r;
489 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 491 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -511,9 +513,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
511 } 513 }
512 514
513 spin_lock(&ls->ls_rsbtbl[bucket].lock); 515 spin_lock(&ls->ls_rsbtbl[bucket].lock);
514 if (!list_empty(&ls->ls_rsbtbl[bucket].list)) { 516 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
515 r = list_first_entry(&ls->ls_rsbtbl[bucket].list, 517 next = rb_first(&ls->ls_rsbtbl[bucket].keep);
516 struct dlm_rsb, res_hashchain); 518 r = rb_entry(next, struct dlm_rsb, res_hashnode);
517 dlm_hold_rsb(r); 519 dlm_hold_rsb(r);
518 ri->rsb = r; 520 ri->rsb = r;
519 ri->bucket = bucket; 521 ri->bucket = bucket;
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 7b84c1dbc82e..83641574b016 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -290,7 +290,6 @@ int dlm_recover_directory(struct dlm_ls *ls)
290 290
291 out_status: 291 out_status:
292 error = 0; 292 error = 0;
293 dlm_set_recover_status(ls, DLM_RS_DIR);
294 log_debug(ls, "dlm_recover_directory %d entries", count); 293 log_debug(ls, "dlm_recover_directory %d entries", count);
295 out_free: 294 out_free:
296 kfree(last_name); 295 kfree(last_name);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index fe2860c02449..3a564d197e99 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -103,8 +103,8 @@ struct dlm_dirtable {
103}; 103};
104 104
105struct dlm_rsbtable { 105struct dlm_rsbtable {
106 struct list_head list; 106 struct rb_root keep;
107 struct list_head toss; 107 struct rb_root toss;
108 spinlock_t lock; 108 spinlock_t lock;
109}; 109};
110 110
@@ -117,6 +117,10 @@ struct dlm_member {
117 struct list_head list; 117 struct list_head list;
118 int nodeid; 118 int nodeid;
119 int weight; 119 int weight;
120 int slot;
121 int slot_prev;
122 int comm_seq;
123 uint32_t generation;
120}; 124};
121 125
122/* 126/*
@@ -125,10 +129,8 @@ struct dlm_member {
125 129
126struct dlm_recover { 130struct dlm_recover {
127 struct list_head list; 131 struct list_head list;
128 int *nodeids; /* nodeids of all members */ 132 struct dlm_config_node *nodes;
129 int node_count; 133 int nodes_count;
130 int *new; /* nodeids of new members */
131 int new_count;
132 uint64_t seq; 134 uint64_t seq;
133}; 135};
134 136
@@ -285,7 +287,10 @@ struct dlm_rsb {
285 unsigned long res_toss_time; 287 unsigned long res_toss_time;
286 uint32_t res_first_lkid; 288 uint32_t res_first_lkid;
287 struct list_head res_lookup; /* lkbs waiting on first */ 289 struct list_head res_lookup; /* lkbs waiting on first */
288 struct list_head res_hashchain; /* rsbtbl */ 290 union {
291 struct list_head res_hashchain;
292 struct rb_node res_hashnode; /* rsbtbl */
293 };
289 struct list_head res_grantqueue; 294 struct list_head res_grantqueue;
290 struct list_head res_convertqueue; 295 struct list_head res_convertqueue;
291 struct list_head res_waitqueue; 296 struct list_head res_waitqueue;
@@ -334,7 +339,9 @@ static inline int rsb_flag(struct dlm_rsb *r, enum rsb_flags flag)
334/* dlm_header is first element of all structs sent between nodes */ 339/* dlm_header is first element of all structs sent between nodes */
335 340
336#define DLM_HEADER_MAJOR 0x00030000 341#define DLM_HEADER_MAJOR 0x00030000
337#define DLM_HEADER_MINOR 0x00000000 342#define DLM_HEADER_MINOR 0x00000001
343
344#define DLM_HEADER_SLOTS 0x00000001
338 345
339#define DLM_MSG 1 346#define DLM_MSG 1
340#define DLM_RCOM 2 347#define DLM_RCOM 2
@@ -422,10 +429,34 @@ union dlm_packet {
422 struct dlm_rcom rcom; 429 struct dlm_rcom rcom;
423}; 430};
424 431
432#define DLM_RSF_NEED_SLOTS 0x00000001
433
434/* RCOM_STATUS data */
435struct rcom_status {
436 __le32 rs_flags;
437 __le32 rs_unused1;
438 __le64 rs_unused2;
439};
440
441/* RCOM_STATUS_REPLY data */
425struct rcom_config { 442struct rcom_config {
426 __le32 rf_lvblen; 443 __le32 rf_lvblen;
427 __le32 rf_lsflags; 444 __le32 rf_lsflags;
428 __le64 rf_unused; 445
446 /* DLM_HEADER_SLOTS adds: */
447 __le32 rf_flags;
448 __le16 rf_our_slot;
449 __le16 rf_num_slots;
450 __le32 rf_generation;
451 __le32 rf_unused1;
452 __le64 rf_unused2;
453};
454
455struct rcom_slot {
456 __le32 ro_nodeid;
457 __le16 ro_slot;
458 __le16 ro_unused1;
459 __le64 ro_unused2;
429}; 460};
430 461
431struct rcom_lock { 462struct rcom_lock {
@@ -452,6 +483,7 @@ struct dlm_ls {
452 struct list_head ls_list; /* list of lockspaces */ 483 struct list_head ls_list; /* list of lockspaces */
453 dlm_lockspace_t *ls_local_handle; 484 dlm_lockspace_t *ls_local_handle;
454 uint32_t ls_global_id; /* global unique lockspace ID */ 485 uint32_t ls_global_id; /* global unique lockspace ID */
486 uint32_t ls_generation;
455 uint32_t ls_exflags; 487 uint32_t ls_exflags;
456 int ls_lvblen; 488 int ls_lvblen;
457 int ls_count; /* refcount of processes in 489 int ls_count; /* refcount of processes in
@@ -490,6 +522,11 @@ struct dlm_ls {
490 int ls_total_weight; 522 int ls_total_weight;
491 int *ls_node_array; 523 int *ls_node_array;
492 524
525 int ls_slot;
526 int ls_num_slots;
527 int ls_slots_size;
528 struct dlm_slot *ls_slots;
529
493 struct dlm_rsb ls_stub_rsb; /* for returning errors */ 530 struct dlm_rsb ls_stub_rsb; /* for returning errors */
494 struct dlm_lkb ls_stub_lkb; /* for returning errors */ 531 struct dlm_lkb ls_stub_lkb; /* for returning errors */
495 struct dlm_message ls_stub_ms; /* for faking a reply */ 532 struct dlm_message ls_stub_ms; /* for faking a reply */
@@ -537,6 +574,9 @@ struct dlm_ls {
537 struct list_head ls_root_list; /* root resources */ 574 struct list_head ls_root_list; /* root resources */
538 struct rw_semaphore ls_root_sem; /* protect root_list */ 575 struct rw_semaphore ls_root_sem; /* protect root_list */
539 576
577 const struct dlm_lockspace_ops *ls_ops;
578 void *ls_ops_arg;
579
540 int ls_namelen; 580 int ls_namelen;
541 char ls_name[1]; 581 char ls_name[1];
542}; 582};
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 83b5e32514e1..d47183043c59 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -56,6 +56,7 @@
56 L: receive_xxxx_reply() <- R: send_xxxx_reply() 56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/ 57*/
58#include <linux/types.h> 58#include <linux/types.h>
59#include <linux/rbtree.h>
59#include <linux/slab.h> 60#include <linux/slab.h>
60#include "dlm_internal.h" 61#include "dlm_internal.h"
61#include <linux/dlm_device.h> 62#include <linux/dlm_device.h>
@@ -380,6 +381,8 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
380 381
381 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); 382 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
382 list_del(&r->res_hashchain); 383 list_del(&r->res_hashchain);
384 /* Convert the empty list_head to a NULL rb_node for tree usage: */
385 memset(&r->res_hashnode, 0, sizeof(struct rb_node));
383 ls->ls_new_rsb_count--; 386 ls->ls_new_rsb_count--;
384 spin_unlock(&ls->ls_new_rsb_spin); 387 spin_unlock(&ls->ls_new_rsb_spin);
385 388
@@ -388,7 +391,6 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
388 memcpy(r->res_name, name, len); 391 memcpy(r->res_name, name, len);
389 mutex_init(&r->res_mutex); 392 mutex_init(&r->res_mutex);
390 393
391 INIT_LIST_HEAD(&r->res_hashchain);
392 INIT_LIST_HEAD(&r->res_lookup); 394 INIT_LIST_HEAD(&r->res_lookup);
393 INIT_LIST_HEAD(&r->res_grantqueue); 395 INIT_LIST_HEAD(&r->res_grantqueue);
394 INIT_LIST_HEAD(&r->res_convertqueue); 396 INIT_LIST_HEAD(&r->res_convertqueue);
@@ -400,14 +402,31 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
400 return 0; 402 return 0;
401} 403}
402 404
403static int search_rsb_list(struct list_head *head, char *name, int len, 405static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
406{
407 char maxname[DLM_RESNAME_MAXLEN];
408
409 memset(maxname, 0, DLM_RESNAME_MAXLEN);
410 memcpy(maxname, name, nlen);
411 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
412}
413
414static int search_rsb_tree(struct rb_root *tree, char *name, int len,
404 unsigned int flags, struct dlm_rsb **r_ret) 415 unsigned int flags, struct dlm_rsb **r_ret)
405{ 416{
417 struct rb_node *node = tree->rb_node;
406 struct dlm_rsb *r; 418 struct dlm_rsb *r;
407 int error = 0; 419 int error = 0;
408 420 int rc;
409 list_for_each_entry(r, head, res_hashchain) { 421
410 if (len == r->res_length && !memcmp(name, r->res_name, len)) 422 while (node) {
423 r = rb_entry(node, struct dlm_rsb, res_hashnode);
424 rc = rsb_cmp(r, name, len);
425 if (rc < 0)
426 node = node->rb_left;
427 else if (rc > 0)
428 node = node->rb_right;
429 else
411 goto found; 430 goto found;
412 } 431 }
413 *r_ret = NULL; 432 *r_ret = NULL;
@@ -420,22 +439,54 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
420 return error; 439 return error;
421} 440}
422 441
442static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
443{
444 struct rb_node **newn = &tree->rb_node;
445 struct rb_node *parent = NULL;
446 int rc;
447
448 while (*newn) {
449 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
450 res_hashnode);
451
452 parent = *newn;
453 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
454 if (rc < 0)
455 newn = &parent->rb_left;
456 else if (rc > 0)
457 newn = &parent->rb_right;
458 else {
459 log_print("rsb_insert match");
460 dlm_dump_rsb(rsb);
461 dlm_dump_rsb(cur);
462 return -EEXIST;
463 }
464 }
465
466 rb_link_node(&rsb->res_hashnode, parent, newn);
467 rb_insert_color(&rsb->res_hashnode, tree);
468 return 0;
469}
470
423static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, 471static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
424 unsigned int flags, struct dlm_rsb **r_ret) 472 unsigned int flags, struct dlm_rsb **r_ret)
425{ 473{
426 struct dlm_rsb *r; 474 struct dlm_rsb *r;
427 int error; 475 int error;
428 476
429 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r); 477 error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
430 if (!error) { 478 if (!error) {
431 kref_get(&r->res_ref); 479 kref_get(&r->res_ref);
432 goto out; 480 goto out;
433 } 481 }
434 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); 482 error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
435 if (error) 483 if (error)
436 goto out; 484 goto out;
437 485
438 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list); 486 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
487 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
488 if (error)
489 return error;
439 490
440 if (dlm_no_directory(ls)) 491 if (dlm_no_directory(ls))
441 goto out; 492 goto out;
@@ -527,8 +578,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
527 nodeid = 0; 578 nodeid = 0;
528 r->res_nodeid = nodeid; 579 r->res_nodeid = nodeid;
529 } 580 }
530 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list); 581 error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
531 error = 0;
532 out_unlock: 582 out_unlock:
533 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 583 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
534 out: 584 out:
@@ -556,7 +606,8 @@ static void toss_rsb(struct kref *kref)
556 606
557 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 607 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
558 kref_init(&r->res_ref); 608 kref_init(&r->res_ref);
559 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); 609 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
610 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
560 r->res_toss_time = jiffies; 611 r->res_toss_time = jiffies;
561 if (r->res_lvbptr) { 612 if (r->res_lvbptr) {
562 dlm_free_lvb(r->res_lvbptr); 613 dlm_free_lvb(r->res_lvbptr);
@@ -1082,19 +1133,19 @@ static void dir_remove(struct dlm_rsb *r)
1082 r->res_name, r->res_length); 1133 r->res_name, r->res_length);
1083} 1134}
1084 1135
1085/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is 1136/* FIXME: make this more efficient */
1086 found since they are in order of newest to oldest? */
1087 1137
1088static int shrink_bucket(struct dlm_ls *ls, int b) 1138static int shrink_bucket(struct dlm_ls *ls, int b)
1089{ 1139{
1140 struct rb_node *n;
1090 struct dlm_rsb *r; 1141 struct dlm_rsb *r;
1091 int count = 0, found; 1142 int count = 0, found;
1092 1143
1093 for (;;) { 1144 for (;;) {
1094 found = 0; 1145 found = 0;
1095 spin_lock(&ls->ls_rsbtbl[b].lock); 1146 spin_lock(&ls->ls_rsbtbl[b].lock);
1096 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss, 1147 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1097 res_hashchain) { 1148 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1098 if (!time_after_eq(jiffies, r->res_toss_time + 1149 if (!time_after_eq(jiffies, r->res_toss_time +
1099 dlm_config.ci_toss_secs * HZ)) 1150 dlm_config.ci_toss_secs * HZ))
1100 continue; 1151 continue;
@@ -1108,7 +1159,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
1108 } 1159 }
1109 1160
1110 if (kref_put(&r->res_ref, kill_rsb)) { 1161 if (kref_put(&r->res_ref, kill_rsb)) {
1111 list_del(&r->res_hashchain); 1162 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1112 spin_unlock(&ls->ls_rsbtbl[b].lock); 1163 spin_unlock(&ls->ls_rsbtbl[b].lock);
1113 1164
1114 if (is_master(r)) 1165 if (is_master(r))
@@ -4441,10 +4492,12 @@ int dlm_purge_locks(struct dlm_ls *ls)
4441 4492
4442static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) 4493static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4443{ 4494{
4495 struct rb_node *n;
4444 struct dlm_rsb *r, *r_ret = NULL; 4496 struct dlm_rsb *r, *r_ret = NULL;
4445 4497
4446 spin_lock(&ls->ls_rsbtbl[bucket].lock); 4498 spin_lock(&ls->ls_rsbtbl[bucket].lock);
4447 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) { 4499 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4500 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4448 if (!rsb_flag(r, RSB_LOCKS_PURGED)) 4501 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4449 continue; 4502 continue;
4450 hold_rsb(r); 4503 hold_rsb(r);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1d8f1af144b..a1ea25face82 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -386,12 +386,15 @@ static void threads_stop(void)
386 dlm_lowcomms_stop(); 386 dlm_lowcomms_stop();
387} 387}
388 388
389static int new_lockspace(const char *name, int namelen, void **lockspace, 389static int new_lockspace(const char *name, const char *cluster,
390 uint32_t flags, int lvblen) 390 uint32_t flags, int lvblen,
391 const struct dlm_lockspace_ops *ops, void *ops_arg,
392 int *ops_result, dlm_lockspace_t **lockspace)
391{ 393{
392 struct dlm_ls *ls; 394 struct dlm_ls *ls;
393 int i, size, error; 395 int i, size, error;
394 int do_unreg = 0; 396 int do_unreg = 0;
397 int namelen = strlen(name);
395 398
396 if (namelen > DLM_LOCKSPACE_LEN) 399 if (namelen > DLM_LOCKSPACE_LEN)
397 return -EINVAL; 400 return -EINVAL;
@@ -403,8 +406,24 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
403 return -EINVAL; 406 return -EINVAL;
404 407
405 if (!dlm_user_daemon_available()) { 408 if (!dlm_user_daemon_available()) {
406 module_put(THIS_MODULE); 409 log_print("dlm user daemon not available");
407 return -EUNATCH; 410 error = -EUNATCH;
411 goto out;
412 }
413
414 if (ops && ops_result) {
415 if (!dlm_config.ci_recover_callbacks)
416 *ops_result = -EOPNOTSUPP;
417 else
418 *ops_result = 0;
419 }
420
421 if (dlm_config.ci_recover_callbacks && cluster &&
422 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
423 log_print("dlm cluster name %s mismatch %s",
424 dlm_config.ci_cluster_name, cluster);
425 error = -EBADR;
426 goto out;
408 } 427 }
409 428
410 error = 0; 429 error = 0;
@@ -442,6 +461,11 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
442 ls->ls_flags = 0; 461 ls->ls_flags = 0;
443 ls->ls_scan_time = jiffies; 462 ls->ls_scan_time = jiffies;
444 463
464 if (ops && dlm_config.ci_recover_callbacks) {
465 ls->ls_ops = ops;
466 ls->ls_ops_arg = ops_arg;
467 }
468
445 if (flags & DLM_LSFL_TIMEWARN) 469 if (flags & DLM_LSFL_TIMEWARN)
446 set_bit(LSFL_TIMEWARN, &ls->ls_flags); 470 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
447 471
@@ -457,8 +481,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
457 if (!ls->ls_rsbtbl) 481 if (!ls->ls_rsbtbl)
458 goto out_lsfree; 482 goto out_lsfree;
459 for (i = 0; i < size; i++) { 483 for (i = 0; i < size; i++) {
460 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list); 484 ls->ls_rsbtbl[i].keep.rb_node = NULL;
461 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss); 485 ls->ls_rsbtbl[i].toss.rb_node = NULL;
462 spin_lock_init(&ls->ls_rsbtbl[i].lock); 486 spin_lock_init(&ls->ls_rsbtbl[i].lock);
463 } 487 }
464 488
@@ -525,6 +549,11 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
525 if (!ls->ls_recover_buf) 549 if (!ls->ls_recover_buf)
526 goto out_dirfree; 550 goto out_dirfree;
527 551
552 ls->ls_slot = 0;
553 ls->ls_num_slots = 0;
554 ls->ls_slots_size = 0;
555 ls->ls_slots = NULL;
556
528 INIT_LIST_HEAD(&ls->ls_recover_list); 557 INIT_LIST_HEAD(&ls->ls_recover_list);
529 spin_lock_init(&ls->ls_recover_list_lock); 558 spin_lock_init(&ls->ls_recover_list_lock);
530 ls->ls_recover_list_count = 0; 559 ls->ls_recover_list_count = 0;
@@ -614,8 +643,10 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
614 return error; 643 return error;
615} 644}
616 645
617int dlm_new_lockspace(const char *name, int namelen, void **lockspace, 646int dlm_new_lockspace(const char *name, const char *cluster,
618 uint32_t flags, int lvblen) 647 uint32_t flags, int lvblen,
648 const struct dlm_lockspace_ops *ops, void *ops_arg,
649 int *ops_result, dlm_lockspace_t **lockspace)
619{ 650{
620 int error = 0; 651 int error = 0;
621 652
@@ -625,7 +656,8 @@ int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
625 if (error) 656 if (error)
626 goto out; 657 goto out;
627 658
628 error = new_lockspace(name, namelen, lockspace, flags, lvblen); 659 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
660 ops_result, lockspace);
629 if (!error) 661 if (!error)
630 ls_count++; 662 ls_count++;
631 if (error > 0) 663 if (error > 0)
@@ -685,7 +717,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
685static int release_lockspace(struct dlm_ls *ls, int force) 717static int release_lockspace(struct dlm_ls *ls, int force)
686{ 718{
687 struct dlm_rsb *rsb; 719 struct dlm_rsb *rsb;
688 struct list_head *head; 720 struct rb_node *n;
689 int i, busy, rv; 721 int i, busy, rv;
690 722
691 busy = lockspace_busy(ls, force); 723 busy = lockspace_busy(ls, force);
@@ -746,20 +778,15 @@ static int release_lockspace(struct dlm_ls *ls, int force)
746 */ 778 */
747 779
748 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 780 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
749 head = &ls->ls_rsbtbl[i].list; 781 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
750 while (!list_empty(head)) { 782 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
751 rsb = list_entry(head->next, struct dlm_rsb, 783 rb_erase(n, &ls->ls_rsbtbl[i].keep);
752 res_hashchain);
753
754 list_del(&rsb->res_hashchain);
755 dlm_free_rsb(rsb); 784 dlm_free_rsb(rsb);
756 } 785 }
757 786
758 head = &ls->ls_rsbtbl[i].toss; 787 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
759 while (!list_empty(head)) { 788 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
760 rsb = list_entry(head->next, struct dlm_rsb, 789 rb_erase(n, &ls->ls_rsbtbl[i].toss);
761 res_hashchain);
762 list_del(&rsb->res_hashchain);
763 dlm_free_rsb(rsb); 790 dlm_free_rsb(rsb);
764 } 791 }
765 } 792 }
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index b12532e553f8..862640a36d5c 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -1,7 +1,7 @@
1/****************************************************************************** 1/******************************************************************************
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) 2005-2009 Red Hat, Inc. All rights reserved. 4** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved.
5** 5**
6** This copyrighted material is made available to anyone wishing to use, 6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions 7** modify, copy, or redistribute it subject to the terms and conditions
@@ -19,6 +19,280 @@
19#include "config.h" 19#include "config.h"
20#include "lowcomms.h" 20#include "lowcomms.h"
21 21
22int dlm_slots_version(struct dlm_header *h)
23{
24 if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
25 return 0;
26 return 1;
27}
28
29void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
30 struct dlm_member *memb)
31{
32 struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
33
34 if (!dlm_slots_version(&rc->rc_header))
35 return;
36
37 memb->slot = le16_to_cpu(rf->rf_our_slot);
38 memb->generation = le32_to_cpu(rf->rf_generation);
39}
40
41void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
42{
43 struct dlm_slot *slot;
44 struct rcom_slot *ro;
45 int i;
46
47 ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
48
49 /* ls_slots array is sparse, but not rcom_slots */
50
51 for (i = 0; i < ls->ls_slots_size; i++) {
52 slot = &ls->ls_slots[i];
53 if (!slot->nodeid)
54 continue;
55 ro->ro_nodeid = cpu_to_le32(slot->nodeid);
56 ro->ro_slot = cpu_to_le16(slot->slot);
57 ro++;
58 }
59}
60
61#define SLOT_DEBUG_LINE 128
62
63static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
64 struct rcom_slot *ro0, struct dlm_slot *array,
65 int array_size)
66{
67 char line[SLOT_DEBUG_LINE];
68 int len = SLOT_DEBUG_LINE - 1;
69 int pos = 0;
70 int ret, i;
71
72 if (!dlm_config.ci_log_debug)
73 return;
74
75 memset(line, 0, sizeof(line));
76
77 if (array) {
78 for (i = 0; i < array_size; i++) {
79 if (!array[i].nodeid)
80 continue;
81
82 ret = snprintf(line + pos, len - pos, " %d:%d",
83 array[i].slot, array[i].nodeid);
84 if (ret >= len - pos)
85 break;
86 pos += ret;
87 }
88 } else if (ro0) {
89 for (i = 0; i < num_slots; i++) {
90 ret = snprintf(line + pos, len - pos, " %d:%d",
91 ro0[i].ro_slot, ro0[i].ro_nodeid);
92 if (ret >= len - pos)
93 break;
94 pos += ret;
95 }
96 }
97
98 log_debug(ls, "generation %u slots %d%s", gen, num_slots, line);
99}
100
101int dlm_slots_copy_in(struct dlm_ls *ls)
102{
103 struct dlm_member *memb;
104 struct dlm_rcom *rc = ls->ls_recover_buf;
105 struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
106 struct rcom_slot *ro0, *ro;
107 int our_nodeid = dlm_our_nodeid();
108 int i, num_slots;
109 uint32_t gen;
110
111 if (!dlm_slots_version(&rc->rc_header))
112 return -1;
113
114 gen = le32_to_cpu(rf->rf_generation);
115 if (gen <= ls->ls_generation) {
116 log_error(ls, "dlm_slots_copy_in gen %u old %u",
117 gen, ls->ls_generation);
118 }
119 ls->ls_generation = gen;
120
121 num_slots = le16_to_cpu(rf->rf_num_slots);
122 if (!num_slots)
123 return -1;
124
125 ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
126
127 for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
128 ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
129 ro->ro_slot = le16_to_cpu(ro->ro_slot);
130 }
131
132 log_debug_slots(ls, gen, num_slots, ro0, NULL, 0);
133
134 list_for_each_entry(memb, &ls->ls_nodes, list) {
135 for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
136 if (ro->ro_nodeid != memb->nodeid)
137 continue;
138 memb->slot = ro->ro_slot;
139 memb->slot_prev = memb->slot;
140 break;
141 }
142
143 if (memb->nodeid == our_nodeid) {
144 if (ls->ls_slot && ls->ls_slot != memb->slot) {
145 log_error(ls, "dlm_slots_copy_in our slot "
146 "changed %d %d", ls->ls_slot,
147 memb->slot);
148 return -1;
149 }
150
151 if (!ls->ls_slot)
152 ls->ls_slot = memb->slot;
153 }
154
155 if (!memb->slot) {
156 log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
157 memb->nodeid);
158 return -1;
159 }
160 }
161
162 return 0;
163}
164
165/* for any nodes that do not support slots, we will not have set memb->slot
166 in wait_status_all(), so memb->slot will remain -1, and we will not
167 assign slots or set ls_num_slots here */
168
169int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
170 struct dlm_slot **slots_out, uint32_t *gen_out)
171{
172 struct dlm_member *memb;
173 struct dlm_slot *array;
174 int our_nodeid = dlm_our_nodeid();
175 int array_size, max_slots, i;
176 int need = 0;
177 int max = 0;
178 int num = 0;
179 uint32_t gen = 0;
180
181 /* our own memb struct will have slot -1 gen 0 */
182
183 list_for_each_entry(memb, &ls->ls_nodes, list) {
184 if (memb->nodeid == our_nodeid) {
185 memb->slot = ls->ls_slot;
186 memb->generation = ls->ls_generation;
187 break;
188 }
189 }
190
191 list_for_each_entry(memb, &ls->ls_nodes, list) {
192 if (memb->generation > gen)
193 gen = memb->generation;
194
195 /* node doesn't support slots */
196
197 if (memb->slot == -1)
198 return -1;
199
200 /* node needs a slot assigned */
201
202 if (!memb->slot)
203 need++;
204
205 /* node has a slot assigned */
206
207 num++;
208
209 if (!max || max < memb->slot)
210 max = memb->slot;
211
212 /* sanity check, once slot is assigned it shouldn't change */
213
214 if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
215 log_error(ls, "nodeid %d slot changed %d %d",
216 memb->nodeid, memb->slot_prev, memb->slot);
217 return -1;
218 }
219 memb->slot_prev = memb->slot;
220 }
221
222 array_size = max + need;
223
224 array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
225 if (!array)
226 return -ENOMEM;
227
228 num = 0;
229
230 /* fill in slots (offsets) that are used */
231
232 list_for_each_entry(memb, &ls->ls_nodes, list) {
233 if (!memb->slot)
234 continue;
235
236 if (memb->slot > array_size) {
237 log_error(ls, "invalid slot number %d", memb->slot);
238 kfree(array);
239 return -1;
240 }
241
242 array[memb->slot - 1].nodeid = memb->nodeid;
243 array[memb->slot - 1].slot = memb->slot;
244 num++;
245 }
246
247 /* assign new slots from unused offsets */
248
249 list_for_each_entry(memb, &ls->ls_nodes, list) {
250 if (memb->slot)
251 continue;
252
253 for (i = 0; i < array_size; i++) {
254 if (array[i].nodeid)
255 continue;
256
257 memb->slot = i + 1;
258 memb->slot_prev = memb->slot;
259 array[i].nodeid = memb->nodeid;
260 array[i].slot = memb->slot;
261 num++;
262
263 if (!ls->ls_slot && memb->nodeid == our_nodeid)
264 ls->ls_slot = memb->slot;
265 break;
266 }
267
268 if (!memb->slot) {
269 log_error(ls, "no free slot found");
270 kfree(array);
271 return -1;
272 }
273 }
274
275 gen++;
276
277 log_debug_slots(ls, gen, num, NULL, array, array_size);
278
279 max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
280 sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
281
282 if (num > max_slots) {
283 log_error(ls, "num_slots %d exceeds max_slots %d",
284 num, max_slots);
285 kfree(array);
286 return -1;
287 }
288
289 *gen_out = gen;
290 *slots_out = array;
291 *slots_size = array_size;
292 *num_slots = num;
293 return 0;
294}
295
22static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) 296static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
23{ 297{
24 struct dlm_member *memb = NULL; 298 struct dlm_member *memb = NULL;
@@ -43,59 +317,51 @@ static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
43 } 317 }
44} 318}
45 319
46static int dlm_add_member(struct dlm_ls *ls, int nodeid) 320static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
47{ 321{
48 struct dlm_member *memb; 322 struct dlm_member *memb;
49 int w, error; 323 int error;
50 324
51 memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS); 325 memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
52 if (!memb) 326 if (!memb)
53 return -ENOMEM; 327 return -ENOMEM;
54 328
55 w = dlm_node_weight(ls->ls_name, nodeid); 329 error = dlm_lowcomms_connect_node(node->nodeid);
56 if (w < 0) {
57 kfree(memb);
58 return w;
59 }
60
61 error = dlm_lowcomms_connect_node(nodeid);
62 if (error < 0) { 330 if (error < 0) {
63 kfree(memb); 331 kfree(memb);
64 return error; 332 return error;
65 } 333 }
66 334
67 memb->nodeid = nodeid; 335 memb->nodeid = node->nodeid;
68 memb->weight = w; 336 memb->weight = node->weight;
337 memb->comm_seq = node->comm_seq;
69 add_ordered_member(ls, memb); 338 add_ordered_member(ls, memb);
70 ls->ls_num_nodes++; 339 ls->ls_num_nodes++;
71 return 0; 340 return 0;
72} 341}
73 342
74static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb) 343static struct dlm_member *find_memb(struct list_head *head, int nodeid)
75{
76 list_move(&memb->list, &ls->ls_nodes_gone);
77 ls->ls_num_nodes--;
78}
79
80int dlm_is_member(struct dlm_ls *ls, int nodeid)
81{ 344{
82 struct dlm_member *memb; 345 struct dlm_member *memb;
83 346
84 list_for_each_entry(memb, &ls->ls_nodes, list) { 347 list_for_each_entry(memb, head, list) {
85 if (memb->nodeid == nodeid) 348 if (memb->nodeid == nodeid)
86 return 1; 349 return memb;
87 } 350 }
351 return NULL;
352}
353
354int dlm_is_member(struct dlm_ls *ls, int nodeid)
355{
356 if (find_memb(&ls->ls_nodes, nodeid))
357 return 1;
88 return 0; 358 return 0;
89} 359}
90 360
91int dlm_is_removed(struct dlm_ls *ls, int nodeid) 361int dlm_is_removed(struct dlm_ls *ls, int nodeid)
92{ 362{
93 struct dlm_member *memb; 363 if (find_memb(&ls->ls_nodes_gone, nodeid))
94 364 return 1;
95 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
96 if (memb->nodeid == nodeid)
97 return 1;
98 }
99 return 0; 365 return 0;
100} 366}
101 367
@@ -176,7 +442,7 @@ static int ping_members(struct dlm_ls *ls)
176 error = dlm_recovery_stopped(ls); 442 error = dlm_recovery_stopped(ls);
177 if (error) 443 if (error)
178 break; 444 break;
179 error = dlm_rcom_status(ls, memb->nodeid); 445 error = dlm_rcom_status(ls, memb->nodeid, 0);
180 if (error) 446 if (error)
181 break; 447 break;
182 } 448 }
@@ -186,10 +452,88 @@ static int ping_members(struct dlm_ls *ls)
186 return error; 452 return error;
187} 453}
188 454
455static void dlm_lsop_recover_prep(struct dlm_ls *ls)
456{
457 if (!ls->ls_ops || !ls->ls_ops->recover_prep)
458 return;
459 ls->ls_ops->recover_prep(ls->ls_ops_arg);
460}
461
462static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
463{
464 struct dlm_slot slot;
465 uint32_t seq;
466 int error;
467
468 if (!ls->ls_ops || !ls->ls_ops->recover_slot)
469 return;
470
471 /* if there is no comms connection with this node
472 or the present comms connection is newer
473 than the one when this member was added, then
474 we consider the node to have failed (versus
475 being removed due to dlm_release_lockspace) */
476
477 error = dlm_comm_seq(memb->nodeid, &seq);
478
479 if (!error && seq == memb->comm_seq)
480 return;
481
482 slot.nodeid = memb->nodeid;
483 slot.slot = memb->slot;
484
485 ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
486}
487
488void dlm_lsop_recover_done(struct dlm_ls *ls)
489{
490 struct dlm_member *memb;
491 struct dlm_slot *slots;
492 int i, num;
493
494 if (!ls->ls_ops || !ls->ls_ops->recover_done)
495 return;
496
497 num = ls->ls_num_nodes;
498
499 slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL);
500 if (!slots)
501 return;
502
503 i = 0;
504 list_for_each_entry(memb, &ls->ls_nodes, list) {
505 if (i == num) {
506 log_error(ls, "dlm_lsop_recover_done bad num %d", num);
507 goto out;
508 }
509 slots[i].nodeid = memb->nodeid;
510 slots[i].slot = memb->slot;
511 i++;
512 }
513
514 ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
515 ls->ls_slot, ls->ls_generation);
516 out:
517 kfree(slots);
518}
519
520static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
521 int nodeid)
522{
523 int i;
524
525 for (i = 0; i < rv->nodes_count; i++) {
526 if (rv->nodes[i].nodeid == nodeid)
527 return &rv->nodes[i];
528 }
529 return NULL;
530}
531
189int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) 532int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
190{ 533{
191 struct dlm_member *memb, *safe; 534 struct dlm_member *memb, *safe;
192 int i, error, found, pos = 0, neg = 0, low = -1; 535 struct dlm_config_node *node;
536 int i, error, neg = 0, low = -1;
193 537
194 /* previously removed members that we've not finished removing need to 538 /* previously removed members that we've not finished removing need to
195 count as a negative change so the "neg" recovery steps will happen */ 539 count as a negative change so the "neg" recovery steps will happen */
@@ -202,46 +546,32 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
202 /* move departed members from ls_nodes to ls_nodes_gone */ 546 /* move departed members from ls_nodes to ls_nodes_gone */
203 547
204 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { 548 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
205 found = 0; 549 node = find_config_node(rv, memb->nodeid);
206 for (i = 0; i < rv->node_count; i++) { 550 if (node && !node->new)
207 if (memb->nodeid == rv->nodeids[i]) { 551 continue;
208 found = 1;
209 break;
210 }
211 }
212 552
213 if (!found) { 553 if (!node) {
214 neg++;
215 dlm_remove_member(ls, memb);
216 log_debug(ls, "remove member %d", memb->nodeid); 554 log_debug(ls, "remove member %d", memb->nodeid);
555 } else {
556 /* removed and re-added */
557 log_debug(ls, "remove member %d comm_seq %u %u",
558 memb->nodeid, memb->comm_seq, node->comm_seq);
217 } 559 }
218 }
219
220 /* Add an entry to ls_nodes_gone for members that were removed and
221 then added again, so that previous state for these nodes will be
222 cleared during recovery. */
223
224 for (i = 0; i < rv->new_count; i++) {
225 if (!dlm_is_member(ls, rv->new[i]))
226 continue;
227 log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
228 560
229 memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
230 if (!memb)
231 return -ENOMEM;
232 memb->nodeid = rv->new[i];
233 list_add_tail(&memb->list, &ls->ls_nodes_gone);
234 neg++; 561 neg++;
562 list_move(&memb->list, &ls->ls_nodes_gone);
563 ls->ls_num_nodes--;
564 dlm_lsop_recover_slot(ls, memb);
235 } 565 }
236 566
237 /* add new members to ls_nodes */ 567 /* add new members to ls_nodes */
238 568
239 for (i = 0; i < rv->node_count; i++) { 569 for (i = 0; i < rv->nodes_count; i++) {
240 if (dlm_is_member(ls, rv->nodeids[i])) 570 node = &rv->nodes[i];
571 if (dlm_is_member(ls, node->nodeid))
241 continue; 572 continue;
242 dlm_add_member(ls, rv->nodeids[i]); 573 dlm_add_member(ls, node);
243 pos++; 574 log_debug(ls, "add member %d", node->nodeid);
244 log_debug(ls, "add member %d", rv->nodeids[i]);
245 } 575 }
246 576
247 list_for_each_entry(memb, &ls->ls_nodes, list) { 577 list_for_each_entry(memb, &ls->ls_nodes, list) {
@@ -251,7 +581,6 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
251 ls->ls_low_nodeid = low; 581 ls->ls_low_nodeid = low;
252 582
253 make_member_array(ls); 583 make_member_array(ls);
254 dlm_set_recover_status(ls, DLM_RS_NODES);
255 *neg_out = neg; 584 *neg_out = neg;
256 585
257 error = ping_members(ls); 586 error = ping_members(ls);
@@ -261,12 +590,8 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
261 ls->ls_members_result = error; 590 ls->ls_members_result = error;
262 complete(&ls->ls_members_done); 591 complete(&ls->ls_members_done);
263 } 592 }
264 if (error)
265 goto out;
266 593
267 error = dlm_recover_members_wait(ls); 594 log_debug(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
268 out:
269 log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
270 return error; 595 return error;
271} 596}
272 597
@@ -327,26 +652,35 @@ int dlm_ls_stop(struct dlm_ls *ls)
327 */ 652 */
328 653
329 dlm_recoverd_suspend(ls); 654 dlm_recoverd_suspend(ls);
655
656 spin_lock(&ls->ls_recover_lock);
657 kfree(ls->ls_slots);
658 ls->ls_slots = NULL;
659 ls->ls_num_slots = 0;
660 ls->ls_slots_size = 0;
330 ls->ls_recover_status = 0; 661 ls->ls_recover_status = 0;
662 spin_unlock(&ls->ls_recover_lock);
663
331 dlm_recoverd_resume(ls); 664 dlm_recoverd_resume(ls);
332 665
333 if (!ls->ls_recover_begin) 666 if (!ls->ls_recover_begin)
334 ls->ls_recover_begin = jiffies; 667 ls->ls_recover_begin = jiffies;
668
669 dlm_lsop_recover_prep(ls);
335 return 0; 670 return 0;
336} 671}
337 672
338int dlm_ls_start(struct dlm_ls *ls) 673int dlm_ls_start(struct dlm_ls *ls)
339{ 674{
340 struct dlm_recover *rv = NULL, *rv_old; 675 struct dlm_recover *rv = NULL, *rv_old;
341 int *ids = NULL, *new = NULL; 676 struct dlm_config_node *nodes;
342 int error, ids_count = 0, new_count = 0; 677 int error, count;
343 678
344 rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS); 679 rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
345 if (!rv) 680 if (!rv)
346 return -ENOMEM; 681 return -ENOMEM;
347 682
348 error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count, 683 error = dlm_config_nodes(ls->ls_name, &nodes, &count);
349 &new, &new_count);
350 if (error < 0) 684 if (error < 0)
351 goto fail; 685 goto fail;
352 686
@@ -361,10 +695,8 @@ int dlm_ls_start(struct dlm_ls *ls)
361 goto fail; 695 goto fail;
362 } 696 }
363 697
364 rv->nodeids = ids; 698 rv->nodes = nodes;
365 rv->node_count = ids_count; 699 rv->nodes_count = count;
366 rv->new = new;
367 rv->new_count = new_count;
368 rv->seq = ++ls->ls_recover_seq; 700 rv->seq = ++ls->ls_recover_seq;
369 rv_old = ls->ls_recover_args; 701 rv_old = ls->ls_recover_args;
370 ls->ls_recover_args = rv; 702 ls->ls_recover_args = rv;
@@ -372,9 +704,8 @@ int dlm_ls_start(struct dlm_ls *ls)
372 704
373 if (rv_old) { 705 if (rv_old) {
374 log_error(ls, "unused recovery %llx %d", 706 log_error(ls, "unused recovery %llx %d",
375 (unsigned long long)rv_old->seq, rv_old->node_count); 707 (unsigned long long)rv_old->seq, rv_old->nodes_count);
376 kfree(rv_old->nodeids); 708 kfree(rv_old->nodes);
377 kfree(rv_old->new);
378 kfree(rv_old); 709 kfree(rv_old);
379 } 710 }
380 711
@@ -383,8 +714,7 @@ int dlm_ls_start(struct dlm_ls *ls)
383 714
384 fail: 715 fail:
385 kfree(rv); 716 kfree(rv);
386 kfree(ids); 717 kfree(nodes);
387 kfree(new);
388 return error; 718 return error;
389} 719}
390 720
diff --git a/fs/dlm/member.h b/fs/dlm/member.h
index 7a26fca1e0b5..3deb70661c69 100644
--- a/fs/dlm/member.h
+++ b/fs/dlm/member.h
@@ -1,7 +1,7 @@
1/****************************************************************************** 1/******************************************************************************
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. 4** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved.
5** 5**
6** This copyrighted material is made available to anyone wishing to use, 6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions 7** modify, copy, or redistribute it subject to the terms and conditions
@@ -20,6 +20,14 @@ void dlm_clear_members_gone(struct dlm_ls *ls);
20int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out); 20int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out);
21int dlm_is_removed(struct dlm_ls *ls, int nodeid); 21int dlm_is_removed(struct dlm_ls *ls, int nodeid);
22int dlm_is_member(struct dlm_ls *ls, int nodeid); 22int dlm_is_member(struct dlm_ls *ls, int nodeid);
23int dlm_slots_version(struct dlm_header *h);
24void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
25 struct dlm_member *memb);
26void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc);
27int dlm_slots_copy_in(struct dlm_ls *ls);
28int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
29 struct dlm_slot **slots_out, uint32_t *gen_out);
30void dlm_lsop_recover_done(struct dlm_ls *ls);
23 31
24#endif /* __MEMBER_DOT_H__ */ 32#endif /* __MEMBER_DOT_H__ */
25 33
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index f10a50f24e8f..ac5c616c9696 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,6 +23,7 @@
23#include "memory.h" 23#include "memory.h"
24#include "lock.h" 24#include "lock.h"
25#include "util.h" 25#include "util.h"
26#include "member.h"
26 27
27 28
28static int rcom_response(struct dlm_ls *ls) 29static int rcom_response(struct dlm_ls *ls)
@@ -72,20 +73,30 @@ static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
72 dlm_lowcomms_commit_buffer(mh); 73 dlm_lowcomms_commit_buffer(mh);
73} 74}
74 75
76static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
77 uint32_t flags)
78{
79 rs->rs_flags = cpu_to_le32(flags);
80}
81
75/* When replying to a status request, a node also sends back its 82/* When replying to a status request, a node also sends back its
76 configuration values. The requesting node then checks that the remote 83 configuration values. The requesting node then checks that the remote
77 node is configured the same way as itself. */ 84 node is configured the same way as itself. */
78 85
79static void make_config(struct dlm_ls *ls, struct rcom_config *rf) 86static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
87 uint32_t num_slots)
80{ 88{
81 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen); 89 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
82 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags); 90 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
91
92 rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
93 rf->rf_num_slots = cpu_to_le16(num_slots);
94 rf->rf_generation = cpu_to_le32(ls->ls_generation);
83} 95}
84 96
85static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 97static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
86{ 98{
87 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf; 99 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
88 size_t conf_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
89 100
90 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) { 101 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
91 log_error(ls, "version mismatch: %x nodeid %d: %x", 102 log_error(ls, "version mismatch: %x nodeid %d: %x",
@@ -94,12 +105,6 @@ static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
94 return -EPROTO; 105 return -EPROTO;
95 } 106 }
96 107
97 if (rc->rc_header.h_length < conf_size) {
98 log_error(ls, "config too short: %d nodeid %d",
99 rc->rc_header.h_length, nodeid);
100 return -EPROTO;
101 }
102
103 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen || 108 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
104 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) { 109 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
105 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x", 110 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
@@ -127,7 +132,18 @@ static void disallow_sync_reply(struct dlm_ls *ls)
127 spin_unlock(&ls->ls_rcom_spin); 132 spin_unlock(&ls->ls_rcom_spin);
128} 133}
129 134
130int dlm_rcom_status(struct dlm_ls *ls, int nodeid) 135/*
136 * low nodeid gathers one slot value at a time from each node.
137 * it sets need_slots=0, and saves rf_our_slot returned from each
138 * rcom_config.
139 *
140 * other nodes gather all slot values at once from the low nodeid.
141 * they set need_slots=1, and ignore the rf_our_slot returned from each
142 * rcom_config. they use the rf_num_slots returned from the low
143 * node's rcom_config.
144 */
145
146int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
131{ 147{
132 struct dlm_rcom *rc; 148 struct dlm_rcom *rc;
133 struct dlm_mhandle *mh; 149 struct dlm_mhandle *mh;
@@ -141,10 +157,13 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
141 goto out; 157 goto out;
142 } 158 }
143 159
144 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh); 160 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
161 sizeof(struct rcom_status), &rc, &mh);
145 if (error) 162 if (error)
146 goto out; 163 goto out;
147 164
165 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
166
148 allow_sync_reply(ls, &rc->rc_id); 167 allow_sync_reply(ls, &rc->rc_id);
149 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size); 168 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
150 169
@@ -161,8 +180,11 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
161 /* we pretend the remote lockspace exists with 0 status */ 180 /* we pretend the remote lockspace exists with 0 status */
162 log_debug(ls, "remote node %d not ready", nodeid); 181 log_debug(ls, "remote node %d not ready", nodeid);
163 rc->rc_result = 0; 182 rc->rc_result = 0;
164 } else 183 error = 0;
165 error = check_config(ls, rc, nodeid); 184 } else {
185 error = check_rcom_config(ls, rc, nodeid);
186 }
187
166 /* the caller looks at rc_result for the remote recovery status */ 188 /* the caller looks at rc_result for the remote recovery status */
167 out: 189 out:
168 return error; 190 return error;
@@ -172,17 +194,60 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
172{ 194{
173 struct dlm_rcom *rc; 195 struct dlm_rcom *rc;
174 struct dlm_mhandle *mh; 196 struct dlm_mhandle *mh;
175 int error, nodeid = rc_in->rc_header.h_nodeid; 197 struct rcom_status *rs;
198 uint32_t status;
199 int nodeid = rc_in->rc_header.h_nodeid;
200 int len = sizeof(struct rcom_config);
201 int num_slots = 0;
202 int error;
203
204 if (!dlm_slots_version(&rc_in->rc_header)) {
205 status = dlm_recover_status(ls);
206 goto do_create;
207 }
208
209 rs = (struct rcom_status *)rc_in->rc_buf;
176 210
211 if (!(rs->rs_flags & DLM_RSF_NEED_SLOTS)) {
212 status = dlm_recover_status(ls);
213 goto do_create;
214 }
215
216 spin_lock(&ls->ls_recover_lock);
217 status = ls->ls_recover_status;
218 num_slots = ls->ls_num_slots;
219 spin_unlock(&ls->ls_recover_lock);
220 len += num_slots * sizeof(struct rcom_slot);
221
222 do_create:
177 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY, 223 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
178 sizeof(struct rcom_config), &rc, &mh); 224 len, &rc, &mh);
179 if (error) 225 if (error)
180 return; 226 return;
227
181 rc->rc_id = rc_in->rc_id; 228 rc->rc_id = rc_in->rc_id;
182 rc->rc_seq_reply = rc_in->rc_seq; 229 rc->rc_seq_reply = rc_in->rc_seq;
183 rc->rc_result = dlm_recover_status(ls); 230 rc->rc_result = status;
184 make_config(ls, (struct rcom_config *) rc->rc_buf); 231
232 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
233
234 if (!num_slots)
235 goto do_send;
236
237 spin_lock(&ls->ls_recover_lock);
238 if (ls->ls_num_slots != num_slots) {
239 spin_unlock(&ls->ls_recover_lock);
240 log_debug(ls, "receive_rcom_status num_slots %d to %d",
241 num_slots, ls->ls_num_slots);
242 rc->rc_result = 0;
243 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
244 goto do_send;
245 }
246
247 dlm_slots_copy_out(ls, rc);
248 spin_unlock(&ls->ls_recover_lock);
185 249
250 do_send:
186 send_rcom(ls, mh, rc); 251 send_rcom(ls, mh, rc);
187} 252}
188 253
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index b09abd29ba38..206723ab744d 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -14,7 +14,7 @@
14#ifndef __RCOM_DOT_H__ 14#ifndef __RCOM_DOT_H__
15#define __RCOM_DOT_H__ 15#define __RCOM_DOT_H__
16 16
17int dlm_rcom_status(struct dlm_ls *ls, int nodeid); 17int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
18int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); 18int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
19int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); 19int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
20int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 20int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 14638235f7b2..34d5adf1fce7 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -85,14 +85,20 @@ uint32_t dlm_recover_status(struct dlm_ls *ls)
85 return status; 85 return status;
86} 86}
87 87
88static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
89{
90 ls->ls_recover_status |= status;
91}
92
88void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status) 93void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
89{ 94{
90 spin_lock(&ls->ls_recover_lock); 95 spin_lock(&ls->ls_recover_lock);
91 ls->ls_recover_status |= status; 96 _set_recover_status(ls, status);
92 spin_unlock(&ls->ls_recover_lock); 97 spin_unlock(&ls->ls_recover_lock);
93} 98}
94 99
95static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status) 100static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
101 int save_slots)
96{ 102{
97 struct dlm_rcom *rc = ls->ls_recover_buf; 103 struct dlm_rcom *rc = ls->ls_recover_buf;
98 struct dlm_member *memb; 104 struct dlm_member *memb;
@@ -106,10 +112,13 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
106 goto out; 112 goto out;
107 } 113 }
108 114
109 error = dlm_rcom_status(ls, memb->nodeid); 115 error = dlm_rcom_status(ls, memb->nodeid, 0);
110 if (error) 116 if (error)
111 goto out; 117 goto out;
112 118
119 if (save_slots)
120 dlm_slot_save(ls, rc, memb);
121
113 if (rc->rc_result & wait_status) 122 if (rc->rc_result & wait_status)
114 break; 123 break;
115 if (delay < 1000) 124 if (delay < 1000)
@@ -121,7 +130,8 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
121 return error; 130 return error;
122} 131}
123 132
124static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status) 133static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
134 uint32_t status_flags)
125{ 135{
126 struct dlm_rcom *rc = ls->ls_recover_buf; 136 struct dlm_rcom *rc = ls->ls_recover_buf;
127 int error = 0, delay = 0, nodeid = ls->ls_low_nodeid; 137 int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
@@ -132,7 +142,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
132 goto out; 142 goto out;
133 } 143 }
134 144
135 error = dlm_rcom_status(ls, nodeid); 145 error = dlm_rcom_status(ls, nodeid, status_flags);
136 if (error) 146 if (error)
137 break; 147 break;
138 148
@@ -152,18 +162,56 @@ static int wait_status(struct dlm_ls *ls, uint32_t status)
152 int error; 162 int error;
153 163
154 if (ls->ls_low_nodeid == dlm_our_nodeid()) { 164 if (ls->ls_low_nodeid == dlm_our_nodeid()) {
155 error = wait_status_all(ls, status); 165 error = wait_status_all(ls, status, 0);
156 if (!error) 166 if (!error)
157 dlm_set_recover_status(ls, status_all); 167 dlm_set_recover_status(ls, status_all);
158 } else 168 } else
159 error = wait_status_low(ls, status_all); 169 error = wait_status_low(ls, status_all, 0);
160 170
161 return error; 171 return error;
162} 172}
163 173
164int dlm_recover_members_wait(struct dlm_ls *ls) 174int dlm_recover_members_wait(struct dlm_ls *ls)
165{ 175{
166 return wait_status(ls, DLM_RS_NODES); 176 struct dlm_member *memb;
177 struct dlm_slot *slots;
178 int num_slots, slots_size;
179 int error, rv;
180 uint32_t gen;
181
182 list_for_each_entry(memb, &ls->ls_nodes, list) {
183 memb->slot = -1;
184 memb->generation = 0;
185 }
186
187 if (ls->ls_low_nodeid == dlm_our_nodeid()) {
188 error = wait_status_all(ls, DLM_RS_NODES, 1);
189 if (error)
190 goto out;
191
192 /* slots array is sparse, slots_size may be > num_slots */
193
194 rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
195 if (!rv) {
196 spin_lock(&ls->ls_recover_lock);
197 _set_recover_status(ls, DLM_RS_NODES_ALL);
198 ls->ls_num_slots = num_slots;
199 ls->ls_slots_size = slots_size;
200 ls->ls_slots = slots;
201 ls->ls_generation = gen;
202 spin_unlock(&ls->ls_recover_lock);
203 } else {
204 dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
205 }
206 } else {
207 error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
208 if (error)
209 goto out;
210
211 dlm_slots_copy_in(ls);
212 }
213 out:
214 return error;
167} 215}
168 216
169int dlm_recover_directory_wait(struct dlm_ls *ls) 217int dlm_recover_directory_wait(struct dlm_ls *ls)
@@ -542,8 +590,6 @@ int dlm_recover_locks(struct dlm_ls *ls)
542 out: 590 out:
543 if (error) 591 if (error)
544 recover_list_clear(ls); 592 recover_list_clear(ls);
545 else
546 dlm_set_recover_status(ls, DLM_RS_LOCKS);
547 return error; 593 return error;
548} 594}
549 595
@@ -715,6 +761,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
715 761
716int dlm_create_root_list(struct dlm_ls *ls) 762int dlm_create_root_list(struct dlm_ls *ls)
717{ 763{
764 struct rb_node *n;
718 struct dlm_rsb *r; 765 struct dlm_rsb *r;
719 int i, error = 0; 766 int i, error = 0;
720 767
@@ -727,7 +774,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
727 774
728 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 775 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
729 spin_lock(&ls->ls_rsbtbl[i].lock); 776 spin_lock(&ls->ls_rsbtbl[i].lock);
730 list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) { 777 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
778 r = rb_entry(n, struct dlm_rsb, res_hashnode);
731 list_add(&r->res_root_list, &ls->ls_root_list); 779 list_add(&r->res_root_list, &ls->ls_root_list);
732 dlm_hold_rsb(r); 780 dlm_hold_rsb(r);
733 } 781 }
@@ -741,7 +789,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
741 continue; 789 continue;
742 } 790 }
743 791
744 list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) { 792 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
793 r = rb_entry(n, struct dlm_rsb, res_hashnode);
745 list_add(&r->res_root_list, &ls->ls_root_list); 794 list_add(&r->res_root_list, &ls->ls_root_list);
746 dlm_hold_rsb(r); 795 dlm_hold_rsb(r);
747 } 796 }
@@ -771,16 +820,18 @@ void dlm_release_root_list(struct dlm_ls *ls)
771 820
772void dlm_clear_toss_list(struct dlm_ls *ls) 821void dlm_clear_toss_list(struct dlm_ls *ls)
773{ 822{
774 struct dlm_rsb *r, *safe; 823 struct rb_node *n, *next;
824 struct dlm_rsb *rsb;
775 int i; 825 int i;
776 826
777 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 827 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
778 spin_lock(&ls->ls_rsbtbl[i].lock); 828 spin_lock(&ls->ls_rsbtbl[i].lock);
779 list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss, 829 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
780 res_hashchain) { 830 next = rb_next(n);;
781 if (dlm_no_directory(ls) || !is_master(r)) { 831 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
782 list_del(&r->res_hashchain); 832 if (dlm_no_directory(ls) || !is_master(rsb)) {
783 dlm_free_rsb(r); 833 rb_erase(n, &ls->ls_rsbtbl[i].toss);
834 dlm_free_rsb(rsb);
784 } 835 }
785 } 836 }
786 spin_unlock(&ls->ls_rsbtbl[i].lock); 837 spin_unlock(&ls->ls_rsbtbl[i].lock);
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 774da3cf92c6..3780caf7ae0c 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -54,7 +54,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
54 unsigned long start; 54 unsigned long start;
55 int error, neg = 0; 55 int error, neg = 0;
56 56
57 log_debug(ls, "recover %llx", (unsigned long long)rv->seq); 57 log_debug(ls, "dlm_recover %llx", (unsigned long long)rv->seq);
58 58
59 mutex_lock(&ls->ls_recoverd_active); 59 mutex_lock(&ls->ls_recoverd_active);
60 60
@@ -76,14 +76,22 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
76 76
77 /* 77 /*
78 * Add or remove nodes from the lockspace's ls_nodes list. 78 * Add or remove nodes from the lockspace's ls_nodes list.
79 * Also waits for all nodes to complete dlm_recover_members.
80 */ 79 */
81 80
82 error = dlm_recover_members(ls, rv, &neg); 81 error = dlm_recover_members(ls, rv, &neg);
83 if (error) { 82 if (error) {
84 log_debug(ls, "recover_members failed %d", error); 83 log_debug(ls, "dlm_recover_members error %d", error);
85 goto fail; 84 goto fail;
86 } 85 }
86
87 dlm_set_recover_status(ls, DLM_RS_NODES);
88
89 error = dlm_recover_members_wait(ls);
90 if (error) {
91 log_debug(ls, "dlm_recover_members_wait error %d", error);
92 goto fail;
93 }
94
87 start = jiffies; 95 start = jiffies;
88 96
89 /* 97 /*
@@ -93,17 +101,15 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
93 101
94 error = dlm_recover_directory(ls); 102 error = dlm_recover_directory(ls);
95 if (error) { 103 if (error) {
96 log_debug(ls, "recover_directory failed %d", error); 104 log_debug(ls, "dlm_recover_directory error %d", error);
97 goto fail; 105 goto fail;
98 } 106 }
99 107
100 /* 108 dlm_set_recover_status(ls, DLM_RS_DIR);
101 * Wait for all nodes to complete directory rebuild.
102 */
103 109
104 error = dlm_recover_directory_wait(ls); 110 error = dlm_recover_directory_wait(ls);
105 if (error) { 111 if (error) {
106 log_debug(ls, "recover_directory_wait failed %d", error); 112 log_debug(ls, "dlm_recover_directory_wait error %d", error);
107 goto fail; 113 goto fail;
108 } 114 }
109 115
@@ -133,7 +139,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
133 139
134 error = dlm_recover_masters(ls); 140 error = dlm_recover_masters(ls);
135 if (error) { 141 if (error) {
136 log_debug(ls, "recover_masters failed %d", error); 142 log_debug(ls, "dlm_recover_masters error %d", error);
137 goto fail; 143 goto fail;
138 } 144 }
139 145
@@ -143,13 +149,15 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
143 149
144 error = dlm_recover_locks(ls); 150 error = dlm_recover_locks(ls);
145 if (error) { 151 if (error) {
146 log_debug(ls, "recover_locks failed %d", error); 152 log_debug(ls, "dlm_recover_locks error %d", error);
147 goto fail; 153 goto fail;
148 } 154 }
149 155
156 dlm_set_recover_status(ls, DLM_RS_LOCKS);
157
150 error = dlm_recover_locks_wait(ls); 158 error = dlm_recover_locks_wait(ls);
151 if (error) { 159 if (error) {
152 log_debug(ls, "recover_locks_wait failed %d", error); 160 log_debug(ls, "dlm_recover_locks_wait error %d", error);
153 goto fail; 161 goto fail;
154 } 162 }
155 163
@@ -170,7 +178,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
170 178
171 error = dlm_recover_locks_wait(ls); 179 error = dlm_recover_locks_wait(ls);
172 if (error) { 180 if (error) {
173 log_debug(ls, "recover_locks_wait failed %d", error); 181 log_debug(ls, "dlm_recover_locks_wait error %d", error);
174 goto fail; 182 goto fail;
175 } 183 }
176 } 184 }
@@ -186,9 +194,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
186 dlm_purge_requestqueue(ls); 194 dlm_purge_requestqueue(ls);
187 195
188 dlm_set_recover_status(ls, DLM_RS_DONE); 196 dlm_set_recover_status(ls, DLM_RS_DONE);
197
189 error = dlm_recover_done_wait(ls); 198 error = dlm_recover_done_wait(ls);
190 if (error) { 199 if (error) {
191 log_debug(ls, "recover_done_wait failed %d", error); 200 log_debug(ls, "dlm_recover_done_wait error %d", error);
192 goto fail; 201 goto fail;
193 } 202 }
194 203
@@ -200,34 +209,35 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
200 209
201 error = enable_locking(ls, rv->seq); 210 error = enable_locking(ls, rv->seq);
202 if (error) { 211 if (error) {
203 log_debug(ls, "enable_locking failed %d", error); 212 log_debug(ls, "enable_locking error %d", error);
204 goto fail; 213 goto fail;
205 } 214 }
206 215
207 error = dlm_process_requestqueue(ls); 216 error = dlm_process_requestqueue(ls);
208 if (error) { 217 if (error) {
209 log_debug(ls, "process_requestqueue failed %d", error); 218 log_debug(ls, "dlm_process_requestqueue error %d", error);
210 goto fail; 219 goto fail;
211 } 220 }
212 221
213 error = dlm_recover_waiters_post(ls); 222 error = dlm_recover_waiters_post(ls);
214 if (error) { 223 if (error) {
215 log_debug(ls, "recover_waiters_post failed %d", error); 224 log_debug(ls, "dlm_recover_waiters_post error %d", error);
216 goto fail; 225 goto fail;
217 } 226 }
218 227
219 dlm_grant_after_purge(ls); 228 dlm_grant_after_purge(ls);
220 229
221 log_debug(ls, "recover %llx done: %u ms", 230 log_debug(ls, "dlm_recover %llx generation %u done: %u ms",
222 (unsigned long long)rv->seq, 231 (unsigned long long)rv->seq, ls->ls_generation,
223 jiffies_to_msecs(jiffies - start)); 232 jiffies_to_msecs(jiffies - start));
224 mutex_unlock(&ls->ls_recoverd_active); 233 mutex_unlock(&ls->ls_recoverd_active);
225 234
235 dlm_lsop_recover_done(ls);
226 return 0; 236 return 0;
227 237
228 fail: 238 fail:
229 dlm_release_root_list(ls); 239 dlm_release_root_list(ls);
230 log_debug(ls, "recover %llx error %d", 240 log_debug(ls, "dlm_recover %llx error %d",
231 (unsigned long long)rv->seq, error); 241 (unsigned long long)rv->seq, error);
232 mutex_unlock(&ls->ls_recoverd_active); 242 mutex_unlock(&ls->ls_recoverd_active);
233 return error; 243 return error;
@@ -250,8 +260,7 @@ static void do_ls_recovery(struct dlm_ls *ls)
250 260
251 if (rv) { 261 if (rv) {
252 ls_recover(ls, rv); 262 ls_recover(ls, rv);
253 kfree(rv->nodeids); 263 kfree(rv->nodes);
254 kfree(rv->new);
255 kfree(rv); 264 kfree(rv);
256 } 265 }
257} 266}
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index d8ea60756403..eb4ed9ba3098 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -392,8 +392,9 @@ static int device_create_lockspace(struct dlm_lspace_params *params)
392 if (!capable(CAP_SYS_ADMIN)) 392 if (!capable(CAP_SYS_ADMIN))
393 return -EPERM; 393 return -EPERM;
394 394
395 error = dlm_new_lockspace(params->name, strlen(params->name), 395 error = dlm_new_lockspace(params->name, NULL, params->flags,
396 &lockspace, params->flags, DLM_USER_LVB_LEN); 396 DLM_USER_LVB_LEN, NULL, NULL, NULL,
397 &lockspace);
397 if (error) 398 if (error)
398 return error; 399 return error;
399 400
diff --git a/fs/gfs2/lock_dlm.c b/fs/gfs2/lock_dlm.c
index 98c80d8c2a62..ce85b62bc0a2 100644
--- a/fs/gfs2/lock_dlm.c
+++ b/fs/gfs2/lock_dlm.c
@@ -195,10 +195,10 @@ static int gdlm_mount(struct gfs2_sbd *sdp, const char *fsname)
195 return -EINVAL; 195 return -EINVAL;
196 } 196 }
197 197
198 error = dlm_new_lockspace(fsname, strlen(fsname), &ls->ls_dlm, 198 error = dlm_new_lockspace(fsname, NULL,
199 DLM_LSFL_FS | DLM_LSFL_NEWEXCL | 199 DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
200 (ls->ls_nodir ? DLM_LSFL_NODIR : 0), 200 (ls->ls_nodir ? DLM_LSFL_NODIR : 0),
201 GDLM_LVB_SIZE); 201 GDLM_LVB_SIZE, NULL, NULL, NULL, &ls->ls_dlm);
202 if (error) 202 if (error)
203 printk(KERN_ERR "dlm_new_lockspace error %d", error); 203 printk(KERN_ERR "dlm_new_lockspace error %d", error);
204 204
diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index a5ebe421195f..286edf1e231f 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -827,8 +827,8 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
827 goto out; 827 goto out;
828 } 828 }
829 829
830 rc = dlm_new_lockspace(conn->cc_name, strlen(conn->cc_name), 830 rc = dlm_new_lockspace(conn->cc_name, NULL, DLM_LSFL_FS, DLM_LVB_LEN,
831 &fsdlm, DLM_LSFL_FS, DLM_LVB_LEN); 831 NULL, NULL, NULL, &fsdlm);
832 if (rc) { 832 if (rc) {
833 ocfs2_live_connection_drop(control); 833 ocfs2_live_connection_drop(control);
834 goto out; 834 goto out;
diff --git a/include/linux/dlm.h b/include/linux/dlm.h
index d4e02f5353a0..6c7f6e9546c7 100644
--- a/include/linux/dlm.h
+++ b/include/linux/dlm.h
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -74,15 +74,76 @@ struct dlm_lksb {
74 74
75#ifdef __KERNEL__ 75#ifdef __KERNEL__
76 76
77struct dlm_slot {
78 int nodeid; /* 1 to MAX_INT */
79 int slot; /* 1 to MAX_INT */
80};
81
82/*
83 * recover_prep: called before the dlm begins lock recovery.
84 * Notfies lockspace user that locks from failed members will be granted.
85 * recover_slot: called after recover_prep and before recover_done.
86 * Identifies a failed lockspace member.
87 * recover_done: called after the dlm completes lock recovery.
88 * Identifies lockspace members and lockspace generation number.
89 */
90
91struct dlm_lockspace_ops {
92 void (*recover_prep) (void *ops_arg);
93 void (*recover_slot) (void *ops_arg, struct dlm_slot *slot);
94 void (*recover_done) (void *ops_arg, struct dlm_slot *slots,
95 int num_slots, int our_slot, uint32_t generation);
96};
97
77/* 98/*
78 * dlm_new_lockspace 99 * dlm_new_lockspace
79 * 100 *
80 * Starts a lockspace with the given name. If the named lockspace exists in 101 * Create/join a lockspace.
81 * the cluster, the calling node joins it. 102 *
103 * name: lockspace name, null terminated, up to DLM_LOCKSPACE_LEN (not
104 * including terminating null).
105 *
106 * cluster: cluster name, null terminated, up to DLM_LOCKSPACE_LEN (not
107 * including terminating null). Optional. When cluster is null, it
108 * is not used. When set, dlm_new_lockspace() returns -EBADR if cluster
109 * is not equal to the dlm cluster name.
110 *
111 * flags:
112 * DLM_LSFL_NODIR
113 * The dlm should not use a resource directory, but statically assign
114 * resource mastery to nodes based on the name hash that is otherwise
115 * used to select the directory node. Must be the same on all nodes.
116 * DLM_LSFL_TIMEWARN
117 * The dlm should emit netlink messages if locks have been waiting
118 * for a configurable amount of time. (Unused.)
119 * DLM_LSFL_FS
120 * The lockspace user is in the kernel (i.e. filesystem). Enables
121 * direct bast/cast callbacks.
122 * DLM_LSFL_NEWEXCL
123 * dlm_new_lockspace() should return -EEXIST if the lockspace exists.
124 *
125 * lvblen: length of lvb in bytes. Must be multiple of 8.
126 * dlm_new_lockspace() returns an error if this does not match
127 * what other nodes are using.
128 *
129 * ops: callbacks that indicate lockspace recovery points so the
130 * caller can coordinate its recovery and know lockspace members.
131 * This is only used by the initial dlm_new_lockspace() call.
132 * Optional.
133 *
134 * ops_arg: arg for ops callbacks.
135 *
136 * ops_result: tells caller if the ops callbacks (if provided) will
137 * be used or not. 0: will be used, -EXXX will not be used.
138 * -EOPNOTSUPP: the dlm does not have recovery_callbacks enabled.
139 *
140 * lockspace: handle for dlm functions
82 */ 141 */
83 142
84int dlm_new_lockspace(const char *name, int namelen, 143int dlm_new_lockspace(const char *name, const char *cluster,
85 dlm_lockspace_t **lockspace, uint32_t flags, int lvblen); 144 uint32_t flags, int lvblen,
145 const struct dlm_lockspace_ops *ops, void *ops_arg,
146 int *ops_result, dlm_lockspace_t **lockspace);
86 147
87/* 148/*
88 * dlm_release_lockspace 149 * dlm_release_lockspace