aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/rcom.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/rcom.c')
-rw-r--r--fs/dlm/rcom.c85
1 files changed, 54 insertions, 31 deletions
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 4cc31be9cd9d..6bfbd6153809 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -56,6 +56,10 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
56 56
57 rc->rc_type = type; 57 rc->rc_type = type;
58 58
59 spin_lock(&ls->ls_recover_lock);
60 rc->rc_seq = ls->ls_recover_seq;
61 spin_unlock(&ls->ls_recover_lock);
62
59 *mh_ret = mh; 63 *mh_ret = mh;
60 *rc_ret = rc; 64 *rc_ret = rc;
61 return 0; 65 return 0;
@@ -78,8 +82,17 @@ static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
78 rf->rf_lsflags = ls->ls_exflags; 82 rf->rf_lsflags = ls->ls_exflags;
79} 83}
80 84
81static int check_config(struct dlm_ls *ls, struct rcom_config *rf, int nodeid) 85static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
82{ 86{
87 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
88
89 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
90 log_error(ls, "version mismatch: %x nodeid %d: %x",
91 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
92 rc->rc_header.h_version);
93 return -EINVAL;
94 }
95
83 if (rf->rf_lvblen != ls->ls_lvblen || 96 if (rf->rf_lvblen != ls->ls_lvblen ||
84 rf->rf_lsflags != ls->ls_exflags) { 97 rf->rf_lsflags != ls->ls_exflags) {
85 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x", 98 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
@@ -125,7 +138,7 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
125 goto out; 138 goto out;
126 139
127 allow_sync_reply(ls, &rc->rc_id); 140 allow_sync_reply(ls, &rc->rc_id);
128 memset(ls->ls_recover_buf, 0, dlm_config.buffer_size); 141 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
129 142
130 send_rcom(ls, mh, rc); 143 send_rcom(ls, mh, rc);
131 144
@@ -141,8 +154,7 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
141 log_debug(ls, "remote node %d not ready", nodeid); 154 log_debug(ls, "remote node %d not ready", nodeid);
142 rc->rc_result = 0; 155 rc->rc_result = 0;
143 } else 156 } else
144 error = check_config(ls, (struct rcom_config *) rc->rc_buf, 157 error = check_config(ls, rc, nodeid);
145 nodeid);
146 /* the caller looks at rc_result for the remote recovery status */ 158 /* the caller looks at rc_result for the remote recovery status */
147 out: 159 out:
148 return error; 160 return error;
@@ -159,6 +171,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
159 if (error) 171 if (error)
160 return; 172 return;
161 rc->rc_id = rc_in->rc_id; 173 rc->rc_id = rc_in->rc_id;
174 rc->rc_seq_reply = rc_in->rc_seq;
162 rc->rc_result = dlm_recover_status(ls); 175 rc->rc_result = dlm_recover_status(ls);
163 make_config(ls, (struct rcom_config *) rc->rc_buf); 176 make_config(ls, (struct rcom_config *) rc->rc_buf);
164 177
@@ -200,7 +213,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
200 if (nodeid == dlm_our_nodeid()) { 213 if (nodeid == dlm_our_nodeid()) {
201 dlm_copy_master_names(ls, last_name, last_len, 214 dlm_copy_master_names(ls, last_name, last_len,
202 ls->ls_recover_buf + len, 215 ls->ls_recover_buf + len,
203 dlm_config.buffer_size - len, nodeid); 216 dlm_config.ci_buffer_size - len, nodeid);
204 goto out; 217 goto out;
205 } 218 }
206 219
@@ -210,7 +223,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
210 memcpy(rc->rc_buf, last_name, last_len); 223 memcpy(rc->rc_buf, last_name, last_len);
211 224
212 allow_sync_reply(ls, &rc->rc_id); 225 allow_sync_reply(ls, &rc->rc_id);
213 memset(ls->ls_recover_buf, 0, dlm_config.buffer_size); 226 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
214 227
215 send_rcom(ls, mh, rc); 228 send_rcom(ls, mh, rc);
216 229
@@ -224,30 +237,17 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
224{ 237{
225 struct dlm_rcom *rc; 238 struct dlm_rcom *rc;
226 struct dlm_mhandle *mh; 239 struct dlm_mhandle *mh;
227 int error, inlen, outlen; 240 int error, inlen, outlen, nodeid;
228 int nodeid = rc_in->rc_header.h_nodeid;
229 uint32_t status = dlm_recover_status(ls);
230
231 /*
232 * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
233 * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes).
234 * It could only happen in rare cases where we get a late NAMES
235 * message from a previous instance of recovery.
236 */
237
238 if (!(status & DLM_RS_NODES)) {
239 log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid);
240 return;
241 }
242 241
243 nodeid = rc_in->rc_header.h_nodeid; 242 nodeid = rc_in->rc_header.h_nodeid;
244 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); 243 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
245 outlen = dlm_config.buffer_size - sizeof(struct dlm_rcom); 244 outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
246 245
247 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh); 246 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
248 if (error) 247 if (error)
249 return; 248 return;
250 rc->rc_id = rc_in->rc_id; 249 rc->rc_id = rc_in->rc_id;
250 rc->rc_seq_reply = rc_in->rc_seq;
251 251
252 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, 252 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
253 nodeid); 253 nodeid);
@@ -294,6 +294,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
294 ret_nodeid = error; 294 ret_nodeid = error;
295 rc->rc_result = ret_nodeid; 295 rc->rc_result = ret_nodeid;
296 rc->rc_id = rc_in->rc_id; 296 rc->rc_id = rc_in->rc_id;
297 rc->rc_seq_reply = rc_in->rc_seq;
297 298
298 send_rcom(ls, mh, rc); 299 send_rcom(ls, mh, rc);
299} 300}
@@ -375,20 +376,13 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
375 376
376 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); 377 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
377 rc->rc_id = rc_in->rc_id; 378 rc->rc_id = rc_in->rc_id;
379 rc->rc_seq_reply = rc_in->rc_seq;
378 380
379 send_rcom(ls, mh, rc); 381 send_rcom(ls, mh, rc);
380} 382}
381 383
382static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 384static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
383{ 385{
384 uint32_t status = dlm_recover_status(ls);
385
386 if (!(status & DLM_RS_DIR)) {
387 log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u",
388 rc_in->rc_header.h_nodeid);
389 return;
390 }
391
392 dlm_recover_process_copy(ls, rc_in); 386 dlm_recover_process_copy(ls, rc_in);
393} 387}
394 388
@@ -415,6 +409,7 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
415 409
416 rc->rc_type = DLM_RCOM_STATUS_REPLY; 410 rc->rc_type = DLM_RCOM_STATUS_REPLY;
417 rc->rc_id = rc_in->rc_id; 411 rc->rc_id = rc_in->rc_id;
412 rc->rc_seq_reply = rc_in->rc_seq;
418 rc->rc_result = -ESRCH; 413 rc->rc_result = -ESRCH;
419 414
420 rf = (struct rcom_config *) rc->rc_buf; 415 rf = (struct rcom_config *) rc->rc_buf;
@@ -426,6 +421,31 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
426 return 0; 421 return 0;
427} 422}
428 423
424static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
425{
426 uint64_t seq;
427 int rv = 0;
428
429 switch (rc->rc_type) {
430 case DLM_RCOM_STATUS_REPLY:
431 case DLM_RCOM_NAMES_REPLY:
432 case DLM_RCOM_LOOKUP_REPLY:
433 case DLM_RCOM_LOCK_REPLY:
434 spin_lock(&ls->ls_recover_lock);
435 seq = ls->ls_recover_seq;
436 spin_unlock(&ls->ls_recover_lock);
437 if (rc->rc_seq_reply != seq) {
438 log_debug(ls, "ignoring old reply %x from %d "
439 "seq_reply %llx expect %llx",
440 rc->rc_type, rc->rc_header.h_nodeid,
441 (unsigned long long)rc->rc_seq_reply,
442 (unsigned long long)seq);
443 rv = 1;
444 }
445 }
446 return rv;
447}
448
429/* Called by dlm_recvd; corresponds to dlm_receive_message() but special 449/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
430 recovery-only comms are sent through here. */ 450 recovery-only comms are sent through here. */
431 451
@@ -449,11 +469,14 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
449 } 469 }
450 470
451 if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) { 471 if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
452 log_error(ls, "ignoring recovery message %x from %d", 472 log_debug(ls, "ignoring recovery message %x from %d",
453 rc->rc_type, nodeid); 473 rc->rc_type, nodeid);
454 goto out; 474 goto out;
455 } 475 }
456 476
477 if (is_old_reply(ls, rc))
478 goto out;
479
457 if (nodeid != rc->rc_header.h_nodeid) { 480 if (nodeid != rc->rc_header.h_nodeid) {
458 log_error(ls, "bad rcom nodeid %d from %d", 481 log_error(ls, "bad rcom nodeid %d from %d",
459 rc->rc_header.h_nodeid, nodeid); 482 rc->rc_header.h_nodeid, nodeid);