aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2011-10-20 14:26:28 -0400
committerDavid Teigland <teigland@redhat.com>2012-01-04 09:55:57 -0500
commit757a42719635495779462514458bbfbf12a37dac (patch)
treebe6ea155fc54c397cb57eb193b81824b37989e56
parentf95a34c66554235b70a681fcd9feebc195f7ec0e (diff)
dlm: add node slots and generation
Slot numbers are assigned to nodes when they join the lockspace. The slot number chosen is the minimum unused value starting at 1. Once a node is assigned a slot, that slot number will not change while the node remains a lockspace member. If the node leaves and rejoins it can be assigned a new slot number. A new generation number is also added to a lockspace. It is set and incremented during each recovery along with the slot collection/assignment. The slot numbers will be passed to gfs2 which will use them as journal id's. Signed-off-by: David Teigland <teigland@redhat.com>
-rw-r--r--fs/dlm/dlm_internal.h48
-rw-r--r--fs/dlm/lockspace.c5
-rw-r--r--fs/dlm/member.c284
-rw-r--r--fs/dlm/member.h7
-rw-r--r--fs/dlm/rcom.c99
-rw-r--r--fs/dlm/rcom.h2
-rw-r--r--fs/dlm/recover.c64
7 files changed, 480 insertions, 29 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 5685a9a5dba2..f4d132c76908 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -117,6 +117,18 @@ struct dlm_member {
117 struct list_head list; 117 struct list_head list;
118 int nodeid; 118 int nodeid;
119 int weight; 119 int weight;
120 int slot;
121 int slot_prev;
122 uint32_t generation;
123};
124
125/*
126 * low nodeid saves array of these in ls_slots
127 */
128
129struct dlm_slot {
130 int nodeid;
131 int slot;
120}; 132};
121 133
122/* 134/*
@@ -337,7 +349,9 @@ static inline int rsb_flag(struct dlm_rsb *r, enum rsb_flags flag)
337/* dlm_header is first element of all structs sent between nodes */ 349/* dlm_header is first element of all structs sent between nodes */
338 350
339#define DLM_HEADER_MAJOR 0x00030000 351#define DLM_HEADER_MAJOR 0x00030000
340#define DLM_HEADER_MINOR 0x00000000 352#define DLM_HEADER_MINOR 0x00000001
353
354#define DLM_HEADER_SLOTS 0x00000001
341 355
342#define DLM_MSG 1 356#define DLM_MSG 1
343#define DLM_RCOM 2 357#define DLM_RCOM 2
@@ -425,10 +439,34 @@ union dlm_packet {
425 struct dlm_rcom rcom; 439 struct dlm_rcom rcom;
426}; 440};
427 441
442#define DLM_RSF_NEED_SLOTS 0x00000001
443
444/* RCOM_STATUS data */
445struct rcom_status {
446 __le32 rs_flags;
447 __le32 rs_unused1;
448 __le64 rs_unused2;
449};
450
451/* RCOM_STATUS_REPLY data */
428struct rcom_config { 452struct rcom_config {
429 __le32 rf_lvblen; 453 __le32 rf_lvblen;
430 __le32 rf_lsflags; 454 __le32 rf_lsflags;
431 __le64 rf_unused; 455
456 /* DLM_HEADER_SLOTS adds: */
457 __le32 rf_flags;
458 __le16 rf_our_slot;
459 __le16 rf_num_slots;
460 __le32 rf_generation;
461 __le32 rf_unused1;
462 __le64 rf_unused2;
463};
464
465struct rcom_slot {
466 __le32 ro_nodeid;
467 __le16 ro_slot;
468 __le16 ro_unused1;
469 __le64 ro_unused2;
432}; 470};
433 471
434struct rcom_lock { 472struct rcom_lock {
@@ -455,6 +493,7 @@ struct dlm_ls {
455 struct list_head ls_list; /* list of lockspaces */ 493 struct list_head ls_list; /* list of lockspaces */
456 dlm_lockspace_t *ls_local_handle; 494 dlm_lockspace_t *ls_local_handle;
457 uint32_t ls_global_id; /* global unique lockspace ID */ 495 uint32_t ls_global_id; /* global unique lockspace ID */
496 uint32_t ls_generation;
458 uint32_t ls_exflags; 497 uint32_t ls_exflags;
459 int ls_lvblen; 498 int ls_lvblen;
460 int ls_count; /* refcount of processes in 499 int ls_count; /* refcount of processes in
@@ -493,6 +532,11 @@ struct dlm_ls {
493 int ls_total_weight; 532 int ls_total_weight;
494 int *ls_node_array; 533 int *ls_node_array;
495 534
535 int ls_slot;
536 int ls_num_slots;
537 int ls_slots_size;
538 struct dlm_slot *ls_slots;
539
496 struct dlm_rsb ls_stub_rsb; /* for returning errors */ 540 struct dlm_rsb ls_stub_rsb; /* for returning errors */
497 struct dlm_lkb ls_stub_lkb; /* for returning errors */ 541 struct dlm_lkb ls_stub_lkb; /* for returning errors */
498 struct dlm_message ls_stub_ms; /* for faking a reply */ 542 struct dlm_message ls_stub_ms; /* for faking a reply */
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 1d16a23b0a06..1441f04bfabe 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -525,6 +525,11 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
525 if (!ls->ls_recover_buf) 525 if (!ls->ls_recover_buf)
526 goto out_dirfree; 526 goto out_dirfree;
527 527
528 ls->ls_slot = 0;
529 ls->ls_num_slots = 0;
530 ls->ls_slots_size = 0;
531 ls->ls_slots = NULL;
532
528 INIT_LIST_HEAD(&ls->ls_recover_list); 533 INIT_LIST_HEAD(&ls->ls_recover_list);
529 spin_lock_init(&ls->ls_recover_list_lock); 534 spin_lock_init(&ls->ls_recover_list_lock);
530 ls->ls_recover_list_count = 0; 535 ls->ls_recover_list_count = 0;
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 5ebd1df69675..eebc52aae82e 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -19,6 +19,280 @@
19#include "config.h" 19#include "config.h"
20#include "lowcomms.h" 20#include "lowcomms.h"
21 21
22int dlm_slots_version(struct dlm_header *h)
23{
24 if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
25 return 0;
26 return 1;
27}
28
29void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
30 struct dlm_member *memb)
31{
32 struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
33
34 if (!dlm_slots_version(&rc->rc_header))
35 return;
36
37 memb->slot = le16_to_cpu(rf->rf_our_slot);
38 memb->generation = le32_to_cpu(rf->rf_generation);
39}
40
41void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
42{
43 struct dlm_slot *slot;
44 struct rcom_slot *ro;
45 int i;
46
47 ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
48
49 /* ls_slots array is sparse, but not rcom_slots */
50
51 for (i = 0; i < ls->ls_slots_size; i++) {
52 slot = &ls->ls_slots[i];
53 if (!slot->nodeid)
54 continue;
55 ro->ro_nodeid = cpu_to_le32(slot->nodeid);
56 ro->ro_slot = cpu_to_le16(slot->slot);
57 ro++;
58 }
59}
60
61#define SLOT_DEBUG_LINE 128
62
63static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
64 struct rcom_slot *ro0, struct dlm_slot *array,
65 int array_size)
66{
67 char line[SLOT_DEBUG_LINE];
68 int len = SLOT_DEBUG_LINE - 1;
69 int pos = 0;
70 int ret, i;
71
72 if (!dlm_config.ci_log_debug)
73 return;
74
75 memset(line, 0, sizeof(line));
76
77 if (array) {
78 for (i = 0; i < array_size; i++) {
79 if (!array[i].nodeid)
80 continue;
81
82 ret = snprintf(line + pos, len - pos, " %d:%d",
83 array[i].slot, array[i].nodeid);
84 if (ret >= len - pos)
85 break;
86 pos += ret;
87 }
88 } else if (ro0) {
89 for (i = 0; i < num_slots; i++) {
90 ret = snprintf(line + pos, len - pos, " %d:%d",
91 ro0[i].ro_slot, ro0[i].ro_nodeid);
92 if (ret >= len - pos)
93 break;
94 pos += ret;
95 }
96 }
97
98 log_debug(ls, "generation %u slots %d%s", gen, num_slots, line);
99}
100
101int dlm_slots_copy_in(struct dlm_ls *ls)
102{
103 struct dlm_member *memb;
104 struct dlm_rcom *rc = ls->ls_recover_buf;
105 struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
106 struct rcom_slot *ro0, *ro;
107 int our_nodeid = dlm_our_nodeid();
108 int i, num_slots;
109 uint32_t gen;
110
111 if (!dlm_slots_version(&rc->rc_header))
112 return -1;
113
114 gen = le32_to_cpu(rf->rf_generation);
115 if (gen <= ls->ls_generation) {
116 log_error(ls, "dlm_slots_copy_in gen %u old %u",
117 gen, ls->ls_generation);
118 }
119 ls->ls_generation = gen;
120
121 num_slots = le16_to_cpu(rf->rf_num_slots);
122 if (!num_slots)
123 return -1;
124
125 ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
126
127 for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
128 ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
129 ro->ro_slot = le16_to_cpu(ro->ro_slot);
130 }
131
132 log_debug_slots(ls, gen, num_slots, ro0, NULL, 0);
133
134 list_for_each_entry(memb, &ls->ls_nodes, list) {
135 for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
136 if (ro->ro_nodeid != memb->nodeid)
137 continue;
138 memb->slot = ro->ro_slot;
139 memb->slot_prev = memb->slot;
140 break;
141 }
142
143 if (memb->nodeid == our_nodeid) {
144 if (ls->ls_slot && ls->ls_slot != memb->slot) {
145 log_error(ls, "dlm_slots_copy_in our slot "
146 "changed %d %d", ls->ls_slot,
147 memb->slot);
148 return -1;
149 }
150
151 if (!ls->ls_slot)
152 ls->ls_slot = memb->slot;
153 }
154
155 if (!memb->slot) {
156 log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
157 memb->nodeid);
158 return -1;
159 }
160 }
161
162 return 0;
163}
164
165/* for any nodes that do not support slots, we will not have set memb->slot
166 in wait_status_all(), so memb->slot will remain -1, and we will not
167 assign slots or set ls_num_slots here */
168
169int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
170 struct dlm_slot **slots_out, uint32_t *gen_out)
171{
172 struct dlm_member *memb;
173 struct dlm_slot *array;
174 int our_nodeid = dlm_our_nodeid();
175 int array_size, max_slots, i;
176 int need = 0;
177 int max = 0;
178 int num = 0;
179 uint32_t gen = 0;
180
181 /* our own memb struct will have slot -1 gen 0 */
182
183 list_for_each_entry(memb, &ls->ls_nodes, list) {
184 if (memb->nodeid == our_nodeid) {
185 memb->slot = ls->ls_slot;
186 memb->generation = ls->ls_generation;
187 break;
188 }
189 }
190
191 list_for_each_entry(memb, &ls->ls_nodes, list) {
192 if (memb->generation > gen)
193 gen = memb->generation;
194
195 /* node doesn't support slots */
196
197 if (memb->slot == -1)
198 return -1;
199
200 /* node needs a slot assigned */
201
202 if (!memb->slot)
203 need++;
204
205 /* node has a slot assigned */
206
207 num++;
208
209 if (!max || max < memb->slot)
210 max = memb->slot;
211
212 /* sanity check, once slot is assigned it shouldn't change */
213
214 if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
215 log_error(ls, "nodeid %d slot changed %d %d",
216 memb->nodeid, memb->slot_prev, memb->slot);
217 return -1;
218 }
219 memb->slot_prev = memb->slot;
220 }
221
222 array_size = max + need;
223
224 array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
225 if (!array)
226 return -ENOMEM;
227
228 num = 0;
229
230 /* fill in slots (offsets) that are used */
231
232 list_for_each_entry(memb, &ls->ls_nodes, list) {
233 if (!memb->slot)
234 continue;
235
236 if (memb->slot > array_size) {
237 log_error(ls, "invalid slot number %d", memb->slot);
238 kfree(array);
239 return -1;
240 }
241
242 array[memb->slot - 1].nodeid = memb->nodeid;
243 array[memb->slot - 1].slot = memb->slot;
244 num++;
245 }
246
247 /* assign new slots from unused offsets */
248
249 list_for_each_entry(memb, &ls->ls_nodes, list) {
250 if (memb->slot)
251 continue;
252
253 for (i = 0; i < array_size; i++) {
254 if (array[i].nodeid)
255 continue;
256
257 memb->slot = i + 1;
258 memb->slot_prev = memb->slot;
259 array[i].nodeid = memb->nodeid;
260 array[i].slot = memb->slot;
261 num++;
262
263 if (!ls->ls_slot && memb->nodeid == our_nodeid)
264 ls->ls_slot = memb->slot;
265 break;
266 }
267
268 if (!memb->slot) {
269 log_error(ls, "no free slot found");
270 kfree(array);
271 return -1;
272 }
273 }
274
275 gen++;
276
277 log_debug_slots(ls, gen, num, NULL, array, array_size);
278
279 max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
280 sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
281
282 if (num > max_slots) {
283 log_error(ls, "num_slots %d exceeds max_slots %d",
284 num, max_slots);
285 kfree(array);
286 return -1;
287 }
288
289 *gen_out = gen;
290 *slots_out = array;
291 *slots_size = array_size;
292 *num_slots = num;
293 return 0;
294}
295
22static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) 296static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
23{ 297{
24 struct dlm_member *memb = NULL; 298 struct dlm_member *memb = NULL;
@@ -176,7 +450,7 @@ static int ping_members(struct dlm_ls *ls)
176 error = dlm_recovery_stopped(ls); 450 error = dlm_recovery_stopped(ls);
177 if (error) 451 if (error)
178 break; 452 break;
179 error = dlm_rcom_status(ls, memb->nodeid); 453 error = dlm_rcom_status(ls, memb->nodeid, 0);
180 if (error) 454 if (error)
181 break; 455 break;
182 } 456 }
@@ -322,7 +596,15 @@ int dlm_ls_stop(struct dlm_ls *ls)
322 */ 596 */
323 597
324 dlm_recoverd_suspend(ls); 598 dlm_recoverd_suspend(ls);
599
600 spin_lock(&ls->ls_recover_lock);
601 kfree(ls->ls_slots);
602 ls->ls_slots = NULL;
603 ls->ls_num_slots = 0;
604 ls->ls_slots_size = 0;
325 ls->ls_recover_status = 0; 605 ls->ls_recover_status = 0;
606 spin_unlock(&ls->ls_recover_lock);
607
326 dlm_recoverd_resume(ls); 608 dlm_recoverd_resume(ls);
327 609
328 if (!ls->ls_recover_begin) 610 if (!ls->ls_recover_begin)
diff --git a/fs/dlm/member.h b/fs/dlm/member.h
index 7a26fca1e0b5..7e87e8a79dfd 100644
--- a/fs/dlm/member.h
+++ b/fs/dlm/member.h
@@ -20,6 +20,13 @@ void dlm_clear_members_gone(struct dlm_ls *ls);
20int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out); 20int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out);
21int dlm_is_removed(struct dlm_ls *ls, int nodeid); 21int dlm_is_removed(struct dlm_ls *ls, int nodeid);
22int dlm_is_member(struct dlm_ls *ls, int nodeid); 22int dlm_is_member(struct dlm_ls *ls, int nodeid);
23int dlm_slots_version(struct dlm_header *h);
24void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
25 struct dlm_member *memb);
26void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc);
27int dlm_slots_copy_in(struct dlm_ls *ls);
28int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
29 struct dlm_slot **slots_out, uint32_t *gen_out);
23 30
24#endif /* __MEMBER_DOT_H__ */ 31#endif /* __MEMBER_DOT_H__ */
25 32
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index f10a50f24e8f..ac5c616c9696 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,6 +23,7 @@
23#include "memory.h" 23#include "memory.h"
24#include "lock.h" 24#include "lock.h"
25#include "util.h" 25#include "util.h"
26#include "member.h"
26 27
27 28
28static int rcom_response(struct dlm_ls *ls) 29static int rcom_response(struct dlm_ls *ls)
@@ -72,20 +73,30 @@ static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
72 dlm_lowcomms_commit_buffer(mh); 73 dlm_lowcomms_commit_buffer(mh);
73} 74}
74 75
76static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
77 uint32_t flags)
78{
79 rs->rs_flags = cpu_to_le32(flags);
80}
81
75/* When replying to a status request, a node also sends back its 82/* When replying to a status request, a node also sends back its
76 configuration values. The requesting node then checks that the remote 83 configuration values. The requesting node then checks that the remote
77 node is configured the same way as itself. */ 84 node is configured the same way as itself. */
78 85
79static void make_config(struct dlm_ls *ls, struct rcom_config *rf) 86static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
87 uint32_t num_slots)
80{ 88{
81 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen); 89 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
82 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags); 90 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
91
92 rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
93 rf->rf_num_slots = cpu_to_le16(num_slots);
94 rf->rf_generation = cpu_to_le32(ls->ls_generation);
83} 95}
84 96
85static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 97static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
86{ 98{
87 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf; 99 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
88 size_t conf_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
89 100
90 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) { 101 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
91 log_error(ls, "version mismatch: %x nodeid %d: %x", 102 log_error(ls, "version mismatch: %x nodeid %d: %x",
@@ -94,12 +105,6 @@ static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
94 return -EPROTO; 105 return -EPROTO;
95 } 106 }
96 107
97 if (rc->rc_header.h_length < conf_size) {
98 log_error(ls, "config too short: %d nodeid %d",
99 rc->rc_header.h_length, nodeid);
100 return -EPROTO;
101 }
102
103 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen || 108 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
104 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) { 109 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
105 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x", 110 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
@@ -127,7 +132,18 @@ static void disallow_sync_reply(struct dlm_ls *ls)
127 spin_unlock(&ls->ls_rcom_spin); 132 spin_unlock(&ls->ls_rcom_spin);
128} 133}
129 134
130int dlm_rcom_status(struct dlm_ls *ls, int nodeid) 135/*
136 * low nodeid gathers one slot value at a time from each node.
137 * it sets need_slots=0, and saves rf_our_slot returned from each
138 * rcom_config.
139 *
140 * other nodes gather all slot values at once from the low nodeid.
141 * they set need_slots=1, and ignore the rf_our_slot returned from each
142 * rcom_config. they use the rf_num_slots returned from the low
143 * node's rcom_config.
144 */
145
146int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
131{ 147{
132 struct dlm_rcom *rc; 148 struct dlm_rcom *rc;
133 struct dlm_mhandle *mh; 149 struct dlm_mhandle *mh;
@@ -141,10 +157,13 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
141 goto out; 157 goto out;
142 } 158 }
143 159
144 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh); 160 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
161 sizeof(struct rcom_status), &rc, &mh);
145 if (error) 162 if (error)
146 goto out; 163 goto out;
147 164
165 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
166
148 allow_sync_reply(ls, &rc->rc_id); 167 allow_sync_reply(ls, &rc->rc_id);
149 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size); 168 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
150 169
@@ -161,8 +180,11 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
161 /* we pretend the remote lockspace exists with 0 status */ 180 /* we pretend the remote lockspace exists with 0 status */
162 log_debug(ls, "remote node %d not ready", nodeid); 181 log_debug(ls, "remote node %d not ready", nodeid);
163 rc->rc_result = 0; 182 rc->rc_result = 0;
164 } else 183 error = 0;
165 error = check_config(ls, rc, nodeid); 184 } else {
185 error = check_rcom_config(ls, rc, nodeid);
186 }
187
166 /* the caller looks at rc_result for the remote recovery status */ 188 /* the caller looks at rc_result for the remote recovery status */
167 out: 189 out:
168 return error; 190 return error;
@@ -172,17 +194,60 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
172{ 194{
173 struct dlm_rcom *rc; 195 struct dlm_rcom *rc;
174 struct dlm_mhandle *mh; 196 struct dlm_mhandle *mh;
175 int error, nodeid = rc_in->rc_header.h_nodeid; 197 struct rcom_status *rs;
198 uint32_t status;
199 int nodeid = rc_in->rc_header.h_nodeid;
200 int len = sizeof(struct rcom_config);
201 int num_slots = 0;
202 int error;
203
204 if (!dlm_slots_version(&rc_in->rc_header)) {
205 status = dlm_recover_status(ls);
206 goto do_create;
207 }
208
209 rs = (struct rcom_status *)rc_in->rc_buf;
176 210
211 if (!(rs->rs_flags & DLM_RSF_NEED_SLOTS)) {
212 status = dlm_recover_status(ls);
213 goto do_create;
214 }
215
216 spin_lock(&ls->ls_recover_lock);
217 status = ls->ls_recover_status;
218 num_slots = ls->ls_num_slots;
219 spin_unlock(&ls->ls_recover_lock);
220 len += num_slots * sizeof(struct rcom_slot);
221
222 do_create:
177 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY, 223 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
178 sizeof(struct rcom_config), &rc, &mh); 224 len, &rc, &mh);
179 if (error) 225 if (error)
180 return; 226 return;
227
181 rc->rc_id = rc_in->rc_id; 228 rc->rc_id = rc_in->rc_id;
182 rc->rc_seq_reply = rc_in->rc_seq; 229 rc->rc_seq_reply = rc_in->rc_seq;
183 rc->rc_result = dlm_recover_status(ls); 230 rc->rc_result = status;
184 make_config(ls, (struct rcom_config *) rc->rc_buf); 231
232 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
233
234 if (!num_slots)
235 goto do_send;
236
237 spin_lock(&ls->ls_recover_lock);
238 if (ls->ls_num_slots != num_slots) {
239 spin_unlock(&ls->ls_recover_lock);
240 log_debug(ls, "receive_rcom_status num_slots %d to %d",
241 num_slots, ls->ls_num_slots);
242 rc->rc_result = 0;
243 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
244 goto do_send;
245 }
246
247 dlm_slots_copy_out(ls, rc);
248 spin_unlock(&ls->ls_recover_lock);
185 249
250 do_send:
186 send_rcom(ls, mh, rc); 251 send_rcom(ls, mh, rc);
187} 252}
188 253
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index b09abd29ba38..206723ab744d 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -14,7 +14,7 @@
14#ifndef __RCOM_DOT_H__ 14#ifndef __RCOM_DOT_H__
15#define __RCOM_DOT_H__ 15#define __RCOM_DOT_H__
16 16
17int dlm_rcom_status(struct dlm_ls *ls, int nodeid); 17int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
18int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); 18int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
19int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); 19int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
20int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 20int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 81b239304495..34d5adf1fce7 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -85,14 +85,20 @@ uint32_t dlm_recover_status(struct dlm_ls *ls)
85 return status; 85 return status;
86} 86}
87 87
88static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
89{
90 ls->ls_recover_status |= status;
91}
92
88void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status) 93void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
89{ 94{
90 spin_lock(&ls->ls_recover_lock); 95 spin_lock(&ls->ls_recover_lock);
91 ls->ls_recover_status |= status; 96 _set_recover_status(ls, status);
92 spin_unlock(&ls->ls_recover_lock); 97 spin_unlock(&ls->ls_recover_lock);
93} 98}
94 99
95static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status) 100static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
101 int save_slots)
96{ 102{
97 struct dlm_rcom *rc = ls->ls_recover_buf; 103 struct dlm_rcom *rc = ls->ls_recover_buf;
98 struct dlm_member *memb; 104 struct dlm_member *memb;
@@ -106,10 +112,13 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
106 goto out; 112 goto out;
107 } 113 }
108 114
109 error = dlm_rcom_status(ls, memb->nodeid); 115 error = dlm_rcom_status(ls, memb->nodeid, 0);
110 if (error) 116 if (error)
111 goto out; 117 goto out;
112 118
119 if (save_slots)
120 dlm_slot_save(ls, rc, memb);
121
113 if (rc->rc_result & wait_status) 122 if (rc->rc_result & wait_status)
114 break; 123 break;
115 if (delay < 1000) 124 if (delay < 1000)
@@ -121,7 +130,8 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
121 return error; 130 return error;
122} 131}
123 132
124static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status) 133static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
134 uint32_t status_flags)
125{ 135{
126 struct dlm_rcom *rc = ls->ls_recover_buf; 136 struct dlm_rcom *rc = ls->ls_recover_buf;
127 int error = 0, delay = 0, nodeid = ls->ls_low_nodeid; 137 int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
@@ -132,7 +142,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
132 goto out; 142 goto out;
133 } 143 }
134 144
135 error = dlm_rcom_status(ls, nodeid); 145 error = dlm_rcom_status(ls, nodeid, status_flags);
136 if (error) 146 if (error)
137 break; 147 break;
138 148
@@ -152,18 +162,56 @@ static int wait_status(struct dlm_ls *ls, uint32_t status)
152 int error; 162 int error;
153 163
154 if (ls->ls_low_nodeid == dlm_our_nodeid()) { 164 if (ls->ls_low_nodeid == dlm_our_nodeid()) {
155 error = wait_status_all(ls, status); 165 error = wait_status_all(ls, status, 0);
156 if (!error) 166 if (!error)
157 dlm_set_recover_status(ls, status_all); 167 dlm_set_recover_status(ls, status_all);
158 } else 168 } else
159 error = wait_status_low(ls, status_all); 169 error = wait_status_low(ls, status_all, 0);
160 170
161 return error; 171 return error;
162} 172}
163 173
164int dlm_recover_members_wait(struct dlm_ls *ls) 174int dlm_recover_members_wait(struct dlm_ls *ls)
165{ 175{
166 return wait_status(ls, DLM_RS_NODES); 176 struct dlm_member *memb;
177 struct dlm_slot *slots;
178 int num_slots, slots_size;
179 int error, rv;
180 uint32_t gen;
181
182 list_for_each_entry(memb, &ls->ls_nodes, list) {
183 memb->slot = -1;
184 memb->generation = 0;
185 }
186
187 if (ls->ls_low_nodeid == dlm_our_nodeid()) {
188 error = wait_status_all(ls, DLM_RS_NODES, 1);
189 if (error)
190 goto out;
191
192 /* slots array is sparse, slots_size may be > num_slots */
193
194 rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
195 if (!rv) {
196 spin_lock(&ls->ls_recover_lock);
197 _set_recover_status(ls, DLM_RS_NODES_ALL);
198 ls->ls_num_slots = num_slots;
199 ls->ls_slots_size = slots_size;
200 ls->ls_slots = slots;
201 ls->ls_generation = gen;
202 spin_unlock(&ls->ls_recover_lock);
203 } else {
204 dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
205 }
206 } else {
207 error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
208 if (error)
209 goto out;
210
211 dlm_slots_copy_in(ls);
212 }
213 out:
214 return error;
167} 215}
168 216
169int dlm_recover_directory_wait(struct dlm_ls *ls) 217int dlm_recover_directory_wait(struct dlm_ls *ls)