aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/rcom.c
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2006-12-13 11:37:16 -0500
committerSteven Whitehouse <swhiteho@redhat.com>2007-02-05 13:35:50 -0500
commit38aa8b0c59c35d10d15ebf00ceee641f9ed7acba (patch)
tree17444ed0f0e195677a6faaac31ba296f37b5e148 /fs/dlm/rcom.c
parentdc200a8848cca8b0e99012996c66f4b379a390ed (diff)
[DLM] fix old rcom messages
A reply to a recovery message will often be received after the relevant recovery sequence has aborted and the next recovery sequence has begun. We need to ignore replies to these old messages from the previous recovery. There's already a way to do this for synchronous recovery requests using the rc_id number, but not for async. Each recovery sequence already has a locally unique sequence number associated with it. This patch adds a field to the rcom (recovery message) structure where this recovery sequence number can be placed, rc_seq. When a node sends a reply to a recovery request, it copies the rc_seq number it received into rc_seq_reply. When the first node receives the reply to its recovery message, it will check whether rc_seq_reply matches the current recovery sequence number, ls_recover_seq, and if not then it ignores the old reply. An old, inadequate approach to filtering out old replies (checking if the current stage of recovery has moved back to the start) has been removed from two spots. The protocol version number is changed to reflect the different rcom structures. Signed-off-by: David Teigland <teigland@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs/dlm/rcom.c')
-rw-r--r--fs/dlm/rcom.c61
1 files changed, 38 insertions, 23 deletions
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 4cc31be9cd9d..521ad9bb47b7 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -56,6 +56,10 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
56 56
57 rc->rc_type = type; 57 rc->rc_type = type;
58 58
59 spin_lock(&ls->ls_recover_lock);
60 rc->rc_seq = ls->ls_recover_seq;
61 spin_unlock(&ls->ls_recover_lock);
62
59 *mh_ret = mh; 63 *mh_ret = mh;
60 *rc_ret = rc; 64 *rc_ret = rc;
61 return 0; 65 return 0;
@@ -159,6 +163,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
159 if (error) 163 if (error)
160 return; 164 return;
161 rc->rc_id = rc_in->rc_id; 165 rc->rc_id = rc_in->rc_id;
166 rc->rc_seq_reply = rc_in->rc_seq;
162 rc->rc_result = dlm_recover_status(ls); 167 rc->rc_result = dlm_recover_status(ls);
163 make_config(ls, (struct rcom_config *) rc->rc_buf); 168 make_config(ls, (struct rcom_config *) rc->rc_buf);
164 169
@@ -224,21 +229,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
224{ 229{
225 struct dlm_rcom *rc; 230 struct dlm_rcom *rc;
226 struct dlm_mhandle *mh; 231 struct dlm_mhandle *mh;
227 int error, inlen, outlen; 232 int error, inlen, outlen, nodeid;
228 int nodeid = rc_in->rc_header.h_nodeid;
229 uint32_t status = dlm_recover_status(ls);
230
231 /*
232 * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
233 * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes).
234 * It could only happen in rare cases where we get a late NAMES
235 * message from a previous instance of recovery.
236 */
237
238 if (!(status & DLM_RS_NODES)) {
239 log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid);
240 return;
241 }
242 233
243 nodeid = rc_in->rc_header.h_nodeid; 234 nodeid = rc_in->rc_header.h_nodeid;
244 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); 235 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
@@ -248,6 +239,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
248 if (error) 239 if (error)
249 return; 240 return;
250 rc->rc_id = rc_in->rc_id; 241 rc->rc_id = rc_in->rc_id;
242 rc->rc_seq_reply = rc_in->rc_seq;
251 243
252 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, 244 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
253 nodeid); 245 nodeid);
@@ -294,6 +286,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
294 ret_nodeid = error; 286 ret_nodeid = error;
295 rc->rc_result = ret_nodeid; 287 rc->rc_result = ret_nodeid;
296 rc->rc_id = rc_in->rc_id; 288 rc->rc_id = rc_in->rc_id;
289 rc->rc_seq_reply = rc_in->rc_seq;
297 290
298 send_rcom(ls, mh, rc); 291 send_rcom(ls, mh, rc);
299} 292}
@@ -375,20 +368,13 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
375 368
376 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); 369 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
377 rc->rc_id = rc_in->rc_id; 370 rc->rc_id = rc_in->rc_id;
371 rc->rc_seq_reply = rc_in->rc_seq;
378 372
379 send_rcom(ls, mh, rc); 373 send_rcom(ls, mh, rc);
380} 374}
381 375
382static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 376static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
383{ 377{
384 uint32_t status = dlm_recover_status(ls);
385
386 if (!(status & DLM_RS_DIR)) {
387 log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u",
388 rc_in->rc_header.h_nodeid);
389 return;
390 }
391
392 dlm_recover_process_copy(ls, rc_in); 378 dlm_recover_process_copy(ls, rc_in);
393} 379}
394 380
@@ -415,6 +401,7 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
415 401
416 rc->rc_type = DLM_RCOM_STATUS_REPLY; 402 rc->rc_type = DLM_RCOM_STATUS_REPLY;
417 rc->rc_id = rc_in->rc_id; 403 rc->rc_id = rc_in->rc_id;
404 rc->rc_seq_reply = rc_in->rc_seq;
418 rc->rc_result = -ESRCH; 405 rc->rc_result = -ESRCH;
419 406
420 rf = (struct rcom_config *) rc->rc_buf; 407 rf = (struct rcom_config *) rc->rc_buf;
@@ -426,6 +413,31 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
426 return 0; 413 return 0;
427} 414}
428 415
416static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
417{
418 uint64_t seq;
419 int rv = 0;
420
421 switch (rc->rc_type) {
422 case DLM_RCOM_STATUS_REPLY:
423 case DLM_RCOM_NAMES_REPLY:
424 case DLM_RCOM_LOOKUP_REPLY:
425 case DLM_RCOM_LOCK_REPLY:
426 spin_lock(&ls->ls_recover_lock);
427 seq = ls->ls_recover_seq;
428 spin_unlock(&ls->ls_recover_lock);
429 if (rc->rc_seq_reply != seq) {
430 log_error(ls, "ignoring old reply %x from %d "
431 "seq_reply %llx expect %llx",
432 rc->rc_type, rc->rc_header.h_nodeid,
433 (unsigned long long)rc->rc_seq_reply,
434 (unsigned long long)seq);
435 rv = 1;
436 }
437 }
438 return rv;
439}
440
429/* Called by dlm_recvd; corresponds to dlm_receive_message() but special 441/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
430 recovery-only comms are sent through here. */ 442 recovery-only comms are sent through here. */
431 443
@@ -454,6 +466,9 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
454 goto out; 466 goto out;
455 } 467 }
456 468
469 if (is_old_reply(ls, rc))
470 goto out;
471
457 if (nodeid != rc->rc_header.h_nodeid) { 472 if (nodeid != rc->rc_header.h_nodeid) {
458 log_error(ls, "bad rcom nodeid %d from %d", 473 log_error(ls, "bad rcom nodeid %d from %d",
459 rc->rc_header.h_nodeid, nodeid); 474 rc->rc_header.h_nodeid, nodeid);