aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/slow-work.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/slow-work.c')
-rw-r--r--kernel/slow-work.c494
1 files changed, 451 insertions, 43 deletions
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
index 0d31135efbf4..da94f3c101af 100644
--- a/kernel/slow-work.c
+++ b/kernel/slow-work.c
@@ -16,11 +16,8 @@
16#include <linux/kthread.h> 16#include <linux/kthread.h>
17#include <linux/freezer.h> 17#include <linux/freezer.h>
18#include <linux/wait.h> 18#include <linux/wait.h>
19 19#include <linux/proc_fs.h>
20#define SLOW_WORK_CULL_TIMEOUT (5 * HZ) /* cull threads 5s after running out of 20#include "slow-work.h"
21 * things to do */
22#define SLOW_WORK_OOM_TIMEOUT (5 * HZ) /* can't start new threads for 5s after
23 * OOM */
24 21
25static void slow_work_cull_timeout(unsigned long); 22static void slow_work_cull_timeout(unsigned long);
26static void slow_work_oom_timeout(unsigned long); 23static void slow_work_oom_timeout(unsigned long);
@@ -46,7 +43,7 @@ static unsigned vslow_work_proportion = 50; /* % of threads that may process
46 43
47#ifdef CONFIG_SYSCTL 44#ifdef CONFIG_SYSCTL
48static const int slow_work_min_min_threads = 2; 45static const int slow_work_min_min_threads = 2;
49static int slow_work_max_max_threads = 255; 46static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
50static const int slow_work_min_vslow = 1; 47static const int slow_work_min_vslow = 1;
51static const int slow_work_max_vslow = 99; 48static const int slow_work_max_vslow = 99;
52 49
@@ -98,6 +95,32 @@ static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
98static struct slow_work slow_work_new_thread; /* new thread starter */ 95static struct slow_work slow_work_new_thread; /* new thread starter */
99 96
100/* 97/*
98 * slow work ID allocation (use slow_work_queue_lock)
99 */
100static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
101
102/*
103 * Unregistration tracking to prevent put_ref() from disappearing during module
104 * unload
105 */
106#ifdef CONFIG_MODULES
107static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
108static struct module *slow_work_unreg_module;
109static struct slow_work *slow_work_unreg_work_item;
110static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
111static DEFINE_MUTEX(slow_work_unreg_sync_lock);
112#endif
113
114/*
115 * Data for tracking currently executing items for indication through /proc
116 */
117#ifdef CONFIG_SLOW_WORK_PROC
118struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT];
119pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT];
120DEFINE_RWLOCK(slow_work_execs_lock);
121#endif
122
123/*
101 * The queues of work items and the lock governing access to them. These are 124 * The queues of work items and the lock governing access to them. These are
102 * shared between all the CPUs. It doesn't make sense to have per-CPU queues 125 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
103 * as the number of threads bears no relation to the number of CPUs. 126 * as the number of threads bears no relation to the number of CPUs.
@@ -105,9 +128,18 @@ static struct slow_work slow_work_new_thread; /* new thread starter */
105 * There are two queues of work items: one for slow work items, and one for 128 * There are two queues of work items: one for slow work items, and one for
106 * very slow work items. 129 * very slow work items.
107 */ 130 */
108static LIST_HEAD(slow_work_queue); 131LIST_HEAD(slow_work_queue);
109static LIST_HEAD(vslow_work_queue); 132LIST_HEAD(vslow_work_queue);
110static DEFINE_SPINLOCK(slow_work_queue_lock); 133DEFINE_SPINLOCK(slow_work_queue_lock);
134
135/*
136 * The following are two wait queues that get pinged when a work item is placed
137 * on an empty queue. These allow work items that are hogging a thread by
138 * sleeping in a way that could be deferred to yield their thread and enqueue
139 * themselves.
140 */
141static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation);
142static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation);
111 143
112/* 144/*
113 * The thread controls. A variable used to signal to the threads that they 145 * The thread controls. A variable used to signal to the threads that they
@@ -126,6 +158,20 @@ static DECLARE_COMPLETION(slow_work_last_thread_exited);
126static int slow_work_user_count; 158static int slow_work_user_count;
127static DEFINE_MUTEX(slow_work_user_lock); 159static DEFINE_MUTEX(slow_work_user_lock);
128 160
161static inline int slow_work_get_ref(struct slow_work *work)
162{
163 if (work->ops->get_ref)
164 return work->ops->get_ref(work);
165
166 return 0;
167}
168
169static inline void slow_work_put_ref(struct slow_work *work)
170{
171 if (work->ops->put_ref)
172 work->ops->put_ref(work);
173}
174
129/* 175/*
130 * Calculate the maximum number of active threads in the pool that are 176 * Calculate the maximum number of active threads in the pool that are
131 * permitted to process very slow work items. 177 * permitted to process very slow work items.
@@ -149,8 +195,11 @@ static unsigned slow_work_calc_vsmax(void)
149 * Attempt to execute stuff queued on a slow thread. Return true if we managed 195 * Attempt to execute stuff queued on a slow thread. Return true if we managed
150 * it, false if there was nothing to do. 196 * it, false if there was nothing to do.
151 */ 197 */
152static bool slow_work_execute(void) 198static noinline bool slow_work_execute(int id)
153{ 199{
200#ifdef CONFIG_MODULES
201 struct module *module;
202#endif
154 struct slow_work *work = NULL; 203 struct slow_work *work = NULL;
155 unsigned vsmax; 204 unsigned vsmax;
156 bool very_slow; 205 bool very_slow;
@@ -186,6 +235,16 @@ static bool slow_work_execute(void)
186 } else { 235 } else {
187 very_slow = false; /* avoid the compiler warning */ 236 very_slow = false; /* avoid the compiler warning */
188 } 237 }
238
239#ifdef CONFIG_MODULES
240 if (work)
241 slow_work_thread_processing[id] = work->owner;
242#endif
243 if (work) {
244 slow_work_mark_time(work);
245 slow_work_begin_exec(id, work);
246 }
247
189 spin_unlock_irq(&slow_work_queue_lock); 248 spin_unlock_irq(&slow_work_queue_lock);
190 249
191 if (!work) 250 if (!work)
@@ -194,12 +253,19 @@ static bool slow_work_execute(void)
194 if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags)) 253 if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
195 BUG(); 254 BUG();
196 255
197 work->ops->execute(work); 256 /* don't execute if the work is in the process of being cancelled */
257 if (!test_bit(SLOW_WORK_CANCELLING, &work->flags))
258 work->ops->execute(work);
198 259
199 if (very_slow) 260 if (very_slow)
200 atomic_dec(&vslow_work_executing_count); 261 atomic_dec(&vslow_work_executing_count);
201 clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags); 262 clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
202 263
264 /* wake up anyone waiting for this work to be complete */
265 wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
266
267 slow_work_end_exec(id, work);
268
203 /* if someone tried to enqueue the item whilst we were executing it, 269 /* if someone tried to enqueue the item whilst we were executing it,
204 * then it'll be left unenqueued to avoid multiple threads trying to 270 * then it'll be left unenqueued to avoid multiple threads trying to
205 * execute it simultaneously 271 * execute it simultaneously
@@ -219,7 +285,18 @@ static bool slow_work_execute(void)
219 spin_unlock_irq(&slow_work_queue_lock); 285 spin_unlock_irq(&slow_work_queue_lock);
220 } 286 }
221 287
222 work->ops->put_ref(work); 288 /* sort out the race between module unloading and put_ref() */
289 slow_work_put_ref(work);
290
291#ifdef CONFIG_MODULES
292 module = slow_work_thread_processing[id];
293 slow_work_thread_processing[id] = NULL;
294 smp_mb();
295 if (slow_work_unreg_work_item == work ||
296 slow_work_unreg_module == module)
297 wake_up_all(&slow_work_unreg_wq);
298#endif
299
223 return true; 300 return true;
224 301
225auto_requeue: 302auto_requeue:
@@ -227,15 +304,61 @@ auto_requeue:
227 * - we transfer our ref on the item back to the appropriate queue 304 * - we transfer our ref on the item back to the appropriate queue
228 * - don't wake another thread up as we're awake already 305 * - don't wake another thread up as we're awake already
229 */ 306 */
307 slow_work_mark_time(work);
230 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) 308 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
231 list_add_tail(&work->link, &vslow_work_queue); 309 list_add_tail(&work->link, &vslow_work_queue);
232 else 310 else
233 list_add_tail(&work->link, &slow_work_queue); 311 list_add_tail(&work->link, &slow_work_queue);
234 spin_unlock_irq(&slow_work_queue_lock); 312 spin_unlock_irq(&slow_work_queue_lock);
313 slow_work_thread_processing[id] = NULL;
235 return true; 314 return true;
236} 315}
237 316
238/** 317/**
318 * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
319 * work: The work item under execution that wants to sleep
320 * _timeout: Scheduler sleep timeout
321 *
322 * Allow a requeueable work item to sleep on a slow-work processor thread until
323 * that thread is needed to do some other work or the sleep is interrupted by
324 * some other event.
325 *
326 * The caller must set up a wake up event before calling this and must have set
327 * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
328 * condition before calling this function as no test is made here.
329 *
330 * False is returned if there is nothing on the queue; true is returned if the
331 * work item should be requeued
332 */
333bool slow_work_sleep_till_thread_needed(struct slow_work *work,
334 signed long *_timeout)
335{
336 wait_queue_head_t *wfo_wq;
337 struct list_head *queue;
338
339 DEFINE_WAIT(wait);
340
341 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
342 wfo_wq = &vslow_work_queue_waits_for_occupation;
343 queue = &vslow_work_queue;
344 } else {
345 wfo_wq = &slow_work_queue_waits_for_occupation;
346 queue = &slow_work_queue;
347 }
348
349 if (!list_empty(queue))
350 return true;
351
352 add_wait_queue_exclusive(wfo_wq, &wait);
353 if (list_empty(queue))
354 *_timeout = schedule_timeout(*_timeout);
355 finish_wait(wfo_wq, &wait);
356
357 return !list_empty(queue);
358}
359EXPORT_SYMBOL(slow_work_sleep_till_thread_needed);
360
361/**
239 * slow_work_enqueue - Schedule a slow work item for processing 362 * slow_work_enqueue - Schedule a slow work item for processing
240 * @work: The work item to queue 363 * @work: The work item to queue
241 * 364 *
@@ -260,16 +383,22 @@ auto_requeue:
260 * allowed to pick items to execute. This ensures that very slow items won't 383 * allowed to pick items to execute. This ensures that very slow items won't
261 * overly block ones that are just ordinarily slow. 384 * overly block ones that are just ordinarily slow.
262 * 385 *
263 * Returns 0 if successful, -EAGAIN if not. 386 * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
387 * attempted queued)
264 */ 388 */
265int slow_work_enqueue(struct slow_work *work) 389int slow_work_enqueue(struct slow_work *work)
266{ 390{
391 wait_queue_head_t *wfo_wq;
392 struct list_head *queue;
267 unsigned long flags; 393 unsigned long flags;
394 int ret;
395
396 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
397 return -ECANCELED;
268 398
269 BUG_ON(slow_work_user_count <= 0); 399 BUG_ON(slow_work_user_count <= 0);
270 BUG_ON(!work); 400 BUG_ON(!work);
271 BUG_ON(!work->ops); 401 BUG_ON(!work->ops);
272 BUG_ON(!work->ops->get_ref);
273 402
274 /* when honouring an enqueue request, we only promise that we will run 403 /* when honouring an enqueue request, we only promise that we will run
275 * the work function in the future; we do not promise to run it once 404 * the work function in the future; we do not promise to run it once
@@ -280,8 +409,19 @@ int slow_work_enqueue(struct slow_work *work)
280 * maintaining our promise 409 * maintaining our promise
281 */ 410 */
282 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) { 411 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
412 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
413 wfo_wq = &vslow_work_queue_waits_for_occupation;
414 queue = &vslow_work_queue;
415 } else {
416 wfo_wq = &slow_work_queue_waits_for_occupation;
417 queue = &slow_work_queue;
418 }
419
283 spin_lock_irqsave(&slow_work_queue_lock, flags); 420 spin_lock_irqsave(&slow_work_queue_lock, flags);
284 421
422 if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags)))
423 goto cancelled;
424
285 /* we promise that we will not attempt to execute the work 425 /* we promise that we will not attempt to execute the work
286 * function in more than one thread simultaneously 426 * function in more than one thread simultaneously
287 * 427 *
@@ -299,25 +439,221 @@ int slow_work_enqueue(struct slow_work *work)
299 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) { 439 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
300 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags); 440 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
301 } else { 441 } else {
302 if (work->ops->get_ref(work) < 0) 442 ret = slow_work_get_ref(work);
303 goto cant_get_ref; 443 if (ret < 0)
304 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) 444 goto failed;
305 list_add_tail(&work->link, &vslow_work_queue); 445 slow_work_mark_time(work);
306 else 446 list_add_tail(&work->link, queue);
307 list_add_tail(&work->link, &slow_work_queue);
308 wake_up(&slow_work_thread_wq); 447 wake_up(&slow_work_thread_wq);
448
449 /* if someone who could be requeued is sleeping on a
450 * thread, then ask them to yield their thread */
451 if (work->link.prev == queue)
452 wake_up(wfo_wq);
309 } 453 }
310 454
311 spin_unlock_irqrestore(&slow_work_queue_lock, flags); 455 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
312 } 456 }
313 return 0; 457 return 0;
314 458
315cant_get_ref: 459cancelled:
460 ret = -ECANCELED;
461failed:
316 spin_unlock_irqrestore(&slow_work_queue_lock, flags); 462 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
317 return -EAGAIN; 463 return ret;
318} 464}
319EXPORT_SYMBOL(slow_work_enqueue); 465EXPORT_SYMBOL(slow_work_enqueue);
320 466
467static int slow_work_wait(void *word)
468{
469 schedule();
470 return 0;
471}
472
473/**
474 * slow_work_cancel - Cancel a slow work item
475 * @work: The work item to cancel
476 *
477 * This function will cancel a previously enqueued work item. If we cannot
478 * cancel the work item, it is guarenteed to have run when this function
479 * returns.
480 */
481void slow_work_cancel(struct slow_work *work)
482{
483 bool wait = true, put = false;
484
485 set_bit(SLOW_WORK_CANCELLING, &work->flags);
486 smp_mb();
487
488 /* if the work item is a delayed work item with an active timer, we
489 * need to wait for the timer to finish _before_ getting the spinlock,
490 * lest we deadlock against the timer routine
491 *
492 * the timer routine will leave DELAYED set if it notices the
493 * CANCELLING flag in time
494 */
495 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
496 struct delayed_slow_work *dwork =
497 container_of(work, struct delayed_slow_work, work);
498 del_timer_sync(&dwork->timer);
499 }
500
501 spin_lock_irq(&slow_work_queue_lock);
502
503 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
504 /* the timer routine aborted or never happened, so we are left
505 * holding the timer's reference on the item and should just
506 * drop the pending flag and wait for any ongoing execution to
507 * finish */
508 struct delayed_slow_work *dwork =
509 container_of(work, struct delayed_slow_work, work);
510
511 BUG_ON(timer_pending(&dwork->timer));
512 BUG_ON(!list_empty(&work->link));
513
514 clear_bit(SLOW_WORK_DELAYED, &work->flags);
515 put = true;
516 clear_bit(SLOW_WORK_PENDING, &work->flags);
517
518 } else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
519 !list_empty(&work->link)) {
520 /* the link in the pending queue holds a reference on the item
521 * that we will need to release */
522 list_del_init(&work->link);
523 wait = false;
524 put = true;
525 clear_bit(SLOW_WORK_PENDING, &work->flags);
526
527 } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) {
528 /* the executor is holding our only reference on the item, so
529 * we merely need to wait for it to finish executing */
530 clear_bit(SLOW_WORK_PENDING, &work->flags);
531 }
532
533 spin_unlock_irq(&slow_work_queue_lock);
534
535 /* the EXECUTING flag is set by the executor whilst the spinlock is set
536 * and before the item is dequeued - so assuming the above doesn't
537 * actually dequeue it, simply waiting for the EXECUTING flag to be
538 * released here should be sufficient */
539 if (wait)
540 wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
541 TASK_UNINTERRUPTIBLE);
542
543 clear_bit(SLOW_WORK_CANCELLING, &work->flags);
544 if (put)
545 slow_work_put_ref(work);
546}
547EXPORT_SYMBOL(slow_work_cancel);
548
549/*
550 * Handle expiry of the delay timer, indicating that a delayed slow work item
551 * should now be queued if not cancelled
552 */
553static void delayed_slow_work_timer(unsigned long data)
554{
555 wait_queue_head_t *wfo_wq;
556 struct list_head *queue;
557 struct slow_work *work = (struct slow_work *) data;
558 unsigned long flags;
559 bool queued = false, put = false, first = false;
560
561 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
562 wfo_wq = &vslow_work_queue_waits_for_occupation;
563 queue = &vslow_work_queue;
564 } else {
565 wfo_wq = &slow_work_queue_waits_for_occupation;
566 queue = &slow_work_queue;
567 }
568
569 spin_lock_irqsave(&slow_work_queue_lock, flags);
570 if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
571 clear_bit(SLOW_WORK_DELAYED, &work->flags);
572
573 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
574 /* we discard the reference the timer was holding in
575 * favour of the one the executor holds */
576 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
577 put = true;
578 } else {
579 slow_work_mark_time(work);
580 list_add_tail(&work->link, queue);
581 queued = true;
582 if (work->link.prev == queue)
583 first = true;
584 }
585 }
586
587 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
588 if (put)
589 slow_work_put_ref(work);
590 if (first)
591 wake_up(wfo_wq);
592 if (queued)
593 wake_up(&slow_work_thread_wq);
594}
595
596/**
597 * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
598 * @dwork: The delayed work item to queue
599 * @delay: When to start executing the work, in jiffies from now
600 *
601 * This is similar to slow_work_enqueue(), but it adds a delay before the work
602 * is actually queued for processing.
603 *
604 * The item can have delayed processing requested on it whilst it is being
605 * executed. The delay will begin immediately, and if it expires before the
606 * item finishes executing, the item will be placed back on the queue when it
607 * has done executing.
608 */
609int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
610 unsigned long delay)
611{
612 struct slow_work *work = &dwork->work;
613 unsigned long flags;
614 int ret;
615
616 if (delay == 0)
617 return slow_work_enqueue(&dwork->work);
618
619 BUG_ON(slow_work_user_count <= 0);
620 BUG_ON(!work);
621 BUG_ON(!work->ops);
622
623 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
624 return -ECANCELED;
625
626 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
627 spin_lock_irqsave(&slow_work_queue_lock, flags);
628
629 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
630 goto cancelled;
631
632 /* the timer holds a reference whilst it is pending */
633 ret = work->ops->get_ref(work);
634 if (ret < 0)
635 goto cant_get_ref;
636
637 if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
638 BUG();
639 dwork->timer.expires = jiffies + delay;
640 dwork->timer.data = (unsigned long) work;
641 dwork->timer.function = delayed_slow_work_timer;
642 add_timer(&dwork->timer);
643
644 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
645 }
646
647 return 0;
648
649cancelled:
650 ret = -ECANCELED;
651cant_get_ref:
652 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
653 return ret;
654}
655EXPORT_SYMBOL(delayed_slow_work_enqueue);
656
321/* 657/*
322 * Schedule a cull of the thread pool at some time in the near future 658 * Schedule a cull of the thread pool at some time in the near future
323 */ 659 */
@@ -368,13 +704,23 @@ static inline bool slow_work_available(int vsmax)
368 */ 704 */
369static int slow_work_thread(void *_data) 705static int slow_work_thread(void *_data)
370{ 706{
371 int vsmax; 707 int vsmax, id;
372 708
373 DEFINE_WAIT(wait); 709 DEFINE_WAIT(wait);
374 710
375 set_freezable(); 711 set_freezable();
376 set_user_nice(current, -5); 712 set_user_nice(current, -5);
377 713
714 /* allocate ourselves an ID */
715 spin_lock_irq(&slow_work_queue_lock);
716 id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
717 BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
718 __set_bit(id, slow_work_ids);
719 slow_work_set_thread_pid(id, current->pid);
720 spin_unlock_irq(&slow_work_queue_lock);
721
722 sprintf(current->comm, "kslowd%03u", id);
723
378 for (;;) { 724 for (;;) {
379 vsmax = vslow_work_proportion; 725 vsmax = vslow_work_proportion;
380 vsmax *= atomic_read(&slow_work_thread_count); 726 vsmax *= atomic_read(&slow_work_thread_count);
@@ -395,7 +741,7 @@ static int slow_work_thread(void *_data)
395 vsmax *= atomic_read(&slow_work_thread_count); 741 vsmax *= atomic_read(&slow_work_thread_count);
396 vsmax /= 100; 742 vsmax /= 100;
397 743
398 if (slow_work_available(vsmax) && slow_work_execute()) { 744 if (slow_work_available(vsmax) && slow_work_execute(id)) {
399 cond_resched(); 745 cond_resched();
400 if (list_empty(&slow_work_queue) && 746 if (list_empty(&slow_work_queue) &&
401 list_empty(&vslow_work_queue) && 747 list_empty(&vslow_work_queue) &&
@@ -412,6 +758,11 @@ static int slow_work_thread(void *_data)
412 break; 758 break;
413 } 759 }
414 760
761 spin_lock_irq(&slow_work_queue_lock);
762 slow_work_set_thread_pid(id, 0);
763 __clear_bit(id, slow_work_ids);
764 spin_unlock_irq(&slow_work_queue_lock);
765
415 if (atomic_dec_and_test(&slow_work_thread_count)) 766 if (atomic_dec_and_test(&slow_work_thread_count))
416 complete_and_exit(&slow_work_last_thread_exited, 0); 767 complete_and_exit(&slow_work_last_thread_exited, 0);
417 return 0; 768 return 0;
@@ -427,21 +778,6 @@ static void slow_work_cull_timeout(unsigned long data)
427} 778}
428 779
429/* 780/*
430 * Get a reference on slow work thread starter
431 */
432static int slow_work_new_thread_get_ref(struct slow_work *work)
433{
434 return 0;
435}
436
437/*
438 * Drop a reference on slow work thread starter
439 */
440static void slow_work_new_thread_put_ref(struct slow_work *work)
441{
442}
443
444/*
445 * Start a new slow work thread 781 * Start a new slow work thread
446 */ 782 */
447static void slow_work_new_thread_execute(struct slow_work *work) 783static void slow_work_new_thread_execute(struct slow_work *work)
@@ -475,9 +811,11 @@ static void slow_work_new_thread_execute(struct slow_work *work)
475} 811}
476 812
477static const struct slow_work_ops slow_work_new_thread_ops = { 813static const struct slow_work_ops slow_work_new_thread_ops = {
478 .get_ref = slow_work_new_thread_get_ref, 814 .owner = THIS_MODULE,
479 .put_ref = slow_work_new_thread_put_ref,
480 .execute = slow_work_new_thread_execute, 815 .execute = slow_work_new_thread_execute,
816#ifdef CONFIG_SLOW_WORK_PROC
817 .desc = slow_work_new_thread_desc,
818#endif
481}; 819};
482 820
483/* 821/*
@@ -546,12 +884,13 @@ static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
546 884
547/** 885/**
548 * slow_work_register_user - Register a user of the facility 886 * slow_work_register_user - Register a user of the facility
887 * @module: The module about to make use of the facility
549 * 888 *
550 * Register a user of the facility, starting up the initial threads if there 889 * Register a user of the facility, starting up the initial threads if there
551 * aren't any other users at this point. This will return 0 if successful, or 890 * aren't any other users at this point. This will return 0 if successful, or
552 * an error if not. 891 * an error if not.
553 */ 892 */
554int slow_work_register_user(void) 893int slow_work_register_user(struct module *module)
555{ 894{
556 struct task_struct *p; 895 struct task_struct *p;
557 int loop; 896 int loop;
@@ -598,14 +937,79 @@ error:
598} 937}
599EXPORT_SYMBOL(slow_work_register_user); 938EXPORT_SYMBOL(slow_work_register_user);
600 939
940/*
941 * wait for all outstanding items from the calling module to complete
942 * - note that more items may be queued whilst we're waiting
943 */
944static void slow_work_wait_for_items(struct module *module)
945{
946 DECLARE_WAITQUEUE(myself, current);
947 struct slow_work *work;
948 int loop;
949
950 mutex_lock(&slow_work_unreg_sync_lock);
951 add_wait_queue(&slow_work_unreg_wq, &myself);
952
953 for (;;) {
954 spin_lock_irq(&slow_work_queue_lock);
955
956 /* first of all, we wait for the last queued item in each list
957 * to be processed */
958 list_for_each_entry_reverse(work, &vslow_work_queue, link) {
959 if (work->owner == module) {
960 set_current_state(TASK_UNINTERRUPTIBLE);
961 slow_work_unreg_work_item = work;
962 goto do_wait;
963 }
964 }
965 list_for_each_entry_reverse(work, &slow_work_queue, link) {
966 if (work->owner == module) {
967 set_current_state(TASK_UNINTERRUPTIBLE);
968 slow_work_unreg_work_item = work;
969 goto do_wait;
970 }
971 }
972
973 /* then we wait for the items being processed to finish */
974 slow_work_unreg_module = module;
975 smp_mb();
976 for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
977 if (slow_work_thread_processing[loop] == module)
978 goto do_wait;
979 }
980 spin_unlock_irq(&slow_work_queue_lock);
981 break; /* okay, we're done */
982
983 do_wait:
984 spin_unlock_irq(&slow_work_queue_lock);
985 schedule();
986 slow_work_unreg_work_item = NULL;
987 slow_work_unreg_module = NULL;
988 }
989
990 remove_wait_queue(&slow_work_unreg_wq, &myself);
991 mutex_unlock(&slow_work_unreg_sync_lock);
992}
993
601/** 994/**
602 * slow_work_unregister_user - Unregister a user of the facility 995 * slow_work_unregister_user - Unregister a user of the facility
996 * @module: The module whose items should be cleared
603 * 997 *
604 * Unregister a user of the facility, killing all the threads if this was the 998 * Unregister a user of the facility, killing all the threads if this was the
605 * last one. 999 * last one.
1000 *
1001 * This waits for all the work items belonging to the nominated module to go
1002 * away before proceeding.
606 */ 1003 */
607void slow_work_unregister_user(void) 1004void slow_work_unregister_user(struct module *module)
608{ 1005{
1006 /* first of all, wait for all outstanding items from the calling module
1007 * to complete */
1008 if (module)
1009 slow_work_wait_for_items(module);
1010
1011 /* then we can actually go about shutting down the facility if need
1012 * be */
609 mutex_lock(&slow_work_user_lock); 1013 mutex_lock(&slow_work_user_lock);
610 1014
611 BUG_ON(slow_work_user_count <= 0); 1015 BUG_ON(slow_work_user_count <= 0);
@@ -639,6 +1043,10 @@ static int __init init_slow_work(void)
639 if (slow_work_max_max_threads < nr_cpus * 2) 1043 if (slow_work_max_max_threads < nr_cpus * 2)
640 slow_work_max_max_threads = nr_cpus * 2; 1044 slow_work_max_max_threads = nr_cpus * 2;
641#endif 1045#endif
1046#ifdef CONFIG_SLOW_WORK_PROC
1047 proc_create("slow_work_rq", S_IFREG | 0400, NULL,
1048 &slow_work_runqueue_fops);
1049#endif
642 return 0; 1050 return 0;
643} 1051}
644 1052