aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmthread.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmthread.c')
-rw-r--r--fs/ocfs2/dlm/dlmthread.c695
1 files changed, 695 insertions, 0 deletions
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
new file mode 100644
index 000000000000..92cd5cd66db8
--- /dev/null
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -0,0 +1,695 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmthread.c
5 *
6 * standalone DLM module
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27
28#include <linux/module.h>
29#include <linux/fs.h>
30#include <linux/types.h>
31#include <linux/slab.h>
32#include <linux/highmem.h>
33#include <linux/utsname.h>
34#include <linux/init.h>
35#include <linux/sysctl.h>
36#include <linux/random.h>
37#include <linux/blkdev.h>
38#include <linux/socket.h>
39#include <linux/inet.h>
40#include <linux/timer.h>
41#include <linux/kthread.h>
42
43
44#include "cluster/heartbeat.h"
45#include "cluster/nodemanager.h"
46#include "cluster/tcp.h"
47
48#include "dlmapi.h"
49#include "dlmcommon.h"
50#include "dlmdomain.h"
51
52#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
53#include "cluster/masklog.h"
54
55extern spinlock_t dlm_domain_lock;
56extern struct list_head dlm_domains;
57
58static int dlm_thread(void *data);
59
60static void dlm_flush_asts(struct dlm_ctxt *dlm);
61
62#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
63
64/* will exit holding res->spinlock, but may drop in function */
65/* waits until flags are cleared on res->state */
66void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
67{
68 DECLARE_WAITQUEUE(wait, current);
69
70 assert_spin_locked(&res->spinlock);
71
72 add_wait_queue(&res->wq, &wait);
73repeat:
74 set_current_state(TASK_UNINTERRUPTIBLE);
75 if (res->state & flags) {
76 spin_unlock(&res->spinlock);
77 schedule();
78 spin_lock(&res->spinlock);
79 goto repeat;
80 }
81 remove_wait_queue(&res->wq, &wait);
82 current->state = TASK_RUNNING;
83}
84
85
86static int __dlm_lockres_unused(struct dlm_lock_resource *res)
87{
88 if (list_empty(&res->granted) &&
89 list_empty(&res->converting) &&
90 list_empty(&res->blocked) &&
91 list_empty(&res->dirty))
92 return 1;
93 return 0;
94}
95
96
97/* Call whenever you may have added or deleted something from one of
98 * the lockres queue's. This will figure out whether it belongs on the
99 * unused list or not and does the appropriate thing. */
100void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
101 struct dlm_lock_resource *res)
102{
103 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
104
105 assert_spin_locked(&dlm->spinlock);
106 assert_spin_locked(&res->spinlock);
107
108 if (__dlm_lockres_unused(res)){
109 if (list_empty(&res->purge)) {
110 mlog(0, "putting lockres %.*s from purge list\n",
111 res->lockname.len, res->lockname.name);
112
113 res->last_used = jiffies;
114 list_add_tail(&res->purge, &dlm->purge_list);
115 dlm->purge_count++;
116 }
117 } else if (!list_empty(&res->purge)) {
118 mlog(0, "removing lockres %.*s from purge list\n",
119 res->lockname.len, res->lockname.name);
120
121 list_del_init(&res->purge);
122 dlm->purge_count--;
123 }
124}
125
126void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
127 struct dlm_lock_resource *res)
128{
129 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
130 spin_lock(&dlm->spinlock);
131 spin_lock(&res->spinlock);
132
133 __dlm_lockres_calc_usage(dlm, res);
134
135 spin_unlock(&res->spinlock);
136 spin_unlock(&dlm->spinlock);
137}
138
139/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
140 * to do migration, but will re-acquire before exit. */
141void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
142{
143 int master;
144 int ret;
145
146 spin_lock(&lockres->spinlock);
147 master = lockres->owner == dlm->node_num;
148 spin_unlock(&lockres->spinlock);
149
150 mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
151 lockres->lockname.name, master);
152
153 /* Non master is the easy case -- no migration required, just
154 * quit. */
155 if (!master)
156 goto finish;
157
158 /* Wheee! Migrate lockres here! */
159 spin_unlock(&dlm->spinlock);
160again:
161
162 ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
163 if (ret == -ENOTEMPTY) {
164 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
165 lockres->lockname.len, lockres->lockname.name);
166
167 BUG();
168 } else if (ret < 0) {
169 mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
170 lockres->lockname.len, lockres->lockname.name);
171 goto again;
172 }
173
174 spin_lock(&dlm->spinlock);
175
176finish:
177 if (!list_empty(&lockres->purge)) {
178 list_del_init(&lockres->purge);
179 dlm->purge_count--;
180 }
181 __dlm_unhash_lockres(lockres);
182}
183
184static void dlm_run_purge_list(struct dlm_ctxt *dlm,
185 int purge_now)
186{
187 unsigned int run_max, unused;
188 unsigned long purge_jiffies;
189 struct dlm_lock_resource *lockres;
190
191 spin_lock(&dlm->spinlock);
192 run_max = dlm->purge_count;
193
194 while(run_max && !list_empty(&dlm->purge_list)) {
195 run_max--;
196
197 lockres = list_entry(dlm->purge_list.next,
198 struct dlm_lock_resource, purge);
199
200 /* Status of the lockres *might* change so double
201 * check. If the lockres is unused, holding the dlm
202 * spinlock will prevent people from getting and more
203 * refs on it -- there's no need to keep the lockres
204 * spinlock. */
205 spin_lock(&lockres->spinlock);
206 unused = __dlm_lockres_unused(lockres);
207 spin_unlock(&lockres->spinlock);
208
209 if (!unused)
210 continue;
211
212 purge_jiffies = lockres->last_used +
213 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
214
215 /* Make sure that we want to be processing this guy at
216 * this time. */
217 if (!purge_now && time_after(purge_jiffies, jiffies)) {
218 /* Since resources are added to the purge list
219 * in tail order, we can stop at the first
220 * unpurgable resource -- anyone added after
221 * him will have a greater last_used value */
222 break;
223 }
224
225 list_del_init(&lockres->purge);
226 dlm->purge_count--;
227
228 /* This may drop and reacquire the dlm spinlock if it
229 * has to do migration. */
230 mlog(0, "calling dlm_purge_lockres!\n");
231 dlm_purge_lockres(dlm, lockres);
232 mlog(0, "DONE calling dlm_purge_lockres!\n");
233
234 /* Avoid adding any scheduling latencies */
235 cond_resched_lock(&dlm->spinlock);
236 }
237
238 spin_unlock(&dlm->spinlock);
239}
240
241static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
242 struct dlm_lock_resource *res)
243{
244 struct dlm_lock *lock, *target;
245 struct list_head *iter;
246 struct list_head *head;
247 int can_grant = 1;
248
249 //mlog(0, "res->lockname.len=%d\n", res->lockname.len);
250 //mlog(0, "res->lockname.name=%p\n", res->lockname.name);
251 //mlog(0, "shuffle res %.*s\n", res->lockname.len,
252 // res->lockname.name);
253
254 /* because this function is called with the lockres
255 * spinlock, and because we know that it is not migrating/
256 * recovering/in-progress, it is fine to reserve asts and
257 * basts right before queueing them all throughout */
258 assert_spin_locked(&res->spinlock);
259 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
260 DLM_LOCK_RES_RECOVERING|
261 DLM_LOCK_RES_IN_PROGRESS)));
262
263converting:
264 if (list_empty(&res->converting))
265 goto blocked;
266 mlog(0, "res %.*s has locks on a convert queue\n", res->lockname.len,
267 res->lockname.name);
268
269 target = list_entry(res->converting.next, struct dlm_lock, list);
270 if (target->ml.convert_type == LKM_IVMODE) {
271 mlog(ML_ERROR, "%.*s: converting a lock with no "
272 "convert_type!\n", res->lockname.len, res->lockname.name);
273 BUG();
274 }
275 head = &res->granted;
276 list_for_each(iter, head) {
277 lock = list_entry(iter, struct dlm_lock, list);
278 if (lock==target)
279 continue;
280 if (!dlm_lock_compatible(lock->ml.type,
281 target->ml.convert_type)) {
282 can_grant = 0;
283 /* queue the BAST if not already */
284 if (lock->ml.highest_blocked == LKM_IVMODE) {
285 __dlm_lockres_reserve_ast(res);
286 dlm_queue_bast(dlm, lock);
287 }
288 /* update the highest_blocked if needed */
289 if (lock->ml.highest_blocked < target->ml.convert_type)
290 lock->ml.highest_blocked =
291 target->ml.convert_type;
292 }
293 }
294 head = &res->converting;
295 list_for_each(iter, head) {
296 lock = list_entry(iter, struct dlm_lock, list);
297 if (lock==target)
298 continue;
299 if (!dlm_lock_compatible(lock->ml.type,
300 target->ml.convert_type)) {
301 can_grant = 0;
302 if (lock->ml.highest_blocked == LKM_IVMODE) {
303 __dlm_lockres_reserve_ast(res);
304 dlm_queue_bast(dlm, lock);
305 }
306 if (lock->ml.highest_blocked < target->ml.convert_type)
307 lock->ml.highest_blocked =
308 target->ml.convert_type;
309 }
310 }
311
312 /* we can convert the lock */
313 if (can_grant) {
314 spin_lock(&target->spinlock);
315 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
316
317 mlog(0, "calling ast for converting lock: %.*s, have: %d, "
318 "granting: %d, node: %u\n", res->lockname.len,
319 res->lockname.name, target->ml.type,
320 target->ml.convert_type, target->ml.node);
321
322 target->ml.type = target->ml.convert_type;
323 target->ml.convert_type = LKM_IVMODE;
324 list_del_init(&target->list);
325 list_add_tail(&target->list, &res->granted);
326
327 BUG_ON(!target->lksb);
328 target->lksb->status = DLM_NORMAL;
329
330 spin_unlock(&target->spinlock);
331
332 __dlm_lockres_reserve_ast(res);
333 dlm_queue_ast(dlm, target);
334 /* go back and check for more */
335 goto converting;
336 }
337
338blocked:
339 if (list_empty(&res->blocked))
340 goto leave;
341 target = list_entry(res->blocked.next, struct dlm_lock, list);
342
343 head = &res->granted;
344 list_for_each(iter, head) {
345 lock = list_entry(iter, struct dlm_lock, list);
346 if (lock==target)
347 continue;
348 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
349 can_grant = 0;
350 if (lock->ml.highest_blocked == LKM_IVMODE) {
351 __dlm_lockres_reserve_ast(res);
352 dlm_queue_bast(dlm, lock);
353 }
354 if (lock->ml.highest_blocked < target->ml.type)
355 lock->ml.highest_blocked = target->ml.type;
356 }
357 }
358
359 head = &res->converting;
360 list_for_each(iter, head) {
361 lock = list_entry(iter, struct dlm_lock, list);
362 if (lock==target)
363 continue;
364 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
365 can_grant = 0;
366 if (lock->ml.highest_blocked == LKM_IVMODE) {
367 __dlm_lockres_reserve_ast(res);
368 dlm_queue_bast(dlm, lock);
369 }
370 if (lock->ml.highest_blocked < target->ml.type)
371 lock->ml.highest_blocked = target->ml.type;
372 }
373 }
374
375 /* we can grant the blocked lock (only
376 * possible if converting list empty) */
377 if (can_grant) {
378 spin_lock(&target->spinlock);
379 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
380
381 mlog(0, "calling ast for blocked lock: %.*s, granting: %d, "
382 "node: %u\n", res->lockname.len, res->lockname.name,
383 target->ml.type, target->ml.node);
384
385 // target->ml.type is already correct
386 list_del_init(&target->list);
387 list_add_tail(&target->list, &res->granted);
388
389 BUG_ON(!target->lksb);
390 target->lksb->status = DLM_NORMAL;
391
392 spin_unlock(&target->spinlock);
393
394 __dlm_lockres_reserve_ast(res);
395 dlm_queue_ast(dlm, target);
396 /* go back and check for more */
397 goto converting;
398 }
399
400leave:
401 return;
402}
403
404/* must have NO locks when calling this with res !=NULL * */
405void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
406{
407 mlog_entry("dlm=%p, res=%p\n", dlm, res);
408 if (res) {
409 spin_lock(&dlm->spinlock);
410 spin_lock(&res->spinlock);
411 __dlm_dirty_lockres(dlm, res);
412 spin_unlock(&res->spinlock);
413 spin_unlock(&dlm->spinlock);
414 }
415 wake_up(&dlm->dlm_thread_wq);
416}
417
418void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
419{
420 mlog_entry("dlm=%p, res=%p\n", dlm, res);
421
422 assert_spin_locked(&dlm->spinlock);
423 assert_spin_locked(&res->spinlock);
424
425 /* don't shuffle secondary queues */
426 if ((res->owner == dlm->node_num) &&
427 !(res->state & DLM_LOCK_RES_DIRTY)) {
428 list_add_tail(&res->dirty, &dlm->dirty_list);
429 res->state |= DLM_LOCK_RES_DIRTY;
430 }
431}
432
433
434/* Launch the NM thread for the mounted volume */
435int dlm_launch_thread(struct dlm_ctxt *dlm)
436{
437 mlog(0, "starting dlm thread...\n");
438
439 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread");
440 if (IS_ERR(dlm->dlm_thread_task)) {
441 mlog_errno(PTR_ERR(dlm->dlm_thread_task));
442 dlm->dlm_thread_task = NULL;
443 return -EINVAL;
444 }
445
446 return 0;
447}
448
449void dlm_complete_thread(struct dlm_ctxt *dlm)
450{
451 if (dlm->dlm_thread_task) {
452 mlog(ML_KTHREAD, "waiting for dlm thread to exit\n");
453 kthread_stop(dlm->dlm_thread_task);
454 dlm->dlm_thread_task = NULL;
455 }
456}
457
458static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
459{
460 int empty;
461
462 spin_lock(&dlm->spinlock);
463 empty = list_empty(&dlm->dirty_list);
464 spin_unlock(&dlm->spinlock);
465
466 return empty;
467}
468
469static void dlm_flush_asts(struct dlm_ctxt *dlm)
470{
471 int ret;
472 struct dlm_lock *lock;
473 struct dlm_lock_resource *res;
474 u8 hi;
475
476 spin_lock(&dlm->ast_lock);
477 while (!list_empty(&dlm->pending_asts)) {
478 lock = list_entry(dlm->pending_asts.next,
479 struct dlm_lock, ast_list);
480 /* get an extra ref on lock */
481 dlm_lock_get(lock);
482 res = lock->lockres;
483 mlog(0, "delivering an ast for this lockres\n");
484
485 BUG_ON(!lock->ast_pending);
486
487 /* remove from list (including ref) */
488 list_del_init(&lock->ast_list);
489 dlm_lock_put(lock);
490 spin_unlock(&dlm->ast_lock);
491
492 if (lock->ml.node != dlm->node_num) {
493 ret = dlm_do_remote_ast(dlm, res, lock);
494 if (ret < 0)
495 mlog_errno(ret);
496 } else
497 dlm_do_local_ast(dlm, res, lock);
498
499 spin_lock(&dlm->ast_lock);
500
501 /* possible that another ast was queued while
502 * we were delivering the last one */
503 if (!list_empty(&lock->ast_list)) {
504 mlog(0, "aha another ast got queued while "
505 "we were finishing the last one. will "
506 "keep the ast_pending flag set.\n");
507 } else
508 lock->ast_pending = 0;
509
510 /* drop the extra ref.
511 * this may drop it completely. */
512 dlm_lock_put(lock);
513 dlm_lockres_release_ast(dlm, res);
514 }
515
516 while (!list_empty(&dlm->pending_basts)) {
517 lock = list_entry(dlm->pending_basts.next,
518 struct dlm_lock, bast_list);
519 /* get an extra ref on lock */
520 dlm_lock_get(lock);
521 res = lock->lockres;
522
523 BUG_ON(!lock->bast_pending);
524
525 /* get the highest blocked lock, and reset */
526 spin_lock(&lock->spinlock);
527 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
528 hi = lock->ml.highest_blocked;
529 lock->ml.highest_blocked = LKM_IVMODE;
530 spin_unlock(&lock->spinlock);
531
532 /* remove from list (including ref) */
533 list_del_init(&lock->bast_list);
534 dlm_lock_put(lock);
535 spin_unlock(&dlm->ast_lock);
536
537 mlog(0, "delivering a bast for this lockres "
538 "(blocked = %d\n", hi);
539
540 if (lock->ml.node != dlm->node_num) {
541 ret = dlm_send_proxy_bast(dlm, res, lock, hi);
542 if (ret < 0)
543 mlog_errno(ret);
544 } else
545 dlm_do_local_bast(dlm, res, lock, hi);
546
547 spin_lock(&dlm->ast_lock);
548
549 /* possible that another bast was queued while
550 * we were delivering the last one */
551 if (!list_empty(&lock->bast_list)) {
552 mlog(0, "aha another bast got queued while "
553 "we were finishing the last one. will "
554 "keep the bast_pending flag set.\n");
555 } else
556 lock->bast_pending = 0;
557
558 /* drop the extra ref.
559 * this may drop it completely. */
560 dlm_lock_put(lock);
561 dlm_lockres_release_ast(dlm, res);
562 }
563 wake_up(&dlm->ast_wq);
564 spin_unlock(&dlm->ast_lock);
565}
566
567
568#define DLM_THREAD_TIMEOUT_MS (4 * 1000)
569#define DLM_THREAD_MAX_DIRTY 100
570#define DLM_THREAD_MAX_ASTS 10
571
572static int dlm_thread(void *data)
573{
574 struct dlm_lock_resource *res;
575 struct dlm_ctxt *dlm = data;
576 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
577
578 mlog(0, "dlm thread running for %s...\n", dlm->name);
579
580 while (!kthread_should_stop()) {
581 int n = DLM_THREAD_MAX_DIRTY;
582
583 /* dlm_shutting_down is very point-in-time, but that
584 * doesn't matter as we'll just loop back around if we
585 * get false on the leading edge of a state
586 * transition. */
587 dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
588
589 /* We really don't want to hold dlm->spinlock while
590 * calling dlm_shuffle_lists on each lockres that
591 * needs to have its queues adjusted and AST/BASTs
592 * run. So let's pull each entry off the dirty_list
593 * and drop dlm->spinlock ASAP. Once off the list,
594 * res->spinlock needs to be taken again to protect
595 * the queues while calling dlm_shuffle_lists. */
596 spin_lock(&dlm->spinlock);
597 while (!list_empty(&dlm->dirty_list)) {
598 int delay = 0;
599 res = list_entry(dlm->dirty_list.next,
600 struct dlm_lock_resource, dirty);
601
602 /* peel a lockres off, remove it from the list,
603 * unset the dirty flag and drop the dlm lock */
604 BUG_ON(!res);
605 dlm_lockres_get(res);
606
607 spin_lock(&res->spinlock);
608 res->state &= ~DLM_LOCK_RES_DIRTY;
609 list_del_init(&res->dirty);
610 spin_unlock(&res->spinlock);
611 spin_unlock(&dlm->spinlock);
612
613 /* lockres can be re-dirtied/re-added to the
614 * dirty_list in this gap, but that is ok */
615
616 spin_lock(&res->spinlock);
617 if (res->owner != dlm->node_num) {
618 __dlm_print_one_lock_resource(res);
619 mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n",
620 res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no",
621 res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no",
622 res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no",
623 res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
624 }
625 BUG_ON(res->owner != dlm->node_num);
626
627 /* it is now ok to move lockreses in these states
628 * to the dirty list, assuming that they will only be
629 * dirty for a short while. */
630 if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
631 DLM_LOCK_RES_MIGRATING |
632 DLM_LOCK_RES_RECOVERING)) {
633 /* move it to the tail and keep going */
634 spin_unlock(&res->spinlock);
635 mlog(0, "delaying list shuffling for in-"
636 "progress lockres %.*s, state=%d\n",
637 res->lockname.len, res->lockname.name,
638 res->state);
639 delay = 1;
640 goto in_progress;
641 }
642
643 /* at this point the lockres is not migrating/
644 * recovering/in-progress. we have the lockres
645 * spinlock and do NOT have the dlm lock.
646 * safe to reserve/queue asts and run the lists. */
647
648 mlog(0, "calling dlm_shuffle_lists with dlm=%p, "
649 "res=%p\n", dlm, res);
650
651 /* called while holding lockres lock */
652 dlm_shuffle_lists(dlm, res);
653 spin_unlock(&res->spinlock);
654
655 dlm_lockres_calc_usage(dlm, res);
656
657in_progress:
658
659 spin_lock(&dlm->spinlock);
660 /* if the lock was in-progress, stick
661 * it on the back of the list */
662 if (delay) {
663 spin_lock(&res->spinlock);
664 list_add_tail(&res->dirty, &dlm->dirty_list);
665 res->state |= DLM_LOCK_RES_DIRTY;
666 spin_unlock(&res->spinlock);
667 }
668 dlm_lockres_put(res);
669
670 /* unlikely, but we may need to give time to
671 * other tasks */
672 if (!--n) {
673 mlog(0, "throttling dlm_thread\n");
674 break;
675 }
676 }
677
678 spin_unlock(&dlm->spinlock);
679 dlm_flush_asts(dlm);
680
681 /* yield and continue right away if there is more work to do */
682 if (!n) {
683 yield();
684 continue;
685 }
686
687 wait_event_interruptible_timeout(dlm->dlm_thread_wq,
688 !dlm_dirty_list_empty(dlm) ||
689 kthread_should_stop(),
690 timeout);
691 }
692
693 mlog(0, "quitting DLM thread\n");
694 return 0;
695}