aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2006-12-13 11:37:16 -0500
committerSteven Whitehouse <swhiteho@redhat.com>2007-02-05 13:35:50 -0500
commit38aa8b0c59c35d10d15ebf00ceee641f9ed7acba (patch)
tree17444ed0f0e195677a6faaac31ba296f37b5e148 /fs
parentdc200a8848cca8b0e99012996c66f4b379a390ed (diff)
[DLM] fix old rcom messages
A reply to a recovery message will often be received after the relevant recovery sequence has aborted and the next recovery sequence has begun. We need to ignore replies to these old messages from the previous recovery. There's already a way to do this for synchronous recovery requests using the rc_id number, but not for async. Each recovery sequence already has a locally unique sequence number associated with it. This patch adds a field to the rcom (recovery message) structure where this recovery sequence number can be placed, rc_seq. When a node sends a reply to a recovery request, it copies the rc_seq number it received into rc_seq_reply. When the first node receives the reply to its recovery message, it will check whether rc_seq_reply matches the current recovery sequence number, ls_recover_seq, and if not then it ignores the old reply. An old, inadequate approach to filtering out old replies (checking if the current stage of recovery has moved back to the start) has been removed from two spots. The protocol version number is changed to reflect the different rcom structures. Signed-off-by: David Teigland <teigland@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/dlm/dlm_internal.h6
-rw-r--r--fs/dlm/rcom.c61
-rw-r--r--fs/dlm/util.c4
3 files changed, 46 insertions, 25 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 1ee8195e6fc0..7185a132a8b5 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -309,8 +309,8 @@ static inline int rsb_flag(struct dlm_rsb *r, enum rsb_flags flag)
309 309
310/* dlm_header is first element of all structs sent between nodes */ 310/* dlm_header is first element of all structs sent between nodes */
311 311
312#define DLM_HEADER_MAJOR 0x00020000 312#define DLM_HEADER_MAJOR 0x00030000
313#define DLM_HEADER_MINOR 0x00000001 313#define DLM_HEADER_MINOR 0x00000000
314 314
315#define DLM_MSG 1 315#define DLM_MSG 1
316#define DLM_RCOM 2 316#define DLM_RCOM 2
@@ -386,6 +386,8 @@ struct dlm_rcom {
386 uint32_t rc_type; /* DLM_RCOM_ */ 386 uint32_t rc_type; /* DLM_RCOM_ */
387 int rc_result; /* multi-purpose */ 387 int rc_result; /* multi-purpose */
388 uint64_t rc_id; /* match reply with request */ 388 uint64_t rc_id; /* match reply with request */
389 uint64_t rc_seq; /* sender's ls_recover_seq */
390 uint64_t rc_seq_reply; /* remote ls_recover_seq */
389 char rc_buf[0]; 391 char rc_buf[0];
390}; 392};
391 393
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 4cc31be9cd9d..521ad9bb47b7 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -56,6 +56,10 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
56 56
57 rc->rc_type = type; 57 rc->rc_type = type;
58 58
59 spin_lock(&ls->ls_recover_lock);
60 rc->rc_seq = ls->ls_recover_seq;
61 spin_unlock(&ls->ls_recover_lock);
62
59 *mh_ret = mh; 63 *mh_ret = mh;
60 *rc_ret = rc; 64 *rc_ret = rc;
61 return 0; 65 return 0;
@@ -159,6 +163,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
159 if (error) 163 if (error)
160 return; 164 return;
161 rc->rc_id = rc_in->rc_id; 165 rc->rc_id = rc_in->rc_id;
166 rc->rc_seq_reply = rc_in->rc_seq;
162 rc->rc_result = dlm_recover_status(ls); 167 rc->rc_result = dlm_recover_status(ls);
163 make_config(ls, (struct rcom_config *) rc->rc_buf); 168 make_config(ls, (struct rcom_config *) rc->rc_buf);
164 169
@@ -224,21 +229,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
224{ 229{
225 struct dlm_rcom *rc; 230 struct dlm_rcom *rc;
226 struct dlm_mhandle *mh; 231 struct dlm_mhandle *mh;
227 int error, inlen, outlen; 232 int error, inlen, outlen, nodeid;
228 int nodeid = rc_in->rc_header.h_nodeid;
229 uint32_t status = dlm_recover_status(ls);
230
231 /*
232 * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
233 * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes).
234 * It could only happen in rare cases where we get a late NAMES
235 * message from a previous instance of recovery.
236 */
237
238 if (!(status & DLM_RS_NODES)) {
239 log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid);
240 return;
241 }
242 233
243 nodeid = rc_in->rc_header.h_nodeid; 234 nodeid = rc_in->rc_header.h_nodeid;
244 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); 235 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
@@ -248,6 +239,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
248 if (error) 239 if (error)
249 return; 240 return;
250 rc->rc_id = rc_in->rc_id; 241 rc->rc_id = rc_in->rc_id;
242 rc->rc_seq_reply = rc_in->rc_seq;
251 243
252 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, 244 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
253 nodeid); 245 nodeid);
@@ -294,6 +286,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
294 ret_nodeid = error; 286 ret_nodeid = error;
295 rc->rc_result = ret_nodeid; 287 rc->rc_result = ret_nodeid;
296 rc->rc_id = rc_in->rc_id; 288 rc->rc_id = rc_in->rc_id;
289 rc->rc_seq_reply = rc_in->rc_seq;
297 290
298 send_rcom(ls, mh, rc); 291 send_rcom(ls, mh, rc);
299} 292}
@@ -375,20 +368,13 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
375 368
376 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); 369 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
377 rc->rc_id = rc_in->rc_id; 370 rc->rc_id = rc_in->rc_id;
371 rc->rc_seq_reply = rc_in->rc_seq;
378 372
379 send_rcom(ls, mh, rc); 373 send_rcom(ls, mh, rc);
380} 374}
381 375
382static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 376static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
383{ 377{
384 uint32_t status = dlm_recover_status(ls);
385
386 if (!(status & DLM_RS_DIR)) {
387 log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u",
388 rc_in->rc_header.h_nodeid);
389 return;
390 }
391
392 dlm_recover_process_copy(ls, rc_in); 378 dlm_recover_process_copy(ls, rc_in);
393} 379}
394 380
@@ -415,6 +401,7 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
415 401
416 rc->rc_type = DLM_RCOM_STATUS_REPLY; 402 rc->rc_type = DLM_RCOM_STATUS_REPLY;
417 rc->rc_id = rc_in->rc_id; 403 rc->rc_id = rc_in->rc_id;
404 rc->rc_seq_reply = rc_in->rc_seq;
418 rc->rc_result = -ESRCH; 405 rc->rc_result = -ESRCH;
419 406
420 rf = (struct rcom_config *) rc->rc_buf; 407 rf = (struct rcom_config *) rc->rc_buf;
@@ -426,6 +413,31 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
426 return 0; 413 return 0;
427} 414}
428 415
416static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
417{
418 uint64_t seq;
419 int rv = 0;
420
421 switch (rc->rc_type) {
422 case DLM_RCOM_STATUS_REPLY:
423 case DLM_RCOM_NAMES_REPLY:
424 case DLM_RCOM_LOOKUP_REPLY:
425 case DLM_RCOM_LOCK_REPLY:
426 spin_lock(&ls->ls_recover_lock);
427 seq = ls->ls_recover_seq;
428 spin_unlock(&ls->ls_recover_lock);
429 if (rc->rc_seq_reply != seq) {
430 log_error(ls, "ignoring old reply %x from %d "
431 "seq_reply %llx expect %llx",
432 rc->rc_type, rc->rc_header.h_nodeid,
433 (unsigned long long)rc->rc_seq_reply,
434 (unsigned long long)seq);
435 rv = 1;
436 }
437 }
438 return rv;
439}
440
429/* Called by dlm_recvd; corresponds to dlm_receive_message() but special 441/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
430 recovery-only comms are sent through here. */ 442 recovery-only comms are sent through here. */
431 443
@@ -454,6 +466,9 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
454 goto out; 466 goto out;
455 } 467 }
456 468
469 if (is_old_reply(ls, rc))
470 goto out;
471
457 if (nodeid != rc->rc_header.h_nodeid) { 472 if (nodeid != rc->rc_header.h_nodeid) {
458 log_error(ls, "bad rcom nodeid %d from %d", 473 log_error(ls, "bad rcom nodeid %d from %d",
459 rc->rc_header.h_nodeid, nodeid); 474 rc->rc_header.h_nodeid, nodeid);
diff --git a/fs/dlm/util.c b/fs/dlm/util.c
index 767197db9944..963889cf6740 100644
--- a/fs/dlm/util.c
+++ b/fs/dlm/util.c
@@ -134,6 +134,8 @@ void dlm_rcom_out(struct dlm_rcom *rc)
134 rc->rc_type = cpu_to_le32(rc->rc_type); 134 rc->rc_type = cpu_to_le32(rc->rc_type);
135 rc->rc_result = cpu_to_le32(rc->rc_result); 135 rc->rc_result = cpu_to_le32(rc->rc_result);
136 rc->rc_id = cpu_to_le64(rc->rc_id); 136 rc->rc_id = cpu_to_le64(rc->rc_id);
137 rc->rc_seq = cpu_to_le64(rc->rc_seq);
138 rc->rc_seq_reply = cpu_to_le64(rc->rc_seq_reply);
137 139
138 if (type == DLM_RCOM_LOCK) 140 if (type == DLM_RCOM_LOCK)
139 rcom_lock_out((struct rcom_lock *) rc->rc_buf); 141 rcom_lock_out((struct rcom_lock *) rc->rc_buf);
@@ -151,6 +153,8 @@ void dlm_rcom_in(struct dlm_rcom *rc)
151 rc->rc_type = le32_to_cpu(rc->rc_type); 153 rc->rc_type = le32_to_cpu(rc->rc_type);
152 rc->rc_result = le32_to_cpu(rc->rc_result); 154 rc->rc_result = le32_to_cpu(rc->rc_result);
153 rc->rc_id = le64_to_cpu(rc->rc_id); 155 rc->rc_id = le64_to_cpu(rc->rc_id);
156 rc->rc_seq = le64_to_cpu(rc->rc_seq);
157 rc->rc_seq_reply = le64_to_cpu(rc->rc_seq_reply);
154 158
155 if (rc->rc_type == DLM_RCOM_LOCK) 159 if (rc->rc_type == DLM_RCOM_LOCK)
156 rcom_lock_in((struct rcom_lock *) rc->rc_buf); 160 rcom_lock_in((struct rcom_lock *) rc->rc_buf);