aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2006-11-27 14:19:28 -0500
committerSteven Whitehouse <swhiteho@redhat.com>2006-11-30 10:37:14 -0500
commit98f176fb32f33795b6d0f83856008b932123ab38 (patch)
tree0565bd70a23546469a985b93c34509f7938fbd5b
parent1babdb453138f17b8ed3d1d5711089c4e2fa5ace (diff)
[DLM] don't accept replies to old recovery messages
We often abort a recovery after sending a status request to a remote node. We want to ignore any potential status reply we get from the remote node. If we get one of these unwanted replies, we've often moved on to the next recovery message and incremented the message sequence counter, so the reply will be ignored due to the seq number. In some cases, we've not moved on to the next message so the seq number of the reply we want to ignore is still correct, causing the reply to be accepted. The next recovery message will then mistake this old reply as a new one. To fix this, we add the flag RCOM_WAIT to indicate when we can accept a new reply. We clear this flag if we abort recovery while waiting for a reply. Before the flag is set again (to allow new replies) we know that any old replies will be rejected due to their sequence number. We also initialize the recovery-message sequence number to a random value when a lockspace is first created. This makes it clear when messages are being rejected from an old instance of a lockspace that has since been recreated. Signed-off-by: David Teigland <teigland@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
-rw-r--r--fs/dlm/dlm_internal.h4
-rw-r--r--fs/dlm/lockspace.c2
-rw-r--r--fs/dlm/rcom.c44
3 files changed, 39 insertions, 11 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 1e5cd67e1b7a..1ee8195e6fc0 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -471,6 +471,7 @@ struct dlm_ls {
471 char *ls_recover_buf; 471 char *ls_recover_buf;
472 int ls_recover_nodeid; /* for debugging */ 472 int ls_recover_nodeid; /* for debugging */
473 uint64_t ls_rcom_seq; 473 uint64_t ls_rcom_seq;
474 spinlock_t ls_rcom_spin;
474 struct list_head ls_recover_list; 475 struct list_head ls_recover_list;
475 spinlock_t ls_recover_list_lock; 476 spinlock_t ls_recover_list_lock;
476 int ls_recover_list_count; 477 int ls_recover_list_count;
@@ -488,7 +489,8 @@ struct dlm_ls {
488#define LSFL_RUNNING 1 489#define LSFL_RUNNING 1
489#define LSFL_RECOVERY_STOP 2 490#define LSFL_RECOVERY_STOP 2
490#define LSFL_RCOM_READY 3 491#define LSFL_RCOM_READY 3
491#define LSFL_UEVENT_WAIT 4 492#define LSFL_RCOM_WAIT 4
493#define LSFL_UEVENT_WAIT 5
492 494
493/* much of this is just saving user space pointers associated with the 495/* much of this is just saving user space pointers associated with the
494 lock that we pass back to the user lib with an ast */ 496 lock that we pass back to the user lib with an ast */
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 791388b25c35..59012b089e8d 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -479,6 +479,8 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
479 ls->ls_recoverd_task = NULL; 479 ls->ls_recoverd_task = NULL;
480 mutex_init(&ls->ls_recoverd_active); 480 mutex_init(&ls->ls_recoverd_active);
481 spin_lock_init(&ls->ls_recover_lock); 481 spin_lock_init(&ls->ls_recover_lock);
482 spin_lock_init(&ls->ls_rcom_spin);
483 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
482 ls->ls_recover_status = 0; 484 ls->ls_recover_status = 0;
483 ls->ls_recover_seq = 0; 485 ls->ls_recover_seq = 0;
484 ls->ls_recover_args = NULL; 486 ls->ls_recover_args = NULL;
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 6ac195cec027..c42f2db6c4b4 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -90,13 +90,28 @@ static int check_config(struct dlm_ls *ls, struct rcom_config *rf, int nodeid)
90 return 0; 90 return 0;
91} 91}
92 92
93static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
94{
95 spin_lock(&ls->ls_rcom_spin);
96 *new_seq = ++ls->ls_rcom_seq;
97 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
98 spin_unlock(&ls->ls_rcom_spin);
99}
100
101static void disallow_sync_reply(struct dlm_ls *ls)
102{
103 spin_lock(&ls->ls_rcom_spin);
104 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
105 clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
106 spin_unlock(&ls->ls_rcom_spin);
107}
108
93int dlm_rcom_status(struct dlm_ls *ls, int nodeid) 109int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
94{ 110{
95 struct dlm_rcom *rc; 111 struct dlm_rcom *rc;
96 struct dlm_mhandle *mh; 112 struct dlm_mhandle *mh;
97 int error = 0; 113 int error = 0;
98 114
99 memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
100 ls->ls_recover_nodeid = nodeid; 115 ls->ls_recover_nodeid = nodeid;
101 116
102 if (nodeid == dlm_our_nodeid()) { 117 if (nodeid == dlm_our_nodeid()) {
@@ -108,12 +123,14 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
108 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh); 123 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
109 if (error) 124 if (error)
110 goto out; 125 goto out;
111 rc->rc_id = ++ls->ls_rcom_seq; 126
127 allow_sync_reply(ls, &rc->rc_id);
128 memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
112 129
113 send_rcom(ls, mh, rc); 130 send_rcom(ls, mh, rc);
114 131
115 error = dlm_wait_function(ls, &rcom_response); 132 error = dlm_wait_function(ls, &rcom_response);
116 clear_bit(LSFL_RCOM_READY, &ls->ls_flags); 133 disallow_sync_reply(ls);
117 if (error) 134 if (error)
118 goto out; 135 goto out;
119 136
@@ -150,14 +167,20 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
150 167
151static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 168static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
152{ 169{
153 if (rc_in->rc_id != ls->ls_rcom_seq) { 170 spin_lock(&ls->ls_rcom_spin);
154 log_debug(ls, "reject old reply %d got %llx wanted %llx", 171 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
155 rc_in->rc_type, rc_in->rc_id, ls->ls_rcom_seq); 172 rc_in->rc_id != ls->ls_rcom_seq) {
156 return; 173 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
174 rc_in->rc_type, rc_in->rc_header.h_nodeid,
175 rc_in->rc_id, ls->ls_rcom_seq);
176 goto out;
157 } 177 }
158 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length); 178 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
159 set_bit(LSFL_RCOM_READY, &ls->ls_flags); 179 set_bit(LSFL_RCOM_READY, &ls->ls_flags);
180 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
160 wake_up(&ls->ls_wait_general); 181 wake_up(&ls->ls_wait_general);
182 out:
183 spin_unlock(&ls->ls_rcom_spin);
161} 184}
162 185
163static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 186static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
@@ -171,7 +194,6 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
171 struct dlm_mhandle *mh; 194 struct dlm_mhandle *mh;
172 int error = 0, len = sizeof(struct dlm_rcom); 195 int error = 0, len = sizeof(struct dlm_rcom);
173 196
174 memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
175 ls->ls_recover_nodeid = nodeid; 197 ls->ls_recover_nodeid = nodeid;
176 198
177 if (nodeid == dlm_our_nodeid()) { 199 if (nodeid == dlm_our_nodeid()) {
@@ -185,12 +207,14 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
185 if (error) 207 if (error)
186 goto out; 208 goto out;
187 memcpy(rc->rc_buf, last_name, last_len); 209 memcpy(rc->rc_buf, last_name, last_len);
188 rc->rc_id = ++ls->ls_rcom_seq; 210
211 allow_sync_reply(ls, &rc->rc_id);
212 memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
189 213
190 send_rcom(ls, mh, rc); 214 send_rcom(ls, mh, rc);
191 215
192 error = dlm_wait_function(ls, &rcom_response); 216 error = dlm_wait_function(ls, &rcom_response);
193 clear_bit(LSFL_RCOM_READY, &ls->ls_flags); 217 disallow_sync_reply(ls);
194 out: 218 out:
195 return error; 219 return error;
196} 220}