aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/dlm_internal.h6
-rw-r--r--fs/dlm/rcom.c61
-rw-r--r--fs/dlm/util.c4
3 files changed, 46 insertions, 25 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 1ee8195e6fc0..7185a132a8b5 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -309,8 +309,8 @@ static inline int rsb_flag(struct dlm_rsb *r, enum rsb_flags flag)
309 309
310/* dlm_header is first element of all structs sent between nodes */ 310/* dlm_header is first element of all structs sent between nodes */
311 311
312#define DLM_HEADER_MAJOR 0x00020000 312#define DLM_HEADER_MAJOR 0x00030000
313#define DLM_HEADER_MINOR 0x00000001 313#define DLM_HEADER_MINOR 0x00000000
314 314
315#define DLM_MSG 1 315#define DLM_MSG 1
316#define DLM_RCOM 2 316#define DLM_RCOM 2
@@ -386,6 +386,8 @@ struct dlm_rcom {
386 uint32_t rc_type; /* DLM_RCOM_ */ 386 uint32_t rc_type; /* DLM_RCOM_ */
387 int rc_result; /* multi-purpose */ 387 int rc_result; /* multi-purpose */
388 uint64_t rc_id; /* match reply with request */ 388 uint64_t rc_id; /* match reply with request */
389 uint64_t rc_seq; /* sender's ls_recover_seq */
390 uint64_t rc_seq_reply; /* remote ls_recover_seq */
389 char rc_buf[0]; 391 char rc_buf[0];
390}; 392};
391 393
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 4cc31be9cd9d..521ad9bb47b7 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -56,6 +56,10 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
56 56
57 rc->rc_type = type; 57 rc->rc_type = type;
58 58
59 spin_lock(&ls->ls_recover_lock);
60 rc->rc_seq = ls->ls_recover_seq;
61 spin_unlock(&ls->ls_recover_lock);
62
59 *mh_ret = mh; 63 *mh_ret = mh;
60 *rc_ret = rc; 64 *rc_ret = rc;
61 return 0; 65 return 0;
@@ -159,6 +163,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
159 if (error) 163 if (error)
160 return; 164 return;
161 rc->rc_id = rc_in->rc_id; 165 rc->rc_id = rc_in->rc_id;
166 rc->rc_seq_reply = rc_in->rc_seq;
162 rc->rc_result = dlm_recover_status(ls); 167 rc->rc_result = dlm_recover_status(ls);
163 make_config(ls, (struct rcom_config *) rc->rc_buf); 168 make_config(ls, (struct rcom_config *) rc->rc_buf);
164 169
@@ -224,21 +229,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
224{ 229{
225 struct dlm_rcom *rc; 230 struct dlm_rcom *rc;
226 struct dlm_mhandle *mh; 231 struct dlm_mhandle *mh;
227 int error, inlen, outlen; 232 int error, inlen, outlen, nodeid;
228 int nodeid = rc_in->rc_header.h_nodeid;
229 uint32_t status = dlm_recover_status(ls);
230
231 /*
232 * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
233 * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes).
234 * It could only happen in rare cases where we get a late NAMES
235 * message from a previous instance of recovery.
236 */
237
238 if (!(status & DLM_RS_NODES)) {
239 log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid);
240 return;
241 }
242 233
243 nodeid = rc_in->rc_header.h_nodeid; 234 nodeid = rc_in->rc_header.h_nodeid;
244 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); 235 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
@@ -248,6 +239,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
248 if (error) 239 if (error)
249 return; 240 return;
250 rc->rc_id = rc_in->rc_id; 241 rc->rc_id = rc_in->rc_id;
242 rc->rc_seq_reply = rc_in->rc_seq;
251 243
252 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, 244 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
253 nodeid); 245 nodeid);
@@ -294,6 +286,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
294 ret_nodeid = error; 286 ret_nodeid = error;
295 rc->rc_result = ret_nodeid; 287 rc->rc_result = ret_nodeid;
296 rc->rc_id = rc_in->rc_id; 288 rc->rc_id = rc_in->rc_id;
289 rc->rc_seq_reply = rc_in->rc_seq;
297 290
298 send_rcom(ls, mh, rc); 291 send_rcom(ls, mh, rc);
299} 292}
@@ -375,20 +368,13 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
375 368
376 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); 369 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
377 rc->rc_id = rc_in->rc_id; 370 rc->rc_id = rc_in->rc_id;
371 rc->rc_seq_reply = rc_in->rc_seq;
378 372
379 send_rcom(ls, mh, rc); 373 send_rcom(ls, mh, rc);
380} 374}
381 375
382static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 376static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
383{ 377{
384 uint32_t status = dlm_recover_status(ls);
385
386 if (!(status & DLM_RS_DIR)) {
387 log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u",
388 rc_in->rc_header.h_nodeid);
389 return;
390 }
391
392 dlm_recover_process_copy(ls, rc_in); 378 dlm_recover_process_copy(ls, rc_in);
393} 379}
394 380
@@ -415,6 +401,7 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
415 401
416 rc->rc_type = DLM_RCOM_STATUS_REPLY; 402 rc->rc_type = DLM_RCOM_STATUS_REPLY;
417 rc->rc_id = rc_in->rc_id; 403 rc->rc_id = rc_in->rc_id;
404 rc->rc_seq_reply = rc_in->rc_seq;
418 rc->rc_result = -ESRCH; 405 rc->rc_result = -ESRCH;
419 406
420 rf = (struct rcom_config *) rc->rc_buf; 407 rf = (struct rcom_config *) rc->rc_buf;
@@ -426,6 +413,31 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
426 return 0; 413 return 0;
427} 414}
428 415
416static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
417{
418 uint64_t seq;
419 int rv = 0;
420
421 switch (rc->rc_type) {
422 case DLM_RCOM_STATUS_REPLY:
423 case DLM_RCOM_NAMES_REPLY:
424 case DLM_RCOM_LOOKUP_REPLY:
425 case DLM_RCOM_LOCK_REPLY:
426 spin_lock(&ls->ls_recover_lock);
427 seq = ls->ls_recover_seq;
428 spin_unlock(&ls->ls_recover_lock);
429 if (rc->rc_seq_reply != seq) {
430 log_error(ls, "ignoring old reply %x from %d "
431 "seq_reply %llx expect %llx",
432 rc->rc_type, rc->rc_header.h_nodeid,
433 (unsigned long long)rc->rc_seq_reply,
434 (unsigned long long)seq);
435 rv = 1;
436 }
437 }
438 return rv;
439}
440
429/* Called by dlm_recvd; corresponds to dlm_receive_message() but special 441/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
430 recovery-only comms are sent through here. */ 442 recovery-only comms are sent through here. */
431 443
@@ -454,6 +466,9 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
454 goto out; 466 goto out;
455 } 467 }
456 468
469 if (is_old_reply(ls, rc))
470 goto out;
471
457 if (nodeid != rc->rc_header.h_nodeid) { 472 if (nodeid != rc->rc_header.h_nodeid) {
458 log_error(ls, "bad rcom nodeid %d from %d", 473 log_error(ls, "bad rcom nodeid %d from %d",
459 rc->rc_header.h_nodeid, nodeid); 474 rc->rc_header.h_nodeid, nodeid);
diff --git a/fs/dlm/util.c b/fs/dlm/util.c
index 767197db9944..963889cf6740 100644
--- a/fs/dlm/util.c
+++ b/fs/dlm/util.c
@@ -134,6 +134,8 @@ void dlm_rcom_out(struct dlm_rcom *rc)
134 rc->rc_type = cpu_to_le32(rc->rc_type); 134 rc->rc_type = cpu_to_le32(rc->rc_type);
135 rc->rc_result = cpu_to_le32(rc->rc_result); 135 rc->rc_result = cpu_to_le32(rc->rc_result);
136 rc->rc_id = cpu_to_le64(rc->rc_id); 136 rc->rc_id = cpu_to_le64(rc->rc_id);
137 rc->rc_seq = cpu_to_le64(rc->rc_seq);
138 rc->rc_seq_reply = cpu_to_le64(rc->rc_seq_reply);
137 139
138 if (type == DLM_RCOM_LOCK) 140 if (type == DLM_RCOM_LOCK)
139 rcom_lock_out((struct rcom_lock *) rc->rc_buf); 141 rcom_lock_out((struct rcom_lock *) rc->rc_buf);
@@ -151,6 +153,8 @@ void dlm_rcom_in(struct dlm_rcom *rc)
151 rc->rc_type = le32_to_cpu(rc->rc_type); 153 rc->rc_type = le32_to_cpu(rc->rc_type);
152 rc->rc_result = le32_to_cpu(rc->rc_result); 154 rc->rc_result = le32_to_cpu(rc->rc_result);
153 rc->rc_id = le64_to_cpu(rc->rc_id); 155 rc->rc_id = le64_to_cpu(rc->rc_id);
156 rc->rc_seq = le64_to_cpu(rc->rc_seq);
157 rc->rc_seq_reply = le64_to_cpu(rc->rc_seq_reply);
154 158
155 if (rc->rc_type == DLM_RCOM_LOCK) 159 if (rc->rc_type == DLM_RCOM_LOCK)
156 rcom_lock_in((struct rcom_lock *) rc->rc_buf); 160 rcom_lock_in((struct rcom_lock *) rc->rc_buf);