aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/ast.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/ast.c')
-rw-r--r--fs/dlm/ast.c265
1 files changed, 117 insertions, 148 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index abc49f292454..90e5997262ea 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -14,17 +14,9 @@
14#include "dlm_internal.h" 14#include "dlm_internal.h"
15#include "lock.h" 15#include "lock.h"
16#include "user.h" 16#include "user.h"
17#include "ast.h"
18
19#define WAKE_ASTS 0
20
21static uint64_t ast_seq_count;
22static struct list_head ast_queue;
23static spinlock_t ast_queue_lock;
24static struct task_struct * astd_task;
25static unsigned long astd_wakeflags;
26static struct mutex astd_running;
27 17
18static uint64_t dlm_cb_seq;
19static spinlock_t dlm_cb_seq_spin;
28 20
29static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb) 21static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
30{ 22{
@@ -57,21 +49,13 @@ static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
57 } 49 }
58} 50}
59 51
60void dlm_del_ast(struct dlm_lkb *lkb)
61{
62 spin_lock(&ast_queue_lock);
63 if (!list_empty(&lkb->lkb_astqueue))
64 list_del_init(&lkb->lkb_astqueue);
65 spin_unlock(&ast_queue_lock);
66}
67
68int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, 52int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
69 int status, uint32_t sbflags, uint64_t seq) 53 int status, uint32_t sbflags, uint64_t seq)
70{ 54{
71 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 55 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
72 uint64_t prev_seq; 56 uint64_t prev_seq;
73 int prev_mode; 57 int prev_mode;
74 int i; 58 int i, rv;
75 59
76 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 60 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
77 if (lkb->lkb_callbacks[i].seq) 61 if (lkb->lkb_callbacks[i].seq)
@@ -100,7 +84,8 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
100 mode, 84 mode,
101 (unsigned long long)prev_seq, 85 (unsigned long long)prev_seq,
102 prev_mode); 86 prev_mode);
103 return 0; 87 rv = 0;
88 goto out;
104 } 89 }
105 } 90 }
106 91
@@ -109,6 +94,7 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
109 lkb->lkb_callbacks[i].mode = mode; 94 lkb->lkb_callbacks[i].mode = mode;
110 lkb->lkb_callbacks[i].sb_status = status; 95 lkb->lkb_callbacks[i].sb_status = status;
111 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF); 96 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
97 rv = 0;
112 break; 98 break;
113 } 99 }
114 100
@@ -117,21 +103,24 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
117 lkb->lkb_id, (unsigned long long)seq, 103 lkb->lkb_id, (unsigned long long)seq,
118 flags, mode, status, sbflags); 104 flags, mode, status, sbflags);
119 dlm_dump_lkb_callbacks(lkb); 105 dlm_dump_lkb_callbacks(lkb);
120 return -1; 106 rv = -1;
107 goto out;
121 } 108 }
122 109 out:
123 return 0; 110 return rv;
124} 111}
125 112
126int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, 113int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
127 struct dlm_callback *cb, int *resid) 114 struct dlm_callback *cb, int *resid)
128{ 115{
129 int i; 116 int i, rv;
130 117
131 *resid = 0; 118 *resid = 0;
132 119
133 if (!lkb->lkb_callbacks[0].seq) 120 if (!lkb->lkb_callbacks[0].seq) {
134 return -ENOENT; 121 rv = -ENOENT;
122 goto out;
123 }
135 124
136 /* oldest undelivered cb is callbacks[0] */ 125 /* oldest undelivered cb is callbacks[0] */
137 126
@@ -163,7 +152,8 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
163 cb->mode, 152 cb->mode,
164 (unsigned long long)lkb->lkb_last_cast.seq, 153 (unsigned long long)lkb->lkb_last_cast.seq,
165 lkb->lkb_last_cast.mode); 154 lkb->lkb_last_cast.mode);
166 return 0; 155 rv = 0;
156 goto out;
167 } 157 }
168 } 158 }
169 159
@@ -176,171 +166,150 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
176 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback)); 166 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
177 lkb->lkb_last_bast_time = ktime_get(); 167 lkb->lkb_last_bast_time = ktime_get();
178 } 168 }
179 169 rv = 0;
180 return 0; 170 out:
171 return rv;
181} 172}
182 173
183void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, 174void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
184 uint32_t sbflags) 175 uint32_t sbflags)
185{ 176{
186 uint64_t seq; 177 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
178 uint64_t new_seq, prev_seq;
187 int rv; 179 int rv;
188 180
189 spin_lock(&ast_queue_lock); 181 spin_lock(&dlm_cb_seq_spin);
190 182 new_seq = ++dlm_cb_seq;
191 seq = ++ast_seq_count; 183 spin_unlock(&dlm_cb_seq_spin);
192 184
193 if (lkb->lkb_flags & DLM_IFL_USER) { 185 if (lkb->lkb_flags & DLM_IFL_USER) {
194 spin_unlock(&ast_queue_lock); 186 dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
195 dlm_user_add_ast(lkb, flags, mode, status, sbflags, seq);
196 return; 187 return;
197 } 188 }
198 189
199 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq); 190 mutex_lock(&lkb->lkb_cb_mutex);
200 if (rv < 0) { 191 prev_seq = lkb->lkb_callbacks[0].seq;
201 spin_unlock(&ast_queue_lock);
202 return;
203 }
204 192
205 if (list_empty(&lkb->lkb_astqueue)) { 193 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
194 if (rv < 0)
195 goto out;
196
197 if (!prev_seq) {
206 kref_get(&lkb->lkb_ref); 198 kref_get(&lkb->lkb_ref);
207 list_add_tail(&lkb->lkb_astqueue, &ast_queue);
208 }
209 spin_unlock(&ast_queue_lock);
210 199
211 set_bit(WAKE_ASTS, &astd_wakeflags); 200 if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
212 wake_up_process(astd_task); 201 mutex_lock(&ls->ls_cb_mutex);
202 list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
203 mutex_unlock(&ls->ls_cb_mutex);
204 } else {
205 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
206 }
207 }
208 out:
209 mutex_unlock(&lkb->lkb_cb_mutex);
213} 210}
214 211
215static void process_asts(void) 212void dlm_callback_work(struct work_struct *work)
216{ 213{
217 struct dlm_ls *ls = NULL; 214 struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
218 struct dlm_rsb *r = NULL; 215 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
219 struct dlm_lkb *lkb;
220 void (*castfn) (void *astparam); 216 void (*castfn) (void *astparam);
221 void (*bastfn) (void *astparam, int mode); 217 void (*bastfn) (void *astparam, int mode);
222 struct dlm_callback callbacks[DLM_CALLBACKS_SIZE]; 218 struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
223 int i, rv, resid; 219 int i, rv, resid;
224 220
225repeat: 221 memset(&callbacks, 0, sizeof(callbacks));
226 spin_lock(&ast_queue_lock);
227 list_for_each_entry(lkb, &ast_queue, lkb_astqueue) {
228 r = lkb->lkb_resource;
229 ls = r->res_ls;
230 222
231 if (dlm_locking_stopped(ls)) 223 mutex_lock(&lkb->lkb_cb_mutex);
232 continue; 224 if (!lkb->lkb_callbacks[0].seq) {
233 225 /* no callback work exists, shouldn't happen */
234 /* we remove from astqueue list and remove everything in 226 log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
235 lkb_callbacks before releasing the spinlock so empty 227 dlm_print_lkb(lkb);
236 lkb_astqueue is always consistent with empty lkb_callbacks */ 228 dlm_dump_lkb_callbacks(lkb);
237 229 }
238 list_del_init(&lkb->lkb_astqueue);
239
240 castfn = lkb->lkb_astfn;
241 bastfn = lkb->lkb_bastfn;
242 230
243 memset(&callbacks, 0, sizeof(callbacks)); 231 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
232 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
233 if (rv < 0)
234 break;
235 }
244 236
245 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 237 if (resid) {
246 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid); 238 /* cbs remain, loop should have removed all, shouldn't happen */
247 if (rv < 0) 239 log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id,
248 break; 240 resid);
249 } 241 dlm_print_lkb(lkb);
250 spin_unlock(&ast_queue_lock); 242 dlm_dump_lkb_callbacks(lkb);
243 }
244 mutex_unlock(&lkb->lkb_cb_mutex);
251 245
252 if (resid) { 246 castfn = lkb->lkb_astfn;
253 /* shouldn't happen, for loop should have removed all */ 247 bastfn = lkb->lkb_bastfn;
254 log_error(ls, "callback resid %d lkb %x",
255 resid, lkb->lkb_id);
256 }
257 248
258 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 249 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
259 if (!callbacks[i].seq) 250 if (!callbacks[i].seq)
260 break; 251 break;
261 if (callbacks[i].flags & DLM_CB_SKIP) { 252 if (callbacks[i].flags & DLM_CB_SKIP) {
262 continue; 253 continue;
263 } else if (callbacks[i].flags & DLM_CB_BAST) { 254 } else if (callbacks[i].flags & DLM_CB_BAST) {
264 bastfn(lkb->lkb_astparam, callbacks[i].mode); 255 bastfn(lkb->lkb_astparam, callbacks[i].mode);
265 } else if (callbacks[i].flags & DLM_CB_CAST) { 256 } else if (callbacks[i].flags & DLM_CB_CAST) {
266 lkb->lkb_lksb->sb_status = callbacks[i].sb_status; 257 lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
267 lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags; 258 lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
268 castfn(lkb->lkb_astparam); 259 castfn(lkb->lkb_astparam);
269 }
270 } 260 }
271
272 /* removes ref for ast_queue, may cause lkb to be freed */
273 dlm_put_lkb(lkb);
274
275 cond_resched();
276 goto repeat;
277 } 261 }
278 spin_unlock(&ast_queue_lock);
279}
280
281static inline int no_asts(void)
282{
283 int ret;
284 262
285 spin_lock(&ast_queue_lock); 263 /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
286 ret = list_empty(&ast_queue); 264 dlm_put_lkb(lkb);
287 spin_unlock(&ast_queue_lock);
288 return ret;
289} 265}
290 266
291static int dlm_astd(void *data) 267int dlm_callback_start(struct dlm_ls *ls)
292{ 268{
293 while (!kthread_should_stop()) { 269 ls->ls_callback_wq = alloc_workqueue("dlm_callback",
294 set_current_state(TASK_INTERRUPTIBLE); 270 WQ_UNBOUND |
295 if (!test_bit(WAKE_ASTS, &astd_wakeflags)) 271 WQ_MEM_RECLAIM |
296 schedule(); 272 WQ_NON_REENTRANT,
297 set_current_state(TASK_RUNNING); 273 0);
298 274 if (!ls->ls_callback_wq) {
299 mutex_lock(&astd_running); 275 log_print("can't start dlm_callback workqueue");
300 if (test_and_clear_bit(WAKE_ASTS, &astd_wakeflags)) 276 return -ENOMEM;
301 process_asts();
302 mutex_unlock(&astd_running);
303 } 277 }
304 return 0; 278 return 0;
305} 279}
306 280
307void dlm_astd_wake(void) 281void dlm_callback_stop(struct dlm_ls *ls)
308{ 282{
309 if (!no_asts()) { 283 if (ls->ls_callback_wq)
310 set_bit(WAKE_ASTS, &astd_wakeflags); 284 destroy_workqueue(ls->ls_callback_wq);
311 wake_up_process(astd_task);
312 }
313} 285}
314 286
315int dlm_astd_start(void) 287void dlm_callback_suspend(struct dlm_ls *ls)
316{ 288{
317 struct task_struct *p; 289 set_bit(LSFL_CB_DELAY, &ls->ls_flags);
318 int error = 0;
319
320 INIT_LIST_HEAD(&ast_queue);
321 spin_lock_init(&ast_queue_lock);
322 mutex_init(&astd_running);
323
324 p = kthread_run(dlm_astd, NULL, "dlm_astd");
325 if (IS_ERR(p))
326 error = PTR_ERR(p);
327 else
328 astd_task = p;
329 return error;
330}
331 290
332void dlm_astd_stop(void) 291 if (ls->ls_callback_wq)
333{ 292 flush_workqueue(ls->ls_callback_wq);
334 kthread_stop(astd_task);
335} 293}
336 294
337void dlm_astd_suspend(void) 295void dlm_callback_resume(struct dlm_ls *ls)
338{ 296{
339 mutex_lock(&astd_running); 297 struct dlm_lkb *lkb, *safe;
340} 298 int count = 0;
341 299
342void dlm_astd_resume(void) 300 clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
343{ 301
344 mutex_unlock(&astd_running); 302 if (!ls->ls_callback_wq)
303 return;
304
305 mutex_lock(&ls->ls_cb_mutex);
306 list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
307 list_del_init(&lkb->lkb_cb_list);
308 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
309 count++;
310 }
311 mutex_unlock(&ls->ls_cb_mutex);
312
313 log_debug(ls, "dlm_callback_resume %d", count);
345} 314}
346 315