aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg Nesterov <oleg@tv-sign.ru>2007-05-09 05:33:52 -0400
committerLinus Torvalds <torvalds@woody.linux-foundation.org>2007-05-09 15:30:50 -0400
commitb89deed32ccc96098bd6bc953c64bba6b847774f (patch)
tree7a5963bbc5203cfdb39bf2fb1204764df39c71db
parentfc2e4d70410546307344821eed6fd23803a45286 (diff)
implement flush_work()
A basic problem with flush_scheduled_work() is that it blocks behind _all_ presently-queued works, rather than just the work whcih the caller wants to flush. If the caller holds some lock, and if one of the queued work happens to want that lock as well then accidental deadlocks can occur. One example of this is the phy layer: it wants to flush work while holding rtnl_lock(). But if a linkwatch event happens to be queued, the phy code will deadlock because the linkwatch callback function takes rtnl_lock. So we implement a new function which will flush a *single* work - just the one which the caller wants to free up. Thus we avoid the accidental deadlocks which can arise from unrelated subsystems' callbacks taking shared locks. flush_work() non-blockingly dequeues the work_struct which we want to kill, then it waits for its handler to complete on all CPUs. Add ->current_work to the "struct cpu_workqueue_struct", it points to currently running "struct work_struct". When flush_work(work) detects ->current_work == work, it inserts a barrier at the _head_ of ->worklist (and thus right _after_ that work) and waits for completition. This means that the next work fired on that CPU will be this barrier, or another barrier queued by concurrent flush_work(), so the caller of flush_work() will be woken before any "regular" work has a chance to run. When wait_on_work() unlocks workqueue_mutex (or whatever we choose to protect against CPU hotplug), CPU may go away. But in that case take_over_work() will move a barrier we queued to another CPU, it will be fired sometime, and wait_on_work() will be woken. Actually, we are doing cleanup_workqueue_thread()->kthread_stop() before take_over_work(), so cwq->thread should complete its ->worklist (and thus the barrier), because currently we don't check kthread_should_stop() in run_workqueue(). But even if we did, everything should be ok. [akpm@osdl.org: cleanup] [akpm@osdl.org: add flush_work_keventd() wrapper] Signed-off-by: Oleg Nesterov <oleg@tv-sign.ru> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
-rw-r--r--include/linux/workqueue.h4
-rw-r--r--kernel/workqueue.c95
2 files changed, 95 insertions, 4 deletions
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index f16ba1e0687d..26a70992dec8 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -178,6 +178,8 @@ extern int FASTCALL(queue_delayed_work(struct workqueue_struct *wq, struct delay
178extern int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 178extern int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
179 struct delayed_work *work, unsigned long delay); 179 struct delayed_work *work, unsigned long delay);
180extern void FASTCALL(flush_workqueue(struct workqueue_struct *wq)); 180extern void FASTCALL(flush_workqueue(struct workqueue_struct *wq));
181extern void flush_work(struct workqueue_struct *wq, struct work_struct *work);
182extern void flush_work_keventd(struct work_struct *work);
181 183
182extern int FASTCALL(schedule_work(struct work_struct *work)); 184extern int FASTCALL(schedule_work(struct work_struct *work));
183extern int FASTCALL(run_scheduled_work(struct work_struct *work)); 185extern int FASTCALL(run_scheduled_work(struct work_struct *work));
@@ -199,7 +201,7 @@ int execute_in_process_context(work_func_t fn, struct execute_work *);
199 * Kill off a pending schedule_delayed_work(). Note that the work callback 201 * Kill off a pending schedule_delayed_work(). Note that the work callback
200 * function may still be running on return from cancel_delayed_work(), unless 202 * function may still be running on return from cancel_delayed_work(), unless
201 * it returns 1 and the work doesn't re-arm itself. Run flush_workqueue() or 203 * it returns 1 and the work doesn't re-arm itself. Run flush_workqueue() or
202 * cancel_work_sync() to wait on it. 204 * flush_work() or cancel_work_sync() to wait on it.
203 */ 205 */
204static inline int cancel_delayed_work(struct delayed_work *work) 206static inline int cancel_delayed_work(struct delayed_work *work)
205{ 207{
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index b7bb37ab03bc..918d55267a12 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -46,6 +46,7 @@ struct cpu_workqueue_struct {
46 46
47 struct workqueue_struct *wq; 47 struct workqueue_struct *wq;
48 struct task_struct *thread; 48 struct task_struct *thread;
49 struct work_struct *current_work;
49 50
50 int run_depth; /* Detect run_workqueue() recursion depth */ 51 int run_depth; /* Detect run_workqueue() recursion depth */
51 52
@@ -120,6 +121,7 @@ static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work
120 && work_pending(work) 121 && work_pending(work)
121 && !list_empty(&work->entry)) { 122 && !list_empty(&work->entry)) {
122 work_func_t f = work->func; 123 work_func_t f = work->func;
124 cwq->current_work = work;
123 list_del_init(&work->entry); 125 list_del_init(&work->entry);
124 spin_unlock_irqrestore(&cwq->lock, flags); 126 spin_unlock_irqrestore(&cwq->lock, flags);
125 127
@@ -128,6 +130,7 @@ static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work
128 f(work); 130 f(work);
129 131
130 spin_lock_irqsave(&cwq->lock, flags); 132 spin_lock_irqsave(&cwq->lock, flags);
133 cwq->current_work = NULL;
131 ret = 1; 134 ret = 1;
132 } 135 }
133 spin_unlock_irqrestore(&cwq->lock, flags); 136 spin_unlock_irqrestore(&cwq->lock, flags);
@@ -166,6 +169,17 @@ int fastcall run_scheduled_work(struct work_struct *work)
166} 169}
167EXPORT_SYMBOL(run_scheduled_work); 170EXPORT_SYMBOL(run_scheduled_work);
168 171
172static void insert_work(struct cpu_workqueue_struct *cwq,
173 struct work_struct *work, int tail)
174{
175 set_wq_data(work, cwq);
176 if (tail)
177 list_add_tail(&work->entry, &cwq->worklist);
178 else
179 list_add(&work->entry, &cwq->worklist);
180 wake_up(&cwq->more_work);
181}
182
169/* Preempt must be disabled. */ 183/* Preempt must be disabled. */
170static void __queue_work(struct cpu_workqueue_struct *cwq, 184static void __queue_work(struct cpu_workqueue_struct *cwq,
171 struct work_struct *work) 185 struct work_struct *work)
@@ -173,9 +187,7 @@ static void __queue_work(struct cpu_workqueue_struct *cwq,
173 unsigned long flags; 187 unsigned long flags;
174 188
175 spin_lock_irqsave(&cwq->lock, flags); 189 spin_lock_irqsave(&cwq->lock, flags);
176 set_wq_data(work, cwq); 190 insert_work(cwq, work, 1);
177 list_add_tail(&work->entry, &cwq->worklist);
178 wake_up(&cwq->more_work);
179 spin_unlock_irqrestore(&cwq->lock, flags); 191 spin_unlock_irqrestore(&cwq->lock, flags);
180} 192}
181 193
@@ -305,6 +317,7 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
305 struct work_struct, entry); 317 struct work_struct, entry);
306 work_func_t f = work->func; 318 work_func_t f = work->func;
307 319
320 cwq->current_work = work;
308 list_del_init(cwq->worklist.next); 321 list_del_init(cwq->worklist.next);
309 spin_unlock_irqrestore(&cwq->lock, flags); 322 spin_unlock_irqrestore(&cwq->lock, flags);
310 323
@@ -325,6 +338,7 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
325 } 338 }
326 339
327 spin_lock_irqsave(&cwq->lock, flags); 340 spin_lock_irqsave(&cwq->lock, flags);
341 cwq->current_work = NULL;
328 } 342 }
329 cwq->run_depth--; 343 cwq->run_depth--;
330 spin_unlock_irqrestore(&cwq->lock, flags); 344 spin_unlock_irqrestore(&cwq->lock, flags);
@@ -449,6 +463,75 @@ void fastcall flush_workqueue(struct workqueue_struct *wq)
449} 463}
450EXPORT_SYMBOL_GPL(flush_workqueue); 464EXPORT_SYMBOL_GPL(flush_workqueue);
451 465
466static void wait_on_work(struct cpu_workqueue_struct *cwq,
467 struct work_struct *work)
468{
469 struct wq_barrier barr;
470 int running = 0;
471
472 spin_lock_irq(&cwq->lock);
473 if (unlikely(cwq->current_work == work)) {
474 init_wq_barrier(&barr);
475 insert_work(cwq, &barr.work, 0);
476 running = 1;
477 }
478 spin_unlock_irq(&cwq->lock);
479
480 if (unlikely(running)) {
481 mutex_unlock(&workqueue_mutex);
482 wait_for_completion(&barr.done);
483 mutex_lock(&workqueue_mutex);
484 }
485}
486
487/**
488 * flush_work - block until a work_struct's callback has terminated
489 * @wq: the workqueue on which the work is queued
490 * @work: the work which is to be flushed
491 *
492 * flush_work() will attempt to cancel the work if it is queued. If the work's
493 * callback appears to be running, flush_work() will block until it has
494 * completed.
495 *
496 * flush_work() is designed to be used when the caller is tearing down data
497 * structures which the callback function operates upon. It is expected that,
498 * prior to calling flush_work(), the caller has arranged for the work to not
499 * be requeued.
500 */
501void flush_work(struct workqueue_struct *wq, struct work_struct *work)
502{
503 struct cpu_workqueue_struct *cwq;
504
505 mutex_lock(&workqueue_mutex);
506 cwq = get_wq_data(work);
507 /* Was it ever queued ? */
508 if (!cwq)
509 goto out;
510
511 /*
512 * This work can't be re-queued, and the lock above protects us
513 * from take_over_work(), no need to re-check that get_wq_data()
514 * is still the same when we take cwq->lock.
515 */
516 spin_lock_irq(&cwq->lock);
517 list_del_init(&work->entry);
518 work_release(work);
519 spin_unlock_irq(&cwq->lock);
520
521 if (is_single_threaded(wq)) {
522 /* Always use first cpu's area. */
523 wait_on_work(per_cpu_ptr(wq->cpu_wq, singlethread_cpu), work);
524 } else {
525 int cpu;
526
527 for_each_online_cpu(cpu)
528 wait_on_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
529 }
530out:
531 mutex_unlock(&workqueue_mutex);
532}
533EXPORT_SYMBOL_GPL(flush_work);
534
452static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, 535static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
453 int cpu, int freezeable) 536 int cpu, int freezeable)
454{ 537{
@@ -650,6 +733,12 @@ void flush_scheduled_work(void)
650} 733}
651EXPORT_SYMBOL(flush_scheduled_work); 734EXPORT_SYMBOL(flush_scheduled_work);
652 735
736void flush_work_keventd(struct work_struct *work)
737{
738 flush_work(keventd_wq, work);
739}
740EXPORT_SYMBOL(flush_work_keventd);
741
653/** 742/**
654 * cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work. 743 * cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work.
655 * @wq: the controlling workqueue structure 744 * @wq: the controlling workqueue structure