aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/dir.c
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2012-05-10 11:18:07 -0400
committerDavid Teigland <teigland@redhat.com>2012-07-16 15:16:19 -0400
commitc04fecb4d9f7753e0cbff7edd03ec68f8721cdce (patch)
treeecd82017d49c7bb03b96a8ad1eb4e9a5bb84409a /fs/dlm/dir.c
parentecc728467fb0c3e350b57fc66ed7585c15be50f5 (diff)
dlm: use rsbtbl as resource directory
Remove the dir hash table (dirtbl), and use the rsb hash table (rsbtbl) as the resource directory. It has always been an unnecessary duplication of information. This improves efficiency by using a single rsbtbl lookup in many cases where both rsbtbl and dirtbl lookups were needed previously. This eliminates the need to handle cases of rsbtbl and dirtbl being out of sync. In many cases there will be memory savings because the dir hash table no longer exists. Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm/dir.c')
-rw-r--r--fs/dlm/dir.c287
1 files changed, 69 insertions, 218 deletions
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index dc5eb598b81f..278a75cda446 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -23,50 +23,6 @@
23#include "lock.h" 23#include "lock.h"
24#include "dir.h" 24#include "dir.h"
25 25
26
27static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28{
29 spin_lock(&ls->ls_recover_list_lock);
30 list_add(&de->list, &ls->ls_recover_list);
31 spin_unlock(&ls->ls_recover_list_lock);
32}
33
34static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35{
36 int found = 0;
37 struct dlm_direntry *de;
38
39 spin_lock(&ls->ls_recover_list_lock);
40 list_for_each_entry(de, &ls->ls_recover_list, list) {
41 if (de->length == len) {
42 list_del(&de->list);
43 de->master_nodeid = 0;
44 memset(de->name, 0, len);
45 found = 1;
46 break;
47 }
48 }
49 spin_unlock(&ls->ls_recover_list_lock);
50
51 if (!found)
52 de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
53 return de;
54}
55
56void dlm_clear_free_entries(struct dlm_ls *ls)
57{
58 struct dlm_direntry *de;
59
60 spin_lock(&ls->ls_recover_list_lock);
61 while (!list_empty(&ls->ls_recover_list)) {
62 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63 list);
64 list_del(&de->list);
65 kfree(de);
66 }
67 spin_unlock(&ls->ls_recover_list_lock);
68}
69
70/* 26/*
71 * We use the upper 16 bits of the hash value to select the directory node. 27 * We use the upper 16 bits of the hash value to select the directory node.
72 * Low bits are used for distribution of rsb's among hash buckets on each node. 28 * Low bits are used for distribution of rsb's among hash buckets on each node.
@@ -78,144 +34,53 @@ void dlm_clear_free_entries(struct dlm_ls *ls)
78 34
79int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) 35int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80{ 36{
81 struct list_head *tmp; 37 uint32_t node;
82 struct dlm_member *memb = NULL;
83 uint32_t node, n = 0;
84 int nodeid;
85
86 if (ls->ls_num_nodes == 1) {
87 nodeid = dlm_our_nodeid();
88 goto out;
89 }
90 38
91 if (ls->ls_node_array) { 39 if (ls->ls_num_nodes == 1)
40 return dlm_our_nodeid();
41 else {
92 node = (hash >> 16) % ls->ls_total_weight; 42 node = (hash >> 16) % ls->ls_total_weight;
93 nodeid = ls->ls_node_array[node]; 43 return ls->ls_node_array[node];
94 goto out;
95 }
96
97 /* make_member_array() failed to kmalloc ls_node_array... */
98
99 node = (hash >> 16) % ls->ls_num_nodes;
100
101 list_for_each(tmp, &ls->ls_nodes) {
102 if (n++ != node)
103 continue;
104 memb = list_entry(tmp, struct dlm_member, list);
105 break;
106 } 44 }
107
108 DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 ls->ls_num_nodes, n, node););
110 nodeid = memb->nodeid;
111 out:
112 return nodeid;
113} 45}
114 46
115int dlm_dir_nodeid(struct dlm_rsb *r) 47int dlm_dir_nodeid(struct dlm_rsb *r)
116{ 48{
117 return dlm_hash2nodeid(r->res_ls, r->res_hash); 49 return r->res_dir_nodeid;
118}
119
120static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121{
122 uint32_t val;
123
124 val = jhash(name, len, 0);
125 val &= (ls->ls_dirtbl_size - 1);
126
127 return val;
128}
129
130static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131{
132 uint32_t bucket;
133
134 bucket = dir_hash(ls, de->name, de->length);
135 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136} 50}
137 51
138static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name, 52void dlm_recover_dir_nodeid(struct dlm_ls *ls)
139 int namelen, uint32_t bucket)
140{ 53{
141 struct dlm_direntry *de; 54 struct dlm_rsb *r;
142
143 list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 if (de->length == namelen && !memcmp(name, de->name, namelen))
145 goto out;
146 }
147 de = NULL;
148 out:
149 return de;
150}
151
152void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153{
154 struct dlm_direntry *de;
155 uint32_t bucket;
156
157 bucket = dir_hash(ls, name, namelen);
158
159 spin_lock(&ls->ls_dirtbl[bucket].lock);
160
161 de = search_bucket(ls, name, namelen, bucket);
162
163 if (!de) {
164 log_error(ls, "remove fr %u none", nodeid);
165 goto out;
166 }
167
168 if (de->master_nodeid != nodeid) {
169 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170 goto out;
171 }
172
173 list_del(&de->list);
174 kfree(de);
175 out:
176 spin_unlock(&ls->ls_dirtbl[bucket].lock);
177}
178 55
179void dlm_dir_clear(struct dlm_ls *ls) 56 down_read(&ls->ls_root_sem);
180{ 57 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
181 struct list_head *head; 58 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
182 struct dlm_direntry *de;
183 int i;
184
185 DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186
187 for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 spin_lock(&ls->ls_dirtbl[i].lock);
189 head = &ls->ls_dirtbl[i].list;
190 while (!list_empty(head)) {
191 de = list_entry(head->next, struct dlm_direntry, list);
192 list_del(&de->list);
193 put_free_de(ls, de);
194 }
195 spin_unlock(&ls->ls_dirtbl[i].lock);
196 } 59 }
60 up_read(&ls->ls_root_sem);
197} 61}
198 62
199int dlm_recover_directory(struct dlm_ls *ls) 63int dlm_recover_directory(struct dlm_ls *ls)
200{ 64{
201 struct dlm_member *memb; 65 struct dlm_member *memb;
202 struct dlm_direntry *de;
203 char *b, *last_name = NULL; 66 char *b, *last_name = NULL;
204 int error = -ENOMEM, last_len, count = 0; 67 int error = -ENOMEM, last_len, nodeid, result;
205 uint16_t namelen; 68 uint16_t namelen;
69 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
206 70
207 log_debug(ls, "dlm_recover_directory"); 71 log_debug(ls, "dlm_recover_directory");
208 72
209 if (dlm_no_directory(ls)) 73 if (dlm_no_directory(ls))
210 goto out_status; 74 goto out_status;
211 75
212 dlm_dir_clear(ls);
213
214 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS); 76 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
215 if (!last_name) 77 if (!last_name)
216 goto out; 78 goto out;
217 79
218 list_for_each_entry(memb, &ls->ls_nodes, list) { 80 list_for_each_entry(memb, &ls->ls_nodes, list) {
81 if (memb->nodeid == dlm_our_nodeid())
82 continue;
83
219 memset(last_name, 0, DLM_RESNAME_MAXLEN); 84 memset(last_name, 0, DLM_RESNAME_MAXLEN);
220 last_len = 0; 85 last_len = 0;
221 86
@@ -230,7 +95,7 @@ int dlm_recover_directory(struct dlm_ls *ls)
230 if (error) 95 if (error)
231 goto out_free; 96 goto out_free;
232 97
233 schedule(); 98 cond_resched();
234 99
235 /* 100 /*
236 * pick namelen/name pairs out of received buffer 101 * pick namelen/name pairs out of received buffer
@@ -267,87 +132,71 @@ int dlm_recover_directory(struct dlm_ls *ls)
267 if (namelen > DLM_RESNAME_MAXLEN) 132 if (namelen > DLM_RESNAME_MAXLEN)
268 goto out_free; 133 goto out_free;
269 134
270 error = -ENOMEM; 135 error = dlm_master_lookup(ls, memb->nodeid,
271 de = get_free_de(ls, namelen); 136 b, namelen,
272 if (!de) 137 DLM_LU_RECOVER_DIR,
138 &nodeid, &result);
139 if (error) {
140 log_error(ls, "recover_dir lookup %d",
141 error);
273 goto out_free; 142 goto out_free;
143 }
144
145 /* The name was found in rsbtbl, but the
146 * master nodeid is different from
147 * memb->nodeid which says it is the master.
148 * This should not happen. */
149
150 if (result == DLM_LU_MATCH &&
151 nodeid != memb->nodeid) {
152 count_bad++;
153 log_error(ls, "recover_dir lookup %d "
154 "nodeid %d memb %d bad %u",
155 result, nodeid, memb->nodeid,
156 count_bad);
157 print_hex_dump_bytes("dlm_recover_dir ",
158 DUMP_PREFIX_NONE,
159 b, namelen);
160 }
161
162 /* The name was found in rsbtbl, and the
163 * master nodeid matches memb->nodeid. */
164
165 if (result == DLM_LU_MATCH &&
166 nodeid == memb->nodeid) {
167 count_match++;
168 }
169
170 /* The name was not found in rsbtbl and was
171 * added with memb->nodeid as the master. */
172
173 if (result == DLM_LU_ADD) {
174 count_add++;
175 }
274 176
275 de->master_nodeid = memb->nodeid;
276 de->length = namelen;
277 last_len = namelen; 177 last_len = namelen;
278 memcpy(de->name, b, namelen);
279 memcpy(last_name, b, namelen); 178 memcpy(last_name, b, namelen);
280 b += namelen; 179 b += namelen;
281 left -= namelen; 180 left -= namelen;
282
283 add_entry_to_hash(ls, de);
284 count++; 181 count++;
285 } 182 }
286 } 183 }
287 done: 184 done:
288 ; 185 ;
289 } 186 }
290 187
291 out_status: 188 out_status:
292 error = 0; 189 error = 0;
293 log_debug(ls, "dlm_recover_directory %d entries", count); 190 dlm_set_recover_status(ls, DLM_RS_DIR);
191
192 log_debug(ls, "dlm_recover_directory %u in %u new",
193 count, count_add);
294 out_free: 194 out_free:
295 kfree(last_name); 195 kfree(last_name);
296 out: 196 out:
297 dlm_clear_free_entries(ls);
298 return error; 197 return error;
299} 198}
300 199
301static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
302 int namelen, int *r_nodeid)
303{
304 struct dlm_direntry *de, *tmp;
305 uint32_t bucket;
306
307 bucket = dir_hash(ls, name, namelen);
308
309 spin_lock(&ls->ls_dirtbl[bucket].lock);
310 de = search_bucket(ls, name, namelen, bucket);
311 if (de) {
312 *r_nodeid = de->master_nodeid;
313 spin_unlock(&ls->ls_dirtbl[bucket].lock);
314 if (*r_nodeid == nodeid)
315 return -EEXIST;
316 return 0;
317 }
318
319 spin_unlock(&ls->ls_dirtbl[bucket].lock);
320
321 if (namelen > DLM_RESNAME_MAXLEN)
322 return -EINVAL;
323
324 de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
325 if (!de)
326 return -ENOMEM;
327
328 de->master_nodeid = nodeid;
329 de->length = namelen;
330 memcpy(de->name, name, namelen);
331
332 spin_lock(&ls->ls_dirtbl[bucket].lock);
333 tmp = search_bucket(ls, name, namelen, bucket);
334 if (tmp) {
335 kfree(de);
336 de = tmp;
337 } else {
338 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
339 }
340 *r_nodeid = de->master_nodeid;
341 spin_unlock(&ls->ls_dirtbl[bucket].lock);
342 return 0;
343}
344
345int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
346 int *r_nodeid)
347{
348 return get_entry(ls, nodeid, name, namelen, r_nodeid);
349}
350
351static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) 200static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
352{ 201{
353 struct dlm_rsb *r; 202 struct dlm_rsb *r;
@@ -358,10 +207,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
358 bucket = hash & (ls->ls_rsbtbl_size - 1); 207 bucket = hash & (ls->ls_rsbtbl_size - 1);
359 208
360 spin_lock(&ls->ls_rsbtbl[bucket].lock); 209 spin_lock(&ls->ls_rsbtbl[bucket].lock);
361 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r); 210 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
362 if (rv) 211 if (rv)
363 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, 212 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
364 name, len, 0, &r); 213 name, len, &r);
365 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 214 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
366 215
367 if (!rv) 216 if (!rv)
@@ -371,7 +220,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
371 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 220 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
372 if (len == r->res_length && !memcmp(name, r->res_name, len)) { 221 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
373 up_read(&ls->ls_root_sem); 222 up_read(&ls->ls_root_sem);
374 log_error(ls, "find_rsb_root revert to root_list %s", 223 log_debug(ls, "find_rsb_root revert to root_list %s",
375 r->res_name); 224 r->res_name);
376 return r; 225 return r;
377 } 226 }
@@ -429,6 +278,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
429 be_namelen = cpu_to_be16(0); 278 be_namelen = cpu_to_be16(0);
430 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 279 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
431 offset += sizeof(__be16); 280 offset += sizeof(__be16);
281 ls->ls_recover_dir_sent_msg++;
432 goto out; 282 goto out;
433 } 283 }
434 284
@@ -437,6 +287,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
437 offset += sizeof(__be16); 287 offset += sizeof(__be16);
438 memcpy(outbuf + offset, r->res_name, r->res_length); 288 memcpy(outbuf + offset, r->res_name, r->res_length);
439 offset += r->res_length; 289 offset += r->res_length;
290 ls->ls_recover_dir_sent_res++;
440 } 291 }
441 292
442 /* 293 /*
@@ -449,8 +300,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
449 be_namelen = cpu_to_be16(0xFFFF); 300 be_namelen = cpu_to_be16(0xFFFF);
450 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 301 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
451 offset += sizeof(__be16); 302 offset += sizeof(__be16);
303 ls->ls_recover_dir_sent_msg++;
452 } 304 }
453
454 out: 305 out:
455 up_read(&ls->ls_root_sem); 306 up_read(&ls->ls_root_sem);
456} 307}