aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg Nesterov <oleg@tv-sign.ru>2007-05-09 05:33:51 -0400
committerLinus Torvalds <torvalds@woody.linux-foundation.org>2007-05-09 15:30:50 -0400
commitfc2e4d70410546307344821eed6fd23803a45286 (patch)
treeb8253b7e42245c9b21b959a77c8070ddcbc9359e
parente18f3ffb9c3ddfc1b4ad8f38f5f2acae8c16f0c9 (diff)
reimplement flush_workqueue()
Remove ->remove_sequence, ->insert_sequence, and ->work_done from struct cpu_workqueue_struct. To implement flush_workqueue() we can queue a barrier work on each CPU and wait for its completition. The barrier is queued under workqueue_mutex to ensure that per cpu wq->cpu_wq is alive, we drop this mutex before going to sleep. If CPU goes down while we are waiting for completition, take_over_work() will move the barrier on another CPU, and the handler will wake up us eventually. Signed-off-by: Oleg Nesterov <oleg@tv-sign.ru> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
-rw-r--r--kernel/workqueue.c70
1 files changed, 31 insertions, 39 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 1ea4bcb86974..b7bb37ab03bc 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -36,23 +36,13 @@
36/* 36/*
37 * The per-CPU workqueue (if single thread, we always use the first 37 * The per-CPU workqueue (if single thread, we always use the first
38 * possible cpu). 38 * possible cpu).
39 *
40 * The sequence counters are for flush_scheduled_work(). It wants to wait
41 * until all currently-scheduled works are completed, but it doesn't
42 * want to be livelocked by new, incoming ones. So it waits until
43 * remove_sequence is >= the insert_sequence which pertained when
44 * flush_scheduled_work() was called.
45 */ 39 */
46struct cpu_workqueue_struct { 40struct cpu_workqueue_struct {
47 41
48 spinlock_t lock; 42 spinlock_t lock;
49 43
50 long remove_sequence; /* Least-recently added (next to run) */
51 long insert_sequence; /* Next to add */
52
53 struct list_head worklist; 44 struct list_head worklist;
54 wait_queue_head_t more_work; 45 wait_queue_head_t more_work;
55 wait_queue_head_t work_done;
56 46
57 struct workqueue_struct *wq; 47 struct workqueue_struct *wq;
58 struct task_struct *thread; 48 struct task_struct *thread;
@@ -138,8 +128,6 @@ static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work
138 f(work); 128 f(work);
139 129
140 spin_lock_irqsave(&cwq->lock, flags); 130 spin_lock_irqsave(&cwq->lock, flags);
141 cwq->remove_sequence++;
142 wake_up(&cwq->work_done);
143 ret = 1; 131 ret = 1;
144 } 132 }
145 spin_unlock_irqrestore(&cwq->lock, flags); 133 spin_unlock_irqrestore(&cwq->lock, flags);
@@ -187,7 +175,6 @@ static void __queue_work(struct cpu_workqueue_struct *cwq,
187 spin_lock_irqsave(&cwq->lock, flags); 175 spin_lock_irqsave(&cwq->lock, flags);
188 set_wq_data(work, cwq); 176 set_wq_data(work, cwq);
189 list_add_tail(&work->entry, &cwq->worklist); 177 list_add_tail(&work->entry, &cwq->worklist);
190 cwq->insert_sequence++;
191 wake_up(&cwq->more_work); 178 wake_up(&cwq->more_work);
192 spin_unlock_irqrestore(&cwq->lock, flags); 179 spin_unlock_irqrestore(&cwq->lock, flags);
193} 180}
@@ -338,8 +325,6 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
338 } 325 }
339 326
340 spin_lock_irqsave(&cwq->lock, flags); 327 spin_lock_irqsave(&cwq->lock, flags);
341 cwq->remove_sequence++;
342 wake_up(&cwq->work_done);
343 } 328 }
344 cwq->run_depth--; 329 cwq->run_depth--;
345 spin_unlock_irqrestore(&cwq->lock, flags); 330 spin_unlock_irqrestore(&cwq->lock, flags);
@@ -394,6 +379,25 @@ static int worker_thread(void *__cwq)
394 return 0; 379 return 0;
395} 380}
396 381
382struct wq_barrier {
383 struct work_struct work;
384 struct completion done;
385};
386
387static void wq_barrier_func(struct work_struct *work)
388{
389 struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
390 complete(&barr->done);
391}
392
393static inline void init_wq_barrier(struct wq_barrier *barr)
394{
395 INIT_WORK(&barr->work, wq_barrier_func);
396 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
397
398 init_completion(&barr->done);
399}
400
397static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 401static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
398{ 402{
399 if (cwq->thread == current) { 403 if (cwq->thread == current) {
@@ -401,23 +405,18 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
401 * Probably keventd trying to flush its own queue. So simply run 405 * Probably keventd trying to flush its own queue. So simply run
402 * it by hand rather than deadlocking. 406 * it by hand rather than deadlocking.
403 */ 407 */
408 mutex_unlock(&workqueue_mutex);
404 run_workqueue(cwq); 409 run_workqueue(cwq);
410 mutex_lock(&workqueue_mutex);
405 } else { 411 } else {
406 DEFINE_WAIT(wait); 412 struct wq_barrier barr;
407 long sequence_needed;
408 413
409 spin_lock_irq(&cwq->lock); 414 init_wq_barrier(&barr);
410 sequence_needed = cwq->insert_sequence; 415 __queue_work(cwq, &barr.work);
411 416
412 while (sequence_needed - cwq->remove_sequence > 0) { 417 mutex_unlock(&workqueue_mutex);
413 prepare_to_wait(&cwq->work_done, &wait, 418 wait_for_completion(&barr.done);
414 TASK_UNINTERRUPTIBLE); 419 mutex_lock(&workqueue_mutex);
415 spin_unlock_irq(&cwq->lock);
416 schedule();
417 spin_lock_irq(&cwq->lock);
418 }
419 finish_wait(&cwq->work_done, &wait);
420 spin_unlock_irq(&cwq->lock);
421 } 420 }
422} 421}
423 422
@@ -428,29 +427,25 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
428 * Forces execution of the workqueue and blocks until its completion. 427 * Forces execution of the workqueue and blocks until its completion.
429 * This is typically used in driver shutdown handlers. 428 * This is typically used in driver shutdown handlers.
430 * 429 *
431 * This function will sample each workqueue's current insert_sequence number and 430 * We sleep until all works which were queued on entry have been handled,
432 * will sleep until the head sequence is greater than or equal to that. This 431 * but we are not livelocked by new incoming ones.
433 * means that we sleep until all works which were queued on entry have been
434 * handled, but we are not livelocked by new incoming ones.
435 * 432 *
436 * This function used to run the workqueues itself. Now we just wait for the 433 * This function used to run the workqueues itself. Now we just wait for the
437 * helper threads to do it. 434 * helper threads to do it.
438 */ 435 */
439void fastcall flush_workqueue(struct workqueue_struct *wq) 436void fastcall flush_workqueue(struct workqueue_struct *wq)
440{ 437{
441 might_sleep(); 438 mutex_lock(&workqueue_mutex);
442
443 if (is_single_threaded(wq)) { 439 if (is_single_threaded(wq)) {
444 /* Always use first cpu's area. */ 440 /* Always use first cpu's area. */
445 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); 441 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
446 } else { 442 } else {
447 int cpu; 443 int cpu;
448 444
449 mutex_lock(&workqueue_mutex);
450 for_each_online_cpu(cpu) 445 for_each_online_cpu(cpu)
451 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 446 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
452 mutex_unlock(&workqueue_mutex);
453 } 447 }
448 mutex_unlock(&workqueue_mutex);
454} 449}
455EXPORT_SYMBOL_GPL(flush_workqueue); 450EXPORT_SYMBOL_GPL(flush_workqueue);
456 451
@@ -463,12 +458,9 @@ static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
463 spin_lock_init(&cwq->lock); 458 spin_lock_init(&cwq->lock);
464 cwq->wq = wq; 459 cwq->wq = wq;
465 cwq->thread = NULL; 460 cwq->thread = NULL;
466 cwq->insert_sequence = 0;
467 cwq->remove_sequence = 0;
468 cwq->freezeable = freezeable; 461 cwq->freezeable = freezeable;
469 INIT_LIST_HEAD(&cwq->worklist); 462 INIT_LIST_HEAD(&cwq->worklist);
470 init_waitqueue_head(&cwq->more_work); 463 init_waitqueue_head(&cwq->more_work);
471 init_waitqueue_head(&cwq->work_done);
472 464
473 if (is_single_threaded(wq)) 465 if (is_single_threaded(wq))
474 p = kthread_create(worker_thread, cwq, "%s", wq->name); 466 p = kthread_create(worker_thread, cwq, "%s", wq->name);