aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/rcom.c
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2011-10-20 14:26:28 -0400
committerDavid Teigland <teigland@redhat.com>2012-01-04 09:55:57 -0500
commit757a42719635495779462514458bbfbf12a37dac (patch)
treebe6ea155fc54c397cb57eb193b81824b37989e56 /fs/dlm/rcom.c
parentf95a34c66554235b70a681fcd9feebc195f7ec0e (diff)
dlm: add node slots and generation
Slot numbers are assigned to nodes when they join the lockspace. The slot number chosen is the minimum unused value starting at 1. Once a node is assigned a slot, that slot number will not change while the node remains a lockspace member. If the node leaves and rejoins it can be assigned a new slot number. A new generation number is also added to a lockspace. It is set and incremented during each recovery along with the slot collection/assignment. The slot numbers will be passed to gfs2 which will use them as journal id's. Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm/rcom.c')
-rw-r--r--fs/dlm/rcom.c99
1 files changed, 82 insertions, 17 deletions
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index f10a50f24e8f..ac5c616c9696 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,6 +23,7 @@
23#include "memory.h" 23#include "memory.h"
24#include "lock.h" 24#include "lock.h"
25#include "util.h" 25#include "util.h"
26#include "member.h"
26 27
27 28
28static int rcom_response(struct dlm_ls *ls) 29static int rcom_response(struct dlm_ls *ls)
@@ -72,20 +73,30 @@ static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
72 dlm_lowcomms_commit_buffer(mh); 73 dlm_lowcomms_commit_buffer(mh);
73} 74}
74 75
76static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
77 uint32_t flags)
78{
79 rs->rs_flags = cpu_to_le32(flags);
80}
81
75/* When replying to a status request, a node also sends back its 82/* When replying to a status request, a node also sends back its
76 configuration values. The requesting node then checks that the remote 83 configuration values. The requesting node then checks that the remote
77 node is configured the same way as itself. */ 84 node is configured the same way as itself. */
78 85
79static void make_config(struct dlm_ls *ls, struct rcom_config *rf) 86static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
87 uint32_t num_slots)
80{ 88{
81 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen); 89 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
82 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags); 90 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
91
92 rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
93 rf->rf_num_slots = cpu_to_le16(num_slots);
94 rf->rf_generation = cpu_to_le32(ls->ls_generation);
83} 95}
84 96
85static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 97static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
86{ 98{
87 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf; 99 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
88 size_t conf_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
89 100
90 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) { 101 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
91 log_error(ls, "version mismatch: %x nodeid %d: %x", 102 log_error(ls, "version mismatch: %x nodeid %d: %x",
@@ -94,12 +105,6 @@ static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
94 return -EPROTO; 105 return -EPROTO;
95 } 106 }
96 107
97 if (rc->rc_header.h_length < conf_size) {
98 log_error(ls, "config too short: %d nodeid %d",
99 rc->rc_header.h_length, nodeid);
100 return -EPROTO;
101 }
102
103 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen || 108 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
104 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) { 109 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
105 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x", 110 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
@@ -127,7 +132,18 @@ static void disallow_sync_reply(struct dlm_ls *ls)
127 spin_unlock(&ls->ls_rcom_spin); 132 spin_unlock(&ls->ls_rcom_spin);
128} 133}
129 134
130int dlm_rcom_status(struct dlm_ls *ls, int nodeid) 135/*
136 * low nodeid gathers one slot value at a time from each node.
137 * it sets need_slots=0, and saves rf_our_slot returned from each
138 * rcom_config.
139 *
140 * other nodes gather all slot values at once from the low nodeid.
141 * they set need_slots=1, and ignore the rf_our_slot returned from each
142 * rcom_config. they use the rf_num_slots returned from the low
143 * node's rcom_config.
144 */
145
146int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
131{ 147{
132 struct dlm_rcom *rc; 148 struct dlm_rcom *rc;
133 struct dlm_mhandle *mh; 149 struct dlm_mhandle *mh;
@@ -141,10 +157,13 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
141 goto out; 157 goto out;
142 } 158 }
143 159
144 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh); 160 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
161 sizeof(struct rcom_status), &rc, &mh);
145 if (error) 162 if (error)
146 goto out; 163 goto out;
147 164
165 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
166
148 allow_sync_reply(ls, &rc->rc_id); 167 allow_sync_reply(ls, &rc->rc_id);
149 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size); 168 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
150 169
@@ -161,8 +180,11 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
161 /* we pretend the remote lockspace exists with 0 status */ 180 /* we pretend the remote lockspace exists with 0 status */
162 log_debug(ls, "remote node %d not ready", nodeid); 181 log_debug(ls, "remote node %d not ready", nodeid);
163 rc->rc_result = 0; 182 rc->rc_result = 0;
164 } else 183 error = 0;
165 error = check_config(ls, rc, nodeid); 184 } else {
185 error = check_rcom_config(ls, rc, nodeid);
186 }
187
166 /* the caller looks at rc_result for the remote recovery status */ 188 /* the caller looks at rc_result for the remote recovery status */
167 out: 189 out:
168 return error; 190 return error;
@@ -172,17 +194,60 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
172{ 194{
173 struct dlm_rcom *rc; 195 struct dlm_rcom *rc;
174 struct dlm_mhandle *mh; 196 struct dlm_mhandle *mh;
175 int error, nodeid = rc_in->rc_header.h_nodeid; 197 struct rcom_status *rs;
198 uint32_t status;
199 int nodeid = rc_in->rc_header.h_nodeid;
200 int len = sizeof(struct rcom_config);
201 int num_slots = 0;
202 int error;
203
204 if (!dlm_slots_version(&rc_in->rc_header)) {
205 status = dlm_recover_status(ls);
206 goto do_create;
207 }
208
209 rs = (struct rcom_status *)rc_in->rc_buf;
176 210
211 if (!(rs->rs_flags & DLM_RSF_NEED_SLOTS)) {
212 status = dlm_recover_status(ls);
213 goto do_create;
214 }
215
216 spin_lock(&ls->ls_recover_lock);
217 status = ls->ls_recover_status;
218 num_slots = ls->ls_num_slots;
219 spin_unlock(&ls->ls_recover_lock);
220 len += num_slots * sizeof(struct rcom_slot);
221
222 do_create:
177 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY, 223 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
178 sizeof(struct rcom_config), &rc, &mh); 224 len, &rc, &mh);
179 if (error) 225 if (error)
180 return; 226 return;
227
181 rc->rc_id = rc_in->rc_id; 228 rc->rc_id = rc_in->rc_id;
182 rc->rc_seq_reply = rc_in->rc_seq; 229 rc->rc_seq_reply = rc_in->rc_seq;
183 rc->rc_result = dlm_recover_status(ls); 230 rc->rc_result = status;
184 make_config(ls, (struct rcom_config *) rc->rc_buf); 231
232 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
233
234 if (!num_slots)
235 goto do_send;
236
237 spin_lock(&ls->ls_recover_lock);
238 if (ls->ls_num_slots != num_slots) {
239 spin_unlock(&ls->ls_recover_lock);
240 log_debug(ls, "receive_rcom_status num_slots %d to %d",
241 num_slots, ls->ls_num_slots);
242 rc->rc_result = 0;
243 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
244 goto do_send;
245 }
246
247 dlm_slots_copy_out(ls, rc);
248 spin_unlock(&ls->ls_recover_lock);
185 249
250 do_send:
186 send_rcom(ls, mh, rc); 251 send_rcom(ls, mh, rc);
187} 252}
188 253