aboutsummaryrefslogtreecommitdiffstats
path: root/kernel
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2010-08-07 15:42:58 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2010-08-07 15:42:58 -0400
commit3b7433b8a8a83c87972065b1852b7dcae691e464 (patch)
tree93fa2c003f8baef5ab0733b53bac77961ed5240c /kernel
parent4a386c3e177ca2fbc70c9283d0b46537844763a0 (diff)
parent6ee0578b4daaea01c96b172c6aacca43fd9807a6 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq: (55 commits) workqueue: mark init_workqueues() as early_initcall() workqueue: explain for_each_*cwq_cpu() iterators fscache: fix build on !CONFIG_SYSCTL slow-work: kill it gfs2: use workqueue instead of slow-work drm: use workqueue instead of slow-work cifs: use workqueue instead of slow-work fscache: drop references to slow-work fscache: convert operation to use workqueue instead of slow-work fscache: convert object to use workqueue instead of slow-work workqueue: fix how cpu number is stored in work->data workqueue: fix mayday_mask handling on UP workqueue: fix build problem on !CONFIG_SMP workqueue: fix locking in retry path of maybe_create_worker() async: use workqueue for worker pool workqueue: remove WQ_SINGLE_CPU and use WQ_UNBOUND instead workqueue: implement unbound workqueue workqueue: prepare for WQ_UNBOUND implementation libata: take advantage of cmwq and remove concurrency limitations workqueue: fix worker management invocation without pending works ... Fixed up conflicts in fs/cifs/* as per Tejun. Other trivial conflicts in include/linux/workqueue.h, kernel/trace/Kconfig and kernel/workqueue.c
Diffstat (limited to 'kernel')
-rw-r--r--kernel/Makefile2
-rw-r--r--kernel/async.c141
-rw-r--r--kernel/kthread.c164
-rw-r--r--kernel/power/process.c21
-rw-r--r--kernel/slow-work-debugfs.c227
-rw-r--r--kernel/slow-work.c1068
-rw-r--r--kernel/slow-work.h72
-rw-r--r--kernel/sysctl.c8
-rw-r--r--kernel/trace/Kconfig11
-rw-r--r--kernel/workqueue.c3160
-rw-r--r--kernel/workqueue_sched.h13
11 files changed, 2963 insertions, 1924 deletions
diff --git a/kernel/Makefile b/kernel/Makefile
index ce53fb2bd1d9..c53e491e25a8 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -99,8 +99,6 @@ obj-$(CONFIG_TRACING) += trace/
99obj-$(CONFIG_X86_DS) += trace/ 99obj-$(CONFIG_X86_DS) += trace/
100obj-$(CONFIG_RING_BUFFER) += trace/ 100obj-$(CONFIG_RING_BUFFER) += trace/
101obj-$(CONFIG_SMP) += sched_cpupri.o 101obj-$(CONFIG_SMP) += sched_cpupri.o
102obj-$(CONFIG_SLOW_WORK) += slow-work.o
103obj-$(CONFIG_SLOW_WORK_DEBUG) += slow-work-debugfs.o
104obj-$(CONFIG_PERF_EVENTS) += perf_event.o 102obj-$(CONFIG_PERF_EVENTS) += perf_event.o
105obj-$(CONFIG_HAVE_HW_BREAKPOINT) += hw_breakpoint.o 103obj-$(CONFIG_HAVE_HW_BREAKPOINT) += hw_breakpoint.o
106obj-$(CONFIG_USER_RETURN_NOTIFIER) += user-return-notifier.o 104obj-$(CONFIG_USER_RETURN_NOTIFIER) += user-return-notifier.o
diff --git a/kernel/async.c b/kernel/async.c
index 15319d6c18fe..cd9dbb913c77 100644
--- a/kernel/async.c
+++ b/kernel/async.c
@@ -49,40 +49,33 @@ asynchronous and synchronous parts of the kernel.
49*/ 49*/
50 50
51#include <linux/async.h> 51#include <linux/async.h>
52#include <linux/bug.h>
53#include <linux/module.h> 52#include <linux/module.h>
54#include <linux/wait.h> 53#include <linux/wait.h>
55#include <linux/sched.h> 54#include <linux/sched.h>
56#include <linux/init.h>
57#include <linux/kthread.h>
58#include <linux/delay.h>
59#include <linux/slab.h> 55#include <linux/slab.h>
56#include <linux/workqueue.h>
60#include <asm/atomic.h> 57#include <asm/atomic.h>
61 58
62static async_cookie_t next_cookie = 1; 59static async_cookie_t next_cookie = 1;
63 60
64#define MAX_THREADS 256
65#define MAX_WORK 32768 61#define MAX_WORK 32768
66 62
67static LIST_HEAD(async_pending); 63static LIST_HEAD(async_pending);
68static LIST_HEAD(async_running); 64static LIST_HEAD(async_running);
69static DEFINE_SPINLOCK(async_lock); 65static DEFINE_SPINLOCK(async_lock);
70 66
71static int async_enabled = 0;
72
73struct async_entry { 67struct async_entry {
74 struct list_head list; 68 struct list_head list;
75 async_cookie_t cookie; 69 struct work_struct work;
76 async_func_ptr *func; 70 async_cookie_t cookie;
77 void *data; 71 async_func_ptr *func;
78 struct list_head *running; 72 void *data;
73 struct list_head *running;
79}; 74};
80 75
81static DECLARE_WAIT_QUEUE_HEAD(async_done); 76static DECLARE_WAIT_QUEUE_HEAD(async_done);
82static DECLARE_WAIT_QUEUE_HEAD(async_new);
83 77
84static atomic_t entry_count; 78static atomic_t entry_count;
85static atomic_t thread_count;
86 79
87extern int initcall_debug; 80extern int initcall_debug;
88 81
@@ -117,27 +110,23 @@ static async_cookie_t lowest_in_progress(struct list_head *running)
117 spin_unlock_irqrestore(&async_lock, flags); 110 spin_unlock_irqrestore(&async_lock, flags);
118 return ret; 111 return ret;
119} 112}
113
120/* 114/*
121 * pick the first pending entry and run it 115 * pick the first pending entry and run it
122 */ 116 */
123static void run_one_entry(void) 117static void async_run_entry_fn(struct work_struct *work)
124{ 118{
119 struct async_entry *entry =
120 container_of(work, struct async_entry, work);
125 unsigned long flags; 121 unsigned long flags;
126 struct async_entry *entry;
127 ktime_t calltime, delta, rettime; 122 ktime_t calltime, delta, rettime;
128 123
129 /* 1) pick one task from the pending queue */ 124 /* 1) move self to the running queue */
130
131 spin_lock_irqsave(&async_lock, flags); 125 spin_lock_irqsave(&async_lock, flags);
132 if (list_empty(&async_pending))
133 goto out;
134 entry = list_first_entry(&async_pending, struct async_entry, list);
135
136 /* 2) move it to the running queue */
137 list_move_tail(&entry->list, entry->running); 126 list_move_tail(&entry->list, entry->running);
138 spin_unlock_irqrestore(&async_lock, flags); 127 spin_unlock_irqrestore(&async_lock, flags);
139 128
140 /* 3) run it (and print duration)*/ 129 /* 2) run (and print duration) */
141 if (initcall_debug && system_state == SYSTEM_BOOTING) { 130 if (initcall_debug && system_state == SYSTEM_BOOTING) {
142 printk("calling %lli_%pF @ %i\n", (long long)entry->cookie, 131 printk("calling %lli_%pF @ %i\n", (long long)entry->cookie,
143 entry->func, task_pid_nr(current)); 132 entry->func, task_pid_nr(current));
@@ -153,31 +142,25 @@ static void run_one_entry(void)
153 (long long)ktime_to_ns(delta) >> 10); 142 (long long)ktime_to_ns(delta) >> 10);
154 } 143 }
155 144
156 /* 4) remove it from the running queue */ 145 /* 3) remove self from the running queue */
157 spin_lock_irqsave(&async_lock, flags); 146 spin_lock_irqsave(&async_lock, flags);
158 list_del(&entry->list); 147 list_del(&entry->list);
159 148
160 /* 5) free the entry */ 149 /* 4) free the entry */
161 kfree(entry); 150 kfree(entry);
162 atomic_dec(&entry_count); 151 atomic_dec(&entry_count);
163 152
164 spin_unlock_irqrestore(&async_lock, flags); 153 spin_unlock_irqrestore(&async_lock, flags);
165 154
166 /* 6) wake up any waiters. */ 155 /* 5) wake up any waiters */
167 wake_up(&async_done); 156 wake_up(&async_done);
168 return;
169
170out:
171 spin_unlock_irqrestore(&async_lock, flags);
172} 157}
173 158
174
175static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct list_head *running) 159static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct list_head *running)
176{ 160{
177 struct async_entry *entry; 161 struct async_entry *entry;
178 unsigned long flags; 162 unsigned long flags;
179 async_cookie_t newcookie; 163 async_cookie_t newcookie;
180
181 164
182 /* allow irq-off callers */ 165 /* allow irq-off callers */
183 entry = kzalloc(sizeof(struct async_entry), GFP_ATOMIC); 166 entry = kzalloc(sizeof(struct async_entry), GFP_ATOMIC);
@@ -186,7 +169,7 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l
186 * If we're out of memory or if there's too much work 169 * If we're out of memory or if there's too much work
187 * pending already, we execute synchronously. 170 * pending already, we execute synchronously.
188 */ 171 */
189 if (!async_enabled || !entry || atomic_read(&entry_count) > MAX_WORK) { 172 if (!entry || atomic_read(&entry_count) > MAX_WORK) {
190 kfree(entry); 173 kfree(entry);
191 spin_lock_irqsave(&async_lock, flags); 174 spin_lock_irqsave(&async_lock, flags);
192 newcookie = next_cookie++; 175 newcookie = next_cookie++;
@@ -196,6 +179,7 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l
196 ptr(data, newcookie); 179 ptr(data, newcookie);
197 return newcookie; 180 return newcookie;
198 } 181 }
182 INIT_WORK(&entry->work, async_run_entry_fn);
199 entry->func = ptr; 183 entry->func = ptr;
200 entry->data = data; 184 entry->data = data;
201 entry->running = running; 185 entry->running = running;
@@ -205,7 +189,10 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l
205 list_add_tail(&entry->list, &async_pending); 189 list_add_tail(&entry->list, &async_pending);
206 atomic_inc(&entry_count); 190 atomic_inc(&entry_count);
207 spin_unlock_irqrestore(&async_lock, flags); 191 spin_unlock_irqrestore(&async_lock, flags);
208 wake_up(&async_new); 192
193 /* schedule for execution */
194 queue_work(system_unbound_wq, &entry->work);
195
209 return newcookie; 196 return newcookie;
210} 197}
211 198
@@ -312,87 +299,3 @@ void async_synchronize_cookie(async_cookie_t cookie)
312 async_synchronize_cookie_domain(cookie, &async_running); 299 async_synchronize_cookie_domain(cookie, &async_running);
313} 300}
314EXPORT_SYMBOL_GPL(async_synchronize_cookie); 301EXPORT_SYMBOL_GPL(async_synchronize_cookie);
315
316
317static int async_thread(void *unused)
318{
319 DECLARE_WAITQUEUE(wq, current);
320 add_wait_queue(&async_new, &wq);
321
322 while (!kthread_should_stop()) {
323 int ret = HZ;
324 set_current_state(TASK_INTERRUPTIBLE);
325 /*
326 * check the list head without lock.. false positives
327 * are dealt with inside run_one_entry() while holding
328 * the lock.
329 */
330 rmb();
331 if (!list_empty(&async_pending))
332 run_one_entry();
333 else
334 ret = schedule_timeout(HZ);
335
336 if (ret == 0) {
337 /*
338 * we timed out, this means we as thread are redundant.
339 * we sign off and die, but we to avoid any races there
340 * is a last-straw check to see if work snuck in.
341 */
342 atomic_dec(&thread_count);
343 wmb(); /* manager must see our departure first */
344 if (list_empty(&async_pending))
345 break;
346 /*
347 * woops work came in between us timing out and us
348 * signing off; we need to stay alive and keep working.
349 */
350 atomic_inc(&thread_count);
351 }
352 }
353 remove_wait_queue(&async_new, &wq);
354
355 return 0;
356}
357
358static int async_manager_thread(void *unused)
359{
360 DECLARE_WAITQUEUE(wq, current);
361 add_wait_queue(&async_new, &wq);
362
363 while (!kthread_should_stop()) {
364 int tc, ec;
365
366 set_current_state(TASK_INTERRUPTIBLE);
367
368 tc = atomic_read(&thread_count);
369 rmb();
370 ec = atomic_read(&entry_count);
371
372 while (tc < ec && tc < MAX_THREADS) {
373 if (IS_ERR(kthread_run(async_thread, NULL, "async/%i",
374 tc))) {
375 msleep(100);
376 continue;
377 }
378 atomic_inc(&thread_count);
379 tc++;
380 }
381
382 schedule();
383 }
384 remove_wait_queue(&async_new, &wq);
385
386 return 0;
387}
388
389static int __init async_init(void)
390{
391 async_enabled =
392 !IS_ERR(kthread_run(async_manager_thread, NULL, "async/mgr"));
393
394 WARN_ON(!async_enabled);
395 return 0;
396}
397
398core_initcall(async_init);
diff --git a/kernel/kthread.c b/kernel/kthread.c
index 83911c780175..2dc3786349d1 100644
--- a/kernel/kthread.c
+++ b/kernel/kthread.c
@@ -14,6 +14,8 @@
14#include <linux/file.h> 14#include <linux/file.h>
15#include <linux/module.h> 15#include <linux/module.h>
16#include <linux/mutex.h> 16#include <linux/mutex.h>
17#include <linux/slab.h>
18#include <linux/freezer.h>
17#include <trace/events/sched.h> 19#include <trace/events/sched.h>
18 20
19static DEFINE_SPINLOCK(kthread_create_lock); 21static DEFINE_SPINLOCK(kthread_create_lock);
@@ -35,6 +37,7 @@ struct kthread_create_info
35 37
36struct kthread { 38struct kthread {
37 int should_stop; 39 int should_stop;
40 void *data;
38 struct completion exited; 41 struct completion exited;
39}; 42};
40 43
@@ -54,6 +57,19 @@ int kthread_should_stop(void)
54} 57}
55EXPORT_SYMBOL(kthread_should_stop); 58EXPORT_SYMBOL(kthread_should_stop);
56 59
60/**
61 * kthread_data - return data value specified on kthread creation
62 * @task: kthread task in question
63 *
64 * Return the data value specified when kthread @task was created.
65 * The caller is responsible for ensuring the validity of @task when
66 * calling this function.
67 */
68void *kthread_data(struct task_struct *task)
69{
70 return to_kthread(task)->data;
71}
72
57static int kthread(void *_create) 73static int kthread(void *_create)
58{ 74{
59 /* Copy data: it's on kthread's stack */ 75 /* Copy data: it's on kthread's stack */
@@ -64,6 +80,7 @@ static int kthread(void *_create)
64 int ret; 80 int ret;
65 81
66 self.should_stop = 0; 82 self.should_stop = 0;
83 self.data = data;
67 init_completion(&self.exited); 84 init_completion(&self.exited);
68 current->vfork_done = &self.exited; 85 current->vfork_done = &self.exited;
69 86
@@ -247,3 +264,150 @@ int kthreadd(void *unused)
247 264
248 return 0; 265 return 0;
249} 266}
267
268/**
269 * kthread_worker_fn - kthread function to process kthread_worker
270 * @worker_ptr: pointer to initialized kthread_worker
271 *
272 * This function can be used as @threadfn to kthread_create() or
273 * kthread_run() with @worker_ptr argument pointing to an initialized
274 * kthread_worker. The started kthread will process work_list until
275 * the it is stopped with kthread_stop(). A kthread can also call
276 * this function directly after extra initialization.
277 *
278 * Different kthreads can be used for the same kthread_worker as long
279 * as there's only one kthread attached to it at any given time. A
280 * kthread_worker without an attached kthread simply collects queued
281 * kthread_works.
282 */
283int kthread_worker_fn(void *worker_ptr)
284{
285 struct kthread_worker *worker = worker_ptr;
286 struct kthread_work *work;
287
288 WARN_ON(worker->task);
289 worker->task = current;
290repeat:
291 set_current_state(TASK_INTERRUPTIBLE); /* mb paired w/ kthread_stop */
292
293 if (kthread_should_stop()) {
294 __set_current_state(TASK_RUNNING);
295 spin_lock_irq(&worker->lock);
296 worker->task = NULL;
297 spin_unlock_irq(&worker->lock);
298 return 0;
299 }
300
301 work = NULL;
302 spin_lock_irq(&worker->lock);
303 if (!list_empty(&worker->work_list)) {
304 work = list_first_entry(&worker->work_list,
305 struct kthread_work, node);
306 list_del_init(&work->node);
307 }
308 spin_unlock_irq(&worker->lock);
309
310 if (work) {
311 __set_current_state(TASK_RUNNING);
312 work->func(work);
313 smp_wmb(); /* wmb worker-b0 paired with flush-b1 */
314 work->done_seq = work->queue_seq;
315 smp_mb(); /* mb worker-b1 paired with flush-b0 */
316 if (atomic_read(&work->flushing))
317 wake_up_all(&work->done);
318 } else if (!freezing(current))
319 schedule();
320
321 try_to_freeze();
322 goto repeat;
323}
324EXPORT_SYMBOL_GPL(kthread_worker_fn);
325
326/**
327 * queue_kthread_work - queue a kthread_work
328 * @worker: target kthread_worker
329 * @work: kthread_work to queue
330 *
331 * Queue @work to work processor @task for async execution. @task
332 * must have been created with kthread_worker_create(). Returns %true
333 * if @work was successfully queued, %false if it was already pending.
334 */
335bool queue_kthread_work(struct kthread_worker *worker,
336 struct kthread_work *work)
337{
338 bool ret = false;
339 unsigned long flags;
340
341 spin_lock_irqsave(&worker->lock, flags);
342 if (list_empty(&work->node)) {
343 list_add_tail(&work->node, &worker->work_list);
344 work->queue_seq++;
345 if (likely(worker->task))
346 wake_up_process(worker->task);
347 ret = true;
348 }
349 spin_unlock_irqrestore(&worker->lock, flags);
350 return ret;
351}
352EXPORT_SYMBOL_GPL(queue_kthread_work);
353
354/**
355 * flush_kthread_work - flush a kthread_work
356 * @work: work to flush
357 *
358 * If @work is queued or executing, wait for it to finish execution.
359 */
360void flush_kthread_work(struct kthread_work *work)
361{
362 int seq = work->queue_seq;
363
364 atomic_inc(&work->flushing);
365
366 /*
367 * mb flush-b0 paired with worker-b1, to make sure either
368 * worker sees the above increment or we see done_seq update.
369 */
370 smp_mb__after_atomic_inc();
371
372 /* A - B <= 0 tests whether B is in front of A regardless of overflow */
373 wait_event(work->done, seq - work->done_seq <= 0);
374 atomic_dec(&work->flushing);
375
376 /*
377 * rmb flush-b1 paired with worker-b0, to make sure our caller
378 * sees every change made by work->func().
379 */
380 smp_mb__after_atomic_dec();
381}
382EXPORT_SYMBOL_GPL(flush_kthread_work);
383
384struct kthread_flush_work {
385 struct kthread_work work;
386 struct completion done;
387};
388
389static void kthread_flush_work_fn(struct kthread_work *work)
390{
391 struct kthread_flush_work *fwork =
392 container_of(work, struct kthread_flush_work, work);
393 complete(&fwork->done);
394}
395
396/**
397 * flush_kthread_worker - flush all current works on a kthread_worker
398 * @worker: worker to flush
399 *
400 * Wait until all currently executing or pending works on @worker are
401 * finished.
402 */
403void flush_kthread_worker(struct kthread_worker *worker)
404{
405 struct kthread_flush_work fwork = {
406 KTHREAD_WORK_INIT(fwork.work, kthread_flush_work_fn),
407 COMPLETION_INITIALIZER_ONSTACK(fwork.done),
408 };
409
410 queue_kthread_work(worker, &fwork.work);
411 wait_for_completion(&fwork.done);
412}
413EXPORT_SYMBOL_GPL(flush_kthread_worker);
diff --git a/kernel/power/process.c b/kernel/power/process.c
index 71ae29052ab6..028a99598f49 100644
--- a/kernel/power/process.c
+++ b/kernel/power/process.c
@@ -15,6 +15,7 @@
15#include <linux/syscalls.h> 15#include <linux/syscalls.h>
16#include <linux/freezer.h> 16#include <linux/freezer.h>
17#include <linux/delay.h> 17#include <linux/delay.h>
18#include <linux/workqueue.h>
18 19
19/* 20/*
20 * Timeout for stopping processes 21 * Timeout for stopping processes
@@ -35,6 +36,7 @@ static int try_to_freeze_tasks(bool sig_only)
35 struct task_struct *g, *p; 36 struct task_struct *g, *p;
36 unsigned long end_time; 37 unsigned long end_time;
37 unsigned int todo; 38 unsigned int todo;
39 bool wq_busy = false;
38 struct timeval start, end; 40 struct timeval start, end;
39 u64 elapsed_csecs64; 41 u64 elapsed_csecs64;
40 unsigned int elapsed_csecs; 42 unsigned int elapsed_csecs;
@@ -42,6 +44,10 @@ static int try_to_freeze_tasks(bool sig_only)
42 do_gettimeofday(&start); 44 do_gettimeofday(&start);
43 45
44 end_time = jiffies + TIMEOUT; 46 end_time = jiffies + TIMEOUT;
47
48 if (!sig_only)
49 freeze_workqueues_begin();
50
45 while (true) { 51 while (true) {
46 todo = 0; 52 todo = 0;
47 read_lock(&tasklist_lock); 53 read_lock(&tasklist_lock);
@@ -63,6 +69,12 @@ static int try_to_freeze_tasks(bool sig_only)
63 todo++; 69 todo++;
64 } while_each_thread(g, p); 70 } while_each_thread(g, p);
65 read_unlock(&tasklist_lock); 71 read_unlock(&tasklist_lock);
72
73 if (!sig_only) {
74 wq_busy = freeze_workqueues_busy();
75 todo += wq_busy;
76 }
77
66 if (!todo || time_after(jiffies, end_time)) 78 if (!todo || time_after(jiffies, end_time))
67 break; 79 break;
68 80
@@ -86,8 +98,12 @@ static int try_to_freeze_tasks(bool sig_only)
86 */ 98 */
87 printk("\n"); 99 printk("\n");
88 printk(KERN_ERR "Freezing of tasks failed after %d.%02d seconds " 100 printk(KERN_ERR "Freezing of tasks failed after %d.%02d seconds "
89 "(%d tasks refusing to freeze):\n", 101 "(%d tasks refusing to freeze, wq_busy=%d):\n",
90 elapsed_csecs / 100, elapsed_csecs % 100, todo); 102 elapsed_csecs / 100, elapsed_csecs % 100,
103 todo - wq_busy, wq_busy);
104
105 thaw_workqueues();
106
91 read_lock(&tasklist_lock); 107 read_lock(&tasklist_lock);
92 do_each_thread(g, p) { 108 do_each_thread(g, p) {
93 task_lock(p); 109 task_lock(p);
@@ -157,6 +173,7 @@ void thaw_processes(void)
157 oom_killer_enable(); 173 oom_killer_enable();
158 174
159 printk("Restarting tasks ... "); 175 printk("Restarting tasks ... ");
176 thaw_workqueues();
160 thaw_tasks(true); 177 thaw_tasks(true);
161 thaw_tasks(false); 178 thaw_tasks(false);
162 schedule(); 179 schedule();
diff --git a/kernel/slow-work-debugfs.c b/kernel/slow-work-debugfs.c
deleted file mode 100644
index e45c43645298..000000000000
--- a/kernel/slow-work-debugfs.c
+++ /dev/null
@@ -1,227 +0,0 @@
1/* Slow work debugging
2 *
3 * Copyright (C) 2009 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version.
10 */
11
12#include <linux/module.h>
13#include <linux/slow-work.h>
14#include <linux/fs.h>
15#include <linux/time.h>
16#include <linux/seq_file.h>
17#include "slow-work.h"
18
19#define ITERATOR_SHIFT (BITS_PER_LONG - 4)
20#define ITERATOR_SELECTOR (0xfUL << ITERATOR_SHIFT)
21#define ITERATOR_COUNTER (~ITERATOR_SELECTOR)
22
23void slow_work_new_thread_desc(struct slow_work *work, struct seq_file *m)
24{
25 seq_puts(m, "Slow-work: New thread");
26}
27
28/*
29 * Render the time mark field on a work item into a 5-char time with units plus
30 * a space
31 */
32static void slow_work_print_mark(struct seq_file *m, struct slow_work *work)
33{
34 struct timespec now, diff;
35
36 now = CURRENT_TIME;
37 diff = timespec_sub(now, work->mark);
38
39 if (diff.tv_sec < 0)
40 seq_puts(m, " -ve ");
41 else if (diff.tv_sec == 0 && diff.tv_nsec < 1000)
42 seq_printf(m, "%3luns ", diff.tv_nsec);
43 else if (diff.tv_sec == 0 && diff.tv_nsec < 1000000)
44 seq_printf(m, "%3luus ", diff.tv_nsec / 1000);
45 else if (diff.tv_sec == 0 && diff.tv_nsec < 1000000000)
46 seq_printf(m, "%3lums ", diff.tv_nsec / 1000000);
47 else if (diff.tv_sec <= 1)
48 seq_puts(m, " 1s ");
49 else if (diff.tv_sec < 60)
50 seq_printf(m, "%4lus ", diff.tv_sec);
51 else if (diff.tv_sec < 60 * 60)
52 seq_printf(m, "%4lum ", diff.tv_sec / 60);
53 else if (diff.tv_sec < 60 * 60 * 24)
54 seq_printf(m, "%4luh ", diff.tv_sec / 3600);
55 else
56 seq_puts(m, "exces ");
57}
58
59/*
60 * Describe a slow work item for debugfs
61 */
62static int slow_work_runqueue_show(struct seq_file *m, void *v)
63{
64 struct slow_work *work;
65 struct list_head *p = v;
66 unsigned long id;
67
68 switch ((unsigned long) v) {
69 case 1:
70 seq_puts(m, "THR PID ITEM ADDR FL MARK DESC\n");
71 return 0;
72 case 2:
73 seq_puts(m, "=== ===== ================ == ===== ==========\n");
74 return 0;
75
76 case 3 ... 3 + SLOW_WORK_THREAD_LIMIT - 1:
77 id = (unsigned long) v - 3;
78
79 read_lock(&slow_work_execs_lock);
80 work = slow_work_execs[id];
81 if (work) {
82 smp_read_barrier_depends();
83
84 seq_printf(m, "%3lu %5d %16p %2lx ",
85 id, slow_work_pids[id], work, work->flags);
86 slow_work_print_mark(m, work);
87
88 if (work->ops->desc)
89 work->ops->desc(work, m);
90 seq_putc(m, '\n');
91 }
92 read_unlock(&slow_work_execs_lock);
93 return 0;
94
95 default:
96 work = list_entry(p, struct slow_work, link);
97 seq_printf(m, "%3s - %16p %2lx ",
98 work->flags & SLOW_WORK_VERY_SLOW ? "vsq" : "sq",
99 work, work->flags);
100 slow_work_print_mark(m, work);
101
102 if (work->ops->desc)
103 work->ops->desc(work, m);
104 seq_putc(m, '\n');
105 return 0;
106 }
107}
108
109/*
110 * map the iterator to a work item
111 */
112static void *slow_work_runqueue_index(struct seq_file *m, loff_t *_pos)
113{
114 struct list_head *p;
115 unsigned long count, id;
116
117 switch (*_pos >> ITERATOR_SHIFT) {
118 case 0x0:
119 if (*_pos == 0)
120 *_pos = 1;
121 if (*_pos < 3)
122 return (void *)(unsigned long) *_pos;
123 if (*_pos < 3 + SLOW_WORK_THREAD_LIMIT)
124 for (id = *_pos - 3;
125 id < SLOW_WORK_THREAD_LIMIT;
126 id++, (*_pos)++)
127 if (slow_work_execs[id])
128 return (void *)(unsigned long) *_pos;
129 *_pos = 0x1UL << ITERATOR_SHIFT;
130
131 case 0x1:
132 count = *_pos & ITERATOR_COUNTER;
133 list_for_each(p, &slow_work_queue) {
134 if (count == 0)
135 return p;
136 count--;
137 }
138 *_pos = 0x2UL << ITERATOR_SHIFT;
139
140 case 0x2:
141 count = *_pos & ITERATOR_COUNTER;
142 list_for_each(p, &vslow_work_queue) {
143 if (count == 0)
144 return p;
145 count--;
146 }
147 *_pos = 0x3UL << ITERATOR_SHIFT;
148
149 default:
150 return NULL;
151 }
152}
153
154/*
155 * set up the iterator to start reading from the first line
156 */
157static void *slow_work_runqueue_start(struct seq_file *m, loff_t *_pos)
158{
159 spin_lock_irq(&slow_work_queue_lock);
160 return slow_work_runqueue_index(m, _pos);
161}
162
163/*
164 * move to the next line
165 */
166static void *slow_work_runqueue_next(struct seq_file *m, void *v, loff_t *_pos)
167{
168 struct list_head *p = v;
169 unsigned long selector = *_pos >> ITERATOR_SHIFT;
170
171 (*_pos)++;
172 switch (selector) {
173 case 0x0:
174 return slow_work_runqueue_index(m, _pos);
175
176 case 0x1:
177 if (*_pos >> ITERATOR_SHIFT == 0x1) {
178 p = p->next;
179 if (p != &slow_work_queue)
180 return p;
181 }
182 *_pos = 0x2UL << ITERATOR_SHIFT;
183 p = &vslow_work_queue;
184
185 case 0x2:
186 if (*_pos >> ITERATOR_SHIFT == 0x2) {
187 p = p->next;
188 if (p != &vslow_work_queue)
189 return p;
190 }
191 *_pos = 0x3UL << ITERATOR_SHIFT;
192
193 default:
194 return NULL;
195 }
196}
197
198/*
199 * clean up after reading
200 */
201static void slow_work_runqueue_stop(struct seq_file *m, void *v)
202{
203 spin_unlock_irq(&slow_work_queue_lock);
204}
205
206static const struct seq_operations slow_work_runqueue_ops = {
207 .start = slow_work_runqueue_start,
208 .stop = slow_work_runqueue_stop,
209 .next = slow_work_runqueue_next,
210 .show = slow_work_runqueue_show,
211};
212
213/*
214 * open "/sys/kernel/debug/slow_work/runqueue" to list queue contents
215 */
216static int slow_work_runqueue_open(struct inode *inode, struct file *file)
217{
218 return seq_open(file, &slow_work_runqueue_ops);
219}
220
221const struct file_operations slow_work_runqueue_fops = {
222 .owner = THIS_MODULE,
223 .open = slow_work_runqueue_open,
224 .read = seq_read,
225 .llseek = seq_lseek,
226 .release = seq_release,
227};
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
deleted file mode 100644
index 7d3f4fa9ef4f..000000000000
--- a/kernel/slow-work.c
+++ /dev/null
@@ -1,1068 +0,0 @@
1/* Worker thread pool for slow items, such as filesystem lookups or mkdirs
2 *
3 * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version.
10 *
11 * See Documentation/slow-work.txt
12 */
13
14#include <linux/module.h>
15#include <linux/slow-work.h>
16#include <linux/kthread.h>
17#include <linux/freezer.h>
18#include <linux/wait.h>
19#include <linux/debugfs.h>
20#include "slow-work.h"
21
22static void slow_work_cull_timeout(unsigned long);
23static void slow_work_oom_timeout(unsigned long);
24
25#ifdef CONFIG_SYSCTL
26static int slow_work_min_threads_sysctl(struct ctl_table *, int,
27 void __user *, size_t *, loff_t *);
28
29static int slow_work_max_threads_sysctl(struct ctl_table *, int ,
30 void __user *, size_t *, loff_t *);
31#endif
32
33/*
34 * The pool of threads has at least min threads in it as long as someone is
35 * using the facility, and may have as many as max.
36 *
37 * A portion of the pool may be processing very slow operations.
38 */
39static unsigned slow_work_min_threads = 2;
40static unsigned slow_work_max_threads = 4;
41static unsigned vslow_work_proportion = 50; /* % of threads that may process
42 * very slow work */
43
44#ifdef CONFIG_SYSCTL
45static const int slow_work_min_min_threads = 2;
46static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
47static const int slow_work_min_vslow = 1;
48static const int slow_work_max_vslow = 99;
49
50ctl_table slow_work_sysctls[] = {
51 {
52 .procname = "min-threads",
53 .data = &slow_work_min_threads,
54 .maxlen = sizeof(unsigned),
55 .mode = 0644,
56 .proc_handler = slow_work_min_threads_sysctl,
57 .extra1 = (void *) &slow_work_min_min_threads,
58 .extra2 = &slow_work_max_threads,
59 },
60 {
61 .procname = "max-threads",
62 .data = &slow_work_max_threads,
63 .maxlen = sizeof(unsigned),
64 .mode = 0644,
65 .proc_handler = slow_work_max_threads_sysctl,
66 .extra1 = &slow_work_min_threads,
67 .extra2 = (void *) &slow_work_max_max_threads,
68 },
69 {
70 .procname = "vslow-percentage",
71 .data = &vslow_work_proportion,
72 .maxlen = sizeof(unsigned),
73 .mode = 0644,
74 .proc_handler = proc_dointvec_minmax,
75 .extra1 = (void *) &slow_work_min_vslow,
76 .extra2 = (void *) &slow_work_max_vslow,
77 },
78 {}
79};
80#endif
81
82/*
83 * The active state of the thread pool
84 */
85static atomic_t slow_work_thread_count;
86static atomic_t vslow_work_executing_count;
87
88static bool slow_work_may_not_start_new_thread;
89static bool slow_work_cull; /* cull a thread due to lack of activity */
90static DEFINE_TIMER(slow_work_cull_timer, slow_work_cull_timeout, 0, 0);
91static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
92static struct slow_work slow_work_new_thread; /* new thread starter */
93
94/*
95 * slow work ID allocation (use slow_work_queue_lock)
96 */
97static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
98
99/*
100 * Unregistration tracking to prevent put_ref() from disappearing during module
101 * unload
102 */
103#ifdef CONFIG_MODULES
104static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
105static struct module *slow_work_unreg_module;
106static struct slow_work *slow_work_unreg_work_item;
107static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
108static DEFINE_MUTEX(slow_work_unreg_sync_lock);
109
110static void slow_work_set_thread_processing(int id, struct slow_work *work)
111{
112 if (work)
113 slow_work_thread_processing[id] = work->owner;
114}
115static void slow_work_done_thread_processing(int id, struct slow_work *work)
116{
117 struct module *module = slow_work_thread_processing[id];
118
119 slow_work_thread_processing[id] = NULL;
120 smp_mb();
121 if (slow_work_unreg_work_item == work ||
122 slow_work_unreg_module == module)
123 wake_up_all(&slow_work_unreg_wq);
124}
125static void slow_work_clear_thread_processing(int id)
126{
127 slow_work_thread_processing[id] = NULL;
128}
129#else
130static void slow_work_set_thread_processing(int id, struct slow_work *work) {}
131static void slow_work_done_thread_processing(int id, struct slow_work *work) {}
132static void slow_work_clear_thread_processing(int id) {}
133#endif
134
135/*
136 * Data for tracking currently executing items for indication through /proc
137 */
138#ifdef CONFIG_SLOW_WORK_DEBUG
139struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT];
140pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT];
141DEFINE_RWLOCK(slow_work_execs_lock);
142#endif
143
144/*
145 * The queues of work items and the lock governing access to them. These are
146 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
147 * as the number of threads bears no relation to the number of CPUs.
148 *
149 * There are two queues of work items: one for slow work items, and one for
150 * very slow work items.
151 */
152LIST_HEAD(slow_work_queue);
153LIST_HEAD(vslow_work_queue);
154DEFINE_SPINLOCK(slow_work_queue_lock);
155
156/*
157 * The following are two wait queues that get pinged when a work item is placed
158 * on an empty queue. These allow work items that are hogging a thread by
159 * sleeping in a way that could be deferred to yield their thread and enqueue
160 * themselves.
161 */
162static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation);
163static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation);
164
165/*
166 * The thread controls. A variable used to signal to the threads that they
167 * should exit when the queue is empty, a waitqueue used by the threads to wait
168 * for signals, and a completion set by the last thread to exit.
169 */
170static bool slow_work_threads_should_exit;
171static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq);
172static DECLARE_COMPLETION(slow_work_last_thread_exited);
173
174/*
175 * The number of users of the thread pool and its lock. Whilst this is zero we
176 * have no threads hanging around, and when this reaches zero, we wait for all
177 * active or queued work items to complete and kill all the threads we do have.
178 */
179static int slow_work_user_count;
180static DEFINE_MUTEX(slow_work_user_lock);
181
182static inline int slow_work_get_ref(struct slow_work *work)
183{
184 if (work->ops->get_ref)
185 return work->ops->get_ref(work);
186
187 return 0;
188}
189
190static inline void slow_work_put_ref(struct slow_work *work)
191{
192 if (work->ops->put_ref)
193 work->ops->put_ref(work);
194}
195
196/*
197 * Calculate the maximum number of active threads in the pool that are
198 * permitted to process very slow work items.
199 *
200 * The answer is rounded up to at least 1, but may not equal or exceed the
201 * maximum number of the threads in the pool. This means we always have at
202 * least one thread that can process slow work items, and we always have at
203 * least one thread that won't get tied up doing so.
204 */
205static unsigned slow_work_calc_vsmax(void)
206{
207 unsigned vsmax;
208
209 vsmax = atomic_read(&slow_work_thread_count) * vslow_work_proportion;
210 vsmax /= 100;
211 vsmax = max(vsmax, 1U);
212 return min(vsmax, slow_work_max_threads - 1);
213}
214
215/*
216 * Attempt to execute stuff queued on a slow thread. Return true if we managed
217 * it, false if there was nothing to do.
218 */
219static noinline bool slow_work_execute(int id)
220{
221 struct slow_work *work = NULL;
222 unsigned vsmax;
223 bool very_slow;
224
225 vsmax = slow_work_calc_vsmax();
226
227 /* see if we can schedule a new thread to be started if we're not
228 * keeping up with the work */
229 if (!waitqueue_active(&slow_work_thread_wq) &&
230 (!list_empty(&slow_work_queue) || !list_empty(&vslow_work_queue)) &&
231 atomic_read(&slow_work_thread_count) < slow_work_max_threads &&
232 !slow_work_may_not_start_new_thread)
233 slow_work_enqueue(&slow_work_new_thread);
234
235 /* find something to execute */
236 spin_lock_irq(&slow_work_queue_lock);
237 if (!list_empty(&vslow_work_queue) &&
238 atomic_read(&vslow_work_executing_count) < vsmax) {
239 work = list_entry(vslow_work_queue.next,
240 struct slow_work, link);
241 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
242 BUG();
243 list_del_init(&work->link);
244 atomic_inc(&vslow_work_executing_count);
245 very_slow = true;
246 } else if (!list_empty(&slow_work_queue)) {
247 work = list_entry(slow_work_queue.next,
248 struct slow_work, link);
249 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
250 BUG();
251 list_del_init(&work->link);
252 very_slow = false;
253 } else {
254 very_slow = false; /* avoid the compiler warning */
255 }
256
257 slow_work_set_thread_processing(id, work);
258 if (work) {
259 slow_work_mark_time(work);
260 slow_work_begin_exec(id, work);
261 }
262
263 spin_unlock_irq(&slow_work_queue_lock);
264
265 if (!work)
266 return false;
267
268 if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
269 BUG();
270
271 /* don't execute if the work is in the process of being cancelled */
272 if (!test_bit(SLOW_WORK_CANCELLING, &work->flags))
273 work->ops->execute(work);
274
275 if (very_slow)
276 atomic_dec(&vslow_work_executing_count);
277 clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
278
279 /* wake up anyone waiting for this work to be complete */
280 wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
281
282 slow_work_end_exec(id, work);
283
284 /* if someone tried to enqueue the item whilst we were executing it,
285 * then it'll be left unenqueued to avoid multiple threads trying to
286 * execute it simultaneously
287 *
288 * there is, however, a race between us testing the pending flag and
289 * getting the spinlock, and between the enqueuer setting the pending
290 * flag and getting the spinlock, so we use a deferral bit to tell us
291 * if the enqueuer got there first
292 */
293 if (test_bit(SLOW_WORK_PENDING, &work->flags)) {
294 spin_lock_irq(&slow_work_queue_lock);
295
296 if (!test_bit(SLOW_WORK_EXECUTING, &work->flags) &&
297 test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags))
298 goto auto_requeue;
299
300 spin_unlock_irq(&slow_work_queue_lock);
301 }
302
303 /* sort out the race between module unloading and put_ref() */
304 slow_work_put_ref(work);
305 slow_work_done_thread_processing(id, work);
306
307 return true;
308
309auto_requeue:
310 /* we must complete the enqueue operation
311 * - we transfer our ref on the item back to the appropriate queue
312 * - don't wake another thread up as we're awake already
313 */
314 slow_work_mark_time(work);
315 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
316 list_add_tail(&work->link, &vslow_work_queue);
317 else
318 list_add_tail(&work->link, &slow_work_queue);
319 spin_unlock_irq(&slow_work_queue_lock);
320 slow_work_clear_thread_processing(id);
321 return true;
322}
323
324/**
325 * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
326 * work: The work item under execution that wants to sleep
327 * _timeout: Scheduler sleep timeout
328 *
329 * Allow a requeueable work item to sleep on a slow-work processor thread until
330 * that thread is needed to do some other work or the sleep is interrupted by
331 * some other event.
332 *
333 * The caller must set up a wake up event before calling this and must have set
334 * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
335 * condition before calling this function as no test is made here.
336 *
337 * False is returned if there is nothing on the queue; true is returned if the
338 * work item should be requeued
339 */
340bool slow_work_sleep_till_thread_needed(struct slow_work *work,
341 signed long *_timeout)
342{
343 wait_queue_head_t *wfo_wq;
344 struct list_head *queue;
345
346 DEFINE_WAIT(wait);
347
348 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
349 wfo_wq = &vslow_work_queue_waits_for_occupation;
350 queue = &vslow_work_queue;
351 } else {
352 wfo_wq = &slow_work_queue_waits_for_occupation;
353 queue = &slow_work_queue;
354 }
355
356 if (!list_empty(queue))
357 return true;
358
359 add_wait_queue_exclusive(wfo_wq, &wait);
360 if (list_empty(queue))
361 *_timeout = schedule_timeout(*_timeout);
362 finish_wait(wfo_wq, &wait);
363
364 return !list_empty(queue);
365}
366EXPORT_SYMBOL(slow_work_sleep_till_thread_needed);
367
368/**
369 * slow_work_enqueue - Schedule a slow work item for processing
370 * @work: The work item to queue
371 *
372 * Schedule a slow work item for processing. If the item is already undergoing
373 * execution, this guarantees not to re-enter the execution routine until the
374 * first execution finishes.
375 *
376 * The item is pinned by this function as it retains a reference to it, managed
377 * through the item operations. The item is unpinned once it has been
378 * executed.
379 *
380 * An item may hog the thread that is running it for a relatively large amount
381 * of time, sufficient, for example, to perform several lookup, mkdir, create
382 * and setxattr operations. It may sleep on I/O and may sleep to obtain locks.
383 *
384 * Conversely, if a number of items are awaiting processing, it may take some
385 * time before any given item is given attention. The number of threads in the
386 * pool may be increased to deal with demand, but only up to a limit.
387 *
388 * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
389 * the very slow queue, from which only a portion of the threads will be
390 * allowed to pick items to execute. This ensures that very slow items won't
391 * overly block ones that are just ordinarily slow.
392 *
393 * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
394 * attempted queued)
395 */
396int slow_work_enqueue(struct slow_work *work)
397{
398 wait_queue_head_t *wfo_wq;
399 struct list_head *queue;
400 unsigned long flags;
401 int ret;
402
403 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
404 return -ECANCELED;
405
406 BUG_ON(slow_work_user_count <= 0);
407 BUG_ON(!work);
408 BUG_ON(!work->ops);
409
410 /* when honouring an enqueue request, we only promise that we will run
411 * the work function in the future; we do not promise to run it once
412 * per enqueue request
413 *
414 * we use the PENDING bit to merge together repeat requests without
415 * having to disable IRQs and take the spinlock, whilst still
416 * maintaining our promise
417 */
418 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
419 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
420 wfo_wq = &vslow_work_queue_waits_for_occupation;
421 queue = &vslow_work_queue;
422 } else {
423 wfo_wq = &slow_work_queue_waits_for_occupation;
424 queue = &slow_work_queue;
425 }
426
427 spin_lock_irqsave(&slow_work_queue_lock, flags);
428
429 if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags)))
430 goto cancelled;
431
432 /* we promise that we will not attempt to execute the work
433 * function in more than one thread simultaneously
434 *
435 * this, however, leaves us with a problem if we're asked to
436 * enqueue the work whilst someone is executing the work
437 * function as simply queueing the work immediately means that
438 * another thread may try executing it whilst it is already
439 * under execution
440 *
441 * to deal with this, we set the ENQ_DEFERRED bit instead of
442 * enqueueing, and the thread currently executing the work
443 * function will enqueue the work item when the work function
444 * returns and it has cleared the EXECUTING bit
445 */
446 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
447 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
448 } else {
449 ret = slow_work_get_ref(work);
450 if (ret < 0)
451 goto failed;
452 slow_work_mark_time(work);
453 list_add_tail(&work->link, queue);
454 wake_up(&slow_work_thread_wq);
455
456 /* if someone who could be requeued is sleeping on a
457 * thread, then ask them to yield their thread */
458 if (work->link.prev == queue)
459 wake_up(wfo_wq);
460 }
461
462 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
463 }
464 return 0;
465
466cancelled:
467 ret = -ECANCELED;
468failed:
469 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
470 return ret;
471}
472EXPORT_SYMBOL(slow_work_enqueue);
473
474static int slow_work_wait(void *word)
475{
476 schedule();
477 return 0;
478}
479
480/**
481 * slow_work_cancel - Cancel a slow work item
482 * @work: The work item to cancel
483 *
484 * This function will cancel a previously enqueued work item. If we cannot
485 * cancel the work item, it is guarenteed to have run when this function
486 * returns.
487 */
488void slow_work_cancel(struct slow_work *work)
489{
490 bool wait = true, put = false;
491
492 set_bit(SLOW_WORK_CANCELLING, &work->flags);
493 smp_mb();
494
495 /* if the work item is a delayed work item with an active timer, we
496 * need to wait for the timer to finish _before_ getting the spinlock,
497 * lest we deadlock against the timer routine
498 *
499 * the timer routine will leave DELAYED set if it notices the
500 * CANCELLING flag in time
501 */
502 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
503 struct delayed_slow_work *dwork =
504 container_of(work, struct delayed_slow_work, work);
505 del_timer_sync(&dwork->timer);
506 }
507
508 spin_lock_irq(&slow_work_queue_lock);
509
510 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
511 /* the timer routine aborted or never happened, so we are left
512 * holding the timer's reference on the item and should just
513 * drop the pending flag and wait for any ongoing execution to
514 * finish */
515 struct delayed_slow_work *dwork =
516 container_of(work, struct delayed_slow_work, work);
517
518 BUG_ON(timer_pending(&dwork->timer));
519 BUG_ON(!list_empty(&work->link));
520
521 clear_bit(SLOW_WORK_DELAYED, &work->flags);
522 put = true;
523 clear_bit(SLOW_WORK_PENDING, &work->flags);
524
525 } else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
526 !list_empty(&work->link)) {
527 /* the link in the pending queue holds a reference on the item
528 * that we will need to release */
529 list_del_init(&work->link);
530 wait = false;
531 put = true;
532 clear_bit(SLOW_WORK_PENDING, &work->flags);
533
534 } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) {
535 /* the executor is holding our only reference on the item, so
536 * we merely need to wait for it to finish executing */
537 clear_bit(SLOW_WORK_PENDING, &work->flags);
538 }
539
540 spin_unlock_irq(&slow_work_queue_lock);
541
542 /* the EXECUTING flag is set by the executor whilst the spinlock is set
543 * and before the item is dequeued - so assuming the above doesn't
544 * actually dequeue it, simply waiting for the EXECUTING flag to be
545 * released here should be sufficient */
546 if (wait)
547 wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
548 TASK_UNINTERRUPTIBLE);
549
550 clear_bit(SLOW_WORK_CANCELLING, &work->flags);
551 if (put)
552 slow_work_put_ref(work);
553}
554EXPORT_SYMBOL(slow_work_cancel);
555
556/*
557 * Handle expiry of the delay timer, indicating that a delayed slow work item
558 * should now be queued if not cancelled
559 */
560static void delayed_slow_work_timer(unsigned long data)
561{
562 wait_queue_head_t *wfo_wq;
563 struct list_head *queue;
564 struct slow_work *work = (struct slow_work *) data;
565 unsigned long flags;
566 bool queued = false, put = false, first = false;
567
568 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
569 wfo_wq = &vslow_work_queue_waits_for_occupation;
570 queue = &vslow_work_queue;
571 } else {
572 wfo_wq = &slow_work_queue_waits_for_occupation;
573 queue = &slow_work_queue;
574 }
575
576 spin_lock_irqsave(&slow_work_queue_lock, flags);
577 if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
578 clear_bit(SLOW_WORK_DELAYED, &work->flags);
579
580 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
581 /* we discard the reference the timer was holding in
582 * favour of the one the executor holds */
583 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
584 put = true;
585 } else {
586 slow_work_mark_time(work);
587 list_add_tail(&work->link, queue);
588 queued = true;
589 if (work->link.prev == queue)
590 first = true;
591 }
592 }
593
594 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
595 if (put)
596 slow_work_put_ref(work);
597 if (first)
598 wake_up(wfo_wq);
599 if (queued)
600 wake_up(&slow_work_thread_wq);
601}
602
603/**
604 * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
605 * @dwork: The delayed work item to queue
606 * @delay: When to start executing the work, in jiffies from now
607 *
608 * This is similar to slow_work_enqueue(), but it adds a delay before the work
609 * is actually queued for processing.
610 *
611 * The item can have delayed processing requested on it whilst it is being
612 * executed. The delay will begin immediately, and if it expires before the
613 * item finishes executing, the item will be placed back on the queue when it
614 * has done executing.
615 */
616int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
617 unsigned long delay)
618{
619 struct slow_work *work = &dwork->work;
620 unsigned long flags;
621 int ret;
622
623 if (delay == 0)
624 return slow_work_enqueue(&dwork->work);
625
626 BUG_ON(slow_work_user_count <= 0);
627 BUG_ON(!work);
628 BUG_ON(!work->ops);
629
630 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
631 return -ECANCELED;
632
633 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
634 spin_lock_irqsave(&slow_work_queue_lock, flags);
635
636 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
637 goto cancelled;
638
639 /* the timer holds a reference whilst it is pending */
640 ret = slow_work_get_ref(work);
641 if (ret < 0)
642 goto cant_get_ref;
643
644 if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
645 BUG();
646 dwork->timer.expires = jiffies + delay;
647 dwork->timer.data = (unsigned long) work;
648 dwork->timer.function = delayed_slow_work_timer;
649 add_timer(&dwork->timer);
650
651 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
652 }
653
654 return 0;
655
656cancelled:
657 ret = -ECANCELED;
658cant_get_ref:
659 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
660 return ret;
661}
662EXPORT_SYMBOL(delayed_slow_work_enqueue);
663
664/*
665 * Schedule a cull of the thread pool at some time in the near future
666 */
667static void slow_work_schedule_cull(void)
668{
669 mod_timer(&slow_work_cull_timer,
670 round_jiffies(jiffies + SLOW_WORK_CULL_TIMEOUT));
671}
672
673/*
674 * Worker thread culling algorithm
675 */
676static bool slow_work_cull_thread(void)
677{
678 unsigned long flags;
679 bool do_cull = false;
680
681 spin_lock_irqsave(&slow_work_queue_lock, flags);
682
683 if (slow_work_cull) {
684 slow_work_cull = false;
685
686 if (list_empty(&slow_work_queue) &&
687 list_empty(&vslow_work_queue) &&
688 atomic_read(&slow_work_thread_count) >
689 slow_work_min_threads) {
690 slow_work_schedule_cull();
691 do_cull = true;
692 }
693 }
694
695 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
696 return do_cull;
697}
698
699/*
700 * Determine if there is slow work available for dispatch
701 */
702static inline bool slow_work_available(int vsmax)
703{
704 return !list_empty(&slow_work_queue) ||
705 (!list_empty(&vslow_work_queue) &&
706 atomic_read(&vslow_work_executing_count) < vsmax);
707}
708
709/*
710 * Worker thread dispatcher
711 */
712static int slow_work_thread(void *_data)
713{
714 int vsmax, id;
715
716 DEFINE_WAIT(wait);
717
718 set_freezable();
719 set_user_nice(current, -5);
720
721 /* allocate ourselves an ID */
722 spin_lock_irq(&slow_work_queue_lock);
723 id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
724 BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
725 __set_bit(id, slow_work_ids);
726 slow_work_set_thread_pid(id, current->pid);
727 spin_unlock_irq(&slow_work_queue_lock);
728
729 sprintf(current->comm, "kslowd%03u", id);
730
731 for (;;) {
732 vsmax = vslow_work_proportion;
733 vsmax *= atomic_read(&slow_work_thread_count);
734 vsmax /= 100;
735
736 prepare_to_wait_exclusive(&slow_work_thread_wq, &wait,
737 TASK_INTERRUPTIBLE);
738 if (!freezing(current) &&
739 !slow_work_threads_should_exit &&
740 !slow_work_available(vsmax) &&
741 !slow_work_cull)
742 schedule();
743 finish_wait(&slow_work_thread_wq, &wait);
744
745 try_to_freeze();
746
747 vsmax = vslow_work_proportion;
748 vsmax *= atomic_read(&slow_work_thread_count);
749 vsmax /= 100;
750
751 if (slow_work_available(vsmax) && slow_work_execute(id)) {
752 cond_resched();
753 if (list_empty(&slow_work_queue) &&
754 list_empty(&vslow_work_queue) &&
755 atomic_read(&slow_work_thread_count) >
756 slow_work_min_threads)
757 slow_work_schedule_cull();
758 continue;
759 }
760
761 if (slow_work_threads_should_exit)
762 break;
763
764 if (slow_work_cull && slow_work_cull_thread())
765 break;
766 }
767
768 spin_lock_irq(&slow_work_queue_lock);
769 slow_work_set_thread_pid(id, 0);
770 __clear_bit(id, slow_work_ids);
771 spin_unlock_irq(&slow_work_queue_lock);
772
773 if (atomic_dec_and_test(&slow_work_thread_count))
774 complete_and_exit(&slow_work_last_thread_exited, 0);
775 return 0;
776}
777
778/*
779 * Handle thread cull timer expiration
780 */
781static void slow_work_cull_timeout(unsigned long data)
782{
783 slow_work_cull = true;
784 wake_up(&slow_work_thread_wq);
785}
786
787/*
788 * Start a new slow work thread
789 */
790static void slow_work_new_thread_execute(struct slow_work *work)
791{
792 struct task_struct *p;
793
794 if (slow_work_threads_should_exit)
795 return;
796
797 if (atomic_read(&slow_work_thread_count) >= slow_work_max_threads)
798 return;
799
800 if (!mutex_trylock(&slow_work_user_lock))
801 return;
802
803 slow_work_may_not_start_new_thread = true;
804 atomic_inc(&slow_work_thread_count);
805 p = kthread_run(slow_work_thread, NULL, "kslowd");
806 if (IS_ERR(p)) {
807 printk(KERN_DEBUG "Slow work thread pool: OOM\n");
808 if (atomic_dec_and_test(&slow_work_thread_count))
809 BUG(); /* we're running on a slow work thread... */
810 mod_timer(&slow_work_oom_timer,
811 round_jiffies(jiffies + SLOW_WORK_OOM_TIMEOUT));
812 } else {
813 /* ratelimit the starting of new threads */
814 mod_timer(&slow_work_oom_timer, jiffies + 1);
815 }
816
817 mutex_unlock(&slow_work_user_lock);
818}
819
820static const struct slow_work_ops slow_work_new_thread_ops = {
821 .owner = THIS_MODULE,
822 .execute = slow_work_new_thread_execute,
823#ifdef CONFIG_SLOW_WORK_DEBUG
824 .desc = slow_work_new_thread_desc,
825#endif
826};
827
828/*
829 * post-OOM new thread start suppression expiration
830 */
831static void slow_work_oom_timeout(unsigned long data)
832{
833 slow_work_may_not_start_new_thread = false;
834}
835
836#ifdef CONFIG_SYSCTL
837/*
838 * Handle adjustment of the minimum number of threads
839 */
840static int slow_work_min_threads_sysctl(struct ctl_table *table, int write,
841 void __user *buffer,
842 size_t *lenp, loff_t *ppos)
843{
844 int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
845 int n;
846
847 if (ret == 0) {
848 mutex_lock(&slow_work_user_lock);
849 if (slow_work_user_count > 0) {
850 /* see if we need to start or stop threads */
851 n = atomic_read(&slow_work_thread_count) -
852 slow_work_min_threads;
853
854 if (n < 0 && !slow_work_may_not_start_new_thread)
855 slow_work_enqueue(&slow_work_new_thread);
856 else if (n > 0)
857 slow_work_schedule_cull();
858 }
859 mutex_unlock(&slow_work_user_lock);
860 }
861
862 return ret;
863}
864
865/*
866 * Handle adjustment of the maximum number of threads
867 */
868static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
869 void __user *buffer,
870 size_t *lenp, loff_t *ppos)
871{
872 int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
873 int n;
874
875 if (ret == 0) {
876 mutex_lock(&slow_work_user_lock);
877 if (slow_work_user_count > 0) {
878 /* see if we need to stop threads */
879 n = slow_work_max_threads -
880 atomic_read(&slow_work_thread_count);
881
882 if (n < 0)
883 slow_work_schedule_cull();
884 }
885 mutex_unlock(&slow_work_user_lock);
886 }
887
888 return ret;
889}
890#endif /* CONFIG_SYSCTL */
891
892/**
893 * slow_work_register_user - Register a user of the facility
894 * @module: The module about to make use of the facility
895 *
896 * Register a user of the facility, starting up the initial threads if there
897 * aren't any other users at this point. This will return 0 if successful, or
898 * an error if not.
899 */
900int slow_work_register_user(struct module *module)
901{
902 struct task_struct *p;
903 int loop;
904
905 mutex_lock(&slow_work_user_lock);
906
907 if (slow_work_user_count == 0) {
908 printk(KERN_NOTICE "Slow work thread pool: Starting up\n");
909 init_completion(&slow_work_last_thread_exited);
910
911 slow_work_threads_should_exit = false;
912 slow_work_init(&slow_work_new_thread,
913 &slow_work_new_thread_ops);
914 slow_work_may_not_start_new_thread = false;
915 slow_work_cull = false;
916
917 /* start the minimum number of threads */
918 for (loop = 0; loop < slow_work_min_threads; loop++) {
919 atomic_inc(&slow_work_thread_count);
920 p = kthread_run(slow_work_thread, NULL, "kslowd");
921 if (IS_ERR(p))
922 goto error;
923 }
924 printk(KERN_NOTICE "Slow work thread pool: Ready\n");
925 }
926
927 slow_work_user_count++;
928 mutex_unlock(&slow_work_user_lock);
929 return 0;
930
931error:
932 if (atomic_dec_and_test(&slow_work_thread_count))
933 complete(&slow_work_last_thread_exited);
934 if (loop > 0) {
935 printk(KERN_ERR "Slow work thread pool:"
936 " Aborting startup on ENOMEM\n");
937 slow_work_threads_should_exit = true;
938 wake_up_all(&slow_work_thread_wq);
939 wait_for_completion(&slow_work_last_thread_exited);
940 printk(KERN_ERR "Slow work thread pool: Aborted\n");
941 }
942 mutex_unlock(&slow_work_user_lock);
943 return PTR_ERR(p);
944}
945EXPORT_SYMBOL(slow_work_register_user);
946
947/*
948 * wait for all outstanding items from the calling module to complete
949 * - note that more items may be queued whilst we're waiting
950 */
951static void slow_work_wait_for_items(struct module *module)
952{
953#ifdef CONFIG_MODULES
954 DECLARE_WAITQUEUE(myself, current);
955 struct slow_work *work;
956 int loop;
957
958 mutex_lock(&slow_work_unreg_sync_lock);
959 add_wait_queue(&slow_work_unreg_wq, &myself);
960
961 for (;;) {
962 spin_lock_irq(&slow_work_queue_lock);
963
964 /* first of all, we wait for the last queued item in each list
965 * to be processed */
966 list_for_each_entry_reverse(work, &vslow_work_queue, link) {
967 if (work->owner == module) {
968 set_current_state(TASK_UNINTERRUPTIBLE);
969 slow_work_unreg_work_item = work;
970 goto do_wait;
971 }
972 }
973 list_for_each_entry_reverse(work, &slow_work_queue, link) {
974 if (work->owner == module) {
975 set_current_state(TASK_UNINTERRUPTIBLE);
976 slow_work_unreg_work_item = work;
977 goto do_wait;
978 }
979 }
980
981 /* then we wait for the items being processed to finish */
982 slow_work_unreg_module = module;
983 smp_mb();
984 for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
985 if (slow_work_thread_processing[loop] == module)
986 goto do_wait;
987 }
988 spin_unlock_irq(&slow_work_queue_lock);
989 break; /* okay, we're done */
990
991 do_wait:
992 spin_unlock_irq(&slow_work_queue_lock);
993 schedule();
994 slow_work_unreg_work_item = NULL;
995 slow_work_unreg_module = NULL;
996 }
997
998 remove_wait_queue(&slow_work_unreg_wq, &myself);
999 mutex_unlock(&slow_work_unreg_sync_lock);
1000#endif /* CONFIG_MODULES */
1001}
1002
1003/**
1004 * slow_work_unregister_user - Unregister a user of the facility
1005 * @module: The module whose items should be cleared
1006 *
1007 * Unregister a user of the facility, killing all the threads if this was the
1008 * last one.
1009 *
1010 * This waits for all the work items belonging to the nominated module to go
1011 * away before proceeding.
1012 */
1013void slow_work_unregister_user(struct module *module)
1014{
1015 /* first of all, wait for all outstanding items from the calling module
1016 * to complete */
1017 if (module)
1018 slow_work_wait_for_items(module);
1019
1020 /* then we can actually go about shutting down the facility if need
1021 * be */
1022 mutex_lock(&slow_work_user_lock);
1023
1024 BUG_ON(slow_work_user_count <= 0);
1025
1026 slow_work_user_count--;
1027 if (slow_work_user_count == 0) {
1028 printk(KERN_NOTICE "Slow work thread pool: Shutting down\n");
1029 slow_work_threads_should_exit = true;
1030 del_timer_sync(&slow_work_cull_timer);
1031 del_timer_sync(&slow_work_oom_timer);
1032 wake_up_all(&slow_work_thread_wq);
1033 wait_for_completion(&slow_work_last_thread_exited);
1034 printk(KERN_NOTICE "Slow work thread pool:"
1035 " Shut down complete\n");
1036 }
1037
1038 mutex_unlock(&slow_work_user_lock);
1039}
1040EXPORT_SYMBOL(slow_work_unregister_user);
1041
1042/*
1043 * Initialise the slow work facility
1044 */
1045static int __init init_slow_work(void)
1046{
1047 unsigned nr_cpus = num_possible_cpus();
1048
1049 if (slow_work_max_threads < nr_cpus)
1050 slow_work_max_threads = nr_cpus;
1051#ifdef CONFIG_SYSCTL
1052 if (slow_work_max_max_threads < nr_cpus * 2)
1053 slow_work_max_max_threads = nr_cpus * 2;
1054#endif
1055#ifdef CONFIG_SLOW_WORK_DEBUG
1056 {
1057 struct dentry *dbdir;
1058
1059 dbdir = debugfs_create_dir("slow_work", NULL);
1060 if (dbdir && !IS_ERR(dbdir))
1061 debugfs_create_file("runqueue", S_IFREG | 0400, dbdir,
1062 NULL, &slow_work_runqueue_fops);
1063 }
1064#endif
1065 return 0;
1066}
1067
1068subsys_initcall(init_slow_work);
diff --git a/kernel/slow-work.h b/kernel/slow-work.h
deleted file mode 100644
index a29ebd1ef41d..000000000000
--- a/kernel/slow-work.h
+++ /dev/null
@@ -1,72 +0,0 @@
1/* Slow work private definitions
2 *
3 * Copyright (C) 2009 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version.
10 */
11
12#define SLOW_WORK_CULL_TIMEOUT (5 * HZ) /* cull threads 5s after running out of
13 * things to do */
14#define SLOW_WORK_OOM_TIMEOUT (5 * HZ) /* can't start new threads for 5s after
15 * OOM */
16
17#define SLOW_WORK_THREAD_LIMIT 255 /* abs maximum number of slow-work threads */
18
19/*
20 * slow-work.c
21 */
22#ifdef CONFIG_SLOW_WORK_DEBUG
23extern struct slow_work *slow_work_execs[];
24extern pid_t slow_work_pids[];
25extern rwlock_t slow_work_execs_lock;
26#endif
27
28extern struct list_head slow_work_queue;
29extern struct list_head vslow_work_queue;
30extern spinlock_t slow_work_queue_lock;
31
32/*
33 * slow-work-debugfs.c
34 */
35#ifdef CONFIG_SLOW_WORK_DEBUG
36extern const struct file_operations slow_work_runqueue_fops;
37
38extern void slow_work_new_thread_desc(struct slow_work *, struct seq_file *);
39#endif
40
41/*
42 * Helper functions
43 */
44static inline void slow_work_set_thread_pid(int id, pid_t pid)
45{
46#ifdef CONFIG_SLOW_WORK_DEBUG
47 slow_work_pids[id] = pid;
48#endif
49}
50
51static inline void slow_work_mark_time(struct slow_work *work)
52{
53#ifdef CONFIG_SLOW_WORK_DEBUG
54 work->mark = CURRENT_TIME;
55#endif
56}
57
58static inline void slow_work_begin_exec(int id, struct slow_work *work)
59{
60#ifdef CONFIG_SLOW_WORK_DEBUG
61 slow_work_execs[id] = work;
62#endif
63}
64
65static inline void slow_work_end_exec(int id, struct slow_work *work)
66{
67#ifdef CONFIG_SLOW_WORK_DEBUG
68 write_lock(&slow_work_execs_lock);
69 slow_work_execs[id] = NULL;
70 write_unlock(&slow_work_execs_lock);
71#endif
72}
diff --git a/kernel/sysctl.c b/kernel/sysctl.c
index 9acfce0cdfdb..6d850bf0a517 100644
--- a/kernel/sysctl.c
+++ b/kernel/sysctl.c
@@ -50,7 +50,6 @@
50#include <linux/acpi.h> 50#include <linux/acpi.h>
51#include <linux/reboot.h> 51#include <linux/reboot.h>
52#include <linux/ftrace.h> 52#include <linux/ftrace.h>
53#include <linux/slow-work.h>
54#include <linux/perf_event.h> 53#include <linux/perf_event.h>
55#include <linux/kprobes.h> 54#include <linux/kprobes.h>
56#include <linux/pipe_fs_i.h> 55#include <linux/pipe_fs_i.h>
@@ -917,13 +916,6 @@ static struct ctl_table kern_table[] = {
917 .proc_handler = proc_dointvec, 916 .proc_handler = proc_dointvec,
918 }, 917 },
919#endif 918#endif
920#ifdef CONFIG_SLOW_WORK
921 {
922 .procname = "slow-work",
923 .mode = 0555,
924 .child = slow_work_sysctls,
925 },
926#endif
927#ifdef CONFIG_PERF_EVENTS 919#ifdef CONFIG_PERF_EVENTS
928 { 920 {
929 .procname = "perf_event_paranoid", 921 .procname = "perf_event_paranoid",
diff --git a/kernel/trace/Kconfig b/kernel/trace/Kconfig
index 6eb97bbdefb1..538501c6ea50 100644
--- a/kernel/trace/Kconfig
+++ b/kernel/trace/Kconfig
@@ -323,17 +323,6 @@ config STACK_TRACER
323 323
324 Say N if unsure. 324 Say N if unsure.
325 325
326config WORKQUEUE_TRACER
327 bool "Trace workqueues"
328 select GENERIC_TRACER
329 help
330 The workqueue tracer provides some statistical information
331 about each cpu workqueue thread such as the number of the
332 works inserted and executed since their creation. It can help
333 to evaluate the amount of work each of them has to perform.
334 For example it can help a developer to decide whether he should
335 choose a per-cpu workqueue instead of a singlethreaded one.
336
337config BLK_DEV_IO_TRACE 326config BLK_DEV_IO_TRACE
338 bool "Support for tracing block IO actions" 327 bool "Support for tracing block IO actions"
339 depends on SYSFS 328 depends on SYSFS
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 59fef1531dd2..9ca34cddaf6d 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -33,41 +33,272 @@
33#include <linux/kallsyms.h> 33#include <linux/kallsyms.h>
34#include <linux/debug_locks.h> 34#include <linux/debug_locks.h>
35#include <linux/lockdep.h> 35#include <linux/lockdep.h>
36#define CREATE_TRACE_POINTS 36#include <linux/idr.h>
37#include <trace/events/workqueue.h> 37
38#include "workqueue_sched.h"
39
40enum {
41 /* global_cwq flags */
42 GCWQ_MANAGE_WORKERS = 1 << 0, /* need to manage workers */
43 GCWQ_MANAGING_WORKERS = 1 << 1, /* managing workers */
44 GCWQ_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
45 GCWQ_FREEZING = 1 << 3, /* freeze in progress */
46 GCWQ_HIGHPRI_PENDING = 1 << 4, /* highpri works on queue */
47
48 /* worker flags */
49 WORKER_STARTED = 1 << 0, /* started */
50 WORKER_DIE = 1 << 1, /* die die die */
51 WORKER_IDLE = 1 << 2, /* is idle */
52 WORKER_PREP = 1 << 3, /* preparing to run works */
53 WORKER_ROGUE = 1 << 4, /* not bound to any cpu */
54 WORKER_REBIND = 1 << 5, /* mom is home, come back */
55 WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */
56 WORKER_UNBOUND = 1 << 7, /* worker is unbound */
57
58 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND |
59 WORKER_CPU_INTENSIVE | WORKER_UNBOUND,
60
61 /* gcwq->trustee_state */
62 TRUSTEE_START = 0, /* start */
63 TRUSTEE_IN_CHARGE = 1, /* trustee in charge of gcwq */
64 TRUSTEE_BUTCHER = 2, /* butcher workers */
65 TRUSTEE_RELEASE = 3, /* release workers */
66 TRUSTEE_DONE = 4, /* trustee is done */
67
68 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
69 BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER,
70 BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1,
71
72 MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
73 IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */
74
75 MAYDAY_INITIAL_TIMEOUT = HZ / 100, /* call for help after 10ms */
76 MAYDAY_INTERVAL = HZ / 10, /* and then every 100ms */
77 CREATE_COOLDOWN = HZ, /* time to breath after fail */
78 TRUSTEE_COOLDOWN = HZ / 10, /* for trustee draining */
79
80 /*
81 * Rescue workers are used only on emergencies and shared by
82 * all cpus. Give -20.
83 */
84 RESCUER_NICE_LEVEL = -20,
85};
38 86
39/* 87/*
40 * The per-CPU workqueue (if single thread, we always use the first 88 * Structure fields follow one of the following exclusion rules.
41 * possible cpu). 89 *
90 * I: Set during initialization and read-only afterwards.
91 *
92 * P: Preemption protected. Disabling preemption is enough and should
93 * only be modified and accessed from the local cpu.
94 *
95 * L: gcwq->lock protected. Access with gcwq->lock held.
96 *
97 * X: During normal operation, modification requires gcwq->lock and
98 * should be done only from local cpu. Either disabling preemption
99 * on local cpu or grabbing gcwq->lock is enough for read access.
100 * If GCWQ_DISASSOCIATED is set, it's identical to L.
101 *
102 * F: wq->flush_mutex protected.
103 *
104 * W: workqueue_lock protected.
42 */ 105 */
43struct cpu_workqueue_struct {
44 106
45 spinlock_t lock; 107struct global_cwq;
46 108
47 struct list_head worklist; 109/*
48 wait_queue_head_t more_work; 110 * The poor guys doing the actual heavy lifting. All on-duty workers
49 struct work_struct *current_work; 111 * are either serving the manager role, on idle list or on busy hash.
112 */
113struct worker {
114 /* on idle list while idle, on busy hash table while busy */
115 union {
116 struct list_head entry; /* L: while idle */
117 struct hlist_node hentry; /* L: while busy */
118 };
50 119
51 struct workqueue_struct *wq; 120 struct work_struct *current_work; /* L: work being processed */
52 struct task_struct *thread; 121 struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq */
53} ____cacheline_aligned; 122 struct list_head scheduled; /* L: scheduled works */
123 struct task_struct *task; /* I: worker task */
124 struct global_cwq *gcwq; /* I: the associated gcwq */
125 /* 64 bytes boundary on 64bit, 32 on 32bit */
126 unsigned long last_active; /* L: last active timestamp */
127 unsigned int flags; /* X: flags */
128 int id; /* I: worker id */
129 struct work_struct rebind_work; /* L: rebind worker to cpu */
130};
131
132/*
133 * Global per-cpu workqueue. There's one and only one for each cpu
134 * and all works are queued and processed here regardless of their
135 * target workqueues.
136 */
137struct global_cwq {
138 spinlock_t lock; /* the gcwq lock */
139 struct list_head worklist; /* L: list of pending works */
140 unsigned int cpu; /* I: the associated cpu */
141 unsigned int flags; /* L: GCWQ_* flags */
142
143 int nr_workers; /* L: total number of workers */
144 int nr_idle; /* L: currently idle ones */
145
146 /* workers are chained either in the idle_list or busy_hash */
147 struct list_head idle_list; /* X: list of idle workers */
148 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
149 /* L: hash of busy workers */
150
151 struct timer_list idle_timer; /* L: worker idle timeout */
152 struct timer_list mayday_timer; /* L: SOS timer for dworkers */
153
154 struct ida worker_ida; /* L: for worker IDs */
155
156 struct task_struct *trustee; /* L: for gcwq shutdown */
157 unsigned int trustee_state; /* L: trustee state */
158 wait_queue_head_t trustee_wait; /* trustee wait */
159 struct worker *first_idle; /* L: first idle worker */
160} ____cacheline_aligned_in_smp;
161
162/*
163 * The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of
164 * work_struct->data are used for flags and thus cwqs need to be
165 * aligned at two's power of the number of flag bits.
166 */
167struct cpu_workqueue_struct {
168 struct global_cwq *gcwq; /* I: the associated gcwq */
169 struct workqueue_struct *wq; /* I: the owning workqueue */
170 int work_color; /* L: current color */
171 int flush_color; /* L: flushing color */
172 int nr_in_flight[WORK_NR_COLORS];
173 /* L: nr of in_flight works */
174 int nr_active; /* L: nr of active works */
175 int max_active; /* L: max active works */
176 struct list_head delayed_works; /* L: delayed works */
177};
178
179/*
180 * Structure used to wait for workqueue flush.
181 */
182struct wq_flusher {
183 struct list_head list; /* F: list of flushers */
184 int flush_color; /* F: flush color waiting for */
185 struct completion done; /* flush completion */
186};
187
188/*
189 * All cpumasks are assumed to be always set on UP and thus can't be
190 * used to determine whether there's something to be done.
191 */
192#ifdef CONFIG_SMP
193typedef cpumask_var_t mayday_mask_t;
194#define mayday_test_and_set_cpu(cpu, mask) \
195 cpumask_test_and_set_cpu((cpu), (mask))
196#define mayday_clear_cpu(cpu, mask) cpumask_clear_cpu((cpu), (mask))
197#define for_each_mayday_cpu(cpu, mask) for_each_cpu((cpu), (mask))
198#define alloc_mayday_mask(maskp, gfp) alloc_cpumask_var((maskp), (gfp))
199#define free_mayday_mask(mask) free_cpumask_var((mask))
200#else
201typedef unsigned long mayday_mask_t;
202#define mayday_test_and_set_cpu(cpu, mask) test_and_set_bit(0, &(mask))
203#define mayday_clear_cpu(cpu, mask) clear_bit(0, &(mask))
204#define for_each_mayday_cpu(cpu, mask) if ((cpu) = 0, (mask))
205#define alloc_mayday_mask(maskp, gfp) true
206#define free_mayday_mask(mask) do { } while (0)
207#endif
54 208
55/* 209/*
56 * The externally visible workqueue abstraction is an array of 210 * The externally visible workqueue abstraction is an array of
57 * per-CPU workqueues: 211 * per-CPU workqueues:
58 */ 212 */
59struct workqueue_struct { 213struct workqueue_struct {
60 struct cpu_workqueue_struct *cpu_wq; 214 unsigned int flags; /* I: WQ_* flags */
61 struct list_head list; 215 union {
62 const char *name; 216 struct cpu_workqueue_struct __percpu *pcpu;
63 int singlethread; 217 struct cpu_workqueue_struct *single;
64 int freezeable; /* Freeze threads during suspend */ 218 unsigned long v;
65 int rt; 219 } cpu_wq; /* I: cwq's */
220 struct list_head list; /* W: list of all workqueues */
221
222 struct mutex flush_mutex; /* protects wq flushing */
223 int work_color; /* F: current work color */
224 int flush_color; /* F: current flush color */
225 atomic_t nr_cwqs_to_flush; /* flush in progress */
226 struct wq_flusher *first_flusher; /* F: first flusher */
227 struct list_head flusher_queue; /* F: flush waiters */
228 struct list_head flusher_overflow; /* F: flush overflow list */
229
230 mayday_mask_t mayday_mask; /* cpus requesting rescue */
231 struct worker *rescuer; /* I: rescue worker */
232
233 int saved_max_active; /* W: saved cwq max_active */
234 const char *name; /* I: workqueue name */
66#ifdef CONFIG_LOCKDEP 235#ifdef CONFIG_LOCKDEP
67 struct lockdep_map lockdep_map; 236 struct lockdep_map lockdep_map;
68#endif 237#endif
69}; 238};
70 239
240struct workqueue_struct *system_wq __read_mostly;
241struct workqueue_struct *system_long_wq __read_mostly;
242struct workqueue_struct *system_nrt_wq __read_mostly;
243struct workqueue_struct *system_unbound_wq __read_mostly;
244EXPORT_SYMBOL_GPL(system_wq);
245EXPORT_SYMBOL_GPL(system_long_wq);
246EXPORT_SYMBOL_GPL(system_nrt_wq);
247EXPORT_SYMBOL_GPL(system_unbound_wq);
248
249#define for_each_busy_worker(worker, i, pos, gcwq) \
250 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \
251 hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
252
253static inline int __next_gcwq_cpu(int cpu, const struct cpumask *mask,
254 unsigned int sw)
255{
256 if (cpu < nr_cpu_ids) {
257 if (sw & 1) {
258 cpu = cpumask_next(cpu, mask);
259 if (cpu < nr_cpu_ids)
260 return cpu;
261 }
262 if (sw & 2)
263 return WORK_CPU_UNBOUND;
264 }
265 return WORK_CPU_NONE;
266}
267
268static inline int __next_wq_cpu(int cpu, const struct cpumask *mask,
269 struct workqueue_struct *wq)
270{
271 return __next_gcwq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2);
272}
273
274/*
275 * CPU iterators
276 *
277 * An extra gcwq is defined for an invalid cpu number
278 * (WORK_CPU_UNBOUND) to host workqueues which are not bound to any
279 * specific CPU. The following iterators are similar to
280 * for_each_*_cpu() iterators but also considers the unbound gcwq.
281 *
282 * for_each_gcwq_cpu() : possible CPUs + WORK_CPU_UNBOUND
283 * for_each_online_gcwq_cpu() : online CPUs + WORK_CPU_UNBOUND
284 * for_each_cwq_cpu() : possible CPUs for bound workqueues,
285 * WORK_CPU_UNBOUND for unbound workqueues
286 */
287#define for_each_gcwq_cpu(cpu) \
288 for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 3); \
289 (cpu) < WORK_CPU_NONE; \
290 (cpu) = __next_gcwq_cpu((cpu), cpu_possible_mask, 3))
291
292#define for_each_online_gcwq_cpu(cpu) \
293 for ((cpu) = __next_gcwq_cpu(-1, cpu_online_mask, 3); \
294 (cpu) < WORK_CPU_NONE; \
295 (cpu) = __next_gcwq_cpu((cpu), cpu_online_mask, 3))
296
297#define for_each_cwq_cpu(cpu, wq) \
298 for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, (wq)); \
299 (cpu) < WORK_CPU_NONE; \
300 (cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq)))
301
71#ifdef CONFIG_LOCKDEP 302#ifdef CONFIG_LOCKDEP
72/** 303/**
73 * in_workqueue_context() - in context of specified workqueue? 304 * in_workqueue_context() - in context of specified workqueue?
@@ -122,7 +353,7 @@ static int work_fixup_activate(void *addr, enum debug_obj_state state)
122 * statically initialized. We just make sure that it 353 * statically initialized. We just make sure that it
123 * is tracked in the object tracker. 354 * is tracked in the object tracker.
124 */ 355 */
125 if (test_bit(WORK_STRUCT_STATIC, work_data_bits(work))) { 356 if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
126 debug_object_init(work, &work_debug_descr); 357 debug_object_init(work, &work_debug_descr);
127 debug_object_activate(work, &work_debug_descr); 358 debug_object_activate(work, &work_debug_descr);
128 return 0; 359 return 0;
@@ -196,94 +427,575 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
196/* Serializes the accesses to the list of workqueues. */ 427/* Serializes the accesses to the list of workqueues. */
197static DEFINE_SPINLOCK(workqueue_lock); 428static DEFINE_SPINLOCK(workqueue_lock);
198static LIST_HEAD(workqueues); 429static LIST_HEAD(workqueues);
430static bool workqueue_freezing; /* W: have wqs started freezing? */
431
432/*
433 * The almighty global cpu workqueues. nr_running is the only field
434 * which is expected to be used frequently by other cpus via
435 * try_to_wake_up(). Put it in a separate cacheline.
436 */
437static DEFINE_PER_CPU(struct global_cwq, global_cwq);
438static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
439
440/*
441 * Global cpu workqueue and nr_running counter for unbound gcwq. The
442 * gcwq is always online, has GCWQ_DISASSOCIATED set, and all its
443 * workers have WORKER_UNBOUND set.
444 */
445static struct global_cwq unbound_global_cwq;
446static atomic_t unbound_gcwq_nr_running = ATOMIC_INIT(0); /* always 0 */
447
448static int worker_thread(void *__worker);
449
450static struct global_cwq *get_gcwq(unsigned int cpu)
451{
452 if (cpu != WORK_CPU_UNBOUND)
453 return &per_cpu(global_cwq, cpu);
454 else
455 return &unbound_global_cwq;
456}
457
458static atomic_t *get_gcwq_nr_running(unsigned int cpu)
459{
460 if (cpu != WORK_CPU_UNBOUND)
461 return &per_cpu(gcwq_nr_running, cpu);
462 else
463 return &unbound_gcwq_nr_running;
464}
465
466static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
467 struct workqueue_struct *wq)
468{
469 if (!(wq->flags & WQ_UNBOUND)) {
470 if (likely(cpu < nr_cpu_ids)) {
471#ifdef CONFIG_SMP
472 return per_cpu_ptr(wq->cpu_wq.pcpu, cpu);
473#else
474 return wq->cpu_wq.single;
475#endif
476 }
477 } else if (likely(cpu == WORK_CPU_UNBOUND))
478 return wq->cpu_wq.single;
479 return NULL;
480}
481
482static unsigned int work_color_to_flags(int color)
483{
484 return color << WORK_STRUCT_COLOR_SHIFT;
485}
486
487static int get_work_color(struct work_struct *work)
488{
489 return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
490 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
491}
492
493static int work_next_color(int color)
494{
495 return (color + 1) % WORK_NR_COLORS;
496}
199 497
200static int singlethread_cpu __read_mostly;
201static const struct cpumask *cpu_singlethread_map __read_mostly;
202/* 498/*
203 * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD 499 * A work's data points to the cwq with WORK_STRUCT_CWQ set while the
204 * flushes cwq->worklist. This means that flush_workqueue/wait_on_work 500 * work is on queue. Once execution starts, WORK_STRUCT_CWQ is
205 * which comes in between can't use for_each_online_cpu(). We could 501 * cleared and the work data contains the cpu number it was last on.
206 * use cpu_possible_map, the cpumask below is more a documentation 502 *
207 * than optimization. 503 * set_work_{cwq|cpu}() and clear_work_data() can be used to set the
504 * cwq, cpu or clear work->data. These functions should only be
505 * called while the work is owned - ie. while the PENDING bit is set.
506 *
507 * get_work_[g]cwq() can be used to obtain the gcwq or cwq
508 * corresponding to a work. gcwq is available once the work has been
509 * queued anywhere after initialization. cwq is available only from
510 * queueing until execution starts.
208 */ 511 */
209static cpumask_var_t cpu_populated_map __read_mostly; 512static inline void set_work_data(struct work_struct *work, unsigned long data,
513 unsigned long flags)
514{
515 BUG_ON(!work_pending(work));
516 atomic_long_set(&work->data, data | flags | work_static(work));
517}
210 518
211/* If it's single threaded, it isn't in the list of workqueues. */ 519static void set_work_cwq(struct work_struct *work,
212static inline int is_wq_single_threaded(struct workqueue_struct *wq) 520 struct cpu_workqueue_struct *cwq,
521 unsigned long extra_flags)
213{ 522{
214 return wq->singlethread; 523 set_work_data(work, (unsigned long)cwq,
524 WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags);
215} 525}
216 526
217static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq) 527static void set_work_cpu(struct work_struct *work, unsigned int cpu)
218{ 528{
219 return is_wq_single_threaded(wq) 529 set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING);
220 ? cpu_singlethread_map : cpu_populated_map;
221} 530}
222 531
223static 532static void clear_work_data(struct work_struct *work)
224struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
225{ 533{
226 if (unlikely(is_wq_single_threaded(wq))) 534 set_work_data(work, WORK_STRUCT_NO_CPU, 0);
227 cpu = singlethread_cpu; 535}
228 return per_cpu_ptr(wq->cpu_wq, cpu); 536
537static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work)
538{
539 unsigned long data = atomic_long_read(&work->data);
540
541 if (data & WORK_STRUCT_CWQ)
542 return (void *)(data & WORK_STRUCT_WQ_DATA_MASK);
543 else
544 return NULL;
545}
546
547static struct global_cwq *get_work_gcwq(struct work_struct *work)
548{
549 unsigned long data = atomic_long_read(&work->data);
550 unsigned int cpu;
551
552 if (data & WORK_STRUCT_CWQ)
553 return ((struct cpu_workqueue_struct *)
554 (data & WORK_STRUCT_WQ_DATA_MASK))->gcwq;
555
556 cpu = data >> WORK_STRUCT_FLAG_BITS;
557 if (cpu == WORK_CPU_NONE)
558 return NULL;
559
560 BUG_ON(cpu >= nr_cpu_ids && cpu != WORK_CPU_UNBOUND);
561 return get_gcwq(cpu);
229} 562}
230 563
231/* 564/*
232 * Set the workqueue on which a work item is to be run 565 * Policy functions. These define the policies on how the global
233 * - Must *only* be called if the pending flag is set 566 * worker pool is managed. Unless noted otherwise, these functions
567 * assume that they're being called with gcwq->lock held.
234 */ 568 */
235static inline void set_wq_data(struct work_struct *work, 569
236 struct cpu_workqueue_struct *cwq) 570static bool __need_more_worker(struct global_cwq *gcwq)
237{ 571{
238 unsigned long new; 572 return !atomic_read(get_gcwq_nr_running(gcwq->cpu)) ||
573 gcwq->flags & GCWQ_HIGHPRI_PENDING;
574}
239 575
240 BUG_ON(!work_pending(work)); 576/*
577 * Need to wake up a worker? Called from anything but currently
578 * running workers.
579 */
580static bool need_more_worker(struct global_cwq *gcwq)
581{
582 return !list_empty(&gcwq->worklist) && __need_more_worker(gcwq);
583}
584
585/* Can I start working? Called from busy but !running workers. */
586static bool may_start_working(struct global_cwq *gcwq)
587{
588 return gcwq->nr_idle;
589}
590
591/* Do I need to keep working? Called from currently running workers. */
592static bool keep_working(struct global_cwq *gcwq)
593{
594 atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
595
596 return !list_empty(&gcwq->worklist) && atomic_read(nr_running) <= 1;
597}
598
599/* Do we need a new worker? Called from manager. */
600static bool need_to_create_worker(struct global_cwq *gcwq)
601{
602 return need_more_worker(gcwq) && !may_start_working(gcwq);
603}
241 604
242 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING); 605/* Do I need to be the manager? */
243 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); 606static bool need_to_manage_workers(struct global_cwq *gcwq)
244 atomic_long_set(&work->data, new); 607{
608 return need_to_create_worker(gcwq) || gcwq->flags & GCWQ_MANAGE_WORKERS;
609}
610
611/* Do we have too many workers and should some go away? */
612static bool too_many_workers(struct global_cwq *gcwq)
613{
614 bool managing = gcwq->flags & GCWQ_MANAGING_WORKERS;
615 int nr_idle = gcwq->nr_idle + managing; /* manager is considered idle */
616 int nr_busy = gcwq->nr_workers - nr_idle;
617
618 return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
245} 619}
246 620
247/* 621/*
248 * Clear WORK_STRUCT_PENDING and the workqueue on which it was queued. 622 * Wake up functions.
623 */
624
625/* Return the first worker. Safe with preemption disabled */
626static struct worker *first_worker(struct global_cwq *gcwq)
627{
628 if (unlikely(list_empty(&gcwq->idle_list)))
629 return NULL;
630
631 return list_first_entry(&gcwq->idle_list, struct worker, entry);
632}
633
634/**
635 * wake_up_worker - wake up an idle worker
636 * @gcwq: gcwq to wake worker for
637 *
638 * Wake up the first idle worker of @gcwq.
639 *
640 * CONTEXT:
641 * spin_lock_irq(gcwq->lock).
642 */
643static void wake_up_worker(struct global_cwq *gcwq)
644{
645 struct worker *worker = first_worker(gcwq);
646
647 if (likely(worker))
648 wake_up_process(worker->task);
649}
650
651/**
652 * wq_worker_waking_up - a worker is waking up
653 * @task: task waking up
654 * @cpu: CPU @task is waking up to
655 *
656 * This function is called during try_to_wake_up() when a worker is
657 * being awoken.
658 *
659 * CONTEXT:
660 * spin_lock_irq(rq->lock)
661 */
662void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
663{
664 struct worker *worker = kthread_data(task);
665
666 if (likely(!(worker->flags & WORKER_NOT_RUNNING)))
667 atomic_inc(get_gcwq_nr_running(cpu));
668}
669
670/**
671 * wq_worker_sleeping - a worker is going to sleep
672 * @task: task going to sleep
673 * @cpu: CPU in question, must be the current CPU number
674 *
675 * This function is called during schedule() when a busy worker is
676 * going to sleep. Worker on the same cpu can be woken up by
677 * returning pointer to its task.
678 *
679 * CONTEXT:
680 * spin_lock_irq(rq->lock)
681 *
682 * RETURNS:
683 * Worker task on @cpu to wake up, %NULL if none.
684 */
685struct task_struct *wq_worker_sleeping(struct task_struct *task,
686 unsigned int cpu)
687{
688 struct worker *worker = kthread_data(task), *to_wakeup = NULL;
689 struct global_cwq *gcwq = get_gcwq(cpu);
690 atomic_t *nr_running = get_gcwq_nr_running(cpu);
691
692 if (unlikely(worker->flags & WORKER_NOT_RUNNING))
693 return NULL;
694
695 /* this can only happen on the local cpu */
696 BUG_ON(cpu != raw_smp_processor_id());
697
698 /*
699 * The counterpart of the following dec_and_test, implied mb,
700 * worklist not empty test sequence is in insert_work().
701 * Please read comment there.
702 *
703 * NOT_RUNNING is clear. This means that trustee is not in
704 * charge and we're running on the local cpu w/ rq lock held
705 * and preemption disabled, which in turn means that none else
706 * could be manipulating idle_list, so dereferencing idle_list
707 * without gcwq lock is safe.
708 */
709 if (atomic_dec_and_test(nr_running) && !list_empty(&gcwq->worklist))
710 to_wakeup = first_worker(gcwq);
711 return to_wakeup ? to_wakeup->task : NULL;
712}
713
714/**
715 * worker_set_flags - set worker flags and adjust nr_running accordingly
716 * @worker: self
717 * @flags: flags to set
718 * @wakeup: wakeup an idle worker if necessary
719 *
720 * Set @flags in @worker->flags and adjust nr_running accordingly. If
721 * nr_running becomes zero and @wakeup is %true, an idle worker is
722 * woken up.
723 *
724 * CONTEXT:
725 * spin_lock_irq(gcwq->lock)
726 */
727static inline void worker_set_flags(struct worker *worker, unsigned int flags,
728 bool wakeup)
729{
730 struct global_cwq *gcwq = worker->gcwq;
731
732 WARN_ON_ONCE(worker->task != current);
733
734 /*
735 * If transitioning into NOT_RUNNING, adjust nr_running and
736 * wake up an idle worker as necessary if requested by
737 * @wakeup.
738 */
739 if ((flags & WORKER_NOT_RUNNING) &&
740 !(worker->flags & WORKER_NOT_RUNNING)) {
741 atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
742
743 if (wakeup) {
744 if (atomic_dec_and_test(nr_running) &&
745 !list_empty(&gcwq->worklist))
746 wake_up_worker(gcwq);
747 } else
748 atomic_dec(nr_running);
749 }
750
751 worker->flags |= flags;
752}
753
754/**
755 * worker_clr_flags - clear worker flags and adjust nr_running accordingly
756 * @worker: self
757 * @flags: flags to clear
758 *
759 * Clear @flags in @worker->flags and adjust nr_running accordingly.
760 *
761 * CONTEXT:
762 * spin_lock_irq(gcwq->lock)
763 */
764static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
765{
766 struct global_cwq *gcwq = worker->gcwq;
767 unsigned int oflags = worker->flags;
768
769 WARN_ON_ONCE(worker->task != current);
770
771 worker->flags &= ~flags;
772
773 /* if transitioning out of NOT_RUNNING, increment nr_running */
774 if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
775 if (!(worker->flags & WORKER_NOT_RUNNING))
776 atomic_inc(get_gcwq_nr_running(gcwq->cpu));
777}
778
779/**
780 * busy_worker_head - return the busy hash head for a work
781 * @gcwq: gcwq of interest
782 * @work: work to be hashed
783 *
784 * Return hash head of @gcwq for @work.
785 *
786 * CONTEXT:
787 * spin_lock_irq(gcwq->lock).
788 *
789 * RETURNS:
790 * Pointer to the hash head.
791 */
792static struct hlist_head *busy_worker_head(struct global_cwq *gcwq,
793 struct work_struct *work)
794{
795 const int base_shift = ilog2(sizeof(struct work_struct));
796 unsigned long v = (unsigned long)work;
797
798 /* simple shift and fold hash, do we need something better? */
799 v >>= base_shift;
800 v += v >> BUSY_WORKER_HASH_ORDER;
801 v &= BUSY_WORKER_HASH_MASK;
802
803 return &gcwq->busy_hash[v];
804}
805
806/**
807 * __find_worker_executing_work - find worker which is executing a work
808 * @gcwq: gcwq of interest
809 * @bwh: hash head as returned by busy_worker_head()
810 * @work: work to find worker for
811 *
812 * Find a worker which is executing @work on @gcwq. @bwh should be
813 * the hash head obtained by calling busy_worker_head() with the same
814 * work.
815 *
816 * CONTEXT:
817 * spin_lock_irq(gcwq->lock).
818 *
819 * RETURNS:
820 * Pointer to worker which is executing @work if found, NULL
821 * otherwise.
822 */
823static struct worker *__find_worker_executing_work(struct global_cwq *gcwq,
824 struct hlist_head *bwh,
825 struct work_struct *work)
826{
827 struct worker *worker;
828 struct hlist_node *tmp;
829
830 hlist_for_each_entry(worker, tmp, bwh, hentry)
831 if (worker->current_work == work)
832 return worker;
833 return NULL;
834}
835
836/**
837 * find_worker_executing_work - find worker which is executing a work
838 * @gcwq: gcwq of interest
839 * @work: work to find worker for
840 *
841 * Find a worker which is executing @work on @gcwq. This function is
842 * identical to __find_worker_executing_work() except that this
843 * function calculates @bwh itself.
844 *
845 * CONTEXT:
846 * spin_lock_irq(gcwq->lock).
847 *
848 * RETURNS:
849 * Pointer to worker which is executing @work if found, NULL
850 * otherwise.
249 */ 851 */
250static inline void clear_wq_data(struct work_struct *work) 852static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
853 struct work_struct *work)
251{ 854{
252 unsigned long flags = *work_data_bits(work) & 855 return __find_worker_executing_work(gcwq, busy_worker_head(gcwq, work),
253 (1UL << WORK_STRUCT_STATIC); 856 work);
254 atomic_long_set(&work->data, flags);
255} 857}
256 858
257static inline 859/**
258struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) 860 * gcwq_determine_ins_pos - find insertion position
861 * @gcwq: gcwq of interest
862 * @cwq: cwq a work is being queued for
863 *
864 * A work for @cwq is about to be queued on @gcwq, determine insertion
865 * position for the work. If @cwq is for HIGHPRI wq, the work is
866 * queued at the head of the queue but in FIFO order with respect to
867 * other HIGHPRI works; otherwise, at the end of the queue. This
868 * function also sets GCWQ_HIGHPRI_PENDING flag to hint @gcwq that
869 * there are HIGHPRI works pending.
870 *
871 * CONTEXT:
872 * spin_lock_irq(gcwq->lock).
873 *
874 * RETURNS:
875 * Pointer to inserstion position.
876 */
877static inline struct list_head *gcwq_determine_ins_pos(struct global_cwq *gcwq,
878 struct cpu_workqueue_struct *cwq)
259{ 879{
260 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); 880 struct work_struct *twork;
881
882 if (likely(!(cwq->wq->flags & WQ_HIGHPRI)))
883 return &gcwq->worklist;
884
885 list_for_each_entry(twork, &gcwq->worklist, entry) {
886 struct cpu_workqueue_struct *tcwq = get_work_cwq(twork);
887
888 if (!(tcwq->wq->flags & WQ_HIGHPRI))
889 break;
890 }
891
892 gcwq->flags |= GCWQ_HIGHPRI_PENDING;
893 return &twork->entry;
261} 894}
262 895
896/**
897 * insert_work - insert a work into gcwq
898 * @cwq: cwq @work belongs to
899 * @work: work to insert
900 * @head: insertion point
901 * @extra_flags: extra WORK_STRUCT_* flags to set
902 *
903 * Insert @work which belongs to @cwq into @gcwq after @head.
904 * @extra_flags is or'd to work_struct flags.
905 *
906 * CONTEXT:
907 * spin_lock_irq(gcwq->lock).
908 */
263static void insert_work(struct cpu_workqueue_struct *cwq, 909static void insert_work(struct cpu_workqueue_struct *cwq,
264 struct work_struct *work, struct list_head *head) 910 struct work_struct *work, struct list_head *head,
911 unsigned int extra_flags)
265{ 912{
266 trace_workqueue_insertion(cwq->thread, work); 913 struct global_cwq *gcwq = cwq->gcwq;
914
915 /* we own @work, set data and link */
916 set_work_cwq(work, cwq, extra_flags);
267 917
268 set_wq_data(work, cwq);
269 /* 918 /*
270 * Ensure that we get the right work->data if we see the 919 * Ensure that we get the right work->data if we see the
271 * result of list_add() below, see try_to_grab_pending(). 920 * result of list_add() below, see try_to_grab_pending().
272 */ 921 */
273 smp_wmb(); 922 smp_wmb();
923
274 list_add_tail(&work->entry, head); 924 list_add_tail(&work->entry, head);
275 wake_up(&cwq->more_work); 925
926 /*
927 * Ensure either worker_sched_deactivated() sees the above
928 * list_add_tail() or we see zero nr_running to avoid workers
929 * lying around lazily while there are works to be processed.
930 */
931 smp_mb();
932
933 if (__need_more_worker(gcwq))
934 wake_up_worker(gcwq);
276} 935}
277 936
278static void __queue_work(struct cpu_workqueue_struct *cwq, 937static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
279 struct work_struct *work) 938 struct work_struct *work)
280{ 939{
940 struct global_cwq *gcwq;
941 struct cpu_workqueue_struct *cwq;
942 struct list_head *worklist;
281 unsigned long flags; 943 unsigned long flags;
282 944
283 debug_work_activate(work); 945 debug_work_activate(work);
284 spin_lock_irqsave(&cwq->lock, flags); 946
285 insert_work(cwq, work, &cwq->worklist); 947 /* determine gcwq to use */
286 spin_unlock_irqrestore(&cwq->lock, flags); 948 if (!(wq->flags & WQ_UNBOUND)) {
949 struct global_cwq *last_gcwq;
950
951 if (unlikely(cpu == WORK_CPU_UNBOUND))
952 cpu = raw_smp_processor_id();
953
954 /*
955 * It's multi cpu. If @wq is non-reentrant and @work
956 * was previously on a different cpu, it might still
957 * be running there, in which case the work needs to
958 * be queued on that cpu to guarantee non-reentrance.
959 */
960 gcwq = get_gcwq(cpu);
961 if (wq->flags & WQ_NON_REENTRANT &&
962 (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
963 struct worker *worker;
964
965 spin_lock_irqsave(&last_gcwq->lock, flags);
966
967 worker = find_worker_executing_work(last_gcwq, work);
968
969 if (worker && worker->current_cwq->wq == wq)
970 gcwq = last_gcwq;
971 else {
972 /* meh... not running there, queue here */
973 spin_unlock_irqrestore(&last_gcwq->lock, flags);
974 spin_lock_irqsave(&gcwq->lock, flags);
975 }
976 } else
977 spin_lock_irqsave(&gcwq->lock, flags);
978 } else {
979 gcwq = get_gcwq(WORK_CPU_UNBOUND);
980 spin_lock_irqsave(&gcwq->lock, flags);
981 }
982
983 /* gcwq determined, get cwq and queue */
984 cwq = get_cwq(gcwq->cpu, wq);
985
986 BUG_ON(!list_empty(&work->entry));
987
988 cwq->nr_in_flight[cwq->work_color]++;
989
990 if (likely(cwq->nr_active < cwq->max_active)) {
991 cwq->nr_active++;
992 worklist = gcwq_determine_ins_pos(gcwq, cwq);
993 } else
994 worklist = &cwq->delayed_works;
995
996 insert_work(cwq, work, worklist, work_color_to_flags(cwq->work_color));
997
998 spin_unlock_irqrestore(&gcwq->lock, flags);
287} 999}
288 1000
289/** 1001/**
@@ -323,9 +1035,8 @@ queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
323{ 1035{
324 int ret = 0; 1036 int ret = 0;
325 1037
326 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 1038 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
327 BUG_ON(!list_empty(&work->entry)); 1039 __queue_work(cpu, wq, work);
328 __queue_work(wq_per_cpu(wq, cpu), work);
329 ret = 1; 1040 ret = 1;
330 } 1041 }
331 return ret; 1042 return ret;
@@ -335,10 +1046,9 @@ EXPORT_SYMBOL_GPL(queue_work_on);
335static void delayed_work_timer_fn(unsigned long __data) 1046static void delayed_work_timer_fn(unsigned long __data)
336{ 1047{
337 struct delayed_work *dwork = (struct delayed_work *)__data; 1048 struct delayed_work *dwork = (struct delayed_work *)__data;
338 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work); 1049 struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
339 struct workqueue_struct *wq = cwq->wq;
340 1050
341 __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work); 1051 __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
342} 1052}
343 1053
344/** 1054/**
@@ -375,14 +1085,31 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
375 struct timer_list *timer = &dwork->timer; 1085 struct timer_list *timer = &dwork->timer;
376 struct work_struct *work = &dwork->work; 1086 struct work_struct *work = &dwork->work;
377 1087
378 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 1088 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1089 unsigned int lcpu;
1090
379 BUG_ON(timer_pending(timer)); 1091 BUG_ON(timer_pending(timer));
380 BUG_ON(!list_empty(&work->entry)); 1092 BUG_ON(!list_empty(&work->entry));
381 1093
382 timer_stats_timer_set_start_info(&dwork->timer); 1094 timer_stats_timer_set_start_info(&dwork->timer);
383 1095
384 /* This stores cwq for the moment, for the timer_fn */ 1096 /*
385 set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id())); 1097 * This stores cwq for the moment, for the timer_fn.
1098 * Note that the work's gcwq is preserved to allow
1099 * reentrance detection for delayed works.
1100 */
1101 if (!(wq->flags & WQ_UNBOUND)) {
1102 struct global_cwq *gcwq = get_work_gcwq(work);
1103
1104 if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND)
1105 lcpu = gcwq->cpu;
1106 else
1107 lcpu = raw_smp_processor_id();
1108 } else
1109 lcpu = WORK_CPU_UNBOUND;
1110
1111 set_work_cwq(work, get_cwq(lcpu, wq), 0);
1112
386 timer->expires = jiffies + delay; 1113 timer->expires = jiffies + delay;
387 timer->data = (unsigned long)dwork; 1114 timer->data = (unsigned long)dwork;
388 timer->function = delayed_work_timer_fn; 1115 timer->function = delayed_work_timer_fn;
@@ -397,80 +1124,872 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
397} 1124}
398EXPORT_SYMBOL_GPL(queue_delayed_work_on); 1125EXPORT_SYMBOL_GPL(queue_delayed_work_on);
399 1126
400static void run_workqueue(struct cpu_workqueue_struct *cwq) 1127/**
1128 * worker_enter_idle - enter idle state
1129 * @worker: worker which is entering idle state
1130 *
1131 * @worker is entering idle state. Update stats and idle timer if
1132 * necessary.
1133 *
1134 * LOCKING:
1135 * spin_lock_irq(gcwq->lock).
1136 */
1137static void worker_enter_idle(struct worker *worker)
401{ 1138{
402 spin_lock_irq(&cwq->lock); 1139 struct global_cwq *gcwq = worker->gcwq;
403 while (!list_empty(&cwq->worklist)) { 1140
404 struct work_struct *work = list_entry(cwq->worklist.next, 1141 BUG_ON(worker->flags & WORKER_IDLE);
405 struct work_struct, entry); 1142 BUG_ON(!list_empty(&worker->entry) &&
406 work_func_t f = work->func; 1143 (worker->hentry.next || worker->hentry.pprev));
407#ifdef CONFIG_LOCKDEP 1144
1145 /* can't use worker_set_flags(), also called from start_worker() */
1146 worker->flags |= WORKER_IDLE;
1147 gcwq->nr_idle++;
1148 worker->last_active = jiffies;
1149
1150 /* idle_list is LIFO */
1151 list_add(&worker->entry, &gcwq->idle_list);
1152
1153 if (likely(!(worker->flags & WORKER_ROGUE))) {
1154 if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer))
1155 mod_timer(&gcwq->idle_timer,
1156 jiffies + IDLE_WORKER_TIMEOUT);
1157 } else
1158 wake_up_all(&gcwq->trustee_wait);
1159
1160 /* sanity check nr_running */
1161 WARN_ON_ONCE(gcwq->nr_workers == gcwq->nr_idle &&
1162 atomic_read(get_gcwq_nr_running(gcwq->cpu)));
1163}
1164
1165/**
1166 * worker_leave_idle - leave idle state
1167 * @worker: worker which is leaving idle state
1168 *
1169 * @worker is leaving idle state. Update stats.
1170 *
1171 * LOCKING:
1172 * spin_lock_irq(gcwq->lock).
1173 */
1174static void worker_leave_idle(struct worker *worker)
1175{
1176 struct global_cwq *gcwq = worker->gcwq;
1177
1178 BUG_ON(!(worker->flags & WORKER_IDLE));
1179 worker_clr_flags(worker, WORKER_IDLE);
1180 gcwq->nr_idle--;
1181 list_del_init(&worker->entry);
1182}
1183
1184/**
1185 * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq
1186 * @worker: self
1187 *
1188 * Works which are scheduled while the cpu is online must at least be
1189 * scheduled to a worker which is bound to the cpu so that if they are
1190 * flushed from cpu callbacks while cpu is going down, they are
1191 * guaranteed to execute on the cpu.
1192 *
1193 * This function is to be used by rogue workers and rescuers to bind
1194 * themselves to the target cpu and may race with cpu going down or
1195 * coming online. kthread_bind() can't be used because it may put the
1196 * worker to already dead cpu and set_cpus_allowed_ptr() can't be used
1197 * verbatim as it's best effort and blocking and gcwq may be
1198 * [dis]associated in the meantime.
1199 *
1200 * This function tries set_cpus_allowed() and locks gcwq and verifies
1201 * the binding against GCWQ_DISASSOCIATED which is set during
1202 * CPU_DYING and cleared during CPU_ONLINE, so if the worker enters
1203 * idle state or fetches works without dropping lock, it can guarantee
1204 * the scheduling requirement described in the first paragraph.
1205 *
1206 * CONTEXT:
1207 * Might sleep. Called without any lock but returns with gcwq->lock
1208 * held.
1209 *
1210 * RETURNS:
1211 * %true if the associated gcwq is online (@worker is successfully
1212 * bound), %false if offline.
1213 */
1214static bool worker_maybe_bind_and_lock(struct worker *worker)
1215{
1216 struct global_cwq *gcwq = worker->gcwq;
1217 struct task_struct *task = worker->task;
1218
1219 while (true) {
408 /* 1220 /*
409 * It is permissible to free the struct work_struct 1221 * The following call may fail, succeed or succeed
410 * from inside the function that is called from it, 1222 * without actually migrating the task to the cpu if
411 * this we need to take into account for lockdep too. 1223 * it races with cpu hotunplug operation. Verify
412 * To avoid bogus "held lock freed" warnings as well 1224 * against GCWQ_DISASSOCIATED.
413 * as problems when looking into work->lockdep_map,
414 * make a copy and use that here.
415 */ 1225 */
416 struct lockdep_map lockdep_map = work->lockdep_map; 1226 if (!(gcwq->flags & GCWQ_DISASSOCIATED))
417#endif 1227 set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu));
418 trace_workqueue_execution(cwq->thread, work); 1228
419 debug_work_deactivate(work); 1229 spin_lock_irq(&gcwq->lock);
420 cwq->current_work = work; 1230 if (gcwq->flags & GCWQ_DISASSOCIATED)
421 list_del_init(cwq->worklist.next); 1231 return false;
422 spin_unlock_irq(&cwq->lock); 1232 if (task_cpu(task) == gcwq->cpu &&
423 1233 cpumask_equal(&current->cpus_allowed,
424 BUG_ON(get_wq_data(work) != cwq); 1234 get_cpu_mask(gcwq->cpu)))
425 work_clear_pending(work); 1235 return true;
426 lock_map_acquire(&cwq->wq->lockdep_map); 1236 spin_unlock_irq(&gcwq->lock);
427 lock_map_acquire(&lockdep_map); 1237
428 f(work); 1238 /* CPU has come up inbetween, retry migration */
429 lock_map_release(&lockdep_map); 1239 cpu_relax();
430 lock_map_release(&cwq->wq->lockdep_map); 1240 }
431 1241}
432 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 1242
433 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 1243/*
434 "%s/0x%08x/%d\n", 1244 * Function for worker->rebind_work used to rebind rogue busy workers
435 current->comm, preempt_count(), 1245 * to the associated cpu which is coming back online. This is
436 task_pid_nr(current)); 1246 * scheduled by cpu up but can race with other cpu hotplug operations
437 printk(KERN_ERR " last function: "); 1247 * and may be executed twice without intervening cpu down.
438 print_symbol("%s\n", (unsigned long)f); 1248 */
439 debug_show_held_locks(current); 1249static void worker_rebind_fn(struct work_struct *work)
440 dump_stack(); 1250{
1251 struct worker *worker = container_of(work, struct worker, rebind_work);
1252 struct global_cwq *gcwq = worker->gcwq;
1253
1254 if (worker_maybe_bind_and_lock(worker))
1255 worker_clr_flags(worker, WORKER_REBIND);
1256
1257 spin_unlock_irq(&gcwq->lock);
1258}
1259
1260static struct worker *alloc_worker(void)
1261{
1262 struct worker *worker;
1263
1264 worker = kzalloc(sizeof(*worker), GFP_KERNEL);
1265 if (worker) {
1266 INIT_LIST_HEAD(&worker->entry);
1267 INIT_LIST_HEAD(&worker->scheduled);
1268 INIT_WORK(&worker->rebind_work, worker_rebind_fn);
1269 /* on creation a worker is in !idle && prep state */
1270 worker->flags = WORKER_PREP;
1271 }
1272 return worker;
1273}
1274
1275/**
1276 * create_worker - create a new workqueue worker
1277 * @gcwq: gcwq the new worker will belong to
1278 * @bind: whether to set affinity to @cpu or not
1279 *
1280 * Create a new worker which is bound to @gcwq. The returned worker
1281 * can be started by calling start_worker() or destroyed using
1282 * destroy_worker().
1283 *
1284 * CONTEXT:
1285 * Might sleep. Does GFP_KERNEL allocations.
1286 *
1287 * RETURNS:
1288 * Pointer to the newly created worker.
1289 */
1290static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
1291{
1292 bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND;
1293 struct worker *worker = NULL;
1294 int id = -1;
1295
1296 spin_lock_irq(&gcwq->lock);
1297 while (ida_get_new(&gcwq->worker_ida, &id)) {
1298 spin_unlock_irq(&gcwq->lock);
1299 if (!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))
1300 goto fail;
1301 spin_lock_irq(&gcwq->lock);
1302 }
1303 spin_unlock_irq(&gcwq->lock);
1304
1305 worker = alloc_worker();
1306 if (!worker)
1307 goto fail;
1308
1309 worker->gcwq = gcwq;
1310 worker->id = id;
1311
1312 if (!on_unbound_cpu)
1313 worker->task = kthread_create(worker_thread, worker,
1314 "kworker/%u:%d", gcwq->cpu, id);
1315 else
1316 worker->task = kthread_create(worker_thread, worker,
1317 "kworker/u:%d", id);
1318 if (IS_ERR(worker->task))
1319 goto fail;
1320
1321 /*
1322 * A rogue worker will become a regular one if CPU comes
1323 * online later on. Make sure every worker has
1324 * PF_THREAD_BOUND set.
1325 */
1326 if (bind && !on_unbound_cpu)
1327 kthread_bind(worker->task, gcwq->cpu);
1328 else {
1329 worker->task->flags |= PF_THREAD_BOUND;
1330 if (on_unbound_cpu)
1331 worker->flags |= WORKER_UNBOUND;
1332 }
1333
1334 return worker;
1335fail:
1336 if (id >= 0) {
1337 spin_lock_irq(&gcwq->lock);
1338 ida_remove(&gcwq->worker_ida, id);
1339 spin_unlock_irq(&gcwq->lock);
1340 }
1341 kfree(worker);
1342 return NULL;
1343}
1344
1345/**
1346 * start_worker - start a newly created worker
1347 * @worker: worker to start
1348 *
1349 * Make the gcwq aware of @worker and start it.
1350 *
1351 * CONTEXT:
1352 * spin_lock_irq(gcwq->lock).
1353 */
1354static void start_worker(struct worker *worker)
1355{
1356 worker->flags |= WORKER_STARTED;
1357 worker->gcwq->nr_workers++;
1358 worker_enter_idle(worker);
1359 wake_up_process(worker->task);
1360}
1361
1362/**
1363 * destroy_worker - destroy a workqueue worker
1364 * @worker: worker to be destroyed
1365 *
1366 * Destroy @worker and adjust @gcwq stats accordingly.
1367 *
1368 * CONTEXT:
1369 * spin_lock_irq(gcwq->lock) which is released and regrabbed.
1370 */
1371static void destroy_worker(struct worker *worker)
1372{
1373 struct global_cwq *gcwq = worker->gcwq;
1374 int id = worker->id;
1375
1376 /* sanity check frenzy */
1377 BUG_ON(worker->current_work);
1378 BUG_ON(!list_empty(&worker->scheduled));
1379
1380 if (worker->flags & WORKER_STARTED)
1381 gcwq->nr_workers--;
1382 if (worker->flags & WORKER_IDLE)
1383 gcwq->nr_idle--;
1384
1385 list_del_init(&worker->entry);
1386 worker->flags |= WORKER_DIE;
1387
1388 spin_unlock_irq(&gcwq->lock);
1389
1390 kthread_stop(worker->task);
1391 kfree(worker);
1392
1393 spin_lock_irq(&gcwq->lock);
1394 ida_remove(&gcwq->worker_ida, id);
1395}
1396
1397static void idle_worker_timeout(unsigned long __gcwq)
1398{
1399 struct global_cwq *gcwq = (void *)__gcwq;
1400
1401 spin_lock_irq(&gcwq->lock);
1402
1403 if (too_many_workers(gcwq)) {
1404 struct worker *worker;
1405 unsigned long expires;
1406
1407 /* idle_list is kept in LIFO order, check the last one */
1408 worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
1409 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1410
1411 if (time_before(jiffies, expires))
1412 mod_timer(&gcwq->idle_timer, expires);
1413 else {
1414 /* it's been idle for too long, wake up manager */
1415 gcwq->flags |= GCWQ_MANAGE_WORKERS;
1416 wake_up_worker(gcwq);
1417 }
1418 }
1419
1420 spin_unlock_irq(&gcwq->lock);
1421}
1422
1423static bool send_mayday(struct work_struct *work)
1424{
1425 struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1426 struct workqueue_struct *wq = cwq->wq;
1427 unsigned int cpu;
1428
1429 if (!(wq->flags & WQ_RESCUER))
1430 return false;
1431
1432 /* mayday mayday mayday */
1433 cpu = cwq->gcwq->cpu;
1434 /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */
1435 if (cpu == WORK_CPU_UNBOUND)
1436 cpu = 0;
1437 if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask))
1438 wake_up_process(wq->rescuer->task);
1439 return true;
1440}
1441
1442static void gcwq_mayday_timeout(unsigned long __gcwq)
1443{
1444 struct global_cwq *gcwq = (void *)__gcwq;
1445 struct work_struct *work;
1446
1447 spin_lock_irq(&gcwq->lock);
1448
1449 if (need_to_create_worker(gcwq)) {
1450 /*
1451 * We've been trying to create a new worker but
1452 * haven't been successful. We might be hitting an
1453 * allocation deadlock. Send distress signals to
1454 * rescuers.
1455 */
1456 list_for_each_entry(work, &gcwq->worklist, entry)
1457 send_mayday(work);
1458 }
1459
1460 spin_unlock_irq(&gcwq->lock);
1461
1462 mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INTERVAL);
1463}
1464
1465/**
1466 * maybe_create_worker - create a new worker if necessary
1467 * @gcwq: gcwq to create a new worker for
1468 *
1469 * Create a new worker for @gcwq if necessary. @gcwq is guaranteed to
1470 * have at least one idle worker on return from this function. If
1471 * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
1472 * sent to all rescuers with works scheduled on @gcwq to resolve
1473 * possible allocation deadlock.
1474 *
1475 * On return, need_to_create_worker() is guaranteed to be false and
1476 * may_start_working() true.
1477 *
1478 * LOCKING:
1479 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1480 * multiple times. Does GFP_KERNEL allocations. Called only from
1481 * manager.
1482 *
1483 * RETURNS:
1484 * false if no action was taken and gcwq->lock stayed locked, true
1485 * otherwise.
1486 */
1487static bool maybe_create_worker(struct global_cwq *gcwq)
1488{
1489 if (!need_to_create_worker(gcwq))
1490 return false;
1491restart:
1492 spin_unlock_irq(&gcwq->lock);
1493
1494 /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
1495 mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
1496
1497 while (true) {
1498 struct worker *worker;
1499
1500 worker = create_worker(gcwq, true);
1501 if (worker) {
1502 del_timer_sync(&gcwq->mayday_timer);
1503 spin_lock_irq(&gcwq->lock);
1504 start_worker(worker);
1505 BUG_ON(need_to_create_worker(gcwq));
1506 return true;
1507 }
1508
1509 if (!need_to_create_worker(gcwq))
1510 break;
1511
1512 __set_current_state(TASK_INTERRUPTIBLE);
1513 schedule_timeout(CREATE_COOLDOWN);
1514
1515 if (!need_to_create_worker(gcwq))
1516 break;
1517 }
1518
1519 del_timer_sync(&gcwq->mayday_timer);
1520 spin_lock_irq(&gcwq->lock);
1521 if (need_to_create_worker(gcwq))
1522 goto restart;
1523 return true;
1524}
1525
1526/**
1527 * maybe_destroy_worker - destroy workers which have been idle for a while
1528 * @gcwq: gcwq to destroy workers for
1529 *
1530 * Destroy @gcwq workers which have been idle for longer than
1531 * IDLE_WORKER_TIMEOUT.
1532 *
1533 * LOCKING:
1534 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1535 * multiple times. Called only from manager.
1536 *
1537 * RETURNS:
1538 * false if no action was taken and gcwq->lock stayed locked, true
1539 * otherwise.
1540 */
1541static bool maybe_destroy_workers(struct global_cwq *gcwq)
1542{
1543 bool ret = false;
1544
1545 while (too_many_workers(gcwq)) {
1546 struct worker *worker;
1547 unsigned long expires;
1548
1549 worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
1550 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1551
1552 if (time_before(jiffies, expires)) {
1553 mod_timer(&gcwq->idle_timer, expires);
1554 break;
441 } 1555 }
442 1556
443 spin_lock_irq(&cwq->lock); 1557 destroy_worker(worker);
444 cwq->current_work = NULL; 1558 ret = true;
445 } 1559 }
446 spin_unlock_irq(&cwq->lock); 1560
1561 return ret;
447} 1562}
448 1563
449static int worker_thread(void *__cwq) 1564/**
1565 * manage_workers - manage worker pool
1566 * @worker: self
1567 *
1568 * Assume the manager role and manage gcwq worker pool @worker belongs
1569 * to. At any given time, there can be only zero or one manager per
1570 * gcwq. The exclusion is handled automatically by this function.
1571 *
1572 * The caller can safely start processing works on false return. On
1573 * true return, it's guaranteed that need_to_create_worker() is false
1574 * and may_start_working() is true.
1575 *
1576 * CONTEXT:
1577 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1578 * multiple times. Does GFP_KERNEL allocations.
1579 *
1580 * RETURNS:
1581 * false if no action was taken and gcwq->lock stayed locked, true if
1582 * some action was taken.
1583 */
1584static bool manage_workers(struct worker *worker)
450{ 1585{
451 struct cpu_workqueue_struct *cwq = __cwq; 1586 struct global_cwq *gcwq = worker->gcwq;
452 DEFINE_WAIT(wait); 1587 bool ret = false;
453 1588
454 if (cwq->wq->freezeable) 1589 if (gcwq->flags & GCWQ_MANAGING_WORKERS)
455 set_freezable(); 1590 return ret;
456 1591
457 for (;;) { 1592 gcwq->flags &= ~GCWQ_MANAGE_WORKERS;
458 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); 1593 gcwq->flags |= GCWQ_MANAGING_WORKERS;
459 if (!freezing(current) &&
460 !kthread_should_stop() &&
461 list_empty(&cwq->worklist))
462 schedule();
463 finish_wait(&cwq->more_work, &wait);
464 1594
465 try_to_freeze(); 1595 /*
1596 * Destroy and then create so that may_start_working() is true
1597 * on return.
1598 */
1599 ret |= maybe_destroy_workers(gcwq);
1600 ret |= maybe_create_worker(gcwq);
1601
1602 gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
1603
1604 /*
1605 * The trustee might be waiting to take over the manager
1606 * position, tell it we're done.
1607 */
1608 if (unlikely(gcwq->trustee))
1609 wake_up_all(&gcwq->trustee_wait);
1610
1611 return ret;
1612}
1613
1614/**
1615 * move_linked_works - move linked works to a list
1616 * @work: start of series of works to be scheduled
1617 * @head: target list to append @work to
1618 * @nextp: out paramter for nested worklist walking
1619 *
1620 * Schedule linked works starting from @work to @head. Work series to
1621 * be scheduled starts at @work and includes any consecutive work with
1622 * WORK_STRUCT_LINKED set in its predecessor.
1623 *
1624 * If @nextp is not NULL, it's updated to point to the next work of
1625 * the last scheduled work. This allows move_linked_works() to be
1626 * nested inside outer list_for_each_entry_safe().
1627 *
1628 * CONTEXT:
1629 * spin_lock_irq(gcwq->lock).
1630 */
1631static void move_linked_works(struct work_struct *work, struct list_head *head,
1632 struct work_struct **nextp)
1633{
1634 struct work_struct *n;
466 1635
467 if (kthread_should_stop()) 1636 /*
1637 * Linked worklist will always end before the end of the list,
1638 * use NULL for list head.
1639 */
1640 list_for_each_entry_safe_from(work, n, NULL, entry) {
1641 list_move_tail(&work->entry, head);
1642 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
468 break; 1643 break;
1644 }
1645
1646 /*
1647 * If we're already inside safe list traversal and have moved
1648 * multiple works to the scheduled queue, the next position
1649 * needs to be updated.
1650 */
1651 if (nextp)
1652 *nextp = n;
1653}
469 1654
470 run_workqueue(cwq); 1655static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
1656{
1657 struct work_struct *work = list_first_entry(&cwq->delayed_works,
1658 struct work_struct, entry);
1659 struct list_head *pos = gcwq_determine_ins_pos(cwq->gcwq, cwq);
1660
1661 move_linked_works(work, pos, NULL);
1662 cwq->nr_active++;
1663}
1664
1665/**
1666 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
1667 * @cwq: cwq of interest
1668 * @color: color of work which left the queue
1669 *
1670 * A work either has completed or is removed from pending queue,
1671 * decrement nr_in_flight of its cwq and handle workqueue flushing.
1672 *
1673 * CONTEXT:
1674 * spin_lock_irq(gcwq->lock).
1675 */
1676static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
1677{
1678 /* ignore uncolored works */
1679 if (color == WORK_NO_COLOR)
1680 return;
1681
1682 cwq->nr_in_flight[color]--;
1683 cwq->nr_active--;
1684
1685 if (!list_empty(&cwq->delayed_works)) {
1686 /* one down, submit a delayed one */
1687 if (cwq->nr_active < cwq->max_active)
1688 cwq_activate_first_delayed(cwq);
471 } 1689 }
472 1690
473 return 0; 1691 /* is flush in progress and are we at the flushing tip? */
1692 if (likely(cwq->flush_color != color))
1693 return;
1694
1695 /* are there still in-flight works? */
1696 if (cwq->nr_in_flight[color])
1697 return;
1698
1699 /* this cwq is done, clear flush_color */
1700 cwq->flush_color = -1;
1701
1702 /*
1703 * If this was the last cwq, wake up the first flusher. It
1704 * will handle the rest.
1705 */
1706 if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
1707 complete(&cwq->wq->first_flusher->done);
1708}
1709
1710/**
1711 * process_one_work - process single work
1712 * @worker: self
1713 * @work: work to process
1714 *
1715 * Process @work. This function contains all the logics necessary to
1716 * process a single work including synchronization against and
1717 * interaction with other workers on the same cpu, queueing and
1718 * flushing. As long as context requirement is met, any worker can
1719 * call this function to process a work.
1720 *
1721 * CONTEXT:
1722 * spin_lock_irq(gcwq->lock) which is released and regrabbed.
1723 */
1724static void process_one_work(struct worker *worker, struct work_struct *work)
1725{
1726 struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1727 struct global_cwq *gcwq = cwq->gcwq;
1728 struct hlist_head *bwh = busy_worker_head(gcwq, work);
1729 bool cpu_intensive = cwq->wq->flags & WQ_CPU_INTENSIVE;
1730 work_func_t f = work->func;
1731 int work_color;
1732 struct worker *collision;
1733#ifdef CONFIG_LOCKDEP
1734 /*
1735 * It is permissible to free the struct work_struct from
1736 * inside the function that is called from it, this we need to
1737 * take into account for lockdep too. To avoid bogus "held
1738 * lock freed" warnings as well as problems when looking into
1739 * work->lockdep_map, make a copy and use that here.
1740 */
1741 struct lockdep_map lockdep_map = work->lockdep_map;
1742#endif
1743 /*
1744 * A single work shouldn't be executed concurrently by
1745 * multiple workers on a single cpu. Check whether anyone is
1746 * already processing the work. If so, defer the work to the
1747 * currently executing one.
1748 */
1749 collision = __find_worker_executing_work(gcwq, bwh, work);
1750 if (unlikely(collision)) {
1751 move_linked_works(work, &collision->scheduled, NULL);
1752 return;
1753 }
1754
1755 /* claim and process */
1756 debug_work_deactivate(work);
1757 hlist_add_head(&worker->hentry, bwh);
1758 worker->current_work = work;
1759 worker->current_cwq = cwq;
1760 work_color = get_work_color(work);
1761
1762 /* record the current cpu number in the work data and dequeue */
1763 set_work_cpu(work, gcwq->cpu);
1764 list_del_init(&work->entry);
1765
1766 /*
1767 * If HIGHPRI_PENDING, check the next work, and, if HIGHPRI,
1768 * wake up another worker; otherwise, clear HIGHPRI_PENDING.
1769 */
1770 if (unlikely(gcwq->flags & GCWQ_HIGHPRI_PENDING)) {
1771 struct work_struct *nwork = list_first_entry(&gcwq->worklist,
1772 struct work_struct, entry);
1773
1774 if (!list_empty(&gcwq->worklist) &&
1775 get_work_cwq(nwork)->wq->flags & WQ_HIGHPRI)
1776 wake_up_worker(gcwq);
1777 else
1778 gcwq->flags &= ~GCWQ_HIGHPRI_PENDING;
1779 }
1780
1781 /*
1782 * CPU intensive works don't participate in concurrency
1783 * management. They're the scheduler's responsibility.
1784 */
1785 if (unlikely(cpu_intensive))
1786 worker_set_flags(worker, WORKER_CPU_INTENSIVE, true);
1787
1788 spin_unlock_irq(&gcwq->lock);
1789
1790 work_clear_pending(work);
1791 lock_map_acquire(&cwq->wq->lockdep_map);
1792 lock_map_acquire(&lockdep_map);
1793 f(work);
1794 lock_map_release(&lockdep_map);
1795 lock_map_release(&cwq->wq->lockdep_map);
1796
1797 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
1798 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
1799 "%s/0x%08x/%d\n",
1800 current->comm, preempt_count(), task_pid_nr(current));
1801 printk(KERN_ERR " last function: ");
1802 print_symbol("%s\n", (unsigned long)f);
1803 debug_show_held_locks(current);
1804 dump_stack();
1805 }
1806
1807 spin_lock_irq(&gcwq->lock);
1808
1809 /* clear cpu intensive status */
1810 if (unlikely(cpu_intensive))
1811 worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
1812
1813 /* we're done with it, release */
1814 hlist_del_init(&worker->hentry);
1815 worker->current_work = NULL;
1816 worker->current_cwq = NULL;
1817 cwq_dec_nr_in_flight(cwq, work_color);
1818}
1819
1820/**
1821 * process_scheduled_works - process scheduled works
1822 * @worker: self
1823 *
1824 * Process all scheduled works. Please note that the scheduled list
1825 * may change while processing a work, so this function repeatedly
1826 * fetches a work from the top and executes it.
1827 *
1828 * CONTEXT:
1829 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1830 * multiple times.
1831 */
1832static void process_scheduled_works(struct worker *worker)
1833{
1834 while (!list_empty(&worker->scheduled)) {
1835 struct work_struct *work = list_first_entry(&worker->scheduled,
1836 struct work_struct, entry);
1837 process_one_work(worker, work);
1838 }
1839}
1840
1841/**
1842 * worker_thread - the worker thread function
1843 * @__worker: self
1844 *
1845 * The gcwq worker thread function. There's a single dynamic pool of
1846 * these per each cpu. These workers process all works regardless of
1847 * their specific target workqueue. The only exception is works which
1848 * belong to workqueues with a rescuer which will be explained in
1849 * rescuer_thread().
1850 */
1851static int worker_thread(void *__worker)
1852{
1853 struct worker *worker = __worker;
1854 struct global_cwq *gcwq = worker->gcwq;
1855
1856 /* tell the scheduler that this is a workqueue worker */
1857 worker->task->flags |= PF_WQ_WORKER;
1858woke_up:
1859 spin_lock_irq(&gcwq->lock);
1860
1861 /* DIE can be set only while we're idle, checking here is enough */
1862 if (worker->flags & WORKER_DIE) {
1863 spin_unlock_irq(&gcwq->lock);
1864 worker->task->flags &= ~PF_WQ_WORKER;
1865 return 0;
1866 }
1867
1868 worker_leave_idle(worker);
1869recheck:
1870 /* no more worker necessary? */
1871 if (!need_more_worker(gcwq))
1872 goto sleep;
1873
1874 /* do we need to manage? */
1875 if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
1876 goto recheck;
1877
1878 /*
1879 * ->scheduled list can only be filled while a worker is
1880 * preparing to process a work or actually processing it.
1881 * Make sure nobody diddled with it while I was sleeping.
1882 */
1883 BUG_ON(!list_empty(&worker->scheduled));
1884
1885 /*
1886 * When control reaches this point, we're guaranteed to have
1887 * at least one idle worker or that someone else has already
1888 * assumed the manager role.
1889 */
1890 worker_clr_flags(worker, WORKER_PREP);
1891
1892 do {
1893 struct work_struct *work =
1894 list_first_entry(&gcwq->worklist,
1895 struct work_struct, entry);
1896
1897 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
1898 /* optimization path, not strictly necessary */
1899 process_one_work(worker, work);
1900 if (unlikely(!list_empty(&worker->scheduled)))
1901 process_scheduled_works(worker);
1902 } else {
1903 move_linked_works(work, &worker->scheduled, NULL);
1904 process_scheduled_works(worker);
1905 }
1906 } while (keep_working(gcwq));
1907
1908 worker_set_flags(worker, WORKER_PREP, false);
1909sleep:
1910 if (unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))
1911 goto recheck;
1912
1913 /*
1914 * gcwq->lock is held and there's no work to process and no
1915 * need to manage, sleep. Workers are woken up only while
1916 * holding gcwq->lock or from local cpu, so setting the
1917 * current state before releasing gcwq->lock is enough to
1918 * prevent losing any event.
1919 */
1920 worker_enter_idle(worker);
1921 __set_current_state(TASK_INTERRUPTIBLE);
1922 spin_unlock_irq(&gcwq->lock);
1923 schedule();
1924 goto woke_up;
1925}
1926
1927/**
1928 * rescuer_thread - the rescuer thread function
1929 * @__wq: the associated workqueue
1930 *
1931 * Workqueue rescuer thread function. There's one rescuer for each
1932 * workqueue which has WQ_RESCUER set.
1933 *
1934 * Regular work processing on a gcwq may block trying to create a new
1935 * worker which uses GFP_KERNEL allocation which has slight chance of
1936 * developing into deadlock if some works currently on the same queue
1937 * need to be processed to satisfy the GFP_KERNEL allocation. This is
1938 * the problem rescuer solves.
1939 *
1940 * When such condition is possible, the gcwq summons rescuers of all
1941 * workqueues which have works queued on the gcwq and let them process
1942 * those works so that forward progress can be guaranteed.
1943 *
1944 * This should happen rarely.
1945 */
1946static int rescuer_thread(void *__wq)
1947{
1948 struct workqueue_struct *wq = __wq;
1949 struct worker *rescuer = wq->rescuer;
1950 struct list_head *scheduled = &rescuer->scheduled;
1951 bool is_unbound = wq->flags & WQ_UNBOUND;
1952 unsigned int cpu;
1953
1954 set_user_nice(current, RESCUER_NICE_LEVEL);
1955repeat:
1956 set_current_state(TASK_INTERRUPTIBLE);
1957
1958 if (kthread_should_stop())
1959 return 0;
1960
1961 /*
1962 * See whether any cpu is asking for help. Unbounded
1963 * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND.
1964 */
1965 for_each_mayday_cpu(cpu, wq->mayday_mask) {
1966 unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu;
1967 struct cpu_workqueue_struct *cwq = get_cwq(tcpu, wq);
1968 struct global_cwq *gcwq = cwq->gcwq;
1969 struct work_struct *work, *n;
1970
1971 __set_current_state(TASK_RUNNING);
1972 mayday_clear_cpu(cpu, wq->mayday_mask);
1973
1974 /* migrate to the target cpu if possible */
1975 rescuer->gcwq = gcwq;
1976 worker_maybe_bind_and_lock(rescuer);
1977
1978 /*
1979 * Slurp in all works issued via this workqueue and
1980 * process'em.
1981 */
1982 BUG_ON(!list_empty(&rescuer->scheduled));
1983 list_for_each_entry_safe(work, n, &gcwq->worklist, entry)
1984 if (get_work_cwq(work) == cwq)
1985 move_linked_works(work, scheduled, &n);
1986
1987 process_scheduled_works(rescuer);
1988 spin_unlock_irq(&gcwq->lock);
1989 }
1990
1991 schedule();
1992 goto repeat;
474} 1993}
475 1994
476struct wq_barrier { 1995struct wq_barrier {
@@ -484,44 +2003,137 @@ static void wq_barrier_func(struct work_struct *work)
484 complete(&barr->done); 2003 complete(&barr->done);
485} 2004}
486 2005
2006/**
2007 * insert_wq_barrier - insert a barrier work
2008 * @cwq: cwq to insert barrier into
2009 * @barr: wq_barrier to insert
2010 * @target: target work to attach @barr to
2011 * @worker: worker currently executing @target, NULL if @target is not executing
2012 *
2013 * @barr is linked to @target such that @barr is completed only after
2014 * @target finishes execution. Please note that the ordering
2015 * guarantee is observed only with respect to @target and on the local
2016 * cpu.
2017 *
2018 * Currently, a queued barrier can't be canceled. This is because
2019 * try_to_grab_pending() can't determine whether the work to be
2020 * grabbed is at the head of the queue and thus can't clear LINKED
2021 * flag of the previous work while there must be a valid next work
2022 * after a work with LINKED flag set.
2023 *
2024 * Note that when @worker is non-NULL, @target may be modified
2025 * underneath us, so we can't reliably determine cwq from @target.
2026 *
2027 * CONTEXT:
2028 * spin_lock_irq(gcwq->lock).
2029 */
487static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, 2030static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
488 struct wq_barrier *barr, struct list_head *head) 2031 struct wq_barrier *barr,
2032 struct work_struct *target, struct worker *worker)
489{ 2033{
2034 struct list_head *head;
2035 unsigned int linked = 0;
2036
490 /* 2037 /*
491 * debugobject calls are safe here even with cwq->lock locked 2038 * debugobject calls are safe here even with gcwq->lock locked
492 * as we know for sure that this will not trigger any of the 2039 * as we know for sure that this will not trigger any of the
493 * checks and call back into the fixup functions where we 2040 * checks and call back into the fixup functions where we
494 * might deadlock. 2041 * might deadlock.
495 */ 2042 */
496 INIT_WORK_ON_STACK(&barr->work, wq_barrier_func); 2043 INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
497 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); 2044 __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
498
499 init_completion(&barr->done); 2045 init_completion(&barr->done);
500 2046
2047 /*
2048 * If @target is currently being executed, schedule the
2049 * barrier to the worker; otherwise, put it after @target.
2050 */
2051 if (worker)
2052 head = worker->scheduled.next;
2053 else {
2054 unsigned long *bits = work_data_bits(target);
2055
2056 head = target->entry.next;
2057 /* there can already be other linked works, inherit and set */
2058 linked = *bits & WORK_STRUCT_LINKED;
2059 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
2060 }
2061
501 debug_work_activate(&barr->work); 2062 debug_work_activate(&barr->work);
502 insert_work(cwq, &barr->work, head); 2063 insert_work(cwq, &barr->work, head,
2064 work_color_to_flags(WORK_NO_COLOR) | linked);
503} 2065}
504 2066
505static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 2067/**
2068 * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
2069 * @wq: workqueue being flushed
2070 * @flush_color: new flush color, < 0 for no-op
2071 * @work_color: new work color, < 0 for no-op
2072 *
2073 * Prepare cwqs for workqueue flushing.
2074 *
2075 * If @flush_color is non-negative, flush_color on all cwqs should be
2076 * -1. If no cwq has in-flight commands at the specified color, all
2077 * cwq->flush_color's stay at -1 and %false is returned. If any cwq
2078 * has in flight commands, its cwq->flush_color is set to
2079 * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
2080 * wakeup logic is armed and %true is returned.
2081 *
2082 * The caller should have initialized @wq->first_flusher prior to
2083 * calling this function with non-negative @flush_color. If
2084 * @flush_color is negative, no flush color update is done and %false
2085 * is returned.
2086 *
2087 * If @work_color is non-negative, all cwqs should have the same
2088 * work_color which is previous to @work_color and all will be
2089 * advanced to @work_color.
2090 *
2091 * CONTEXT:
2092 * mutex_lock(wq->flush_mutex).
2093 *
2094 * RETURNS:
2095 * %true if @flush_color >= 0 and there's something to flush. %false
2096 * otherwise.
2097 */
2098static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
2099 int flush_color, int work_color)
506{ 2100{
507 int active = 0; 2101 bool wait = false;
508 struct wq_barrier barr; 2102 unsigned int cpu;
509
510 WARN_ON(cwq->thread == current);
511 2103
512 spin_lock_irq(&cwq->lock); 2104 if (flush_color >= 0) {
513 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) { 2105 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
514 insert_wq_barrier(cwq, &barr, &cwq->worklist); 2106 atomic_set(&wq->nr_cwqs_to_flush, 1);
515 active = 1;
516 } 2107 }
517 spin_unlock_irq(&cwq->lock);
518 2108
519 if (active) { 2109 for_each_cwq_cpu(cpu, wq) {
520 wait_for_completion(&barr.done); 2110 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
521 destroy_work_on_stack(&barr.work); 2111 struct global_cwq *gcwq = cwq->gcwq;
2112
2113 spin_lock_irq(&gcwq->lock);
2114
2115 if (flush_color >= 0) {
2116 BUG_ON(cwq->flush_color != -1);
2117
2118 if (cwq->nr_in_flight[flush_color]) {
2119 cwq->flush_color = flush_color;
2120 atomic_inc(&wq->nr_cwqs_to_flush);
2121 wait = true;
2122 }
2123 }
2124
2125 if (work_color >= 0) {
2126 BUG_ON(work_color != work_next_color(cwq->work_color));
2127 cwq->work_color = work_color;
2128 }
2129
2130 spin_unlock_irq(&gcwq->lock);
522 } 2131 }
523 2132
524 return active; 2133 if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
2134 complete(&wq->first_flusher->done);
2135
2136 return wait;
525} 2137}
526 2138
527/** 2139/**
@@ -533,20 +2145,150 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
533 * 2145 *
534 * We sleep until all works which were queued on entry have been handled, 2146 * We sleep until all works which were queued on entry have been handled,
535 * but we are not livelocked by new incoming ones. 2147 * but we are not livelocked by new incoming ones.
536 *
537 * This function used to run the workqueues itself. Now we just wait for the
538 * helper threads to do it.
539 */ 2148 */
540void flush_workqueue(struct workqueue_struct *wq) 2149void flush_workqueue(struct workqueue_struct *wq)
541{ 2150{
542 const struct cpumask *cpu_map = wq_cpu_map(wq); 2151 struct wq_flusher this_flusher = {
543 int cpu; 2152 .list = LIST_HEAD_INIT(this_flusher.list),
2153 .flush_color = -1,
2154 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
2155 };
2156 int next_color;
544 2157
545 might_sleep();
546 lock_map_acquire(&wq->lockdep_map); 2158 lock_map_acquire(&wq->lockdep_map);
547 lock_map_release(&wq->lockdep_map); 2159 lock_map_release(&wq->lockdep_map);
548 for_each_cpu(cpu, cpu_map) 2160
549 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 2161 mutex_lock(&wq->flush_mutex);
2162
2163 /*
2164 * Start-to-wait phase
2165 */
2166 next_color = work_next_color(wq->work_color);
2167
2168 if (next_color != wq->flush_color) {
2169 /*
2170 * Color space is not full. The current work_color
2171 * becomes our flush_color and work_color is advanced
2172 * by one.
2173 */
2174 BUG_ON(!list_empty(&wq->flusher_overflow));
2175 this_flusher.flush_color = wq->work_color;
2176 wq->work_color = next_color;
2177
2178 if (!wq->first_flusher) {
2179 /* no flush in progress, become the first flusher */
2180 BUG_ON(wq->flush_color != this_flusher.flush_color);
2181
2182 wq->first_flusher = &this_flusher;
2183
2184 if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
2185 wq->work_color)) {
2186 /* nothing to flush, done */
2187 wq->flush_color = next_color;
2188 wq->first_flusher = NULL;
2189 goto out_unlock;
2190 }
2191 } else {
2192 /* wait in queue */
2193 BUG_ON(wq->flush_color == this_flusher.flush_color);
2194 list_add_tail(&this_flusher.list, &wq->flusher_queue);
2195 flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
2196 }
2197 } else {
2198 /*
2199 * Oops, color space is full, wait on overflow queue.
2200 * The next flush completion will assign us
2201 * flush_color and transfer to flusher_queue.
2202 */
2203 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
2204 }
2205
2206 mutex_unlock(&wq->flush_mutex);
2207
2208 wait_for_completion(&this_flusher.done);
2209
2210 /*
2211 * Wake-up-and-cascade phase
2212 *
2213 * First flushers are responsible for cascading flushes and
2214 * handling overflow. Non-first flushers can simply return.
2215 */
2216 if (wq->first_flusher != &this_flusher)
2217 return;
2218
2219 mutex_lock(&wq->flush_mutex);
2220
2221 /* we might have raced, check again with mutex held */
2222 if (wq->first_flusher != &this_flusher)
2223 goto out_unlock;
2224
2225 wq->first_flusher = NULL;
2226
2227 BUG_ON(!list_empty(&this_flusher.list));
2228 BUG_ON(wq->flush_color != this_flusher.flush_color);
2229
2230 while (true) {
2231 struct wq_flusher *next, *tmp;
2232
2233 /* complete all the flushers sharing the current flush color */
2234 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
2235 if (next->flush_color != wq->flush_color)
2236 break;
2237 list_del_init(&next->list);
2238 complete(&next->done);
2239 }
2240
2241 BUG_ON(!list_empty(&wq->flusher_overflow) &&
2242 wq->flush_color != work_next_color(wq->work_color));
2243
2244 /* this flush_color is finished, advance by one */
2245 wq->flush_color = work_next_color(wq->flush_color);
2246
2247 /* one color has been freed, handle overflow queue */
2248 if (!list_empty(&wq->flusher_overflow)) {
2249 /*
2250 * Assign the same color to all overflowed
2251 * flushers, advance work_color and append to
2252 * flusher_queue. This is the start-to-wait
2253 * phase for these overflowed flushers.
2254 */
2255 list_for_each_entry(tmp, &wq->flusher_overflow, list)
2256 tmp->flush_color = wq->work_color;
2257
2258 wq->work_color = work_next_color(wq->work_color);
2259
2260 list_splice_tail_init(&wq->flusher_overflow,
2261 &wq->flusher_queue);
2262 flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
2263 }
2264
2265 if (list_empty(&wq->flusher_queue)) {
2266 BUG_ON(wq->flush_color != wq->work_color);
2267 break;
2268 }
2269
2270 /*
2271 * Need to flush more colors. Make the next flusher
2272 * the new first flusher and arm cwqs.
2273 */
2274 BUG_ON(wq->flush_color == wq->work_color);
2275 BUG_ON(wq->flush_color != next->flush_color);
2276
2277 list_del_init(&next->list);
2278 wq->first_flusher = next;
2279
2280 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
2281 break;
2282
2283 /*
2284 * Meh... this color is already done, clear first
2285 * flusher and repeat cascading.
2286 */
2287 wq->first_flusher = NULL;
2288 }
2289
2290out_unlock:
2291 mutex_unlock(&wq->flush_mutex);
550} 2292}
551EXPORT_SYMBOL_GPL(flush_workqueue); 2293EXPORT_SYMBOL_GPL(flush_workqueue);
552 2294
@@ -562,43 +2304,46 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
562 */ 2304 */
563int flush_work(struct work_struct *work) 2305int flush_work(struct work_struct *work)
564{ 2306{
2307 struct worker *worker = NULL;
2308 struct global_cwq *gcwq;
565 struct cpu_workqueue_struct *cwq; 2309 struct cpu_workqueue_struct *cwq;
566 struct list_head *prev;
567 struct wq_barrier barr; 2310 struct wq_barrier barr;
568 2311
569 might_sleep(); 2312 might_sleep();
570 cwq = get_wq_data(work); 2313 gcwq = get_work_gcwq(work);
571 if (!cwq) 2314 if (!gcwq)
572 return 0; 2315 return 0;
573 2316
574 lock_map_acquire(&cwq->wq->lockdep_map); 2317 spin_lock_irq(&gcwq->lock);
575 lock_map_release(&cwq->wq->lockdep_map);
576
577 prev = NULL;
578 spin_lock_irq(&cwq->lock);
579 if (!list_empty(&work->entry)) { 2318 if (!list_empty(&work->entry)) {
580 /* 2319 /*
581 * See the comment near try_to_grab_pending()->smp_rmb(). 2320 * See the comment near try_to_grab_pending()->smp_rmb().
582 * If it was re-queued under us we are not going to wait. 2321 * If it was re-queued to a different gcwq under us, we
2322 * are not going to wait.
583 */ 2323 */
584 smp_rmb(); 2324 smp_rmb();
585 if (unlikely(cwq != get_wq_data(work))) 2325 cwq = get_work_cwq(work);
586 goto out; 2326 if (unlikely(!cwq || gcwq != cwq->gcwq))
587 prev = &work->entry; 2327 goto already_gone;
588 } else { 2328 } else {
589 if (cwq->current_work != work) 2329 worker = find_worker_executing_work(gcwq, work);
590 goto out; 2330 if (!worker)
591 prev = &cwq->worklist; 2331 goto already_gone;
2332 cwq = worker->current_cwq;
592 } 2333 }
593 insert_wq_barrier(cwq, &barr, prev->next); 2334
594out: 2335 insert_wq_barrier(cwq, &barr, work, worker);
595 spin_unlock_irq(&cwq->lock); 2336 spin_unlock_irq(&gcwq->lock);
596 if (!prev) 2337
597 return 0; 2338 lock_map_acquire(&cwq->wq->lockdep_map);
2339 lock_map_release(&cwq->wq->lockdep_map);
598 2340
599 wait_for_completion(&barr.done); 2341 wait_for_completion(&barr.done);
600 destroy_work_on_stack(&barr.work); 2342 destroy_work_on_stack(&barr.work);
601 return 1; 2343 return 1;
2344already_gone:
2345 spin_unlock_irq(&gcwq->lock);
2346 return 0;
602} 2347}
603EXPORT_SYMBOL_GPL(flush_work); 2348EXPORT_SYMBOL_GPL(flush_work);
604 2349
@@ -608,54 +2353,55 @@ EXPORT_SYMBOL_GPL(flush_work);
608 */ 2353 */
609static int try_to_grab_pending(struct work_struct *work) 2354static int try_to_grab_pending(struct work_struct *work)
610{ 2355{
611 struct cpu_workqueue_struct *cwq; 2356 struct global_cwq *gcwq;
612 int ret = -1; 2357 int ret = -1;
613 2358
614 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) 2359 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
615 return 0; 2360 return 0;
616 2361
617 /* 2362 /*
618 * The queueing is in progress, or it is already queued. Try to 2363 * The queueing is in progress, or it is already queued. Try to
619 * steal it from ->worklist without clearing WORK_STRUCT_PENDING. 2364 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
620 */ 2365 */
621 2366 gcwq = get_work_gcwq(work);
622 cwq = get_wq_data(work); 2367 if (!gcwq)
623 if (!cwq)
624 return ret; 2368 return ret;
625 2369
626 spin_lock_irq(&cwq->lock); 2370 spin_lock_irq(&gcwq->lock);
627 if (!list_empty(&work->entry)) { 2371 if (!list_empty(&work->entry)) {
628 /* 2372 /*
629 * This work is queued, but perhaps we locked the wrong cwq. 2373 * This work is queued, but perhaps we locked the wrong gcwq.
630 * In that case we must see the new value after rmb(), see 2374 * In that case we must see the new value after rmb(), see
631 * insert_work()->wmb(). 2375 * insert_work()->wmb().
632 */ 2376 */
633 smp_rmb(); 2377 smp_rmb();
634 if (cwq == get_wq_data(work)) { 2378 if (gcwq == get_work_gcwq(work)) {
635 debug_work_deactivate(work); 2379 debug_work_deactivate(work);
636 list_del_init(&work->entry); 2380 list_del_init(&work->entry);
2381 cwq_dec_nr_in_flight(get_work_cwq(work),
2382 get_work_color(work));
637 ret = 1; 2383 ret = 1;
638 } 2384 }
639 } 2385 }
640 spin_unlock_irq(&cwq->lock); 2386 spin_unlock_irq(&gcwq->lock);
641 2387
642 return ret; 2388 return ret;
643} 2389}
644 2390
645static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, 2391static void wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
646 struct work_struct *work)
647{ 2392{
648 struct wq_barrier barr; 2393 struct wq_barrier barr;
649 int running = 0; 2394 struct worker *worker;
650 2395
651 spin_lock_irq(&cwq->lock); 2396 spin_lock_irq(&gcwq->lock);
652 if (unlikely(cwq->current_work == work)) { 2397
653 insert_wq_barrier(cwq, &barr, cwq->worklist.next); 2398 worker = find_worker_executing_work(gcwq, work);
654 running = 1; 2399 if (unlikely(worker))
655 } 2400 insert_wq_barrier(worker->current_cwq, &barr, work, worker);
656 spin_unlock_irq(&cwq->lock);
657 2401
658 if (unlikely(running)) { 2402 spin_unlock_irq(&gcwq->lock);
2403
2404 if (unlikely(worker)) {
659 wait_for_completion(&barr.done); 2405 wait_for_completion(&barr.done);
660 destroy_work_on_stack(&barr.work); 2406 destroy_work_on_stack(&barr.work);
661 } 2407 }
@@ -663,9 +2409,6 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
663 2409
664static void wait_on_work(struct work_struct *work) 2410static void wait_on_work(struct work_struct *work)
665{ 2411{
666 struct cpu_workqueue_struct *cwq;
667 struct workqueue_struct *wq;
668 const struct cpumask *cpu_map;
669 int cpu; 2412 int cpu;
670 2413
671 might_sleep(); 2414 might_sleep();
@@ -673,15 +2416,8 @@ static void wait_on_work(struct work_struct *work)
673 lock_map_acquire(&work->lockdep_map); 2416 lock_map_acquire(&work->lockdep_map);
674 lock_map_release(&work->lockdep_map); 2417 lock_map_release(&work->lockdep_map);
675 2418
676 cwq = get_wq_data(work); 2419 for_each_gcwq_cpu(cpu)
677 if (!cwq) 2420 wait_on_cpu_work(get_gcwq(cpu), work);
678 return;
679
680 wq = cwq->wq;
681 cpu_map = wq_cpu_map(wq);
682
683 for_each_cpu(cpu, cpu_map)
684 wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
685} 2421}
686 2422
687static int __cancel_work_timer(struct work_struct *work, 2423static int __cancel_work_timer(struct work_struct *work,
@@ -696,7 +2432,7 @@ static int __cancel_work_timer(struct work_struct *work,
696 wait_on_work(work); 2432 wait_on_work(work);
697 } while (unlikely(ret < 0)); 2433 } while (unlikely(ret < 0));
698 2434
699 clear_wq_data(work); 2435 clear_work_data(work);
700 return ret; 2436 return ret;
701} 2437}
702 2438
@@ -742,8 +2478,6 @@ int cancel_delayed_work_sync(struct delayed_work *dwork)
742} 2478}
743EXPORT_SYMBOL(cancel_delayed_work_sync); 2479EXPORT_SYMBOL(cancel_delayed_work_sync);
744 2480
745static struct workqueue_struct *keventd_wq __read_mostly;
746
747/** 2481/**
748 * schedule_work - put work task in global workqueue 2482 * schedule_work - put work task in global workqueue
749 * @work: job to be done 2483 * @work: job to be done
@@ -757,7 +2491,7 @@ static struct workqueue_struct *keventd_wq __read_mostly;
757 */ 2491 */
758int schedule_work(struct work_struct *work) 2492int schedule_work(struct work_struct *work)
759{ 2493{
760 return queue_work(keventd_wq, work); 2494 return queue_work(system_wq, work);
761} 2495}
762EXPORT_SYMBOL(schedule_work); 2496EXPORT_SYMBOL(schedule_work);
763 2497
@@ -770,7 +2504,7 @@ EXPORT_SYMBOL(schedule_work);
770 */ 2504 */
771int schedule_work_on(int cpu, struct work_struct *work) 2505int schedule_work_on(int cpu, struct work_struct *work)
772{ 2506{
773 return queue_work_on(cpu, keventd_wq, work); 2507 return queue_work_on(cpu, system_wq, work);
774} 2508}
775EXPORT_SYMBOL(schedule_work_on); 2509EXPORT_SYMBOL(schedule_work_on);
776 2510
@@ -785,7 +2519,7 @@ EXPORT_SYMBOL(schedule_work_on);
785int schedule_delayed_work(struct delayed_work *dwork, 2519int schedule_delayed_work(struct delayed_work *dwork,
786 unsigned long delay) 2520 unsigned long delay)
787{ 2521{
788 return queue_delayed_work(keventd_wq, dwork, delay); 2522 return queue_delayed_work(system_wq, dwork, delay);
789} 2523}
790EXPORT_SYMBOL(schedule_delayed_work); 2524EXPORT_SYMBOL(schedule_delayed_work);
791 2525
@@ -798,9 +2532,8 @@ EXPORT_SYMBOL(schedule_delayed_work);
798void flush_delayed_work(struct delayed_work *dwork) 2532void flush_delayed_work(struct delayed_work *dwork)
799{ 2533{
800 if (del_timer_sync(&dwork->timer)) { 2534 if (del_timer_sync(&dwork->timer)) {
801 struct cpu_workqueue_struct *cwq; 2535 __queue_work(get_cpu(), get_work_cwq(&dwork->work)->wq,
802 cwq = wq_per_cpu(get_wq_data(&dwork->work)->wq, get_cpu()); 2536 &dwork->work);
803 __queue_work(cwq, &dwork->work);
804 put_cpu(); 2537 put_cpu();
805 } 2538 }
806 flush_work(&dwork->work); 2539 flush_work(&dwork->work);
@@ -819,7 +2552,7 @@ EXPORT_SYMBOL(flush_delayed_work);
819int schedule_delayed_work_on(int cpu, 2552int schedule_delayed_work_on(int cpu,
820 struct delayed_work *dwork, unsigned long delay) 2553 struct delayed_work *dwork, unsigned long delay)
821{ 2554{
822 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay); 2555 return queue_delayed_work_on(cpu, system_wq, dwork, delay);
823} 2556}
824EXPORT_SYMBOL(schedule_delayed_work_on); 2557EXPORT_SYMBOL(schedule_delayed_work_on);
825 2558
@@ -835,7 +2568,6 @@ EXPORT_SYMBOL(schedule_delayed_work_on);
835int schedule_on_each_cpu(work_func_t func) 2568int schedule_on_each_cpu(work_func_t func)
836{ 2569{
837 int cpu; 2570 int cpu;
838 int orig = -1;
839 struct work_struct *works; 2571 struct work_struct *works;
840 2572
841 works = alloc_percpu(struct work_struct); 2573 works = alloc_percpu(struct work_struct);
@@ -844,23 +2576,12 @@ int schedule_on_each_cpu(work_func_t func)
844 2576
845 get_online_cpus(); 2577 get_online_cpus();
846 2578
847 /*
848 * When running in keventd don't schedule a work item on
849 * itself. Can just call directly because the work queue is
850 * already bound. This also is faster.
851 */
852 if (current_is_keventd())
853 orig = raw_smp_processor_id();
854
855 for_each_online_cpu(cpu) { 2579 for_each_online_cpu(cpu) {
856 struct work_struct *work = per_cpu_ptr(works, cpu); 2580 struct work_struct *work = per_cpu_ptr(works, cpu);
857 2581
858 INIT_WORK(work, func); 2582 INIT_WORK(work, func);
859 if (cpu != orig) 2583 schedule_work_on(cpu, work);
860 schedule_work_on(cpu, work);
861 } 2584 }
862 if (orig >= 0)
863 func(per_cpu_ptr(works, orig));
864 2585
865 for_each_online_cpu(cpu) 2586 for_each_online_cpu(cpu)
866 flush_work(per_cpu_ptr(works, cpu)); 2587 flush_work(per_cpu_ptr(works, cpu));
@@ -896,7 +2617,7 @@ int schedule_on_each_cpu(work_func_t func)
896 */ 2617 */
897void flush_scheduled_work(void) 2618void flush_scheduled_work(void)
898{ 2619{
899 flush_workqueue(keventd_wq); 2620 flush_workqueue(system_wq);
900} 2621}
901EXPORT_SYMBOL(flush_scheduled_work); 2622EXPORT_SYMBOL(flush_scheduled_work);
902 2623
@@ -928,170 +2649,170 @@ EXPORT_SYMBOL_GPL(execute_in_process_context);
928 2649
929int keventd_up(void) 2650int keventd_up(void)
930{ 2651{
931 return keventd_wq != NULL; 2652 return system_wq != NULL;
932} 2653}
933 2654
934int current_is_keventd(void) 2655static int alloc_cwqs(struct workqueue_struct *wq)
935{ 2656{
936 struct cpu_workqueue_struct *cwq; 2657 /*
937 int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 2658 * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
938 int ret = 0; 2659 * Make sure that the alignment isn't lower than that of
939 2660 * unsigned long long.
940 BUG_ON(!keventd_wq); 2661 */
2662 const size_t size = sizeof(struct cpu_workqueue_struct);
2663 const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
2664 __alignof__(unsigned long long));
2665#ifdef CONFIG_SMP
2666 bool percpu = !(wq->flags & WQ_UNBOUND);
2667#else
2668 bool percpu = false;
2669#endif
941 2670
942 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 2671 if (percpu)
943 if (current == cwq->thread) 2672 wq->cpu_wq.pcpu = __alloc_percpu(size, align);
944 ret = 1; 2673 else {
2674 void *ptr;
945 2675
946 return ret; 2676 /*
2677 * Allocate enough room to align cwq and put an extra
2678 * pointer at the end pointing back to the originally
2679 * allocated pointer which will be used for free.
2680 */
2681 ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL);
2682 if (ptr) {
2683 wq->cpu_wq.single = PTR_ALIGN(ptr, align);
2684 *(void **)(wq->cpu_wq.single + 1) = ptr;
2685 }
2686 }
947 2687
2688 /* just in case, make sure it's actually aligned */
2689 BUG_ON(!IS_ALIGNED(wq->cpu_wq.v, align));
2690 return wq->cpu_wq.v ? 0 : -ENOMEM;
948} 2691}
949 2692
950static struct cpu_workqueue_struct * 2693static void free_cwqs(struct workqueue_struct *wq)
951init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
952{ 2694{
953 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 2695#ifdef CONFIG_SMP
954 2696 bool percpu = !(wq->flags & WQ_UNBOUND);
955 cwq->wq = wq; 2697#else
956 spin_lock_init(&cwq->lock); 2698 bool percpu = false;
957 INIT_LIST_HEAD(&cwq->worklist); 2699#endif
958 init_waitqueue_head(&cwq->more_work);
959 2700
960 return cwq; 2701 if (percpu)
2702 free_percpu(wq->cpu_wq.pcpu);
2703 else if (wq->cpu_wq.single) {
2704 /* the pointer to free is stored right after the cwq */
2705 kfree(*(void **)(wq->cpu_wq.single + 1));
2706 }
961} 2707}
962 2708
963static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 2709static int wq_clamp_max_active(int max_active, unsigned int flags,
2710 const char *name)
964{ 2711{
965 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 }; 2712 int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
966 struct workqueue_struct *wq = cwq->wq;
967 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
968 struct task_struct *p;
969 2713
970 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); 2714 if (max_active < 1 || max_active > lim)
971 /* 2715 printk(KERN_WARNING "workqueue: max_active %d requested for %s "
972 * Nobody can add the work_struct to this cwq, 2716 "is out of range, clamping between %d and %d\n",
973 * if (caller is __create_workqueue) 2717 max_active, name, 1, lim);
974 * nobody should see this wq
975 * else // caller is CPU_UP_PREPARE
976 * cpu is not on cpu_online_map
977 * so we can abort safely.
978 */
979 if (IS_ERR(p))
980 return PTR_ERR(p);
981 if (cwq->wq->rt)
982 sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
983 cwq->thread = p;
984
985 trace_workqueue_creation(cwq->thread, cpu);
986 2718
987 return 0; 2719 return clamp_val(max_active, 1, lim);
988} 2720}
989 2721
990static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 2722struct workqueue_struct *__alloc_workqueue_key(const char *name,
2723 unsigned int flags,
2724 int max_active,
2725 struct lock_class_key *key,
2726 const char *lock_name)
991{ 2727{
992 struct task_struct *p = cwq->thread; 2728 struct workqueue_struct *wq;
2729 unsigned int cpu;
993 2730
994 if (p != NULL) { 2731 /*
995 if (cpu >= 0) 2732 * Unbound workqueues aren't concurrency managed and should be
996 kthread_bind(p, cpu); 2733 * dispatched to workers immediately.
997 wake_up_process(p); 2734 */
998 } 2735 if (flags & WQ_UNBOUND)
999} 2736 flags |= WQ_HIGHPRI;
1000 2737
1001struct workqueue_struct *__create_workqueue_key(const char *name, 2738 max_active = max_active ?: WQ_DFL_ACTIVE;
1002 int singlethread, 2739 max_active = wq_clamp_max_active(max_active, flags, name);
1003 int freezeable,
1004 int rt,
1005 struct lock_class_key *key,
1006 const char *lock_name)
1007{
1008 struct workqueue_struct *wq;
1009 struct cpu_workqueue_struct *cwq;
1010 int err = 0, cpu;
1011 2740
1012 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 2741 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1013 if (!wq) 2742 if (!wq)
1014 return NULL; 2743 goto err;
1015 2744
1016 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 2745 wq->flags = flags;
1017 if (!wq->cpu_wq) { 2746 wq->saved_max_active = max_active;
1018 kfree(wq); 2747 mutex_init(&wq->flush_mutex);
1019 return NULL; 2748 atomic_set(&wq->nr_cwqs_to_flush, 0);
1020 } 2749 INIT_LIST_HEAD(&wq->flusher_queue);
2750 INIT_LIST_HEAD(&wq->flusher_overflow);
1021 2751
1022 wq->name = name; 2752 wq->name = name;
1023 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 2753 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1024 wq->singlethread = singlethread;
1025 wq->freezeable = freezeable;
1026 wq->rt = rt;
1027 INIT_LIST_HEAD(&wq->list); 2754 INIT_LIST_HEAD(&wq->list);
1028 2755
1029 if (singlethread) { 2756 if (alloc_cwqs(wq) < 0)
1030 cwq = init_cpu_workqueue(wq, singlethread_cpu); 2757 goto err;
1031 err = create_workqueue_thread(cwq, singlethread_cpu); 2758
1032 start_workqueue_thread(cwq, -1); 2759 for_each_cwq_cpu(cpu, wq) {
1033 } else { 2760 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1034 cpu_maps_update_begin(); 2761 struct global_cwq *gcwq = get_gcwq(cpu);
1035 /* 2762
1036 * We must place this wq on list even if the code below fails. 2763 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
1037 * cpu_down(cpu) can remove cpu from cpu_populated_map before 2764 cwq->gcwq = gcwq;
1038 * destroy_workqueue() takes the lock, in that case we leak 2765 cwq->wq = wq;
1039 * cwq[cpu]->thread. 2766 cwq->flush_color = -1;
1040 */ 2767 cwq->max_active = max_active;
1041 spin_lock(&workqueue_lock); 2768 INIT_LIST_HEAD(&cwq->delayed_works);
1042 list_add(&wq->list, &workqueues);
1043 spin_unlock(&workqueue_lock);
1044 /*
1045 * We must initialize cwqs for each possible cpu even if we
1046 * are going to call destroy_workqueue() finally. Otherwise
1047 * cpu_up() can hit the uninitialized cwq once we drop the
1048 * lock.
1049 */
1050 for_each_possible_cpu(cpu) {
1051 cwq = init_cpu_workqueue(wq, cpu);
1052 if (err || !cpu_online(cpu))
1053 continue;
1054 err = create_workqueue_thread(cwq, cpu);
1055 start_workqueue_thread(cwq, cpu);
1056 }
1057 cpu_maps_update_done();
1058 } 2769 }
1059 2770
1060 if (err) { 2771 if (flags & WQ_RESCUER) {
1061 destroy_workqueue(wq); 2772 struct worker *rescuer;
1062 wq = NULL; 2773
2774 if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))
2775 goto err;
2776
2777 wq->rescuer = rescuer = alloc_worker();
2778 if (!rescuer)
2779 goto err;
2780
2781 rescuer->task = kthread_create(rescuer_thread, wq, "%s", name);
2782 if (IS_ERR(rescuer->task))
2783 goto err;
2784
2785 wq->rescuer = rescuer;
2786 rescuer->task->flags |= PF_THREAD_BOUND;
2787 wake_up_process(rescuer->task);
1063 } 2788 }
1064 return wq;
1065}
1066EXPORT_SYMBOL_GPL(__create_workqueue_key);
1067 2789
1068static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
1069{
1070 /* 2790 /*
1071 * Our caller is either destroy_workqueue() or CPU_POST_DEAD, 2791 * workqueue_lock protects global freeze state and workqueues
1072 * cpu_add_remove_lock protects cwq->thread. 2792 * list. Grab it, set max_active accordingly and add the new
2793 * workqueue to workqueues list.
1073 */ 2794 */
1074 if (cwq->thread == NULL) 2795 spin_lock(&workqueue_lock);
1075 return;
1076 2796
1077 lock_map_acquire(&cwq->wq->lockdep_map); 2797 if (workqueue_freezing && wq->flags & WQ_FREEZEABLE)
1078 lock_map_release(&cwq->wq->lockdep_map); 2798 for_each_cwq_cpu(cpu, wq)
2799 get_cwq(cpu, wq)->max_active = 0;
1079 2800
1080 flush_cpu_workqueue(cwq); 2801 list_add(&wq->list, &workqueues);
1081 /* 2802
1082 * If the caller is CPU_POST_DEAD and cwq->worklist was not empty, 2803 spin_unlock(&workqueue_lock);
1083 * a concurrent flush_workqueue() can insert a barrier after us. 2804
1084 * However, in that case run_workqueue() won't return and check 2805 return wq;
1085 * kthread_should_stop() until it flushes all work_struct's. 2806err:
1086 * When ->worklist becomes empty it is safe to exit because no 2807 if (wq) {
1087 * more work_structs can be queued on this cwq: flush_workqueue 2808 free_cwqs(wq);
1088 * checks list_empty(), and a "normal" queue_work() can't use 2809 free_mayday_mask(wq->mayday_mask);
1089 * a dead CPU. 2810 kfree(wq->rescuer);
1090 */ 2811 kfree(wq);
1091 trace_workqueue_destruction(cwq->thread); 2812 }
1092 kthread_stop(cwq->thread); 2813 return NULL;
1093 cwq->thread = NULL;
1094} 2814}
2815EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
1095 2816
1096/** 2817/**
1097 * destroy_workqueue - safely terminate a workqueue 2818 * destroy_workqueue - safely terminate a workqueue
@@ -1101,72 +2822,516 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
1101 */ 2822 */
1102void destroy_workqueue(struct workqueue_struct *wq) 2823void destroy_workqueue(struct workqueue_struct *wq)
1103{ 2824{
1104 const struct cpumask *cpu_map = wq_cpu_map(wq); 2825 unsigned int cpu;
1105 int cpu; 2826
2827 flush_workqueue(wq);
1106 2828
1107 cpu_maps_update_begin(); 2829 /*
2830 * wq list is used to freeze wq, remove from list after
2831 * flushing is complete in case freeze races us.
2832 */
1108 spin_lock(&workqueue_lock); 2833 spin_lock(&workqueue_lock);
1109 list_del(&wq->list); 2834 list_del(&wq->list);
1110 spin_unlock(&workqueue_lock); 2835 spin_unlock(&workqueue_lock);
1111 2836
1112 for_each_cpu(cpu, cpu_map) 2837 /* sanity check */
1113 cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); 2838 for_each_cwq_cpu(cpu, wq) {
1114 cpu_maps_update_done(); 2839 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2840 int i;
2841
2842 for (i = 0; i < WORK_NR_COLORS; i++)
2843 BUG_ON(cwq->nr_in_flight[i]);
2844 BUG_ON(cwq->nr_active);
2845 BUG_ON(!list_empty(&cwq->delayed_works));
2846 }
2847
2848 if (wq->flags & WQ_RESCUER) {
2849 kthread_stop(wq->rescuer->task);
2850 free_mayday_mask(wq->mayday_mask);
2851 }
1115 2852
1116 free_percpu(wq->cpu_wq); 2853 free_cwqs(wq);
1117 kfree(wq); 2854 kfree(wq);
1118} 2855}
1119EXPORT_SYMBOL_GPL(destroy_workqueue); 2856EXPORT_SYMBOL_GPL(destroy_workqueue);
1120 2857
2858/**
2859 * workqueue_set_max_active - adjust max_active of a workqueue
2860 * @wq: target workqueue
2861 * @max_active: new max_active value.
2862 *
2863 * Set max_active of @wq to @max_active.
2864 *
2865 * CONTEXT:
2866 * Don't call from IRQ context.
2867 */
2868void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
2869{
2870 unsigned int cpu;
2871
2872 max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
2873
2874 spin_lock(&workqueue_lock);
2875
2876 wq->saved_max_active = max_active;
2877
2878 for_each_cwq_cpu(cpu, wq) {
2879 struct global_cwq *gcwq = get_gcwq(cpu);
2880
2881 spin_lock_irq(&gcwq->lock);
2882
2883 if (!(wq->flags & WQ_FREEZEABLE) ||
2884 !(gcwq->flags & GCWQ_FREEZING))
2885 get_cwq(gcwq->cpu, wq)->max_active = max_active;
2886
2887 spin_unlock_irq(&gcwq->lock);
2888 }
2889
2890 spin_unlock(&workqueue_lock);
2891}
2892EXPORT_SYMBOL_GPL(workqueue_set_max_active);
2893
2894/**
2895 * workqueue_congested - test whether a workqueue is congested
2896 * @cpu: CPU in question
2897 * @wq: target workqueue
2898 *
2899 * Test whether @wq's cpu workqueue for @cpu is congested. There is
2900 * no synchronization around this function and the test result is
2901 * unreliable and only useful as advisory hints or for debugging.
2902 *
2903 * RETURNS:
2904 * %true if congested, %false otherwise.
2905 */
2906bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq)
2907{
2908 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2909
2910 return !list_empty(&cwq->delayed_works);
2911}
2912EXPORT_SYMBOL_GPL(workqueue_congested);
2913
2914/**
2915 * work_cpu - return the last known associated cpu for @work
2916 * @work: the work of interest
2917 *
2918 * RETURNS:
2919 * CPU number if @work was ever queued. WORK_CPU_NONE otherwise.
2920 */
2921unsigned int work_cpu(struct work_struct *work)
2922{
2923 struct global_cwq *gcwq = get_work_gcwq(work);
2924
2925 return gcwq ? gcwq->cpu : WORK_CPU_NONE;
2926}
2927EXPORT_SYMBOL_GPL(work_cpu);
2928
2929/**
2930 * work_busy - test whether a work is currently pending or running
2931 * @work: the work to be tested
2932 *
2933 * Test whether @work is currently pending or running. There is no
2934 * synchronization around this function and the test result is
2935 * unreliable and only useful as advisory hints or for debugging.
2936 * Especially for reentrant wqs, the pending state might hide the
2937 * running state.
2938 *
2939 * RETURNS:
2940 * OR'd bitmask of WORK_BUSY_* bits.
2941 */
2942unsigned int work_busy(struct work_struct *work)
2943{
2944 struct global_cwq *gcwq = get_work_gcwq(work);
2945 unsigned long flags;
2946 unsigned int ret = 0;
2947
2948 if (!gcwq)
2949 return false;
2950
2951 spin_lock_irqsave(&gcwq->lock, flags);
2952
2953 if (work_pending(work))
2954 ret |= WORK_BUSY_PENDING;
2955 if (find_worker_executing_work(gcwq, work))
2956 ret |= WORK_BUSY_RUNNING;
2957
2958 spin_unlock_irqrestore(&gcwq->lock, flags);
2959
2960 return ret;
2961}
2962EXPORT_SYMBOL_GPL(work_busy);
2963
2964/*
2965 * CPU hotplug.
2966 *
2967 * There are two challenges in supporting CPU hotplug. Firstly, there
2968 * are a lot of assumptions on strong associations among work, cwq and
2969 * gcwq which make migrating pending and scheduled works very
2970 * difficult to implement without impacting hot paths. Secondly,
2971 * gcwqs serve mix of short, long and very long running works making
2972 * blocked draining impractical.
2973 *
2974 * This is solved by allowing a gcwq to be detached from CPU, running
2975 * it with unbound (rogue) workers and allowing it to be reattached
2976 * later if the cpu comes back online. A separate thread is created
2977 * to govern a gcwq in such state and is called the trustee of the
2978 * gcwq.
2979 *
2980 * Trustee states and their descriptions.
2981 *
2982 * START Command state used on startup. On CPU_DOWN_PREPARE, a
2983 * new trustee is started with this state.
2984 *
2985 * IN_CHARGE Once started, trustee will enter this state after
2986 * assuming the manager role and making all existing
2987 * workers rogue. DOWN_PREPARE waits for trustee to
2988 * enter this state. After reaching IN_CHARGE, trustee
2989 * tries to execute the pending worklist until it's empty
2990 * and the state is set to BUTCHER, or the state is set
2991 * to RELEASE.
2992 *
2993 * BUTCHER Command state which is set by the cpu callback after
2994 * the cpu has went down. Once this state is set trustee
2995 * knows that there will be no new works on the worklist
2996 * and once the worklist is empty it can proceed to
2997 * killing idle workers.
2998 *
2999 * RELEASE Command state which is set by the cpu callback if the
3000 * cpu down has been canceled or it has come online
3001 * again. After recognizing this state, trustee stops
3002 * trying to drain or butcher and clears ROGUE, rebinds
3003 * all remaining workers back to the cpu and releases
3004 * manager role.
3005 *
3006 * DONE Trustee will enter this state after BUTCHER or RELEASE
3007 * is complete.
3008 *
3009 * trustee CPU draining
3010 * took over down complete
3011 * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
3012 * | | ^
3013 * | CPU is back online v return workers |
3014 * ----------------> RELEASE --------------
3015 */
3016
3017/**
3018 * trustee_wait_event_timeout - timed event wait for trustee
3019 * @cond: condition to wait for
3020 * @timeout: timeout in jiffies
3021 *
3022 * wait_event_timeout() for trustee to use. Handles locking and
3023 * checks for RELEASE request.
3024 *
3025 * CONTEXT:
3026 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3027 * multiple times. To be used by trustee.
3028 *
3029 * RETURNS:
3030 * Positive indicating left time if @cond is satisfied, 0 if timed
3031 * out, -1 if canceled.
3032 */
3033#define trustee_wait_event_timeout(cond, timeout) ({ \
3034 long __ret = (timeout); \
3035 while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) && \
3036 __ret) { \
3037 spin_unlock_irq(&gcwq->lock); \
3038 __wait_event_timeout(gcwq->trustee_wait, (cond) || \
3039 (gcwq->trustee_state == TRUSTEE_RELEASE), \
3040 __ret); \
3041 spin_lock_irq(&gcwq->lock); \
3042 } \
3043 gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret); \
3044})
3045
3046/**
3047 * trustee_wait_event - event wait for trustee
3048 * @cond: condition to wait for
3049 *
3050 * wait_event() for trustee to use. Automatically handles locking and
3051 * checks for CANCEL request.
3052 *
3053 * CONTEXT:
3054 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3055 * multiple times. To be used by trustee.
3056 *
3057 * RETURNS:
3058 * 0 if @cond is satisfied, -1 if canceled.
3059 */
3060#define trustee_wait_event(cond) ({ \
3061 long __ret1; \
3062 __ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
3063 __ret1 < 0 ? -1 : 0; \
3064})
3065
3066static int __cpuinit trustee_thread(void *__gcwq)
3067{
3068 struct global_cwq *gcwq = __gcwq;
3069 struct worker *worker;
3070 struct work_struct *work;
3071 struct hlist_node *pos;
3072 long rc;
3073 int i;
3074
3075 BUG_ON(gcwq->cpu != smp_processor_id());
3076
3077 spin_lock_irq(&gcwq->lock);
3078 /*
3079 * Claim the manager position and make all workers rogue.
3080 * Trustee must be bound to the target cpu and can't be
3081 * cancelled.
3082 */
3083 BUG_ON(gcwq->cpu != smp_processor_id());
3084 rc = trustee_wait_event(!(gcwq->flags & GCWQ_MANAGING_WORKERS));
3085 BUG_ON(rc < 0);
3086
3087 gcwq->flags |= GCWQ_MANAGING_WORKERS;
3088
3089 list_for_each_entry(worker, &gcwq->idle_list, entry)
3090 worker->flags |= WORKER_ROGUE;
3091
3092 for_each_busy_worker(worker, i, pos, gcwq)
3093 worker->flags |= WORKER_ROGUE;
3094
3095 /*
3096 * Call schedule() so that we cross rq->lock and thus can
3097 * guarantee sched callbacks see the rogue flag. This is
3098 * necessary as scheduler callbacks may be invoked from other
3099 * cpus.
3100 */
3101 spin_unlock_irq(&gcwq->lock);
3102 schedule();
3103 spin_lock_irq(&gcwq->lock);
3104
3105 /*
3106 * Sched callbacks are disabled now. Zap nr_running. After
3107 * this, nr_running stays zero and need_more_worker() and
3108 * keep_working() are always true as long as the worklist is
3109 * not empty.
3110 */
3111 atomic_set(get_gcwq_nr_running(gcwq->cpu), 0);
3112
3113 spin_unlock_irq(&gcwq->lock);
3114 del_timer_sync(&gcwq->idle_timer);
3115 spin_lock_irq(&gcwq->lock);
3116
3117 /*
3118 * We're now in charge. Notify and proceed to drain. We need
3119 * to keep the gcwq running during the whole CPU down
3120 * procedure as other cpu hotunplug callbacks may need to
3121 * flush currently running tasks.
3122 */
3123 gcwq->trustee_state = TRUSTEE_IN_CHARGE;
3124 wake_up_all(&gcwq->trustee_wait);
3125
3126 /*
3127 * The original cpu is in the process of dying and may go away
3128 * anytime now. When that happens, we and all workers would
3129 * be migrated to other cpus. Try draining any left work. We
3130 * want to get it over with ASAP - spam rescuers, wake up as
3131 * many idlers as necessary and create new ones till the
3132 * worklist is empty. Note that if the gcwq is frozen, there
3133 * may be frozen works in freezeable cwqs. Don't declare
3134 * completion while frozen.
3135 */
3136 while (gcwq->nr_workers != gcwq->nr_idle ||
3137 gcwq->flags & GCWQ_FREEZING ||
3138 gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
3139 int nr_works = 0;
3140
3141 list_for_each_entry(work, &gcwq->worklist, entry) {
3142 send_mayday(work);
3143 nr_works++;
3144 }
3145
3146 list_for_each_entry(worker, &gcwq->idle_list, entry) {
3147 if (!nr_works--)
3148 break;
3149 wake_up_process(worker->task);
3150 }
3151
3152 if (need_to_create_worker(gcwq)) {
3153 spin_unlock_irq(&gcwq->lock);
3154 worker = create_worker(gcwq, false);
3155 spin_lock_irq(&gcwq->lock);
3156 if (worker) {
3157 worker->flags |= WORKER_ROGUE;
3158 start_worker(worker);
3159 }
3160 }
3161
3162 /* give a breather */
3163 if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
3164 break;
3165 }
3166
3167 /*
3168 * Either all works have been scheduled and cpu is down, or
3169 * cpu down has already been canceled. Wait for and butcher
3170 * all workers till we're canceled.
3171 */
3172 do {
3173 rc = trustee_wait_event(!list_empty(&gcwq->idle_list));
3174 while (!list_empty(&gcwq->idle_list))
3175 destroy_worker(list_first_entry(&gcwq->idle_list,
3176 struct worker, entry));
3177 } while (gcwq->nr_workers && rc >= 0);
3178
3179 /*
3180 * At this point, either draining has completed and no worker
3181 * is left, or cpu down has been canceled or the cpu is being
3182 * brought back up. There shouldn't be any idle one left.
3183 * Tell the remaining busy ones to rebind once it finishes the
3184 * currently scheduled works by scheduling the rebind_work.
3185 */
3186 WARN_ON(!list_empty(&gcwq->idle_list));
3187
3188 for_each_busy_worker(worker, i, pos, gcwq) {
3189 struct work_struct *rebind_work = &worker->rebind_work;
3190
3191 /*
3192 * Rebind_work may race with future cpu hotplug
3193 * operations. Use a separate flag to mark that
3194 * rebinding is scheduled.
3195 */
3196 worker->flags |= WORKER_REBIND;
3197 worker->flags &= ~WORKER_ROGUE;
3198
3199 /* queue rebind_work, wq doesn't matter, use the default one */
3200 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
3201 work_data_bits(rebind_work)))
3202 continue;
3203
3204 debug_work_activate(rebind_work);
3205 insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work,
3206 worker->scheduled.next,
3207 work_color_to_flags(WORK_NO_COLOR));
3208 }
3209
3210 /* relinquish manager role */
3211 gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
3212
3213 /* notify completion */
3214 gcwq->trustee = NULL;
3215 gcwq->trustee_state = TRUSTEE_DONE;
3216 wake_up_all(&gcwq->trustee_wait);
3217 spin_unlock_irq(&gcwq->lock);
3218 return 0;
3219}
3220
3221/**
3222 * wait_trustee_state - wait for trustee to enter the specified state
3223 * @gcwq: gcwq the trustee of interest belongs to
3224 * @state: target state to wait for
3225 *
3226 * Wait for the trustee to reach @state. DONE is already matched.
3227 *
3228 * CONTEXT:
3229 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3230 * multiple times. To be used by cpu_callback.
3231 */
3232static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
3233{
3234 if (!(gcwq->trustee_state == state ||
3235 gcwq->trustee_state == TRUSTEE_DONE)) {
3236 spin_unlock_irq(&gcwq->lock);
3237 __wait_event(gcwq->trustee_wait,
3238 gcwq->trustee_state == state ||
3239 gcwq->trustee_state == TRUSTEE_DONE);
3240 spin_lock_irq(&gcwq->lock);
3241 }
3242}
3243
1121static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 3244static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
1122 unsigned long action, 3245 unsigned long action,
1123 void *hcpu) 3246 void *hcpu)
1124{ 3247{
1125 unsigned int cpu = (unsigned long)hcpu; 3248 unsigned int cpu = (unsigned long)hcpu;
1126 struct cpu_workqueue_struct *cwq; 3249 struct global_cwq *gcwq = get_gcwq(cpu);
1127 struct workqueue_struct *wq; 3250 struct task_struct *new_trustee = NULL;
1128 int err = 0; 3251 struct worker *uninitialized_var(new_worker);
3252 unsigned long flags;
1129 3253
1130 action &= ~CPU_TASKS_FROZEN; 3254 action &= ~CPU_TASKS_FROZEN;
1131 3255
1132 switch (action) { 3256 switch (action) {
3257 case CPU_DOWN_PREPARE:
3258 new_trustee = kthread_create(trustee_thread, gcwq,
3259 "workqueue_trustee/%d\n", cpu);
3260 if (IS_ERR(new_trustee))
3261 return notifier_from_errno(PTR_ERR(new_trustee));
3262 kthread_bind(new_trustee, cpu);
3263 /* fall through */
1133 case CPU_UP_PREPARE: 3264 case CPU_UP_PREPARE:
1134 cpumask_set_cpu(cpu, cpu_populated_map); 3265 BUG_ON(gcwq->first_idle);
1135 } 3266 new_worker = create_worker(gcwq, false);
1136undo: 3267 if (!new_worker) {
1137 list_for_each_entry(wq, &workqueues, list) { 3268 if (new_trustee)
1138 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 3269 kthread_stop(new_trustee);
1139 3270 return NOTIFY_BAD;
1140 switch (action) {
1141 case CPU_UP_PREPARE:
1142 err = create_workqueue_thread(cwq, cpu);
1143 if (!err)
1144 break;
1145 printk(KERN_ERR "workqueue [%s] for %i failed\n",
1146 wq->name, cpu);
1147 action = CPU_UP_CANCELED;
1148 err = -ENOMEM;
1149 goto undo;
1150
1151 case CPU_ONLINE:
1152 start_workqueue_thread(cwq, cpu);
1153 break;
1154
1155 case CPU_UP_CANCELED:
1156 start_workqueue_thread(cwq, -1);
1157 case CPU_POST_DEAD:
1158 cleanup_workqueue_thread(cwq);
1159 break;
1160 } 3271 }
1161 } 3272 }
1162 3273
3274 /* some are called w/ irq disabled, don't disturb irq status */
3275 spin_lock_irqsave(&gcwq->lock, flags);
3276
1163 switch (action) { 3277 switch (action) {
1164 case CPU_UP_CANCELED: 3278 case CPU_DOWN_PREPARE:
3279 /* initialize trustee and tell it to acquire the gcwq */
3280 BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
3281 gcwq->trustee = new_trustee;
3282 gcwq->trustee_state = TRUSTEE_START;
3283 wake_up_process(gcwq->trustee);
3284 wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
3285 /* fall through */
3286 case CPU_UP_PREPARE:
3287 BUG_ON(gcwq->first_idle);
3288 gcwq->first_idle = new_worker;
3289 break;
3290
3291 case CPU_DYING:
3292 /*
3293 * Before this, the trustee and all workers except for
3294 * the ones which are still executing works from
3295 * before the last CPU down must be on the cpu. After
3296 * this, they'll all be diasporas.
3297 */
3298 gcwq->flags |= GCWQ_DISASSOCIATED;
3299 break;
3300
1165 case CPU_POST_DEAD: 3301 case CPU_POST_DEAD:
1166 cpumask_clear_cpu(cpu, cpu_populated_map); 3302 gcwq->trustee_state = TRUSTEE_BUTCHER;
3303 /* fall through */
3304 case CPU_UP_CANCELED:
3305 destroy_worker(gcwq->first_idle);
3306 gcwq->first_idle = NULL;
3307 break;
3308
3309 case CPU_DOWN_FAILED:
3310 case CPU_ONLINE:
3311 gcwq->flags &= ~GCWQ_DISASSOCIATED;
3312 if (gcwq->trustee_state != TRUSTEE_DONE) {
3313 gcwq->trustee_state = TRUSTEE_RELEASE;
3314 wake_up_process(gcwq->trustee);
3315 wait_trustee_state(gcwq, TRUSTEE_DONE);
3316 }
3317
3318 /*
3319 * Trustee is done and there might be no worker left.
3320 * Put the first_idle in and request a real manager to
3321 * take a look.
3322 */
3323 spin_unlock_irq(&gcwq->lock);
3324 kthread_bind(gcwq->first_idle->task, cpu);
3325 spin_lock_irq(&gcwq->lock);
3326 gcwq->flags |= GCWQ_MANAGE_WORKERS;
3327 start_worker(gcwq->first_idle);
3328 gcwq->first_idle = NULL;
3329 break;
1167 } 3330 }
1168 3331
1169 return notifier_from_errno(err); 3332 spin_unlock_irqrestore(&gcwq->lock, flags);
3333
3334 return notifier_from_errno(0);
1170} 3335}
1171 3336
1172#ifdef CONFIG_SMP 3337#ifdef CONFIG_SMP
@@ -1216,14 +3381,199 @@ long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
1216EXPORT_SYMBOL_GPL(work_on_cpu); 3381EXPORT_SYMBOL_GPL(work_on_cpu);
1217#endif /* CONFIG_SMP */ 3382#endif /* CONFIG_SMP */
1218 3383
1219void __init init_workqueues(void) 3384#ifdef CONFIG_FREEZER
3385
3386/**
3387 * freeze_workqueues_begin - begin freezing workqueues
3388 *
3389 * Start freezing workqueues. After this function returns, all
3390 * freezeable workqueues will queue new works to their frozen_works
3391 * list instead of gcwq->worklist.
3392 *
3393 * CONTEXT:
3394 * Grabs and releases workqueue_lock and gcwq->lock's.
3395 */
3396void freeze_workqueues_begin(void)
3397{
3398 unsigned int cpu;
3399
3400 spin_lock(&workqueue_lock);
3401
3402 BUG_ON(workqueue_freezing);
3403 workqueue_freezing = true;
3404
3405 for_each_gcwq_cpu(cpu) {
3406 struct global_cwq *gcwq = get_gcwq(cpu);
3407 struct workqueue_struct *wq;
3408
3409 spin_lock_irq(&gcwq->lock);
3410
3411 BUG_ON(gcwq->flags & GCWQ_FREEZING);
3412 gcwq->flags |= GCWQ_FREEZING;
3413
3414 list_for_each_entry(wq, &workqueues, list) {
3415 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3416
3417 if (cwq && wq->flags & WQ_FREEZEABLE)
3418 cwq->max_active = 0;
3419 }
3420
3421 spin_unlock_irq(&gcwq->lock);
3422 }
3423
3424 spin_unlock(&workqueue_lock);
3425}
3426
3427/**
3428 * freeze_workqueues_busy - are freezeable workqueues still busy?
3429 *
3430 * Check whether freezing is complete. This function must be called
3431 * between freeze_workqueues_begin() and thaw_workqueues().
3432 *
3433 * CONTEXT:
3434 * Grabs and releases workqueue_lock.
3435 *
3436 * RETURNS:
3437 * %true if some freezeable workqueues are still busy. %false if
3438 * freezing is complete.
3439 */
3440bool freeze_workqueues_busy(void)
3441{
3442 unsigned int cpu;
3443 bool busy = false;
3444
3445 spin_lock(&workqueue_lock);
3446
3447 BUG_ON(!workqueue_freezing);
3448
3449 for_each_gcwq_cpu(cpu) {
3450 struct workqueue_struct *wq;
3451 /*
3452 * nr_active is monotonically decreasing. It's safe
3453 * to peek without lock.
3454 */
3455 list_for_each_entry(wq, &workqueues, list) {
3456 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3457
3458 if (!cwq || !(wq->flags & WQ_FREEZEABLE))
3459 continue;
3460
3461 BUG_ON(cwq->nr_active < 0);
3462 if (cwq->nr_active) {
3463 busy = true;
3464 goto out_unlock;
3465 }
3466 }
3467 }
3468out_unlock:
3469 spin_unlock(&workqueue_lock);
3470 return busy;
3471}
3472
3473/**
3474 * thaw_workqueues - thaw workqueues
3475 *
3476 * Thaw workqueues. Normal queueing is restored and all collected
3477 * frozen works are transferred to their respective gcwq worklists.
3478 *
3479 * CONTEXT:
3480 * Grabs and releases workqueue_lock and gcwq->lock's.
3481 */
3482void thaw_workqueues(void)
3483{
3484 unsigned int cpu;
3485
3486 spin_lock(&workqueue_lock);
3487
3488 if (!workqueue_freezing)
3489 goto out_unlock;
3490
3491 for_each_gcwq_cpu(cpu) {
3492 struct global_cwq *gcwq = get_gcwq(cpu);
3493 struct workqueue_struct *wq;
3494
3495 spin_lock_irq(&gcwq->lock);
3496
3497 BUG_ON(!(gcwq->flags & GCWQ_FREEZING));
3498 gcwq->flags &= ~GCWQ_FREEZING;
3499
3500 list_for_each_entry(wq, &workqueues, list) {
3501 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3502
3503 if (!cwq || !(wq->flags & WQ_FREEZEABLE))
3504 continue;
3505
3506 /* restore max_active and repopulate worklist */
3507 cwq->max_active = wq->saved_max_active;
3508
3509 while (!list_empty(&cwq->delayed_works) &&
3510 cwq->nr_active < cwq->max_active)
3511 cwq_activate_first_delayed(cwq);
3512 }
3513
3514 wake_up_worker(gcwq);
3515
3516 spin_unlock_irq(&gcwq->lock);
3517 }
3518
3519 workqueue_freezing = false;
3520out_unlock:
3521 spin_unlock(&workqueue_lock);
3522}
3523#endif /* CONFIG_FREEZER */
3524
3525static int __init init_workqueues(void)
1220{ 3526{
1221 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL); 3527 unsigned int cpu;
3528 int i;
3529
3530 hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
3531
3532 /* initialize gcwqs */
3533 for_each_gcwq_cpu(cpu) {
3534 struct global_cwq *gcwq = get_gcwq(cpu);
3535
3536 spin_lock_init(&gcwq->lock);
3537 INIT_LIST_HEAD(&gcwq->worklist);
3538 gcwq->cpu = cpu;
3539 if (cpu == WORK_CPU_UNBOUND)
3540 gcwq->flags |= GCWQ_DISASSOCIATED;
3541
3542 INIT_LIST_HEAD(&gcwq->idle_list);
3543 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
3544 INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
3545
3546 init_timer_deferrable(&gcwq->idle_timer);
3547 gcwq->idle_timer.function = idle_worker_timeout;
3548 gcwq->idle_timer.data = (unsigned long)gcwq;
3549
3550 setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,
3551 (unsigned long)gcwq);
1222 3552
1223 cpumask_copy(cpu_populated_map, cpu_online_mask); 3553 ida_init(&gcwq->worker_ida);
1224 singlethread_cpu = cpumask_first(cpu_possible_mask); 3554
1225 cpu_singlethread_map = cpumask_of(singlethread_cpu); 3555 gcwq->trustee_state = TRUSTEE_DONE;
1226 hotcpu_notifier(workqueue_cpu_callback, 0); 3556 init_waitqueue_head(&gcwq->trustee_wait);
1227 keventd_wq = create_workqueue("events"); 3557 }
1228 BUG_ON(!keventd_wq); 3558
3559 /* create the initial worker */
3560 for_each_online_gcwq_cpu(cpu) {
3561 struct global_cwq *gcwq = get_gcwq(cpu);
3562 struct worker *worker;
3563
3564 worker = create_worker(gcwq, true);
3565 BUG_ON(!worker);
3566 spin_lock_irq(&gcwq->lock);
3567 start_worker(worker);
3568 spin_unlock_irq(&gcwq->lock);
3569 }
3570
3571 system_wq = alloc_workqueue("events", 0, 0);
3572 system_long_wq = alloc_workqueue("events_long", 0, 0);
3573 system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
3574 system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
3575 WQ_UNBOUND_MAX_ACTIVE);
3576 BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq);
3577 return 0;
1229} 3578}
3579early_initcall(init_workqueues);
diff --git a/kernel/workqueue_sched.h b/kernel/workqueue_sched.h
index af040babb742..2d10fc98dc79 100644
--- a/kernel/workqueue_sched.h
+++ b/kernel/workqueue_sched.h
@@ -4,13 +4,6 @@
4 * Scheduler hooks for concurrency managed workqueue. Only to be 4 * Scheduler hooks for concurrency managed workqueue. Only to be
5 * included from sched.c and workqueue.c. 5 * included from sched.c and workqueue.c.
6 */ 6 */
7static inline void wq_worker_waking_up(struct task_struct *task, 7void wq_worker_waking_up(struct task_struct *task, unsigned int cpu);
8 unsigned int cpu) 8struct task_struct *wq_worker_sleeping(struct task_struct *task,
9{ 9 unsigned int cpu);
10}
11
12static inline struct task_struct *wq_worker_sleeping(struct task_struct *task,
13 unsigned int cpu)
14{
15 return NULL;
16}