aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2011-04-05 14:16:24 -0400
committerDavid Teigland <teigland@redhat.com>2011-07-15 13:30:43 -0400
commit23e8e1aaacb10d9f05e44a93e10ea4ee5b3838a5 (patch)
tree7c94bc4eeb9dfd85a26869003c56dc45fc6fd697 /fs
parent883ba74f43092823d0ed4c6b21f0171e9b334607 (diff)
dlm: use workqueue for callbacks
Instead of creating our own kthread (dlm_astd) to deliver callbacks for all lockspaces, use a per-lockspace workqueue to deliver the callbacks. This eliminates complications and slowdowns from many lockspaces sharing the same thread. Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/dlm/ast.c262
-rw-r--r--fs/dlm/ast.h15
-rw-r--r--fs/dlm/dlm_internal.h9
-rw-r--r--fs/dlm/lock.c24
-rw-r--r--fs/dlm/lockspace.c43
-rw-r--r--fs/dlm/recoverd.c12
-rw-r--r--fs/dlm/user.c12
7 files changed, 172 insertions, 205 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index abc49f292454..4f29add0e7d1 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -14,17 +14,9 @@
14#include "dlm_internal.h" 14#include "dlm_internal.h"
15#include "lock.h" 15#include "lock.h"
16#include "user.h" 16#include "user.h"
17#include "ast.h"
18
19#define WAKE_ASTS 0
20
21static uint64_t ast_seq_count;
22static struct list_head ast_queue;
23static spinlock_t ast_queue_lock;
24static struct task_struct * astd_task;
25static unsigned long astd_wakeflags;
26static struct mutex astd_running;
27 17
18static uint64_t dlm_cb_seq;
19static spinlock_t dlm_cb_seq_spin;
28 20
29static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb) 21static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
30{ 22{
@@ -57,21 +49,13 @@ static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
57 } 49 }
58} 50}
59 51
60void dlm_del_ast(struct dlm_lkb *lkb)
61{
62 spin_lock(&ast_queue_lock);
63 if (!list_empty(&lkb->lkb_astqueue))
64 list_del_init(&lkb->lkb_astqueue);
65 spin_unlock(&ast_queue_lock);
66}
67
68int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, 52int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
69 int status, uint32_t sbflags, uint64_t seq) 53 int status, uint32_t sbflags, uint64_t seq)
70{ 54{
71 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 55 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
72 uint64_t prev_seq; 56 uint64_t prev_seq;
73 int prev_mode; 57 int prev_mode;
74 int i; 58 int i, rv;
75 59
76 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 60 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
77 if (lkb->lkb_callbacks[i].seq) 61 if (lkb->lkb_callbacks[i].seq)
@@ -100,7 +84,8 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
100 mode, 84 mode,
101 (unsigned long long)prev_seq, 85 (unsigned long long)prev_seq,
102 prev_mode); 86 prev_mode);
103 return 0; 87 rv = 0;
88 goto out;
104 } 89 }
105 } 90 }
106 91
@@ -109,6 +94,7 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
109 lkb->lkb_callbacks[i].mode = mode; 94 lkb->lkb_callbacks[i].mode = mode;
110 lkb->lkb_callbacks[i].sb_status = status; 95 lkb->lkb_callbacks[i].sb_status = status;
111 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF); 96 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
97 rv = 0;
112 break; 98 break;
113 } 99 }
114 100
@@ -117,21 +103,24 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
117 lkb->lkb_id, (unsigned long long)seq, 103 lkb->lkb_id, (unsigned long long)seq,
118 flags, mode, status, sbflags); 104 flags, mode, status, sbflags);
119 dlm_dump_lkb_callbacks(lkb); 105 dlm_dump_lkb_callbacks(lkb);
120 return -1; 106 rv = -1;
107 goto out;
121 } 108 }
122 109 out:
123 return 0; 110 return rv;
124} 111}
125 112
126int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, 113int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
127 struct dlm_callback *cb, int *resid) 114 struct dlm_callback *cb, int *resid)
128{ 115{
129 int i; 116 int i, rv;
130 117
131 *resid = 0; 118 *resid = 0;
132 119
133 if (!lkb->lkb_callbacks[0].seq) 120 if (!lkb->lkb_callbacks[0].seq) {
134 return -ENOENT; 121 rv = -ENOENT;
122 goto out;
123 }
135 124
136 /* oldest undelivered cb is callbacks[0] */ 125 /* oldest undelivered cb is callbacks[0] */
137 126
@@ -163,7 +152,8 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
163 cb->mode, 152 cb->mode,
164 (unsigned long long)lkb->lkb_last_cast.seq, 153 (unsigned long long)lkb->lkb_last_cast.seq,
165 lkb->lkb_last_cast.mode); 154 lkb->lkb_last_cast.mode);
166 return 0; 155 rv = 0;
156 goto out;
167 } 157 }
168 } 158 }
169 159
@@ -176,171 +166,147 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
176 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback)); 166 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
177 lkb->lkb_last_bast_time = ktime_get(); 167 lkb->lkb_last_bast_time = ktime_get();
178 } 168 }
179 169 rv = 0;
180 return 0; 170 out:
171 return rv;
181} 172}
182 173
183void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, 174void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
184 uint32_t sbflags) 175 uint32_t sbflags)
185{ 176{
186 uint64_t seq; 177 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
178 uint64_t new_seq, prev_seq;
187 int rv; 179 int rv;
188 180
189 spin_lock(&ast_queue_lock); 181 spin_lock(&dlm_cb_seq_spin);
190 182 new_seq = ++dlm_cb_seq;
191 seq = ++ast_seq_count; 183 spin_unlock(&dlm_cb_seq_spin);
192 184
193 if (lkb->lkb_flags & DLM_IFL_USER) { 185 if (lkb->lkb_flags & DLM_IFL_USER) {
194 spin_unlock(&ast_queue_lock); 186 dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
195 dlm_user_add_ast(lkb, flags, mode, status, sbflags, seq);
196 return; 187 return;
197 } 188 }
198 189
199 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq); 190 mutex_lock(&lkb->lkb_cb_mutex);
200 if (rv < 0) { 191 prev_seq = lkb->lkb_callbacks[0].seq;
201 spin_unlock(&ast_queue_lock);
202 return;
203 }
204 192
205 if (list_empty(&lkb->lkb_astqueue)) { 193 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
194 if (rv < 0)
195 goto out;
196
197 if (!prev_seq) {
206 kref_get(&lkb->lkb_ref); 198 kref_get(&lkb->lkb_ref);
207 list_add_tail(&lkb->lkb_astqueue, &ast_queue);
208 }
209 spin_unlock(&ast_queue_lock);
210 199
211 set_bit(WAKE_ASTS, &astd_wakeflags); 200 if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
212 wake_up_process(astd_task); 201 mutex_lock(&ls->ls_cb_mutex);
202 list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
203 mutex_unlock(&ls->ls_cb_mutex);
204 } else {
205 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
206 }
207 }
208 out:
209 mutex_unlock(&lkb->lkb_cb_mutex);
213} 210}
214 211
215static void process_asts(void) 212void dlm_callback_work(struct work_struct *work)
216{ 213{
217 struct dlm_ls *ls = NULL; 214 struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
218 struct dlm_rsb *r = NULL; 215 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
219 struct dlm_lkb *lkb;
220 void (*castfn) (void *astparam); 216 void (*castfn) (void *astparam);
221 void (*bastfn) (void *astparam, int mode); 217 void (*bastfn) (void *astparam, int mode);
222 struct dlm_callback callbacks[DLM_CALLBACKS_SIZE]; 218 struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
223 int i, rv, resid; 219 int i, rv, resid;
224 220
225repeat: 221 memset(&callbacks, 0, sizeof(callbacks));
226 spin_lock(&ast_queue_lock);
227 list_for_each_entry(lkb, &ast_queue, lkb_astqueue) {
228 r = lkb->lkb_resource;
229 ls = r->res_ls;
230 222
231 if (dlm_locking_stopped(ls)) 223 mutex_lock(&lkb->lkb_cb_mutex);
232 continue; 224 if (!lkb->lkb_callbacks[0].seq) {
233 225 /* no callback work exists, shouldn't happen */
234 /* we remove from astqueue list and remove everything in 226 log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
235 lkb_callbacks before releasing the spinlock so empty 227 dlm_print_lkb(lkb);
236 lkb_astqueue is always consistent with empty lkb_callbacks */ 228 dlm_dump_lkb_callbacks(lkb);
237 229 }
238 list_del_init(&lkb->lkb_astqueue);
239
240 castfn = lkb->lkb_astfn;
241 bastfn = lkb->lkb_bastfn;
242 230
243 memset(&callbacks, 0, sizeof(callbacks)); 231 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
232 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
233 if (rv < 0)
234 break;
235 }
244 236
245 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 237 if (resid) {
246 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid); 238 /* cbs remain, loop should have removed all, shouldn't happen */
247 if (rv < 0) 239 log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id,
248 break; 240 resid);
249 } 241 dlm_print_lkb(lkb);
250 spin_unlock(&ast_queue_lock); 242 dlm_dump_lkb_callbacks(lkb);
243 }
244 mutex_unlock(&lkb->lkb_cb_mutex);
251 245
252 if (resid) { 246 castfn = lkb->lkb_astfn;
253 /* shouldn't happen, for loop should have removed all */ 247 bastfn = lkb->lkb_bastfn;
254 log_error(ls, "callback resid %d lkb %x",
255 resid, lkb->lkb_id);
256 }
257 248
258 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 249 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
259 if (!callbacks[i].seq) 250 if (!callbacks[i].seq)
260 break; 251 break;
261 if (callbacks[i].flags & DLM_CB_SKIP) { 252 if (callbacks[i].flags & DLM_CB_SKIP) {
262 continue; 253 continue;
263 } else if (callbacks[i].flags & DLM_CB_BAST) { 254 } else if (callbacks[i].flags & DLM_CB_BAST) {
264 bastfn(lkb->lkb_astparam, callbacks[i].mode); 255 bastfn(lkb->lkb_astparam, callbacks[i].mode);
265 } else if (callbacks[i].flags & DLM_CB_CAST) { 256 } else if (callbacks[i].flags & DLM_CB_CAST) {
266 lkb->lkb_lksb->sb_status = callbacks[i].sb_status; 257 lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
267 lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags; 258 lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
268 castfn(lkb->lkb_astparam); 259 castfn(lkb->lkb_astparam);
269 }
270 } 260 }
271
272 /* removes ref for ast_queue, may cause lkb to be freed */
273 dlm_put_lkb(lkb);
274
275 cond_resched();
276 goto repeat;
277 } 261 }
278 spin_unlock(&ast_queue_lock);
279}
280
281static inline int no_asts(void)
282{
283 int ret;
284 262
285 spin_lock(&ast_queue_lock); 263 /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
286 ret = list_empty(&ast_queue); 264 dlm_put_lkb(lkb);
287 spin_unlock(&ast_queue_lock);
288 return ret;
289} 265}
290 266
291static int dlm_astd(void *data) 267int dlm_callback_start(struct dlm_ls *ls)
292{ 268{
293 while (!kthread_should_stop()) { 269 ls->ls_callback_wq = alloc_workqueue("dlm_callback",
294 set_current_state(TASK_INTERRUPTIBLE); 270 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
295 if (!test_bit(WAKE_ASTS, &astd_wakeflags)) 271 if (!ls->ls_callback_wq) {
296 schedule(); 272 log_print("can't start dlm_callback workqueue");
297 set_current_state(TASK_RUNNING); 273 return -ENOMEM;
298
299 mutex_lock(&astd_running);
300 if (test_and_clear_bit(WAKE_ASTS, &astd_wakeflags))
301 process_asts();
302 mutex_unlock(&astd_running);
303 } 274 }
304 return 0; 275 return 0;
305} 276}
306 277
307void dlm_astd_wake(void) 278void dlm_callback_stop(struct dlm_ls *ls)
308{ 279{
309 if (!no_asts()) { 280 if (ls->ls_callback_wq)
310 set_bit(WAKE_ASTS, &astd_wakeflags); 281 destroy_workqueue(ls->ls_callback_wq);
311 wake_up_process(astd_task);
312 }
313} 282}
314 283
315int dlm_astd_start(void) 284void dlm_callback_suspend(struct dlm_ls *ls)
316{ 285{
317 struct task_struct *p; 286 set_bit(LSFL_CB_DELAY, &ls->ls_flags);
318 int error = 0;
319
320 INIT_LIST_HEAD(&ast_queue);
321 spin_lock_init(&ast_queue_lock);
322 mutex_init(&astd_running);
323
324 p = kthread_run(dlm_astd, NULL, "dlm_astd");
325 if (IS_ERR(p))
326 error = PTR_ERR(p);
327 else
328 astd_task = p;
329 return error;
330}
331 287
332void dlm_astd_stop(void) 288 if (ls->ls_callback_wq)
333{ 289 flush_workqueue(ls->ls_callback_wq);
334 kthread_stop(astd_task);
335} 290}
336 291
337void dlm_astd_suspend(void) 292void dlm_callback_resume(struct dlm_ls *ls)
338{ 293{
339 mutex_lock(&astd_running); 294 struct dlm_lkb *lkb, *safe;
340} 295 int count = 0;
341 296
342void dlm_astd_resume(void) 297 clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
343{ 298
344 mutex_unlock(&astd_running); 299 if (!ls->ls_callback_wq)
300 return;
301
302 mutex_lock(&ls->ls_cb_mutex);
303 list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
304 list_del_init(&lkb->lkb_cb_list);
305 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
306 count++;
307 }
308 mutex_unlock(&ls->ls_cb_mutex);
309
310 log_debug(ls, "dlm_callback_resume %d", count);
345} 311}
346 312
diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h
index 8aa89c9b5611..757b551c6820 100644
--- a/fs/dlm/ast.h
+++ b/fs/dlm/ast.h
@@ -18,14 +18,15 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
18 int status, uint32_t sbflags, uint64_t seq); 18 int status, uint32_t sbflags, uint64_t seq);
19int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, 19int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
20 struct dlm_callback *cb, int *resid); 20 struct dlm_callback *cb, int *resid);
21void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, 21void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
22 uint32_t sbflags); 22 uint32_t sbflags);
23 23
24void dlm_astd_wake(void); 24void dlm_callback_work(struct work_struct *work);
25int dlm_astd_start(void); 25int dlm_callback_start(struct dlm_ls *ls);
26void dlm_astd_stop(void); 26void dlm_callback_stop(struct dlm_ls *ls);
27void dlm_astd_suspend(void); 27void dlm_callback_suspend(struct dlm_ls *ls);
28void dlm_astd_resume(void); 28void dlm_callback_resume(struct dlm_ls *ls);
29 29
30#endif 30#endif
31 31
32
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 6614f335e25d..fe2860c02449 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -246,13 +246,15 @@ struct dlm_lkb {
246 struct list_head lkb_statequeue; /* rsb g/c/w list */ 246 struct list_head lkb_statequeue; /* rsb g/c/w list */
247 struct list_head lkb_rsb_lookup; /* waiting for rsb lookup */ 247 struct list_head lkb_rsb_lookup; /* waiting for rsb lookup */
248 struct list_head lkb_wait_reply; /* waiting for remote reply */ 248 struct list_head lkb_wait_reply; /* waiting for remote reply */
249 struct list_head lkb_astqueue; /* need ast to be sent */
250 struct list_head lkb_ownqueue; /* list of locks for a process */ 249 struct list_head lkb_ownqueue; /* list of locks for a process */
251 struct list_head lkb_time_list; 250 struct list_head lkb_time_list;
252 ktime_t lkb_timestamp; 251 ktime_t lkb_timestamp;
253 ktime_t lkb_wait_time; 252 ktime_t lkb_wait_time;
254 unsigned long lkb_timeout_cs; 253 unsigned long lkb_timeout_cs;
255 254
255 struct mutex lkb_cb_mutex;
256 struct work_struct lkb_cb_work;
257 struct list_head lkb_cb_list; /* for ls_cb_delay or proc->asts */
256 struct dlm_callback lkb_callbacks[DLM_CALLBACKS_SIZE]; 258 struct dlm_callback lkb_callbacks[DLM_CALLBACKS_SIZE];
257 struct dlm_callback lkb_last_cast; 259 struct dlm_callback lkb_last_cast;
258 struct dlm_callback lkb_last_bast; 260 struct dlm_callback lkb_last_bast;
@@ -504,8 +506,12 @@ struct dlm_ls {
504 506
505 struct miscdevice ls_device; 507 struct miscdevice ls_device;
506 508
509 struct workqueue_struct *ls_callback_wq;
510
507 /* recovery related */ 511 /* recovery related */
508 512
513 struct mutex ls_cb_mutex;
514 struct list_head ls_cb_delay; /* save for queue_work later */
509 struct timer_list ls_timer; 515 struct timer_list ls_timer;
510 struct task_struct *ls_recoverd_task; 516 struct task_struct *ls_recoverd_task;
511 struct mutex ls_recoverd_active; 517 struct mutex ls_recoverd_active;
@@ -542,6 +548,7 @@ struct dlm_ls {
542#define LSFL_RCOM_WAIT 4 548#define LSFL_RCOM_WAIT 4
543#define LSFL_UEVENT_WAIT 5 549#define LSFL_UEVENT_WAIT 5
544#define LSFL_TIMEWARN 6 550#define LSFL_TIMEWARN 6
551#define LSFL_CB_DELAY 7
545 552
546/* much of this is just saving user space pointers associated with the 553/* much of this is just saving user space pointers associated with the
547 lock that we pass back to the user lib with an ast */ 554 lock that we pass back to the user lib with an ast */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 6772e5c5bcd6..83b5e32514e1 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -305,7 +305,7 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
305 rv = -EDEADLK; 305 rv = -EDEADLK;
306 } 306 }
307 307
308 dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); 308 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309} 309}
310 310
311static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 311static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -319,7 +319,7 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319 if (is_master_copy(lkb)) { 319 if (is_master_copy(lkb)) {
320 send_bast(r, lkb, rqmode); 320 send_bast(r, lkb, rqmode);
321 } else { 321 } else {
322 dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0); 322 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
323 } 323 }
324} 324}
325 325
@@ -638,7 +638,9 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
638 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 638 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
639 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 639 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
640 INIT_LIST_HEAD(&lkb->lkb_time_list); 640 INIT_LIST_HEAD(&lkb->lkb_time_list);
641 INIT_LIST_HEAD(&lkb->lkb_astqueue); 641 INIT_LIST_HEAD(&lkb->lkb_cb_list);
642 mutex_init(&lkb->lkb_cb_mutex);
643 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
642 644
643 retry: 645 retry:
644 rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS); 646 rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
@@ -4010,8 +4012,6 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
4010 default: 4012 default:
4011 log_error(ls, "unknown message type %d", ms->m_type); 4013 log_error(ls, "unknown message type %d", ms->m_type);
4012 } 4014 }
4013
4014 dlm_astd_wake();
4015} 4015}
4016 4016
4017/* If the lockspace is in recovery mode (locking stopped), then normal 4017/* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4826,7 +4826,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4826 goto out_put; 4826 goto out_put;
4827 4827
4828 spin_lock(&ua->proc->locks_spin); 4828 spin_lock(&ua->proc->locks_spin);
4829 /* dlm_user_add_ast() may have already taken lkb off the proc list */ 4829 /* dlm_user_add_cb() may have already taken lkb off the proc list */
4830 if (!list_empty(&lkb->lkb_ownqueue)) 4830 if (!list_empty(&lkb->lkb_ownqueue))
4831 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 4831 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4832 spin_unlock(&ua->proc->locks_spin); 4832 spin_unlock(&ua->proc->locks_spin);
@@ -4963,7 +4963,7 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4963 4963
4964/* We have to release clear_proc_locks mutex before calling unlock_proc_lock() 4964/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4965 (which does lock_rsb) due to deadlock with receiving a message that does 4965 (which does lock_rsb) due to deadlock with receiving a message that does
4966 lock_rsb followed by dlm_user_add_ast() */ 4966 lock_rsb followed by dlm_user_add_cb() */
4967 4967
4968static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, 4968static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4969 struct dlm_user_proc *proc) 4969 struct dlm_user_proc *proc)
@@ -4986,7 +4986,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4986 return lkb; 4986 return lkb;
4987} 4987}
4988 4988
4989/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which 4989/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
4990 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 4990 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4991 which we clear here. */ 4991 which we clear here. */
4992 4992
@@ -5028,10 +5028,10 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5028 dlm_put_lkb(lkb); 5028 dlm_put_lkb(lkb);
5029 } 5029 }
5030 5030
5031 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 5031 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5032 memset(&lkb->lkb_callbacks, 0, 5032 memset(&lkb->lkb_callbacks, 0,
5033 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); 5033 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5034 list_del_init(&lkb->lkb_astqueue); 5034 list_del_init(&lkb->lkb_cb_list);
5035 dlm_put_lkb(lkb); 5035 dlm_put_lkb(lkb);
5036 } 5036 }
5037 5037
@@ -5070,10 +5070,10 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5070 spin_unlock(&proc->locks_spin); 5070 spin_unlock(&proc->locks_spin);
5071 5071
5072 spin_lock(&proc->asts_spin); 5072 spin_lock(&proc->asts_spin);
5073 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 5073 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5074 memset(&lkb->lkb_callbacks, 0, 5074 memset(&lkb->lkb_callbacks, 0,
5075 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); 5075 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5076 list_del_init(&lkb->lkb_astqueue); 5076 list_del_init(&lkb->lkb_cb_list);
5077 dlm_put_lkb(lkb); 5077 dlm_put_lkb(lkb);
5078 } 5078 }
5079 spin_unlock(&proc->asts_spin); 5079 spin_unlock(&proc->asts_spin);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 98a97762c893..a1d8f1af144b 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -15,7 +15,6 @@
15#include "lockspace.h" 15#include "lockspace.h"
16#include "member.h" 16#include "member.h"
17#include "recoverd.h" 17#include "recoverd.h"
18#include "ast.h"
19#include "dir.h" 18#include "dir.h"
20#include "lowcomms.h" 19#include "lowcomms.h"
21#include "config.h" 20#include "config.h"
@@ -24,6 +23,7 @@
24#include "recover.h" 23#include "recover.h"
25#include "requestqueue.h" 24#include "requestqueue.h"
26#include "user.h" 25#include "user.h"
26#include "ast.h"
27 27
28static int ls_count; 28static int ls_count;
29static struct mutex ls_lock; 29static struct mutex ls_lock;
@@ -359,17 +359,10 @@ static int threads_start(void)
359{ 359{
360 int error; 360 int error;
361 361
362 /* Thread which process lock requests for all lockspace's */
363 error = dlm_astd_start();
364 if (error) {
365 log_print("cannot start dlm_astd thread %d", error);
366 goto fail;
367 }
368
369 error = dlm_scand_start(); 362 error = dlm_scand_start();
370 if (error) { 363 if (error) {
371 log_print("cannot start dlm_scand thread %d", error); 364 log_print("cannot start dlm_scand thread %d", error);
372 goto astd_fail; 365 goto fail;
373 } 366 }
374 367
375 /* Thread for sending/receiving messages for all lockspace's */ 368 /* Thread for sending/receiving messages for all lockspace's */
@@ -383,8 +376,6 @@ static int threads_start(void)
383 376
384 scand_fail: 377 scand_fail:
385 dlm_scand_stop(); 378 dlm_scand_stop();
386 astd_fail:
387 dlm_astd_stop();
388 fail: 379 fail:
389 return error; 380 return error;
390} 381}
@@ -393,7 +384,6 @@ static void threads_stop(void)
393{ 384{
394 dlm_scand_stop(); 385 dlm_scand_stop();
395 dlm_lowcomms_stop(); 386 dlm_lowcomms_stop();
396 dlm_astd_stop();
397} 387}
398 388
399static int new_lockspace(const char *name, int namelen, void **lockspace, 389static int new_lockspace(const char *name, int namelen, void **lockspace,
@@ -514,6 +504,9 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
514 init_completion(&ls->ls_members_done); 504 init_completion(&ls->ls_members_done);
515 ls->ls_members_result = -1; 505 ls->ls_members_result = -1;
516 506
507 mutex_init(&ls->ls_cb_mutex);
508 INIT_LIST_HEAD(&ls->ls_cb_delay);
509
517 ls->ls_recoverd_task = NULL; 510 ls->ls_recoverd_task = NULL;
518 mutex_init(&ls->ls_recoverd_active); 511 mutex_init(&ls->ls_recoverd_active);
519 spin_lock_init(&ls->ls_recover_lock); 512 spin_lock_init(&ls->ls_recover_lock);
@@ -547,18 +540,26 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
547 list_add(&ls->ls_list, &lslist); 540 list_add(&ls->ls_list, &lslist);
548 spin_unlock(&lslist_lock); 541 spin_unlock(&lslist_lock);
549 542
543 if (flags & DLM_LSFL_FS) {
544 error = dlm_callback_start(ls);
545 if (error) {
546 log_error(ls, "can't start dlm_callback %d", error);
547 goto out_delist;
548 }
549 }
550
550 /* needs to find ls in lslist */ 551 /* needs to find ls in lslist */
551 error = dlm_recoverd_start(ls); 552 error = dlm_recoverd_start(ls);
552 if (error) { 553 if (error) {
553 log_error(ls, "can't start dlm_recoverd %d", error); 554 log_error(ls, "can't start dlm_recoverd %d", error);
554 goto out_delist; 555 goto out_callback;
555 } 556 }
556 557
557 ls->ls_kobj.kset = dlm_kset; 558 ls->ls_kobj.kset = dlm_kset;
558 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, 559 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
559 "%s", ls->ls_name); 560 "%s", ls->ls_name);
560 if (error) 561 if (error)
561 goto out_stop; 562 goto out_recoverd;
562 kobject_uevent(&ls->ls_kobj, KOBJ_ADD); 563 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
563 564
564 /* let kobject handle freeing of ls if there's an error */ 565 /* let kobject handle freeing of ls if there's an error */
@@ -572,7 +573,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
572 573
573 error = do_uevent(ls, 1); 574 error = do_uevent(ls, 1);
574 if (error) 575 if (error)
575 goto out_stop; 576 goto out_recoverd;
576 577
577 wait_for_completion(&ls->ls_members_done); 578 wait_for_completion(&ls->ls_members_done);
578 error = ls->ls_members_result; 579 error = ls->ls_members_result;
@@ -589,8 +590,10 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
589 do_uevent(ls, 0); 590 do_uevent(ls, 0);
590 dlm_clear_members(ls); 591 dlm_clear_members(ls);
591 kfree(ls->ls_node_array); 592 kfree(ls->ls_node_array);
592 out_stop: 593 out_recoverd:
593 dlm_recoverd_stop(ls); 594 dlm_recoverd_stop(ls);
595 out_callback:
596 dlm_callback_stop(ls);
594 out_delist: 597 out_delist:
595 spin_lock(&lslist_lock); 598 spin_lock(&lslist_lock);
596 list_del(&ls->ls_list); 599 list_del(&ls->ls_list);
@@ -652,8 +655,6 @@ static int lkb_idr_free(int id, void *p, void *data)
652{ 655{
653 struct dlm_lkb *lkb = p; 656 struct dlm_lkb *lkb = p;
654 657
655 dlm_del_ast(lkb);
656
657 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY) 658 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
658 dlm_free_lvb(lkb->lkb_lvbptr); 659 dlm_free_lvb(lkb->lkb_lvbptr);
659 660
@@ -717,12 +718,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
717 718
718 dlm_recoverd_stop(ls); 719 dlm_recoverd_stop(ls);
719 720
721 dlm_callback_stop(ls);
722
720 remove_lockspace(ls); 723 remove_lockspace(ls);
721 724
722 dlm_delete_debug_file(ls); 725 dlm_delete_debug_file(ls);
723 726
724 dlm_astd_suspend();
725
726 kfree(ls->ls_recover_buf); 727 kfree(ls->ls_recover_buf);
727 728
728 /* 729 /*
@@ -740,8 +741,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
740 idr_remove_all(&ls->ls_lkbidr); 741 idr_remove_all(&ls->ls_lkbidr);
741 idr_destroy(&ls->ls_lkbidr); 742 idr_destroy(&ls->ls_lkbidr);
742 743
743 dlm_astd_resume();
744
745 /* 744 /*
746 * Free all rsb's on rsbtbl[] lists 745 * Free all rsb's on rsbtbl[] lists
747 */ 746 */
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index fd677c8c3d3b..774da3cf92c6 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -58,13 +58,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
58 58
59 mutex_lock(&ls->ls_recoverd_active); 59 mutex_lock(&ls->ls_recoverd_active);
60 60
61 /* 61 dlm_callback_suspend(ls);
62 * Suspending and resuming dlm_astd ensures that no lkb's from this ls
63 * will be processed by dlm_astd during recovery.
64 */
65
66 dlm_astd_suspend();
67 dlm_astd_resume();
68 62
69 /* 63 /*
70 * Free non-master tossed rsb's. Master rsb's are kept on toss 64 * Free non-master tossed rsb's. Master rsb's are kept on toss
@@ -202,6 +196,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
202 196
203 dlm_adjust_timeouts(ls); 197 dlm_adjust_timeouts(ls);
204 198
199 dlm_callback_resume(ls);
200
205 error = enable_locking(ls, rv->seq); 201 error = enable_locking(ls, rv->seq);
206 if (error) { 202 if (error) {
207 log_debug(ls, "enable_locking failed %d", error); 203 log_debug(ls, "enable_locking failed %d", error);
@@ -222,8 +218,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
222 218
223 dlm_grant_after_purge(ls); 219 dlm_grant_after_purge(ls);
224 220
225 dlm_astd_wake();
226
227 log_debug(ls, "recover %llx done: %u ms", 221 log_debug(ls, "recover %llx done: %u ms",
228 (unsigned long long)rv->seq, 222 (unsigned long long)rv->seq,
229 jiffies_to_msecs(jiffies - start)); 223 jiffies_to_msecs(jiffies - start));
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index e96bf3e9be88..d8ea60756403 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -213,9 +213,9 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
213 goto out; 213 goto out;
214 } 214 }
215 215
216 if (list_empty(&lkb->lkb_astqueue)) { 216 if (list_empty(&lkb->lkb_cb_list)) {
217 kref_get(&lkb->lkb_ref); 217 kref_get(&lkb->lkb_ref);
218 list_add_tail(&lkb->lkb_astqueue, &proc->asts); 218 list_add_tail(&lkb->lkb_cb_list, &proc->asts);
219 wake_up_interruptible(&proc->wait); 219 wake_up_interruptible(&proc->wait);
220 } 220 }
221 spin_unlock(&proc->asts_spin); 221 spin_unlock(&proc->asts_spin);
@@ -832,24 +832,24 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
832 } 832 }
833 833
834 /* if we empty lkb_callbacks, we don't want to unlock the spinlock 834 /* if we empty lkb_callbacks, we don't want to unlock the spinlock
835 without removing lkb_astqueue; so empty lkb_astqueue is always 835 without removing lkb_cb_list; so empty lkb_cb_list is always
836 consistent with empty lkb_callbacks */ 836 consistent with empty lkb_callbacks */
837 837
838 lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_astqueue); 838 lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_cb_list);
839 839
840 rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid); 840 rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid);
841 if (rv < 0) { 841 if (rv < 0) {
842 /* this shouldn't happen; lkb should have been removed from 842 /* this shouldn't happen; lkb should have been removed from
843 list when resid was zero */ 843 list when resid was zero */
844 log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id); 844 log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
845 list_del_init(&lkb->lkb_astqueue); 845 list_del_init(&lkb->lkb_cb_list);
846 spin_unlock(&proc->asts_spin); 846 spin_unlock(&proc->asts_spin);
847 /* removes ref for proc->asts, may cause lkb to be freed */ 847 /* removes ref for proc->asts, may cause lkb to be freed */
848 dlm_put_lkb(lkb); 848 dlm_put_lkb(lkb);
849 goto try_another; 849 goto try_another;
850 } 850 }
851 if (!resid) 851 if (!resid)
852 list_del_init(&lkb->lkb_astqueue); 852 list_del_init(&lkb->lkb_cb_list);
853 spin_unlock(&proc->asts_spin); 853 spin_unlock(&proc->asts_spin);
854 854
855 if (cb.flags & DLM_CB_SKIP) { 855 if (cb.flags & DLM_CB_SKIP) {