diff options
author | Oleg Nesterov <oleg@tv-sign.ru> | 2007-05-09 05:33:51 -0400 |
---|---|---|
committer | Linus Torvalds <torvalds@woody.linux-foundation.org> | 2007-05-09 15:30:50 -0400 |
commit | fc2e4d70410546307344821eed6fd23803a45286 (patch) | |
tree | b8253b7e42245c9b21b959a77c8070ddcbc9359e | |
parent | e18f3ffb9c3ddfc1b4ad8f38f5f2acae8c16f0c9 (diff) |
reimplement flush_workqueue()
Remove ->remove_sequence, ->insert_sequence, and ->work_done from struct
cpu_workqueue_struct. To implement flush_workqueue() we can queue a
barrier work on each CPU and wait for its completition.
The barrier is queued under workqueue_mutex to ensure that per cpu
wq->cpu_wq is alive, we drop this mutex before going to sleep. If CPU goes
down while we are waiting for completition, take_over_work() will move the
barrier on another CPU, and the handler will wake up us eventually.
Signed-off-by: Oleg Nesterov <oleg@tv-sign.ru>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
-rw-r--r-- | kernel/workqueue.c | 70 |
1 files changed, 31 insertions, 39 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c index 1ea4bcb86974..b7bb37ab03bc 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c | |||
@@ -36,23 +36,13 @@ | |||
36 | /* | 36 | /* |
37 | * The per-CPU workqueue (if single thread, we always use the first | 37 | * The per-CPU workqueue (if single thread, we always use the first |
38 | * possible cpu). | 38 | * possible cpu). |
39 | * | ||
40 | * The sequence counters are for flush_scheduled_work(). It wants to wait | ||
41 | * until all currently-scheduled works are completed, but it doesn't | ||
42 | * want to be livelocked by new, incoming ones. So it waits until | ||
43 | * remove_sequence is >= the insert_sequence which pertained when | ||
44 | * flush_scheduled_work() was called. | ||
45 | */ | 39 | */ |
46 | struct cpu_workqueue_struct { | 40 | struct cpu_workqueue_struct { |
47 | 41 | ||
48 | spinlock_t lock; | 42 | spinlock_t lock; |
49 | 43 | ||
50 | long remove_sequence; /* Least-recently added (next to run) */ | ||
51 | long insert_sequence; /* Next to add */ | ||
52 | |||
53 | struct list_head worklist; | 44 | struct list_head worklist; |
54 | wait_queue_head_t more_work; | 45 | wait_queue_head_t more_work; |
55 | wait_queue_head_t work_done; | ||
56 | 46 | ||
57 | struct workqueue_struct *wq; | 47 | struct workqueue_struct *wq; |
58 | struct task_struct *thread; | 48 | struct task_struct *thread; |
@@ -138,8 +128,6 @@ static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work | |||
138 | f(work); | 128 | f(work); |
139 | 129 | ||
140 | spin_lock_irqsave(&cwq->lock, flags); | 130 | spin_lock_irqsave(&cwq->lock, flags); |
141 | cwq->remove_sequence++; | ||
142 | wake_up(&cwq->work_done); | ||
143 | ret = 1; | 131 | ret = 1; |
144 | } | 132 | } |
145 | spin_unlock_irqrestore(&cwq->lock, flags); | 133 | spin_unlock_irqrestore(&cwq->lock, flags); |
@@ -187,7 +175,6 @@ static void __queue_work(struct cpu_workqueue_struct *cwq, | |||
187 | spin_lock_irqsave(&cwq->lock, flags); | 175 | spin_lock_irqsave(&cwq->lock, flags); |
188 | set_wq_data(work, cwq); | 176 | set_wq_data(work, cwq); |
189 | list_add_tail(&work->entry, &cwq->worklist); | 177 | list_add_tail(&work->entry, &cwq->worklist); |
190 | cwq->insert_sequence++; | ||
191 | wake_up(&cwq->more_work); | 178 | wake_up(&cwq->more_work); |
192 | spin_unlock_irqrestore(&cwq->lock, flags); | 179 | spin_unlock_irqrestore(&cwq->lock, flags); |
193 | } | 180 | } |
@@ -338,8 +325,6 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq) | |||
338 | } | 325 | } |
339 | 326 | ||
340 | spin_lock_irqsave(&cwq->lock, flags); | 327 | spin_lock_irqsave(&cwq->lock, flags); |
341 | cwq->remove_sequence++; | ||
342 | wake_up(&cwq->work_done); | ||
343 | } | 328 | } |
344 | cwq->run_depth--; | 329 | cwq->run_depth--; |
345 | spin_unlock_irqrestore(&cwq->lock, flags); | 330 | spin_unlock_irqrestore(&cwq->lock, flags); |
@@ -394,6 +379,25 @@ static int worker_thread(void *__cwq) | |||
394 | return 0; | 379 | return 0; |
395 | } | 380 | } |
396 | 381 | ||
382 | struct wq_barrier { | ||
383 | struct work_struct work; | ||
384 | struct completion done; | ||
385 | }; | ||
386 | |||
387 | static void wq_barrier_func(struct work_struct *work) | ||
388 | { | ||
389 | struct wq_barrier *barr = container_of(work, struct wq_barrier, work); | ||
390 | complete(&barr->done); | ||
391 | } | ||
392 | |||
393 | static inline void init_wq_barrier(struct wq_barrier *barr) | ||
394 | { | ||
395 | INIT_WORK(&barr->work, wq_barrier_func); | ||
396 | __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); | ||
397 | |||
398 | init_completion(&barr->done); | ||
399 | } | ||
400 | |||
397 | static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) | 401 | static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) |
398 | { | 402 | { |
399 | if (cwq->thread == current) { | 403 | if (cwq->thread == current) { |
@@ -401,23 +405,18 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) | |||
401 | * Probably keventd trying to flush its own queue. So simply run | 405 | * Probably keventd trying to flush its own queue. So simply run |
402 | * it by hand rather than deadlocking. | 406 | * it by hand rather than deadlocking. |
403 | */ | 407 | */ |
408 | mutex_unlock(&workqueue_mutex); | ||
404 | run_workqueue(cwq); | 409 | run_workqueue(cwq); |
410 | mutex_lock(&workqueue_mutex); | ||
405 | } else { | 411 | } else { |
406 | DEFINE_WAIT(wait); | 412 | struct wq_barrier barr; |
407 | long sequence_needed; | ||
408 | 413 | ||
409 | spin_lock_irq(&cwq->lock); | 414 | init_wq_barrier(&barr); |
410 | sequence_needed = cwq->insert_sequence; | 415 | __queue_work(cwq, &barr.work); |
411 | 416 | ||
412 | while (sequence_needed - cwq->remove_sequence > 0) { | 417 | mutex_unlock(&workqueue_mutex); |
413 | prepare_to_wait(&cwq->work_done, &wait, | 418 | wait_for_completion(&barr.done); |
414 | TASK_UNINTERRUPTIBLE); | 419 | mutex_lock(&workqueue_mutex); |
415 | spin_unlock_irq(&cwq->lock); | ||
416 | schedule(); | ||
417 | spin_lock_irq(&cwq->lock); | ||
418 | } | ||
419 | finish_wait(&cwq->work_done, &wait); | ||
420 | spin_unlock_irq(&cwq->lock); | ||
421 | } | 420 | } |
422 | } | 421 | } |
423 | 422 | ||
@@ -428,29 +427,25 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) | |||
428 | * Forces execution of the workqueue and blocks until its completion. | 427 | * Forces execution of the workqueue and blocks until its completion. |
429 | * This is typically used in driver shutdown handlers. | 428 | * This is typically used in driver shutdown handlers. |
430 | * | 429 | * |
431 | * This function will sample each workqueue's current insert_sequence number and | 430 | * We sleep until all works which were queued on entry have been handled, |
432 | * will sleep until the head sequence is greater than or equal to that. This | 431 | * but we are not livelocked by new incoming ones. |
433 | * means that we sleep until all works which were queued on entry have been | ||
434 | * handled, but we are not livelocked by new incoming ones. | ||
435 | * | 432 | * |
436 | * This function used to run the workqueues itself. Now we just wait for the | 433 | * This function used to run the workqueues itself. Now we just wait for the |
437 | * helper threads to do it. | 434 | * helper threads to do it. |
438 | */ | 435 | */ |
439 | void fastcall flush_workqueue(struct workqueue_struct *wq) | 436 | void fastcall flush_workqueue(struct workqueue_struct *wq) |
440 | { | 437 | { |
441 | might_sleep(); | 438 | mutex_lock(&workqueue_mutex); |
442 | |||
443 | if (is_single_threaded(wq)) { | 439 | if (is_single_threaded(wq)) { |
444 | /* Always use first cpu's area. */ | 440 | /* Always use first cpu's area. */ |
445 | flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); | 441 | flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); |
446 | } else { | 442 | } else { |
447 | int cpu; | 443 | int cpu; |
448 | 444 | ||
449 | mutex_lock(&workqueue_mutex); | ||
450 | for_each_online_cpu(cpu) | 445 | for_each_online_cpu(cpu) |
451 | flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); | 446 | flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); |
452 | mutex_unlock(&workqueue_mutex); | ||
453 | } | 447 | } |
448 | mutex_unlock(&workqueue_mutex); | ||
454 | } | 449 | } |
455 | EXPORT_SYMBOL_GPL(flush_workqueue); | 450 | EXPORT_SYMBOL_GPL(flush_workqueue); |
456 | 451 | ||
@@ -463,12 +458,9 @@ static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, | |||
463 | spin_lock_init(&cwq->lock); | 458 | spin_lock_init(&cwq->lock); |
464 | cwq->wq = wq; | 459 | cwq->wq = wq; |
465 | cwq->thread = NULL; | 460 | cwq->thread = NULL; |
466 | cwq->insert_sequence = 0; | ||
467 | cwq->remove_sequence = 0; | ||
468 | cwq->freezeable = freezeable; | 461 | cwq->freezeable = freezeable; |
469 | INIT_LIST_HEAD(&cwq->worklist); | 462 | INIT_LIST_HEAD(&cwq->worklist); |
470 | init_waitqueue_head(&cwq->more_work); | 463 | init_waitqueue_head(&cwq->more_work); |
471 | init_waitqueue_head(&cwq->work_done); | ||
472 | 464 | ||
473 | if (is_single_threaded(wq)) | 465 | if (is_single_threaded(wq)) |
474 | p = kthread_create(worker_thread, cwq, "%s", wq->name); | 466 | p = kthread_create(worker_thread, cwq, "%s", wq->name); |