aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/slow-work.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/slow-work.c')
-rw-r--r--kernel/slow-work.c512
1 files changed, 469 insertions, 43 deletions
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
index 0d31135efbf4..00889bd3c590 100644
--- a/kernel/slow-work.c
+++ b/kernel/slow-work.c
@@ -16,11 +16,8 @@
16#include <linux/kthread.h> 16#include <linux/kthread.h>
17#include <linux/freezer.h> 17#include <linux/freezer.h>
18#include <linux/wait.h> 18#include <linux/wait.h>
19 19#include <linux/debugfs.h>
20#define SLOW_WORK_CULL_TIMEOUT (5 * HZ) /* cull threads 5s after running out of 20#include "slow-work.h"
21 * things to do */
22#define SLOW_WORK_OOM_TIMEOUT (5 * HZ) /* can't start new threads for 5s after
23 * OOM */
24 21
25static void slow_work_cull_timeout(unsigned long); 22static void slow_work_cull_timeout(unsigned long);
26static void slow_work_oom_timeout(unsigned long); 23static void slow_work_oom_timeout(unsigned long);
@@ -46,7 +43,7 @@ static unsigned vslow_work_proportion = 50; /* % of threads that may process
46 43
47#ifdef CONFIG_SYSCTL 44#ifdef CONFIG_SYSCTL
48static const int slow_work_min_min_threads = 2; 45static const int slow_work_min_min_threads = 2;
49static int slow_work_max_max_threads = 255; 46static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
50static const int slow_work_min_vslow = 1; 47static const int slow_work_min_vslow = 1;
51static const int slow_work_max_vslow = 99; 48static const int slow_work_max_vslow = 99;
52 49
@@ -98,6 +95,56 @@ static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
98static struct slow_work slow_work_new_thread; /* new thread starter */ 95static struct slow_work slow_work_new_thread; /* new thread starter */
99 96
100/* 97/*
98 * slow work ID allocation (use slow_work_queue_lock)
99 */
100static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
101
102/*
103 * Unregistration tracking to prevent put_ref() from disappearing during module
104 * unload
105 */
106#ifdef CONFIG_MODULES
107static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
108static struct module *slow_work_unreg_module;
109static struct slow_work *slow_work_unreg_work_item;
110static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
111static DEFINE_MUTEX(slow_work_unreg_sync_lock);
112
113static void slow_work_set_thread_processing(int id, struct slow_work *work)
114{
115 if (work)
116 slow_work_thread_processing[id] = work->owner;
117}
118static void slow_work_done_thread_processing(int id, struct slow_work *work)
119{
120 struct module *module = slow_work_thread_processing[id];
121
122 slow_work_thread_processing[id] = NULL;
123 smp_mb();
124 if (slow_work_unreg_work_item == work ||
125 slow_work_unreg_module == module)
126 wake_up_all(&slow_work_unreg_wq);
127}
128static void slow_work_clear_thread_processing(int id)
129{
130 slow_work_thread_processing[id] = NULL;
131}
132#else
133static void slow_work_set_thread_processing(int id, struct slow_work *work) {}
134static void slow_work_done_thread_processing(int id, struct slow_work *work) {}
135static void slow_work_clear_thread_processing(int id) {}
136#endif
137
138/*
139 * Data for tracking currently executing items for indication through /proc
140 */
141#ifdef CONFIG_SLOW_WORK_DEBUG
142struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT];
143pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT];
144DEFINE_RWLOCK(slow_work_execs_lock);
145#endif
146
147/*
101 * The queues of work items and the lock governing access to them. These are 148 * The queues of work items and the lock governing access to them. These are
102 * shared between all the CPUs. It doesn't make sense to have per-CPU queues 149 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
103 * as the number of threads bears no relation to the number of CPUs. 150 * as the number of threads bears no relation to the number of CPUs.
@@ -105,9 +152,18 @@ static struct slow_work slow_work_new_thread; /* new thread starter */
105 * There are two queues of work items: one for slow work items, and one for 152 * There are two queues of work items: one for slow work items, and one for
106 * very slow work items. 153 * very slow work items.
107 */ 154 */
108static LIST_HEAD(slow_work_queue); 155LIST_HEAD(slow_work_queue);
109static LIST_HEAD(vslow_work_queue); 156LIST_HEAD(vslow_work_queue);
110static DEFINE_SPINLOCK(slow_work_queue_lock); 157DEFINE_SPINLOCK(slow_work_queue_lock);
158
159/*
160 * The following are two wait queues that get pinged when a work item is placed
161 * on an empty queue. These allow work items that are hogging a thread by
162 * sleeping in a way that could be deferred to yield their thread and enqueue
163 * themselves.
164 */
165static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation);
166static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation);
111 167
112/* 168/*
113 * The thread controls. A variable used to signal to the threads that they 169 * The thread controls. A variable used to signal to the threads that they
@@ -126,6 +182,20 @@ static DECLARE_COMPLETION(slow_work_last_thread_exited);
126static int slow_work_user_count; 182static int slow_work_user_count;
127static DEFINE_MUTEX(slow_work_user_lock); 183static DEFINE_MUTEX(slow_work_user_lock);
128 184
185static inline int slow_work_get_ref(struct slow_work *work)
186{
187 if (work->ops->get_ref)
188 return work->ops->get_ref(work);
189
190 return 0;
191}
192
193static inline void slow_work_put_ref(struct slow_work *work)
194{
195 if (work->ops->put_ref)
196 work->ops->put_ref(work);
197}
198
129/* 199/*
130 * Calculate the maximum number of active threads in the pool that are 200 * Calculate the maximum number of active threads in the pool that are
131 * permitted to process very slow work items. 201 * permitted to process very slow work items.
@@ -149,7 +219,7 @@ static unsigned slow_work_calc_vsmax(void)
149 * Attempt to execute stuff queued on a slow thread. Return true if we managed 219 * Attempt to execute stuff queued on a slow thread. Return true if we managed
150 * it, false if there was nothing to do. 220 * it, false if there was nothing to do.
151 */ 221 */
152static bool slow_work_execute(void) 222static noinline bool slow_work_execute(int id)
153{ 223{
154 struct slow_work *work = NULL; 224 struct slow_work *work = NULL;
155 unsigned vsmax; 225 unsigned vsmax;
@@ -186,6 +256,13 @@ static bool slow_work_execute(void)
186 } else { 256 } else {
187 very_slow = false; /* avoid the compiler warning */ 257 very_slow = false; /* avoid the compiler warning */
188 } 258 }
259
260 slow_work_set_thread_processing(id, work);
261 if (work) {
262 slow_work_mark_time(work);
263 slow_work_begin_exec(id, work);
264 }
265
189 spin_unlock_irq(&slow_work_queue_lock); 266 spin_unlock_irq(&slow_work_queue_lock);
190 267
191 if (!work) 268 if (!work)
@@ -194,12 +271,19 @@ static bool slow_work_execute(void)
194 if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags)) 271 if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
195 BUG(); 272 BUG();
196 273
197 work->ops->execute(work); 274 /* don't execute if the work is in the process of being cancelled */
275 if (!test_bit(SLOW_WORK_CANCELLING, &work->flags))
276 work->ops->execute(work);
198 277
199 if (very_slow) 278 if (very_slow)
200 atomic_dec(&vslow_work_executing_count); 279 atomic_dec(&vslow_work_executing_count);
201 clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags); 280 clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
202 281
282 /* wake up anyone waiting for this work to be complete */
283 wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
284
285 slow_work_end_exec(id, work);
286
203 /* if someone tried to enqueue the item whilst we were executing it, 287 /* if someone tried to enqueue the item whilst we were executing it,
204 * then it'll be left unenqueued to avoid multiple threads trying to 288 * then it'll be left unenqueued to avoid multiple threads trying to
205 * execute it simultaneously 289 * execute it simultaneously
@@ -219,7 +303,10 @@ static bool slow_work_execute(void)
219 spin_unlock_irq(&slow_work_queue_lock); 303 spin_unlock_irq(&slow_work_queue_lock);
220 } 304 }
221 305
222 work->ops->put_ref(work); 306 /* sort out the race between module unloading and put_ref() */
307 slow_work_put_ref(work);
308 slow_work_done_thread_processing(id, work);
309
223 return true; 310 return true;
224 311
225auto_requeue: 312auto_requeue:
@@ -227,15 +314,61 @@ auto_requeue:
227 * - we transfer our ref on the item back to the appropriate queue 314 * - we transfer our ref on the item back to the appropriate queue
228 * - don't wake another thread up as we're awake already 315 * - don't wake another thread up as we're awake already
229 */ 316 */
317 slow_work_mark_time(work);
230 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) 318 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
231 list_add_tail(&work->link, &vslow_work_queue); 319 list_add_tail(&work->link, &vslow_work_queue);
232 else 320 else
233 list_add_tail(&work->link, &slow_work_queue); 321 list_add_tail(&work->link, &slow_work_queue);
234 spin_unlock_irq(&slow_work_queue_lock); 322 spin_unlock_irq(&slow_work_queue_lock);
323 slow_work_clear_thread_processing(id);
235 return true; 324 return true;
236} 325}
237 326
238/** 327/**
328 * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
329 * work: The work item under execution that wants to sleep
330 * _timeout: Scheduler sleep timeout
331 *
332 * Allow a requeueable work item to sleep on a slow-work processor thread until
333 * that thread is needed to do some other work or the sleep is interrupted by
334 * some other event.
335 *
336 * The caller must set up a wake up event before calling this and must have set
337 * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
338 * condition before calling this function as no test is made here.
339 *
340 * False is returned if there is nothing on the queue; true is returned if the
341 * work item should be requeued
342 */
343bool slow_work_sleep_till_thread_needed(struct slow_work *work,
344 signed long *_timeout)
345{
346 wait_queue_head_t *wfo_wq;
347 struct list_head *queue;
348
349 DEFINE_WAIT(wait);
350
351 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
352 wfo_wq = &vslow_work_queue_waits_for_occupation;
353 queue = &vslow_work_queue;
354 } else {
355 wfo_wq = &slow_work_queue_waits_for_occupation;
356 queue = &slow_work_queue;
357 }
358
359 if (!list_empty(queue))
360 return true;
361
362 add_wait_queue_exclusive(wfo_wq, &wait);
363 if (list_empty(queue))
364 *_timeout = schedule_timeout(*_timeout);
365 finish_wait(wfo_wq, &wait);
366
367 return !list_empty(queue);
368}
369EXPORT_SYMBOL(slow_work_sleep_till_thread_needed);
370
371/**
239 * slow_work_enqueue - Schedule a slow work item for processing 372 * slow_work_enqueue - Schedule a slow work item for processing
240 * @work: The work item to queue 373 * @work: The work item to queue
241 * 374 *
@@ -260,16 +393,22 @@ auto_requeue:
260 * allowed to pick items to execute. This ensures that very slow items won't 393 * allowed to pick items to execute. This ensures that very slow items won't
261 * overly block ones that are just ordinarily slow. 394 * overly block ones that are just ordinarily slow.
262 * 395 *
263 * Returns 0 if successful, -EAGAIN if not. 396 * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
397 * attempted queued)
264 */ 398 */
265int slow_work_enqueue(struct slow_work *work) 399int slow_work_enqueue(struct slow_work *work)
266{ 400{
401 wait_queue_head_t *wfo_wq;
402 struct list_head *queue;
267 unsigned long flags; 403 unsigned long flags;
404 int ret;
405
406 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
407 return -ECANCELED;
268 408
269 BUG_ON(slow_work_user_count <= 0); 409 BUG_ON(slow_work_user_count <= 0);
270 BUG_ON(!work); 410 BUG_ON(!work);
271 BUG_ON(!work->ops); 411 BUG_ON(!work->ops);
272 BUG_ON(!work->ops->get_ref);
273 412
274 /* when honouring an enqueue request, we only promise that we will run 413 /* when honouring an enqueue request, we only promise that we will run
275 * the work function in the future; we do not promise to run it once 414 * the work function in the future; we do not promise to run it once
@@ -280,8 +419,19 @@ int slow_work_enqueue(struct slow_work *work)
280 * maintaining our promise 419 * maintaining our promise
281 */ 420 */
282 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) { 421 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
422 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
423 wfo_wq = &vslow_work_queue_waits_for_occupation;
424 queue = &vslow_work_queue;
425 } else {
426 wfo_wq = &slow_work_queue_waits_for_occupation;
427 queue = &slow_work_queue;
428 }
429
283 spin_lock_irqsave(&slow_work_queue_lock, flags); 430 spin_lock_irqsave(&slow_work_queue_lock, flags);
284 431
432 if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags)))
433 goto cancelled;
434
285 /* we promise that we will not attempt to execute the work 435 /* we promise that we will not attempt to execute the work
286 * function in more than one thread simultaneously 436 * function in more than one thread simultaneously
287 * 437 *
@@ -299,25 +449,221 @@ int slow_work_enqueue(struct slow_work *work)
299 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) { 449 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
300 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags); 450 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
301 } else { 451 } else {
302 if (work->ops->get_ref(work) < 0) 452 ret = slow_work_get_ref(work);
303 goto cant_get_ref; 453 if (ret < 0)
304 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) 454 goto failed;
305 list_add_tail(&work->link, &vslow_work_queue); 455 slow_work_mark_time(work);
306 else 456 list_add_tail(&work->link, queue);
307 list_add_tail(&work->link, &slow_work_queue);
308 wake_up(&slow_work_thread_wq); 457 wake_up(&slow_work_thread_wq);
458
459 /* if someone who could be requeued is sleeping on a
460 * thread, then ask them to yield their thread */
461 if (work->link.prev == queue)
462 wake_up(wfo_wq);
309 } 463 }
310 464
311 spin_unlock_irqrestore(&slow_work_queue_lock, flags); 465 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
312 } 466 }
313 return 0; 467 return 0;
314 468
315cant_get_ref: 469cancelled:
470 ret = -ECANCELED;
471failed:
316 spin_unlock_irqrestore(&slow_work_queue_lock, flags); 472 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
317 return -EAGAIN; 473 return ret;
318} 474}
319EXPORT_SYMBOL(slow_work_enqueue); 475EXPORT_SYMBOL(slow_work_enqueue);
320 476
477static int slow_work_wait(void *word)
478{
479 schedule();
480 return 0;
481}
482
483/**
484 * slow_work_cancel - Cancel a slow work item
485 * @work: The work item to cancel
486 *
487 * This function will cancel a previously enqueued work item. If we cannot
488 * cancel the work item, it is guarenteed to have run when this function
489 * returns.
490 */
491void slow_work_cancel(struct slow_work *work)
492{
493 bool wait = true, put = false;
494
495 set_bit(SLOW_WORK_CANCELLING, &work->flags);
496 smp_mb();
497
498 /* if the work item is a delayed work item with an active timer, we
499 * need to wait for the timer to finish _before_ getting the spinlock,
500 * lest we deadlock against the timer routine
501 *
502 * the timer routine will leave DELAYED set if it notices the
503 * CANCELLING flag in time
504 */
505 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
506 struct delayed_slow_work *dwork =
507 container_of(work, struct delayed_slow_work, work);
508 del_timer_sync(&dwork->timer);
509 }
510
511 spin_lock_irq(&slow_work_queue_lock);
512
513 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
514 /* the timer routine aborted or never happened, so we are left
515 * holding the timer's reference on the item and should just
516 * drop the pending flag and wait for any ongoing execution to
517 * finish */
518 struct delayed_slow_work *dwork =
519 container_of(work, struct delayed_slow_work, work);
520
521 BUG_ON(timer_pending(&dwork->timer));
522 BUG_ON(!list_empty(&work->link));
523
524 clear_bit(SLOW_WORK_DELAYED, &work->flags);
525 put = true;
526 clear_bit(SLOW_WORK_PENDING, &work->flags);
527
528 } else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
529 !list_empty(&work->link)) {
530 /* the link in the pending queue holds a reference on the item
531 * that we will need to release */
532 list_del_init(&work->link);
533 wait = false;
534 put = true;
535 clear_bit(SLOW_WORK_PENDING, &work->flags);
536
537 } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) {
538 /* the executor is holding our only reference on the item, so
539 * we merely need to wait for it to finish executing */
540 clear_bit(SLOW_WORK_PENDING, &work->flags);
541 }
542
543 spin_unlock_irq(&slow_work_queue_lock);
544
545 /* the EXECUTING flag is set by the executor whilst the spinlock is set
546 * and before the item is dequeued - so assuming the above doesn't
547 * actually dequeue it, simply waiting for the EXECUTING flag to be
548 * released here should be sufficient */
549 if (wait)
550 wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
551 TASK_UNINTERRUPTIBLE);
552
553 clear_bit(SLOW_WORK_CANCELLING, &work->flags);
554 if (put)
555 slow_work_put_ref(work);
556}
557EXPORT_SYMBOL(slow_work_cancel);
558
559/*
560 * Handle expiry of the delay timer, indicating that a delayed slow work item
561 * should now be queued if not cancelled
562 */
563static void delayed_slow_work_timer(unsigned long data)
564{
565 wait_queue_head_t *wfo_wq;
566 struct list_head *queue;
567 struct slow_work *work = (struct slow_work *) data;
568 unsigned long flags;
569 bool queued = false, put = false, first = false;
570
571 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
572 wfo_wq = &vslow_work_queue_waits_for_occupation;
573 queue = &vslow_work_queue;
574 } else {
575 wfo_wq = &slow_work_queue_waits_for_occupation;
576 queue = &slow_work_queue;
577 }
578
579 spin_lock_irqsave(&slow_work_queue_lock, flags);
580 if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
581 clear_bit(SLOW_WORK_DELAYED, &work->flags);
582
583 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
584 /* we discard the reference the timer was holding in
585 * favour of the one the executor holds */
586 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
587 put = true;
588 } else {
589 slow_work_mark_time(work);
590 list_add_tail(&work->link, queue);
591 queued = true;
592 if (work->link.prev == queue)
593 first = true;
594 }
595 }
596
597 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
598 if (put)
599 slow_work_put_ref(work);
600 if (first)
601 wake_up(wfo_wq);
602 if (queued)
603 wake_up(&slow_work_thread_wq);
604}
605
606/**
607 * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
608 * @dwork: The delayed work item to queue
609 * @delay: When to start executing the work, in jiffies from now
610 *
611 * This is similar to slow_work_enqueue(), but it adds a delay before the work
612 * is actually queued for processing.
613 *
614 * The item can have delayed processing requested on it whilst it is being
615 * executed. The delay will begin immediately, and if it expires before the
616 * item finishes executing, the item will be placed back on the queue when it
617 * has done executing.
618 */
619int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
620 unsigned long delay)
621{
622 struct slow_work *work = &dwork->work;
623 unsigned long flags;
624 int ret;
625
626 if (delay == 0)
627 return slow_work_enqueue(&dwork->work);
628
629 BUG_ON(slow_work_user_count <= 0);
630 BUG_ON(!work);
631 BUG_ON(!work->ops);
632
633 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
634 return -ECANCELED;
635
636 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
637 spin_lock_irqsave(&slow_work_queue_lock, flags);
638
639 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
640 goto cancelled;
641
642 /* the timer holds a reference whilst it is pending */
643 ret = work->ops->get_ref(work);
644 if (ret < 0)
645 goto cant_get_ref;
646
647 if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
648 BUG();
649 dwork->timer.expires = jiffies + delay;
650 dwork->timer.data = (unsigned long) work;
651 dwork->timer.function = delayed_slow_work_timer;
652 add_timer(&dwork->timer);
653
654 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
655 }
656
657 return 0;
658
659cancelled:
660 ret = -ECANCELED;
661cant_get_ref:
662 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
663 return ret;
664}
665EXPORT_SYMBOL(delayed_slow_work_enqueue);
666
321/* 667/*
322 * Schedule a cull of the thread pool at some time in the near future 668 * Schedule a cull of the thread pool at some time in the near future
323 */ 669 */
@@ -368,13 +714,23 @@ static inline bool slow_work_available(int vsmax)
368 */ 714 */
369static int slow_work_thread(void *_data) 715static int slow_work_thread(void *_data)
370{ 716{
371 int vsmax; 717 int vsmax, id;
372 718
373 DEFINE_WAIT(wait); 719 DEFINE_WAIT(wait);
374 720
375 set_freezable(); 721 set_freezable();
376 set_user_nice(current, -5); 722 set_user_nice(current, -5);
377 723
724 /* allocate ourselves an ID */
725 spin_lock_irq(&slow_work_queue_lock);
726 id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
727 BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
728 __set_bit(id, slow_work_ids);
729 slow_work_set_thread_pid(id, current->pid);
730 spin_unlock_irq(&slow_work_queue_lock);
731
732 sprintf(current->comm, "kslowd%03u", id);
733
378 for (;;) { 734 for (;;) {
379 vsmax = vslow_work_proportion; 735 vsmax = vslow_work_proportion;
380 vsmax *= atomic_read(&slow_work_thread_count); 736 vsmax *= atomic_read(&slow_work_thread_count);
@@ -395,7 +751,7 @@ static int slow_work_thread(void *_data)
395 vsmax *= atomic_read(&slow_work_thread_count); 751 vsmax *= atomic_read(&slow_work_thread_count);
396 vsmax /= 100; 752 vsmax /= 100;
397 753
398 if (slow_work_available(vsmax) && slow_work_execute()) { 754 if (slow_work_available(vsmax) && slow_work_execute(id)) {
399 cond_resched(); 755 cond_resched();
400 if (list_empty(&slow_work_queue) && 756 if (list_empty(&slow_work_queue) &&
401 list_empty(&vslow_work_queue) && 757 list_empty(&vslow_work_queue) &&
@@ -412,6 +768,11 @@ static int slow_work_thread(void *_data)
412 break; 768 break;
413 } 769 }
414 770
771 spin_lock_irq(&slow_work_queue_lock);
772 slow_work_set_thread_pid(id, 0);
773 __clear_bit(id, slow_work_ids);
774 spin_unlock_irq(&slow_work_queue_lock);
775
415 if (atomic_dec_and_test(&slow_work_thread_count)) 776 if (atomic_dec_and_test(&slow_work_thread_count))
416 complete_and_exit(&slow_work_last_thread_exited, 0); 777 complete_and_exit(&slow_work_last_thread_exited, 0);
417 return 0; 778 return 0;
@@ -427,21 +788,6 @@ static void slow_work_cull_timeout(unsigned long data)
427} 788}
428 789
429/* 790/*
430 * Get a reference on slow work thread starter
431 */
432static int slow_work_new_thread_get_ref(struct slow_work *work)
433{
434 return 0;
435}
436
437/*
438 * Drop a reference on slow work thread starter
439 */
440static void slow_work_new_thread_put_ref(struct slow_work *work)
441{
442}
443
444/*
445 * Start a new slow work thread 791 * Start a new slow work thread
446 */ 792 */
447static void slow_work_new_thread_execute(struct slow_work *work) 793static void slow_work_new_thread_execute(struct slow_work *work)
@@ -475,9 +821,11 @@ static void slow_work_new_thread_execute(struct slow_work *work)
475} 821}
476 822
477static const struct slow_work_ops slow_work_new_thread_ops = { 823static const struct slow_work_ops slow_work_new_thread_ops = {
478 .get_ref = slow_work_new_thread_get_ref, 824 .owner = THIS_MODULE,
479 .put_ref = slow_work_new_thread_put_ref,
480 .execute = slow_work_new_thread_execute, 825 .execute = slow_work_new_thread_execute,
826#ifdef CONFIG_SLOW_WORK_DEBUG
827 .desc = slow_work_new_thread_desc,
828#endif
481}; 829};
482 830
483/* 831/*
@@ -546,12 +894,13 @@ static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
546 894
547/** 895/**
548 * slow_work_register_user - Register a user of the facility 896 * slow_work_register_user - Register a user of the facility
897 * @module: The module about to make use of the facility
549 * 898 *
550 * Register a user of the facility, starting up the initial threads if there 899 * Register a user of the facility, starting up the initial threads if there
551 * aren't any other users at this point. This will return 0 if successful, or 900 * aren't any other users at this point. This will return 0 if successful, or
552 * an error if not. 901 * an error if not.
553 */ 902 */
554int slow_work_register_user(void) 903int slow_work_register_user(struct module *module)
555{ 904{
556 struct task_struct *p; 905 struct task_struct *p;
557 int loop; 906 int loop;
@@ -598,14 +947,81 @@ error:
598} 947}
599EXPORT_SYMBOL(slow_work_register_user); 948EXPORT_SYMBOL(slow_work_register_user);
600 949
950/*
951 * wait for all outstanding items from the calling module to complete
952 * - note that more items may be queued whilst we're waiting
953 */
954static void slow_work_wait_for_items(struct module *module)
955{
956#ifdef CONFIG_MODULES
957 DECLARE_WAITQUEUE(myself, current);
958 struct slow_work *work;
959 int loop;
960
961 mutex_lock(&slow_work_unreg_sync_lock);
962 add_wait_queue(&slow_work_unreg_wq, &myself);
963
964 for (;;) {
965 spin_lock_irq(&slow_work_queue_lock);
966
967 /* first of all, we wait for the last queued item in each list
968 * to be processed */
969 list_for_each_entry_reverse(work, &vslow_work_queue, link) {
970 if (work->owner == module) {
971 set_current_state(TASK_UNINTERRUPTIBLE);
972 slow_work_unreg_work_item = work;
973 goto do_wait;
974 }
975 }
976 list_for_each_entry_reverse(work, &slow_work_queue, link) {
977 if (work->owner == module) {
978 set_current_state(TASK_UNINTERRUPTIBLE);
979 slow_work_unreg_work_item = work;
980 goto do_wait;
981 }
982 }
983
984 /* then we wait for the items being processed to finish */
985 slow_work_unreg_module = module;
986 smp_mb();
987 for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
988 if (slow_work_thread_processing[loop] == module)
989 goto do_wait;
990 }
991 spin_unlock_irq(&slow_work_queue_lock);
992 break; /* okay, we're done */
993
994 do_wait:
995 spin_unlock_irq(&slow_work_queue_lock);
996 schedule();
997 slow_work_unreg_work_item = NULL;
998 slow_work_unreg_module = NULL;
999 }
1000
1001 remove_wait_queue(&slow_work_unreg_wq, &myself);
1002 mutex_unlock(&slow_work_unreg_sync_lock);
1003#endif /* CONFIG_MODULES */
1004}
1005
601/** 1006/**
602 * slow_work_unregister_user - Unregister a user of the facility 1007 * slow_work_unregister_user - Unregister a user of the facility
1008 * @module: The module whose items should be cleared
603 * 1009 *
604 * Unregister a user of the facility, killing all the threads if this was the 1010 * Unregister a user of the facility, killing all the threads if this was the
605 * last one. 1011 * last one.
1012 *
1013 * This waits for all the work items belonging to the nominated module to go
1014 * away before proceeding.
606 */ 1015 */
607void slow_work_unregister_user(void) 1016void slow_work_unregister_user(struct module *module)
608{ 1017{
1018 /* first of all, wait for all outstanding items from the calling module
1019 * to complete */
1020 if (module)
1021 slow_work_wait_for_items(module);
1022
1023 /* then we can actually go about shutting down the facility if need
1024 * be */
609 mutex_lock(&slow_work_user_lock); 1025 mutex_lock(&slow_work_user_lock);
610 1026
611 BUG_ON(slow_work_user_count <= 0); 1027 BUG_ON(slow_work_user_count <= 0);
@@ -639,6 +1055,16 @@ static int __init init_slow_work(void)
639 if (slow_work_max_max_threads < nr_cpus * 2) 1055 if (slow_work_max_max_threads < nr_cpus * 2)
640 slow_work_max_max_threads = nr_cpus * 2; 1056 slow_work_max_max_threads = nr_cpus * 2;
641#endif 1057#endif
1058#ifdef CONFIG_SLOW_WORK_DEBUG
1059 {
1060 struct dentry *dbdir;
1061
1062 dbdir = debugfs_create_dir("slow_work", NULL);
1063 if (dbdir && !IS_ERR(dbdir))
1064 debugfs_create_file("runqueue", S_IFREG | 0400, dbdir,
1065 NULL, &slow_work_runqueue_fops);
1066 }
1067#endif
642 return 0; 1068 return 0;
643} 1069}
644 1070