aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2010-07-02 04:03:52 -0400
committerTejun Heo <tj@kernel.org>2010-07-14 05:29:46 -0400
commit083b804c4d3e1e3d0eace56bdbc0f674946d2847 (patch)
treef09c2ca4f4e14edbf99030ed50a553ba0dadec47
parentc7fc77f78f16d138ca997ce096a62f46e2e9420a (diff)
async: use workqueue for worker pool
Replace private worker pool with system_unbound_wq. Signed-off-by: Tejun Heo <tj@kernel.org> Acked-by: Arjan van de Ven <arjan@infradead.org>
-rw-r--r--kernel/async.c141
1 files changed, 22 insertions, 119 deletions
diff --git a/kernel/async.c b/kernel/async.c
index 15319d6c18fe..cd9dbb913c77 100644
--- a/kernel/async.c
+++ b/kernel/async.c
@@ -49,40 +49,33 @@ asynchronous and synchronous parts of the kernel.
49*/ 49*/
50 50
51#include <linux/async.h> 51#include <linux/async.h>
52#include <linux/bug.h>
53#include <linux/module.h> 52#include <linux/module.h>
54#include <linux/wait.h> 53#include <linux/wait.h>
55#include <linux/sched.h> 54#include <linux/sched.h>
56#include <linux/init.h>
57#include <linux/kthread.h>
58#include <linux/delay.h>
59#include <linux/slab.h> 55#include <linux/slab.h>
56#include <linux/workqueue.h>
60#include <asm/atomic.h> 57#include <asm/atomic.h>
61 58
62static async_cookie_t next_cookie = 1; 59static async_cookie_t next_cookie = 1;
63 60
64#define MAX_THREADS 256
65#define MAX_WORK 32768 61#define MAX_WORK 32768
66 62
67static LIST_HEAD(async_pending); 63static LIST_HEAD(async_pending);
68static LIST_HEAD(async_running); 64static LIST_HEAD(async_running);
69static DEFINE_SPINLOCK(async_lock); 65static DEFINE_SPINLOCK(async_lock);
70 66
71static int async_enabled = 0;
72
73struct async_entry { 67struct async_entry {
74 struct list_head list; 68 struct list_head list;
75 async_cookie_t cookie; 69 struct work_struct work;
76 async_func_ptr *func; 70 async_cookie_t cookie;
77 void *data; 71 async_func_ptr *func;
78 struct list_head *running; 72 void *data;
73 struct list_head *running;
79}; 74};
80 75
81static DECLARE_WAIT_QUEUE_HEAD(async_done); 76static DECLARE_WAIT_QUEUE_HEAD(async_done);
82static DECLARE_WAIT_QUEUE_HEAD(async_new);
83 77
84static atomic_t entry_count; 78static atomic_t entry_count;
85static atomic_t thread_count;
86 79
87extern int initcall_debug; 80extern int initcall_debug;
88 81
@@ -117,27 +110,23 @@ static async_cookie_t lowest_in_progress(struct list_head *running)
117 spin_unlock_irqrestore(&async_lock, flags); 110 spin_unlock_irqrestore(&async_lock, flags);
118 return ret; 111 return ret;
119} 112}
113
120/* 114/*
121 * pick the first pending entry and run it 115 * pick the first pending entry and run it
122 */ 116 */
123static void run_one_entry(void) 117static void async_run_entry_fn(struct work_struct *work)
124{ 118{
119 struct async_entry *entry =
120 container_of(work, struct async_entry, work);
125 unsigned long flags; 121 unsigned long flags;
126 struct async_entry *entry;
127 ktime_t calltime, delta, rettime; 122 ktime_t calltime, delta, rettime;
128 123
129 /* 1) pick one task from the pending queue */ 124 /* 1) move self to the running queue */
130
131 spin_lock_irqsave(&async_lock, flags); 125 spin_lock_irqsave(&async_lock, flags);
132 if (list_empty(&async_pending))
133 goto out;
134 entry = list_first_entry(&async_pending, struct async_entry, list);
135
136 /* 2) move it to the running queue */
137 list_move_tail(&entry->list, entry->running); 126 list_move_tail(&entry->list, entry->running);
138 spin_unlock_irqrestore(&async_lock, flags); 127 spin_unlock_irqrestore(&async_lock, flags);
139 128
140 /* 3) run it (and print duration)*/ 129 /* 2) run (and print duration) */
141 if (initcall_debug && system_state == SYSTEM_BOOTING) { 130 if (initcall_debug && system_state == SYSTEM_BOOTING) {
142 printk("calling %lli_%pF @ %i\n", (long long)entry->cookie, 131 printk("calling %lli_%pF @ %i\n", (long long)entry->cookie,
143 entry->func, task_pid_nr(current)); 132 entry->func, task_pid_nr(current));
@@ -153,31 +142,25 @@ static void run_one_entry(void)
153 (long long)ktime_to_ns(delta) >> 10); 142 (long long)ktime_to_ns(delta) >> 10);
154 } 143 }
155 144
156 /* 4) remove it from the running queue */ 145 /* 3) remove self from the running queue */
157 spin_lock_irqsave(&async_lock, flags); 146 spin_lock_irqsave(&async_lock, flags);
158 list_del(&entry->list); 147 list_del(&entry->list);
159 148
160 /* 5) free the entry */ 149 /* 4) free the entry */
161 kfree(entry); 150 kfree(entry);
162 atomic_dec(&entry_count); 151 atomic_dec(&entry_count);
163 152
164 spin_unlock_irqrestore(&async_lock, flags); 153 spin_unlock_irqrestore(&async_lock, flags);
165 154
166 /* 6) wake up any waiters. */ 155 /* 5) wake up any waiters */
167 wake_up(&async_done); 156 wake_up(&async_done);
168 return;
169
170out:
171 spin_unlock_irqrestore(&async_lock, flags);
172} 157}
173 158
174
175static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct list_head *running) 159static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct list_head *running)
176{ 160{
177 struct async_entry *entry; 161 struct async_entry *entry;
178 unsigned long flags; 162 unsigned long flags;
179 async_cookie_t newcookie; 163 async_cookie_t newcookie;
180
181 164
182 /* allow irq-off callers */ 165 /* allow irq-off callers */
183 entry = kzalloc(sizeof(struct async_entry), GFP_ATOMIC); 166 entry = kzalloc(sizeof(struct async_entry), GFP_ATOMIC);
@@ -186,7 +169,7 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l
186 * If we're out of memory or if there's too much work 169 * If we're out of memory or if there's too much work
187 * pending already, we execute synchronously. 170 * pending already, we execute synchronously.
188 */ 171 */
189 if (!async_enabled || !entry || atomic_read(&entry_count) > MAX_WORK) { 172 if (!entry || atomic_read(&entry_count) > MAX_WORK) {
190 kfree(entry); 173 kfree(entry);
191 spin_lock_irqsave(&async_lock, flags); 174 spin_lock_irqsave(&async_lock, flags);
192 newcookie = next_cookie++; 175 newcookie = next_cookie++;
@@ -196,6 +179,7 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l
196 ptr(data, newcookie); 179 ptr(data, newcookie);
197 return newcookie; 180 return newcookie;
198 } 181 }
182 INIT_WORK(&entry->work, async_run_entry_fn);
199 entry->func = ptr; 183 entry->func = ptr;
200 entry->data = data; 184 entry->data = data;
201 entry->running = running; 185 entry->running = running;
@@ -205,7 +189,10 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l
205 list_add_tail(&entry->list, &async_pending); 189 list_add_tail(&entry->list, &async_pending);
206 atomic_inc(&entry_count); 190 atomic_inc(&entry_count);
207 spin_unlock_irqrestore(&async_lock, flags); 191 spin_unlock_irqrestore(&async_lock, flags);
208 wake_up(&async_new); 192
193 /* schedule for execution */
194 queue_work(system_unbound_wq, &entry->work);
195
209 return newcookie; 196 return newcookie;
210} 197}
211 198
@@ -312,87 +299,3 @@ void async_synchronize_cookie(async_cookie_t cookie)
312 async_synchronize_cookie_domain(cookie, &async_running); 299 async_synchronize_cookie_domain(cookie, &async_running);
313} 300}
314EXPORT_SYMBOL_GPL(async_synchronize_cookie); 301EXPORT_SYMBOL_GPL(async_synchronize_cookie);
315
316
317static int async_thread(void *unused)
318{
319 DECLARE_WAITQUEUE(wq, current);
320 add_wait_queue(&async_new, &wq);
321
322 while (!kthread_should_stop()) {
323 int ret = HZ;
324 set_current_state(TASK_INTERRUPTIBLE);
325 /*
326 * check the list head without lock.. false positives
327 * are dealt with inside run_one_entry() while holding
328 * the lock.
329 */
330 rmb();
331 if (!list_empty(&async_pending))
332 run_one_entry();
333 else
334 ret = schedule_timeout(HZ);
335
336 if (ret == 0) {
337 /*
338 * we timed out, this means we as thread are redundant.
339 * we sign off and die, but we to avoid any races there
340 * is a last-straw check to see if work snuck in.
341 */
342 atomic_dec(&thread_count);
343 wmb(); /* manager must see our departure first */
344 if (list_empty(&async_pending))
345 break;
346 /*
347 * woops work came in between us timing out and us
348 * signing off; we need to stay alive and keep working.
349 */
350 atomic_inc(&thread_count);
351 }
352 }
353 remove_wait_queue(&async_new, &wq);
354
355 return 0;
356}
357
358static int async_manager_thread(void *unused)
359{
360 DECLARE_WAITQUEUE(wq, current);
361 add_wait_queue(&async_new, &wq);
362
363 while (!kthread_should_stop()) {
364 int tc, ec;
365
366 set_current_state(TASK_INTERRUPTIBLE);
367
368 tc = atomic_read(&thread_count);
369 rmb();
370 ec = atomic_read(&entry_count);
371
372 while (tc < ec && tc < MAX_THREADS) {
373 if (IS_ERR(kthread_run(async_thread, NULL, "async/%i",
374 tc))) {
375 msleep(100);
376 continue;
377 }
378 atomic_inc(&thread_count);
379 tc++;
380 }
381
382 schedule();
383 }
384 remove_wait_queue(&async_new, &wq);
385
386 return 0;
387}
388
389static int __init async_init(void)
390{
391 async_enabled =
392 !IS_ERR(kthread_run(async_manager_thread, NULL, "async/mgr"));
393
394 WARN_ON(!async_enabled);
395 return 0;
396}
397
398core_initcall(async_init);