aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2010-06-29 04:07:14 -0400
committerTejun Heo <tj@kernel.org>2010-06-29 04:07:14 -0400
commite22bee782b3b00bd4534ae9b1c5fb2e8e6573c5c (patch)
tree9854d22294699d9ec27e28f70c05f479e5640abd /kernel/workqueue.c
parentd302f0178223802a1e496ba90c66193b7721c9c1 (diff)
workqueue: implement concurrency managed dynamic worker pool
Instead of creating a worker for each cwq and putting it into the shared pool, manage per-cpu workers dynamically. Works aren't supposed to be cpu cycle hogs and maintaining just enough concurrency to prevent work processing from stalling due to lack of processing context is optimal. gcwq keeps the number of concurrent active workers to minimum but no less. As long as there's one or more running workers on the cpu, no new worker is scheduled so that works can be processed in batch as much as possible but when the last running worker blocks, gcwq immediately schedules new worker so that the cpu doesn't sit idle while there are works to be processed. gcwq always keeps at least single idle worker around. When a new worker is necessary and the worker is the last idle one, the worker assumes the role of "manager" and manages the worker pool - ie. creates another worker. Forward-progress is guaranteed by having dedicated rescue workers for workqueues which may be necessary while creating a new worker. When the manager is having problem creating a new worker, mayday timer activates and rescue workers are summoned to the cpu and execute works which might be necessary to create new workers. Trustee is expanded to serve the role of manager while a CPU is being taken down and stays down. As no new works are supposed to be queued on a dead cpu, it just needs to drain all the existing ones. Trustee continues to try to create new workers and summon rescuers as long as there are pending works. If the CPU is brought back up while the trustee is still trying to drain the gcwq from the previous offlining, the trustee will kill all idles ones and tell workers which are still busy to rebind to the cpu, and pass control over to gcwq which assumes the manager role as necessary. Concurrency managed worker pool reduces the number of workers drastically. Only workers which are necessary to keep the processing going are created and kept. Also, it reduces cache footprint by avoiding unnecessarily switching contexts between different workers. Please note that this patch does not increase max_active of any workqueue. All workqueues can still only process one work per cpu. Signed-off-by: Tejun Heo <tj@kernel.org>
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c936
1 files changed, 833 insertions, 103 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 4c31fde092c..0ad46523b42 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -34,17 +34,25 @@
34#include <linux/debug_locks.h> 34#include <linux/debug_locks.h>
35#include <linux/lockdep.h> 35#include <linux/lockdep.h>
36#include <linux/idr.h> 36#include <linux/idr.h>
37#include <linux/delay.h> 37
38#include "workqueue_sched.h"
38 39
39enum { 40enum {
40 /* global_cwq flags */ 41 /* global_cwq flags */
42 GCWQ_MANAGE_WORKERS = 1 << 0, /* need to manage workers */
43 GCWQ_MANAGING_WORKERS = 1 << 1, /* managing workers */
44 GCWQ_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
41 GCWQ_FREEZING = 1 << 3, /* freeze in progress */ 45 GCWQ_FREEZING = 1 << 3, /* freeze in progress */
42 46
43 /* worker flags */ 47 /* worker flags */
44 WORKER_STARTED = 1 << 0, /* started */ 48 WORKER_STARTED = 1 << 0, /* started */
45 WORKER_DIE = 1 << 1, /* die die die */ 49 WORKER_DIE = 1 << 1, /* die die die */
46 WORKER_IDLE = 1 << 2, /* is idle */ 50 WORKER_IDLE = 1 << 2, /* is idle */
51 WORKER_PREP = 1 << 3, /* preparing to run works */
47 WORKER_ROGUE = 1 << 4, /* not bound to any cpu */ 52 WORKER_ROGUE = 1 << 4, /* not bound to any cpu */
53 WORKER_REBIND = 1 << 5, /* mom is home, come back */
54
55 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND,
48 56
49 /* gcwq->trustee_state */ 57 /* gcwq->trustee_state */
50 TRUSTEE_START = 0, /* start */ 58 TRUSTEE_START = 0, /* start */
@@ -57,7 +65,19 @@ enum {
57 BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER, 65 BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER,
58 BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1, 66 BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1,
59 67
68 MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
69 IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */
70
71 MAYDAY_INITIAL_TIMEOUT = HZ / 100, /* call for help after 10ms */
72 MAYDAY_INTERVAL = HZ / 10, /* and then every 100ms */
73 CREATE_COOLDOWN = HZ, /* time to breath after fail */
60 TRUSTEE_COOLDOWN = HZ / 10, /* for trustee draining */ 74 TRUSTEE_COOLDOWN = HZ / 10, /* for trustee draining */
75
76 /*
77 * Rescue workers are used only on emergencies and shared by
78 * all cpus. Give -20.
79 */
80 RESCUER_NICE_LEVEL = -20,
61}; 81};
62 82
63/* 83/*
@@ -65,8 +85,16 @@ enum {
65 * 85 *
66 * I: Set during initialization and read-only afterwards. 86 * I: Set during initialization and read-only afterwards.
67 * 87 *
88 * P: Preemption protected. Disabling preemption is enough and should
89 * only be modified and accessed from the local cpu.
90 *
68 * L: gcwq->lock protected. Access with gcwq->lock held. 91 * L: gcwq->lock protected. Access with gcwq->lock held.
69 * 92 *
93 * X: During normal operation, modification requires gcwq->lock and
94 * should be done only from local cpu. Either disabling preemption
95 * on local cpu or grabbing gcwq->lock is enough for read access.
96 * While trustee is in charge, it's identical to L.
97 *
70 * F: wq->flush_mutex protected. 98 * F: wq->flush_mutex protected.
71 * 99 *
72 * W: workqueue_lock protected. 100 * W: workqueue_lock protected.
@@ -74,6 +102,10 @@ enum {
74 102
75struct global_cwq; 103struct global_cwq;
76 104
105/*
106 * The poor guys doing the actual heavy lifting. All on-duty workers
107 * are either serving the manager role, on idle list or on busy hash.
108 */
77struct worker { 109struct worker {
78 /* on idle list while idle, on busy hash table while busy */ 110 /* on idle list while idle, on busy hash table while busy */
79 union { 111 union {
@@ -86,12 +118,17 @@ struct worker {
86 struct list_head scheduled; /* L: scheduled works */ 118 struct list_head scheduled; /* L: scheduled works */
87 struct task_struct *task; /* I: worker task */ 119 struct task_struct *task; /* I: worker task */
88 struct global_cwq *gcwq; /* I: the associated gcwq */ 120 struct global_cwq *gcwq; /* I: the associated gcwq */
89 unsigned int flags; /* L: flags */ 121 /* 64 bytes boundary on 64bit, 32 on 32bit */
122 unsigned long last_active; /* L: last active timestamp */
123 unsigned int flags; /* X: flags */
90 int id; /* I: worker id */ 124 int id; /* I: worker id */
125 struct work_struct rebind_work; /* L: rebind worker to cpu */
91}; 126};
92 127
93/* 128/*
94 * Global per-cpu workqueue. 129 * Global per-cpu workqueue. There's one and only one for each cpu
130 * and all works are queued and processed here regardless of their
131 * target workqueues.
95 */ 132 */
96struct global_cwq { 133struct global_cwq {
97 spinlock_t lock; /* the gcwq lock */ 134 spinlock_t lock; /* the gcwq lock */
@@ -103,15 +140,19 @@ struct global_cwq {
103 int nr_idle; /* L: currently idle ones */ 140 int nr_idle; /* L: currently idle ones */
104 141
105 /* workers are chained either in the idle_list or busy_hash */ 142 /* workers are chained either in the idle_list or busy_hash */
106 struct list_head idle_list; /* L: list of idle workers */ 143 struct list_head idle_list; /* X: list of idle workers */
107 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE]; 144 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
108 /* L: hash of busy workers */ 145 /* L: hash of busy workers */
109 146
147 struct timer_list idle_timer; /* L: worker idle timeout */
148 struct timer_list mayday_timer; /* L: SOS timer for dworkers */
149
110 struct ida worker_ida; /* L: for worker IDs */ 150 struct ida worker_ida; /* L: for worker IDs */
111 151
112 struct task_struct *trustee; /* L: for gcwq shutdown */ 152 struct task_struct *trustee; /* L: for gcwq shutdown */
113 unsigned int trustee_state; /* L: trustee state */ 153 unsigned int trustee_state; /* L: trustee state */
114 wait_queue_head_t trustee_wait; /* trustee wait */ 154 wait_queue_head_t trustee_wait; /* trustee wait */
155 struct worker *first_idle; /* L: first idle worker */
115} ____cacheline_aligned_in_smp; 156} ____cacheline_aligned_in_smp;
116 157
117/* 158/*
@@ -121,7 +162,6 @@ struct global_cwq {
121 */ 162 */
122struct cpu_workqueue_struct { 163struct cpu_workqueue_struct {
123 struct global_cwq *gcwq; /* I: the associated gcwq */ 164 struct global_cwq *gcwq; /* I: the associated gcwq */
124 struct worker *worker;
125 struct workqueue_struct *wq; /* I: the owning workqueue */ 165 struct workqueue_struct *wq; /* I: the owning workqueue */
126 int work_color; /* L: current color */ 166 int work_color; /* L: current color */
127 int flush_color; /* L: flushing color */ 167 int flush_color; /* L: flushing color */
@@ -160,6 +200,9 @@ struct workqueue_struct {
160 200
161 unsigned long single_cpu; /* cpu for single cpu wq */ 201 unsigned long single_cpu; /* cpu for single cpu wq */
162 202
203 cpumask_var_t mayday_mask; /* cpus requesting rescue */
204 struct worker *rescuer; /* I: rescue worker */
205
163 int saved_max_active; /* I: saved cwq max_active */ 206 int saved_max_active; /* I: saved cwq max_active */
164 const char *name; /* I: workqueue name */ 207 const char *name; /* I: workqueue name */
165#ifdef CONFIG_LOCKDEP 208#ifdef CONFIG_LOCKDEP
@@ -286,7 +329,13 @@ static DEFINE_SPINLOCK(workqueue_lock);
286static LIST_HEAD(workqueues); 329static LIST_HEAD(workqueues);
287static bool workqueue_freezing; /* W: have wqs started freezing? */ 330static bool workqueue_freezing; /* W: have wqs started freezing? */
288 331
332/*
333 * The almighty global cpu workqueues. nr_running is the only field
334 * which is expected to be used frequently by other cpus via
335 * try_to_wake_up(). Put it in a separate cacheline.
336 */
289static DEFINE_PER_CPU(struct global_cwq, global_cwq); 337static DEFINE_PER_CPU(struct global_cwq, global_cwq);
338static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
290 339
291static int worker_thread(void *__worker); 340static int worker_thread(void *__worker);
292 341
@@ -295,6 +344,11 @@ static struct global_cwq *get_gcwq(unsigned int cpu)
295 return &per_cpu(global_cwq, cpu); 344 return &per_cpu(global_cwq, cpu);
296} 345}
297 346
347static atomic_t *get_gcwq_nr_running(unsigned int cpu)
348{
349 return &per_cpu(gcwq_nr_running, cpu);
350}
351
298static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, 352static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
299 struct workqueue_struct *wq) 353 struct workqueue_struct *wq)
300{ 354{
@@ -385,6 +439,63 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
385 return get_gcwq(cpu); 439 return get_gcwq(cpu);
386} 440}
387 441
442/*
443 * Policy functions. These define the policies on how the global
444 * worker pool is managed. Unless noted otherwise, these functions
445 * assume that they're being called with gcwq->lock held.
446 */
447
448/*
449 * Need to wake up a worker? Called from anything but currently
450 * running workers.
451 */
452static bool need_more_worker(struct global_cwq *gcwq)
453{
454 atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
455
456 return !list_empty(&gcwq->worklist) && !atomic_read(nr_running);
457}
458
459/* Can I start working? Called from busy but !running workers. */
460static bool may_start_working(struct global_cwq *gcwq)
461{
462 return gcwq->nr_idle;
463}
464
465/* Do I need to keep working? Called from currently running workers. */
466static bool keep_working(struct global_cwq *gcwq)
467{
468 atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
469
470 return !list_empty(&gcwq->worklist) && atomic_read(nr_running) <= 1;
471}
472
473/* Do we need a new worker? Called from manager. */
474static bool need_to_create_worker(struct global_cwq *gcwq)
475{
476 return need_more_worker(gcwq) && !may_start_working(gcwq);
477}
478
479/* Do I need to be the manager? */
480static bool need_to_manage_workers(struct global_cwq *gcwq)
481{
482 return need_to_create_worker(gcwq) || gcwq->flags & GCWQ_MANAGE_WORKERS;
483}
484
485/* Do we have too many workers and should some go away? */
486static bool too_many_workers(struct global_cwq *gcwq)
487{
488 bool managing = gcwq->flags & GCWQ_MANAGING_WORKERS;
489 int nr_idle = gcwq->nr_idle + managing; /* manager is considered idle */
490 int nr_busy = gcwq->nr_workers - nr_idle;
491
492 return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
493}
494
495/*
496 * Wake up functions.
497 */
498
388/* Return the first worker. Safe with preemption disabled */ 499/* Return the first worker. Safe with preemption disabled */
389static struct worker *first_worker(struct global_cwq *gcwq) 500static struct worker *first_worker(struct global_cwq *gcwq)
390{ 501{
@@ -412,12 +523,77 @@ static void wake_up_worker(struct global_cwq *gcwq)
412} 523}
413 524
414/** 525/**
415 * worker_set_flags - set worker flags 526 * wq_worker_waking_up - a worker is waking up
527 * @task: task waking up
528 * @cpu: CPU @task is waking up to
529 *
530 * This function is called during try_to_wake_up() when a worker is
531 * being awoken.
532 *
533 * CONTEXT:
534 * spin_lock_irq(rq->lock)
535 */
536void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
537{
538 struct worker *worker = kthread_data(task);
539
540 if (likely(!(worker->flags & WORKER_NOT_RUNNING)))
541 atomic_inc(get_gcwq_nr_running(cpu));
542}
543
544/**
545 * wq_worker_sleeping - a worker is going to sleep
546 * @task: task going to sleep
547 * @cpu: CPU in question, must be the current CPU number
548 *
549 * This function is called during schedule() when a busy worker is
550 * going to sleep. Worker on the same cpu can be woken up by
551 * returning pointer to its task.
552 *
553 * CONTEXT:
554 * spin_lock_irq(rq->lock)
555 *
556 * RETURNS:
557 * Worker task on @cpu to wake up, %NULL if none.
558 */
559struct task_struct *wq_worker_sleeping(struct task_struct *task,
560 unsigned int cpu)
561{
562 struct worker *worker = kthread_data(task), *to_wakeup = NULL;
563 struct global_cwq *gcwq = get_gcwq(cpu);
564 atomic_t *nr_running = get_gcwq_nr_running(cpu);
565
566 if (unlikely(worker->flags & WORKER_NOT_RUNNING))
567 return NULL;
568
569 /* this can only happen on the local cpu */
570 BUG_ON(cpu != raw_smp_processor_id());
571
572 /*
573 * The counterpart of the following dec_and_test, implied mb,
574 * worklist not empty test sequence is in insert_work().
575 * Please read comment there.
576 *
577 * NOT_RUNNING is clear. This means that trustee is not in
578 * charge and we're running on the local cpu w/ rq lock held
579 * and preemption disabled, which in turn means that none else
580 * could be manipulating idle_list, so dereferencing idle_list
581 * without gcwq lock is safe.
582 */
583 if (atomic_dec_and_test(nr_running) && !list_empty(&gcwq->worklist))
584 to_wakeup = first_worker(gcwq);
585 return to_wakeup ? to_wakeup->task : NULL;
586}
587
588/**
589 * worker_set_flags - set worker flags and adjust nr_running accordingly
416 * @worker: worker to set flags for 590 * @worker: worker to set flags for
417 * @flags: flags to set 591 * @flags: flags to set
418 * @wakeup: wakeup an idle worker if necessary 592 * @wakeup: wakeup an idle worker if necessary
419 * 593 *
420 * Set @flags in @worker->flags. 594 * Set @flags in @worker->flags and adjust nr_running accordingly. If
595 * nr_running becomes zero and @wakeup is %true, an idle worker is
596 * woken up.
421 * 597 *
422 * LOCKING: 598 * LOCKING:
423 * spin_lock_irq(gcwq->lock). 599 * spin_lock_irq(gcwq->lock).
@@ -425,22 +601,49 @@ static void wake_up_worker(struct global_cwq *gcwq)
425static inline void worker_set_flags(struct worker *worker, unsigned int flags, 601static inline void worker_set_flags(struct worker *worker, unsigned int flags,
426 bool wakeup) 602 bool wakeup)
427{ 603{
604 struct global_cwq *gcwq = worker->gcwq;
605
606 /*
607 * If transitioning into NOT_RUNNING, adjust nr_running and
608 * wake up an idle worker as necessary if requested by
609 * @wakeup.
610 */
611 if ((flags & WORKER_NOT_RUNNING) &&
612 !(worker->flags & WORKER_NOT_RUNNING)) {
613 atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
614
615 if (wakeup) {
616 if (atomic_dec_and_test(nr_running) &&
617 !list_empty(&gcwq->worklist))
618 wake_up_worker(gcwq);
619 } else
620 atomic_dec(nr_running);
621 }
622
428 worker->flags |= flags; 623 worker->flags |= flags;
429} 624}
430 625
431/** 626/**
432 * worker_clr_flags - clear worker flags 627 * worker_clr_flags - clear worker flags and adjust nr_running accordingly
433 * @worker: worker to set flags for 628 * @worker: worker to set flags for
434 * @flags: flags to clear 629 * @flags: flags to clear
435 * 630 *
436 * Clear @flags in @worker->flags. 631 * Clear @flags in @worker->flags and adjust nr_running accordingly.
437 * 632 *
438 * LOCKING: 633 * LOCKING:
439 * spin_lock_irq(gcwq->lock). 634 * spin_lock_irq(gcwq->lock).
440 */ 635 */
441static inline void worker_clr_flags(struct worker *worker, unsigned int flags) 636static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
442{ 637{
638 struct global_cwq *gcwq = worker->gcwq;
639 unsigned int oflags = worker->flags;
640
443 worker->flags &= ~flags; 641 worker->flags &= ~flags;
642
643 /* if transitioning out of NOT_RUNNING, increment nr_running */
644 if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
645 if (!(worker->flags & WORKER_NOT_RUNNING))
646 atomic_inc(get_gcwq_nr_running(gcwq->cpu));
444} 647}
445 648
446/** 649/**
@@ -540,6 +743,8 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
540 struct work_struct *work, struct list_head *head, 743 struct work_struct *work, struct list_head *head,
541 unsigned int extra_flags) 744 unsigned int extra_flags)
542{ 745{
746 struct global_cwq *gcwq = cwq->gcwq;
747
543 /* we own @work, set data and link */ 748 /* we own @work, set data and link */
544 set_work_cwq(work, cwq, extra_flags); 749 set_work_cwq(work, cwq, extra_flags);
545 750
@@ -550,7 +755,16 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
550 smp_wmb(); 755 smp_wmb();
551 756
552 list_add_tail(&work->entry, head); 757 list_add_tail(&work->entry, head);
553 wake_up_worker(cwq->gcwq); 758
759 /*
760 * Ensure either worker_sched_deactivated() sees the above
761 * list_add_tail() or we see zero nr_running to avoid workers
762 * lying around lazily while there are works to be processed.
763 */
764 smp_mb();
765
766 if (!atomic_read(get_gcwq_nr_running(gcwq->cpu)))
767 wake_up_worker(gcwq);
554} 768}
555 769
556/** 770/**
@@ -810,11 +1024,16 @@ static void worker_enter_idle(struct worker *worker)
810 1024
811 worker_set_flags(worker, WORKER_IDLE, false); 1025 worker_set_flags(worker, WORKER_IDLE, false);
812 gcwq->nr_idle++; 1026 gcwq->nr_idle++;
1027 worker->last_active = jiffies;
813 1028
814 /* idle_list is LIFO */ 1029 /* idle_list is LIFO */
815 list_add(&worker->entry, &gcwq->idle_list); 1030 list_add(&worker->entry, &gcwq->idle_list);
816 1031
817 if (unlikely(worker->flags & WORKER_ROGUE)) 1032 if (likely(!(worker->flags & WORKER_ROGUE))) {
1033 if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer))
1034 mod_timer(&gcwq->idle_timer,
1035 jiffies + IDLE_WORKER_TIMEOUT);
1036 } else
818 wake_up_all(&gcwq->trustee_wait); 1037 wake_up_all(&gcwq->trustee_wait);
819} 1038}
820 1039
@@ -837,6 +1056,81 @@ static void worker_leave_idle(struct worker *worker)
837 list_del_init(&worker->entry); 1056 list_del_init(&worker->entry);
838} 1057}
839 1058
1059/**
1060 * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq
1061 * @worker: self
1062 *
1063 * Works which are scheduled while the cpu is online must at least be
1064 * scheduled to a worker which is bound to the cpu so that if they are
1065 * flushed from cpu callbacks while cpu is going down, they are
1066 * guaranteed to execute on the cpu.
1067 *
1068 * This function is to be used by rogue workers and rescuers to bind
1069 * themselves to the target cpu and may race with cpu going down or
1070 * coming online. kthread_bind() can't be used because it may put the
1071 * worker to already dead cpu and set_cpus_allowed_ptr() can't be used
1072 * verbatim as it's best effort and blocking and gcwq may be
1073 * [dis]associated in the meantime.
1074 *
1075 * This function tries set_cpus_allowed() and locks gcwq and verifies
1076 * the binding against GCWQ_DISASSOCIATED which is set during
1077 * CPU_DYING and cleared during CPU_ONLINE, so if the worker enters
1078 * idle state or fetches works without dropping lock, it can guarantee
1079 * the scheduling requirement described in the first paragraph.
1080 *
1081 * CONTEXT:
1082 * Might sleep. Called without any lock but returns with gcwq->lock
1083 * held.
1084 *
1085 * RETURNS:
1086 * %true if the associated gcwq is online (@worker is successfully
1087 * bound), %false if offline.
1088 */
1089static bool worker_maybe_bind_and_lock(struct worker *worker)
1090{
1091 struct global_cwq *gcwq = worker->gcwq;
1092 struct task_struct *task = worker->task;
1093
1094 while (true) {
1095 /*
1096 * The following call may fail, succeed or succeed
1097 * without actually migrating the task to the cpu if
1098 * it races with cpu hotunplug operation. Verify
1099 * against GCWQ_DISASSOCIATED.
1100 */
1101 set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu));
1102
1103 spin_lock_irq(&gcwq->lock);
1104 if (gcwq->flags & GCWQ_DISASSOCIATED)
1105 return false;
1106 if (task_cpu(task) == gcwq->cpu &&
1107 cpumask_equal(&current->cpus_allowed,
1108 get_cpu_mask(gcwq->cpu)))
1109 return true;
1110 spin_unlock_irq(&gcwq->lock);
1111
1112 /* CPU has come up inbetween, retry migration */
1113 cpu_relax();
1114 }
1115}
1116
1117/*
1118 * Function for worker->rebind_work used to rebind rogue busy workers
1119 * to the associated cpu which is coming back online. This is
1120 * scheduled by cpu up but can race with other cpu hotplug operations
1121 * and may be executed twice without intervening cpu down.
1122 */
1123static void worker_rebind_fn(struct work_struct *work)
1124{
1125 struct worker *worker = container_of(work, struct worker, rebind_work);
1126 struct global_cwq *gcwq = worker->gcwq;
1127
1128 if (worker_maybe_bind_and_lock(worker))
1129 worker_clr_flags(worker, WORKER_REBIND);
1130
1131 spin_unlock_irq(&gcwq->lock);
1132}
1133
840static struct worker *alloc_worker(void) 1134static struct worker *alloc_worker(void)
841{ 1135{
842 struct worker *worker; 1136 struct worker *worker;
@@ -845,6 +1139,9 @@ static struct worker *alloc_worker(void)
845 if (worker) { 1139 if (worker) {
846 INIT_LIST_HEAD(&worker->entry); 1140 INIT_LIST_HEAD(&worker->entry);
847 INIT_LIST_HEAD(&worker->scheduled); 1141 INIT_LIST_HEAD(&worker->scheduled);
1142 INIT_WORK(&worker->rebind_work, worker_rebind_fn);
1143 /* on creation a worker is in !idle && prep state */
1144 worker->flags = WORKER_PREP;
848 } 1145 }
849 return worker; 1146 return worker;
850} 1147}
@@ -963,6 +1260,220 @@ static void destroy_worker(struct worker *worker)
963 ida_remove(&gcwq->worker_ida, id); 1260 ida_remove(&gcwq->worker_ida, id);
964} 1261}
965 1262
1263static void idle_worker_timeout(unsigned long __gcwq)
1264{
1265 struct global_cwq *gcwq = (void *)__gcwq;
1266
1267 spin_lock_irq(&gcwq->lock);
1268
1269 if (too_many_workers(gcwq)) {
1270 struct worker *worker;
1271 unsigned long expires;
1272
1273 /* idle_list is kept in LIFO order, check the last one */
1274 worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
1275 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1276
1277 if (time_before(jiffies, expires))
1278 mod_timer(&gcwq->idle_timer, expires);
1279 else {
1280 /* it's been idle for too long, wake up manager */
1281 gcwq->flags |= GCWQ_MANAGE_WORKERS;
1282 wake_up_worker(gcwq);
1283 }
1284 }
1285
1286 spin_unlock_irq(&gcwq->lock);
1287}
1288
1289static bool send_mayday(struct work_struct *work)
1290{
1291 struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1292 struct workqueue_struct *wq = cwq->wq;
1293
1294 if (!(wq->flags & WQ_RESCUER))
1295 return false;
1296
1297 /* mayday mayday mayday */
1298 if (!cpumask_test_and_set_cpu(cwq->gcwq->cpu, wq->mayday_mask))
1299 wake_up_process(wq->rescuer->task);
1300 return true;
1301}
1302
1303static void gcwq_mayday_timeout(unsigned long __gcwq)
1304{
1305 struct global_cwq *gcwq = (void *)__gcwq;
1306 struct work_struct *work;
1307
1308 spin_lock_irq(&gcwq->lock);
1309
1310 if (need_to_create_worker(gcwq)) {
1311 /*
1312 * We've been trying to create a new worker but
1313 * haven't been successful. We might be hitting an
1314 * allocation deadlock. Send distress signals to
1315 * rescuers.
1316 */
1317 list_for_each_entry(work, &gcwq->worklist, entry)
1318 send_mayday(work);
1319 }
1320
1321 spin_unlock_irq(&gcwq->lock);
1322
1323 mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INTERVAL);
1324}
1325
1326/**
1327 * maybe_create_worker - create a new worker if necessary
1328 * @gcwq: gcwq to create a new worker for
1329 *
1330 * Create a new worker for @gcwq if necessary. @gcwq is guaranteed to
1331 * have at least one idle worker on return from this function. If
1332 * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
1333 * sent to all rescuers with works scheduled on @gcwq to resolve
1334 * possible allocation deadlock.
1335 *
1336 * On return, need_to_create_worker() is guaranteed to be false and
1337 * may_start_working() true.
1338 *
1339 * LOCKING:
1340 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1341 * multiple times. Does GFP_KERNEL allocations. Called only from
1342 * manager.
1343 *
1344 * RETURNS:
1345 * false if no action was taken and gcwq->lock stayed locked, true
1346 * otherwise.
1347 */
1348static bool maybe_create_worker(struct global_cwq *gcwq)
1349{
1350 if (!need_to_create_worker(gcwq))
1351 return false;
1352restart:
1353 /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
1354 mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
1355
1356 while (true) {
1357 struct worker *worker;
1358
1359 spin_unlock_irq(&gcwq->lock);
1360
1361 worker = create_worker(gcwq, true);
1362 if (worker) {
1363 del_timer_sync(&gcwq->mayday_timer);
1364 spin_lock_irq(&gcwq->lock);
1365 start_worker(worker);
1366 BUG_ON(need_to_create_worker(gcwq));
1367 return true;
1368 }
1369
1370 if (!need_to_create_worker(gcwq))
1371 break;
1372
1373 spin_unlock_irq(&gcwq->lock);
1374 __set_current_state(TASK_INTERRUPTIBLE);
1375 schedule_timeout(CREATE_COOLDOWN);
1376 spin_lock_irq(&gcwq->lock);
1377 if (!need_to_create_worker(gcwq))
1378 break;
1379 }
1380
1381 spin_unlock_irq(&gcwq->lock);
1382 del_timer_sync(&gcwq->mayday_timer);
1383 spin_lock_irq(&gcwq->lock);
1384 if (need_to_create_worker(gcwq))
1385 goto restart;
1386 return true;
1387}
1388
1389/**
1390 * maybe_destroy_worker - destroy workers which have been idle for a while
1391 * @gcwq: gcwq to destroy workers for
1392 *
1393 * Destroy @gcwq workers which have been idle for longer than
1394 * IDLE_WORKER_TIMEOUT.
1395 *
1396 * LOCKING:
1397 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1398 * multiple times. Called only from manager.
1399 *
1400 * RETURNS:
1401 * false if no action was taken and gcwq->lock stayed locked, true
1402 * otherwise.
1403 */
1404static bool maybe_destroy_workers(struct global_cwq *gcwq)
1405{
1406 bool ret = false;
1407
1408 while (too_many_workers(gcwq)) {
1409 struct worker *worker;
1410 unsigned long expires;
1411
1412 worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
1413 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1414
1415 if (time_before(jiffies, expires)) {
1416 mod_timer(&gcwq->idle_timer, expires);
1417 break;
1418 }
1419
1420 destroy_worker(worker);
1421 ret = true;
1422 }
1423
1424 return ret;
1425}
1426
1427/**
1428 * manage_workers - manage worker pool
1429 * @worker: self
1430 *
1431 * Assume the manager role and manage gcwq worker pool @worker belongs
1432 * to. At any given time, there can be only zero or one manager per
1433 * gcwq. The exclusion is handled automatically by this function.
1434 *
1435 * The caller can safely start processing works on false return. On
1436 * true return, it's guaranteed that need_to_create_worker() is false
1437 * and may_start_working() is true.
1438 *
1439 * CONTEXT:
1440 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1441 * multiple times. Does GFP_KERNEL allocations.
1442 *
1443 * RETURNS:
1444 * false if no action was taken and gcwq->lock stayed locked, true if
1445 * some action was taken.
1446 */
1447static bool manage_workers(struct worker *worker)
1448{
1449 struct global_cwq *gcwq = worker->gcwq;
1450 bool ret = false;
1451
1452 if (gcwq->flags & GCWQ_MANAGING_WORKERS)
1453 return ret;
1454
1455 gcwq->flags &= ~GCWQ_MANAGE_WORKERS;
1456 gcwq->flags |= GCWQ_MANAGING_WORKERS;
1457
1458 /*
1459 * Destroy and then create so that may_start_working() is true
1460 * on return.
1461 */
1462 ret |= maybe_destroy_workers(gcwq);
1463 ret |= maybe_create_worker(gcwq);
1464
1465 gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
1466
1467 /*
1468 * The trustee might be waiting to take over the manager
1469 * position, tell it we're done.
1470 */
1471 if (unlikely(gcwq->trustee))
1472 wake_up_all(&gcwq->trustee_wait);
1473
1474 return ret;
1475}
1476
966/** 1477/**
967 * move_linked_works - move linked works to a list 1478 * move_linked_works - move linked works to a list
968 * @work: start of series of works to be scheduled 1479 * @work: start of series of works to be scheduled
@@ -1169,24 +1680,39 @@ static void process_scheduled_works(struct worker *worker)
1169 * worker_thread - the worker thread function 1680 * worker_thread - the worker thread function
1170 * @__worker: self 1681 * @__worker: self
1171 * 1682 *
1172 * The cwq worker thread function. 1683 * The gcwq worker thread function. There's a single dynamic pool of
1684 * these per each cpu. These workers process all works regardless of
1685 * their specific target workqueue. The only exception is works which
1686 * belong to workqueues with a rescuer which will be explained in
1687 * rescuer_thread().
1173 */ 1688 */
1174static int worker_thread(void *__worker) 1689static int worker_thread(void *__worker)
1175{ 1690{
1176 struct worker *worker = __worker; 1691 struct worker *worker = __worker;
1177 struct global_cwq *gcwq = worker->gcwq; 1692 struct global_cwq *gcwq = worker->gcwq;
1178 1693
1694 /* tell the scheduler that this is a workqueue worker */
1695 worker->task->flags |= PF_WQ_WORKER;
1179woke_up: 1696woke_up:
1180 spin_lock_irq(&gcwq->lock); 1697 spin_lock_irq(&gcwq->lock);
1181 1698
1182 /* DIE can be set only while we're idle, checking here is enough */ 1699 /* DIE can be set only while we're idle, checking here is enough */
1183 if (worker->flags & WORKER_DIE) { 1700 if (worker->flags & WORKER_DIE) {
1184 spin_unlock_irq(&gcwq->lock); 1701 spin_unlock_irq(&gcwq->lock);
1702 worker->task->flags &= ~PF_WQ_WORKER;
1185 return 0; 1703 return 0;
1186 } 1704 }
1187 1705
1188 worker_leave_idle(worker); 1706 worker_leave_idle(worker);
1189recheck: 1707recheck:
1708 /* no more worker necessary? */
1709 if (!need_more_worker(gcwq))
1710 goto sleep;
1711
1712 /* do we need to manage? */
1713 if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
1714 goto recheck;
1715
1190 /* 1716 /*
1191 * ->scheduled list can only be filled while a worker is 1717 * ->scheduled list can only be filled while a worker is
1192 * preparing to process a work or actually processing it. 1718 * preparing to process a work or actually processing it.
@@ -1194,27 +1720,18 @@ recheck:
1194 */ 1720 */
1195 BUG_ON(!list_empty(&worker->scheduled)); 1721 BUG_ON(!list_empty(&worker->scheduled));
1196 1722
1197 while (!list_empty(&gcwq->worklist)) { 1723 /*
1724 * When control reaches this point, we're guaranteed to have
1725 * at least one idle worker or that someone else has already
1726 * assumed the manager role.
1727 */
1728 worker_clr_flags(worker, WORKER_PREP);
1729
1730 do {
1198 struct work_struct *work = 1731 struct work_struct *work =
1199 list_first_entry(&gcwq->worklist, 1732 list_first_entry(&gcwq->worklist,
1200 struct work_struct, entry); 1733 struct work_struct, entry);
1201 1734
1202 /*
1203 * The following is a rather inefficient way to close
1204 * race window against cpu hotplug operations. Will
1205 * be replaced soon.
1206 */
1207 if (unlikely(!(worker->flags & WORKER_ROGUE) &&
1208 !cpumask_equal(&worker->task->cpus_allowed,
1209 get_cpu_mask(gcwq->cpu)))) {
1210 spin_unlock_irq(&gcwq->lock);
1211 set_cpus_allowed_ptr(worker->task,
1212 get_cpu_mask(gcwq->cpu));
1213 cpu_relax();
1214 spin_lock_irq(&gcwq->lock);
1215 goto recheck;
1216 }
1217
1218 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) { 1735 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
1219 /* optimization path, not strictly necessary */ 1736 /* optimization path, not strictly necessary */
1220 process_one_work(worker, work); 1737 process_one_work(worker, work);
@@ -1224,13 +1741,19 @@ recheck:
1224 move_linked_works(work, &worker->scheduled, NULL); 1741 move_linked_works(work, &worker->scheduled, NULL);
1225 process_scheduled_works(worker); 1742 process_scheduled_works(worker);
1226 } 1743 }
1227 } 1744 } while (keep_working(gcwq));
1745
1746 worker_set_flags(worker, WORKER_PREP, false);
1228 1747
1748 if (unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))
1749 goto recheck;
1750sleep:
1229 /* 1751 /*
1230 * gcwq->lock is held and there's no work to process, sleep. 1752 * gcwq->lock is held and there's no work to process and no
1231 * Workers are woken up only while holding gcwq->lock, so 1753 * need to manage, sleep. Workers are woken up only while
1232 * setting the current state before releasing gcwq->lock is 1754 * holding gcwq->lock or from local cpu, so setting the
1233 * enough to prevent losing any event. 1755 * current state before releasing gcwq->lock is enough to
1756 * prevent losing any event.
1234 */ 1757 */
1235 worker_enter_idle(worker); 1758 worker_enter_idle(worker);
1236 __set_current_state(TASK_INTERRUPTIBLE); 1759 __set_current_state(TASK_INTERRUPTIBLE);
@@ -1239,6 +1762,68 @@ recheck:
1239 goto woke_up; 1762 goto woke_up;
1240} 1763}
1241 1764
1765/**
1766 * rescuer_thread - the rescuer thread function
1767 * @__wq: the associated workqueue
1768 *
1769 * Workqueue rescuer thread function. There's one rescuer for each
1770 * workqueue which has WQ_RESCUER set.
1771 *
1772 * Regular work processing on a gcwq may block trying to create a new
1773 * worker which uses GFP_KERNEL allocation which has slight chance of
1774 * developing into deadlock if some works currently on the same queue
1775 * need to be processed to satisfy the GFP_KERNEL allocation. This is
1776 * the problem rescuer solves.
1777 *
1778 * When such condition is possible, the gcwq summons rescuers of all
1779 * workqueues which have works queued on the gcwq and let them process
1780 * those works so that forward progress can be guaranteed.
1781 *
1782 * This should happen rarely.
1783 */
1784static int rescuer_thread(void *__wq)
1785{
1786 struct workqueue_struct *wq = __wq;
1787 struct worker *rescuer = wq->rescuer;
1788 struct list_head *scheduled = &rescuer->scheduled;
1789 unsigned int cpu;
1790
1791 set_user_nice(current, RESCUER_NICE_LEVEL);
1792repeat:
1793 set_current_state(TASK_INTERRUPTIBLE);
1794
1795 if (kthread_should_stop())
1796 return 0;
1797
1798 for_each_cpu(cpu, wq->mayday_mask) {
1799 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1800 struct global_cwq *gcwq = cwq->gcwq;
1801 struct work_struct *work, *n;
1802
1803 __set_current_state(TASK_RUNNING);
1804 cpumask_clear_cpu(cpu, wq->mayday_mask);
1805
1806 /* migrate to the target cpu if possible */
1807 rescuer->gcwq = gcwq;
1808 worker_maybe_bind_and_lock(rescuer);
1809
1810 /*
1811 * Slurp in all works issued via this workqueue and
1812 * process'em.
1813 */
1814 BUG_ON(!list_empty(&rescuer->scheduled));
1815 list_for_each_entry_safe(work, n, &gcwq->worklist, entry)
1816 if (get_work_cwq(work) == cwq)
1817 move_linked_works(work, scheduled, &n);
1818
1819 process_scheduled_works(rescuer);
1820 spin_unlock_irq(&gcwq->lock);
1821 }
1822
1823 schedule();
1824 goto repeat;
1825}
1826
1242struct wq_barrier { 1827struct wq_barrier {
1243 struct work_struct work; 1828 struct work_struct work;
1244 struct completion done; 1829 struct completion done;
@@ -1998,7 +2583,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1998 const char *lock_name) 2583 const char *lock_name)
1999{ 2584{
2000 struct workqueue_struct *wq; 2585 struct workqueue_struct *wq;
2001 bool failed = false;
2002 unsigned int cpu; 2586 unsigned int cpu;
2003 2587
2004 max_active = clamp_val(max_active, 1, INT_MAX); 2588 max_active = clamp_val(max_active, 1, INT_MAX);
@@ -2023,13 +2607,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
2023 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 2607 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
2024 INIT_LIST_HEAD(&wq->list); 2608 INIT_LIST_HEAD(&wq->list);
2025 2609
2026 cpu_maps_update_begin();
2027 /*
2028 * We must initialize cwqs for each possible cpu even if we
2029 * are going to call destroy_workqueue() finally. Otherwise
2030 * cpu_up() can hit the uninitialized cwq once we drop the
2031 * lock.
2032 */
2033 for_each_possible_cpu(cpu) { 2610 for_each_possible_cpu(cpu) {
2034 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 2611 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2035 struct global_cwq *gcwq = get_gcwq(cpu); 2612 struct global_cwq *gcwq = get_gcwq(cpu);
@@ -2040,14 +2617,25 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
2040 cwq->flush_color = -1; 2617 cwq->flush_color = -1;
2041 cwq->max_active = max_active; 2618 cwq->max_active = max_active;
2042 INIT_LIST_HEAD(&cwq->delayed_works); 2619 INIT_LIST_HEAD(&cwq->delayed_works);
2620 }
2043 2621
2044 if (failed) 2622 if (flags & WQ_RESCUER) {
2045 continue; 2623 struct worker *rescuer;
2046 cwq->worker = create_worker(gcwq, cpu_online(cpu)); 2624
2047 if (cwq->worker) 2625 if (!alloc_cpumask_var(&wq->mayday_mask, GFP_KERNEL))
2048 start_worker(cwq->worker); 2626 goto err;
2049 else 2627
2050 failed = true; 2628 wq->rescuer = rescuer = alloc_worker();
2629 if (!rescuer)
2630 goto err;
2631
2632 rescuer->task = kthread_create(rescuer_thread, wq, "%s", name);
2633 if (IS_ERR(rescuer->task))
2634 goto err;
2635
2636 wq->rescuer = rescuer;
2637 rescuer->task->flags |= PF_THREAD_BOUND;
2638 wake_up_process(rescuer->task);
2051 } 2639 }
2052 2640
2053 /* 2641 /*
@@ -2065,16 +2653,12 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
2065 2653
2066 spin_unlock(&workqueue_lock); 2654 spin_unlock(&workqueue_lock);
2067 2655
2068 cpu_maps_update_done();
2069
2070 if (failed) {
2071 destroy_workqueue(wq);
2072 wq = NULL;
2073 }
2074 return wq; 2656 return wq;
2075err: 2657err:
2076 if (wq) { 2658 if (wq) {
2077 free_cwqs(wq->cpu_wq); 2659 free_cwqs(wq->cpu_wq);
2660 free_cpumask_var(wq->mayday_mask);
2661 kfree(wq->rescuer);
2078 kfree(wq); 2662 kfree(wq);
2079 } 2663 }
2080 return NULL; 2664 return NULL;
@@ -2097,42 +2681,26 @@ void destroy_workqueue(struct workqueue_struct *wq)
2097 * wq list is used to freeze wq, remove from list after 2681 * wq list is used to freeze wq, remove from list after
2098 * flushing is complete in case freeze races us. 2682 * flushing is complete in case freeze races us.
2099 */ 2683 */
2100 cpu_maps_update_begin();
2101 spin_lock(&workqueue_lock); 2684 spin_lock(&workqueue_lock);
2102 list_del(&wq->list); 2685 list_del(&wq->list);
2103 spin_unlock(&workqueue_lock); 2686 spin_unlock(&workqueue_lock);
2104 cpu_maps_update_done();
2105 2687
2688 /* sanity check */
2106 for_each_possible_cpu(cpu) { 2689 for_each_possible_cpu(cpu) {
2107 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 2690 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2108 struct global_cwq *gcwq = cwq->gcwq;
2109 int i; 2691 int i;
2110 2692
2111 if (cwq->worker) {
2112 retry:
2113 spin_lock_irq(&gcwq->lock);
2114 /*
2115 * Worker can only be destroyed while idle.
2116 * Wait till it becomes idle. This is ugly
2117 * and prone to starvation. It will go away
2118 * once dynamic worker pool is implemented.
2119 */
2120 if (!(cwq->worker->flags & WORKER_IDLE)) {
2121 spin_unlock_irq(&gcwq->lock);
2122 msleep(100);
2123 goto retry;
2124 }
2125 destroy_worker(cwq->worker);
2126 cwq->worker = NULL;
2127 spin_unlock_irq(&gcwq->lock);
2128 }
2129
2130 for (i = 0; i < WORK_NR_COLORS; i++) 2693 for (i = 0; i < WORK_NR_COLORS; i++)
2131 BUG_ON(cwq->nr_in_flight[i]); 2694 BUG_ON(cwq->nr_in_flight[i]);
2132 BUG_ON(cwq->nr_active); 2695 BUG_ON(cwq->nr_active);
2133 BUG_ON(!list_empty(&cwq->delayed_works)); 2696 BUG_ON(!list_empty(&cwq->delayed_works));
2134 } 2697 }
2135 2698
2699 if (wq->flags & WQ_RESCUER) {
2700 kthread_stop(wq->rescuer->task);
2701 free_cpumask_var(wq->mayday_mask);
2702 }
2703
2136 free_cwqs(wq->cpu_wq); 2704 free_cwqs(wq->cpu_wq);
2137 kfree(wq); 2705 kfree(wq);
2138} 2706}
@@ -2141,10 +2709,18 @@ EXPORT_SYMBOL_GPL(destroy_workqueue);
2141/* 2709/*
2142 * CPU hotplug. 2710 * CPU hotplug.
2143 * 2711 *
2144 * CPU hotplug is implemented by allowing cwqs to be detached from 2712 * There are two challenges in supporting CPU hotplug. Firstly, there
2145 * CPU, running with unbound workers and allowing them to be 2713 * are a lot of assumptions on strong associations among work, cwq and
2146 * reattached later if the cpu comes back online. A separate thread 2714 * gcwq which make migrating pending and scheduled works very
2147 * is created to govern cwqs in such state and is called the trustee. 2715 * difficult to implement without impacting hot paths. Secondly,
2716 * gcwqs serve mix of short, long and very long running works making
2717 * blocked draining impractical.
2718 *
2719 * This is solved by allowing a gcwq to be detached from CPU, running
2720 * it with unbound (rogue) workers and allowing it to be reattached
2721 * later if the cpu comes back online. A separate thread is created
2722 * to govern a gcwq in such state and is called the trustee of the
2723 * gcwq.
2148 * 2724 *
2149 * Trustee states and their descriptions. 2725 * Trustee states and their descriptions.
2150 * 2726 *
@@ -2152,11 +2728,12 @@ EXPORT_SYMBOL_GPL(destroy_workqueue);
2152 * new trustee is started with this state. 2728 * new trustee is started with this state.
2153 * 2729 *
2154 * IN_CHARGE Once started, trustee will enter this state after 2730 * IN_CHARGE Once started, trustee will enter this state after
2155 * making all existing workers rogue. DOWN_PREPARE waits 2731 * assuming the manager role and making all existing
2156 * for trustee to enter this state. After reaching 2732 * workers rogue. DOWN_PREPARE waits for trustee to
2157 * IN_CHARGE, trustee tries to execute the pending 2733 * enter this state. After reaching IN_CHARGE, trustee
2158 * worklist until it's empty and the state is set to 2734 * tries to execute the pending worklist until it's empty
2159 * BUTCHER, or the state is set to RELEASE. 2735 * and the state is set to BUTCHER, or the state is set
2736 * to RELEASE.
2160 * 2737 *
2161 * BUTCHER Command state which is set by the cpu callback after 2738 * BUTCHER Command state which is set by the cpu callback after
2162 * the cpu has went down. Once this state is set trustee 2739 * the cpu has went down. Once this state is set trustee
@@ -2167,7 +2744,9 @@ EXPORT_SYMBOL_GPL(destroy_workqueue);
2167 * RELEASE Command state which is set by the cpu callback if the 2744 * RELEASE Command state which is set by the cpu callback if the
2168 * cpu down has been canceled or it has come online 2745 * cpu down has been canceled or it has come online
2169 * again. After recognizing this state, trustee stops 2746 * again. After recognizing this state, trustee stops
2170 * trying to drain or butcher and transits to DONE. 2747 * trying to drain or butcher and clears ROGUE, rebinds
2748 * all remaining workers back to the cpu and releases
2749 * manager role.
2171 * 2750 *
2172 * DONE Trustee will enter this state after BUTCHER or RELEASE 2751 * DONE Trustee will enter this state after BUTCHER or RELEASE
2173 * is complete. 2752 * is complete.
@@ -2233,17 +2812,24 @@ static int __cpuinit trustee_thread(void *__gcwq)
2233{ 2812{
2234 struct global_cwq *gcwq = __gcwq; 2813 struct global_cwq *gcwq = __gcwq;
2235 struct worker *worker; 2814 struct worker *worker;
2815 struct work_struct *work;
2236 struct hlist_node *pos; 2816 struct hlist_node *pos;
2817 long rc;
2237 int i; 2818 int i;
2238 2819
2239 BUG_ON(gcwq->cpu != smp_processor_id()); 2820 BUG_ON(gcwq->cpu != smp_processor_id());
2240 2821
2241 spin_lock_irq(&gcwq->lock); 2822 spin_lock_irq(&gcwq->lock);
2242 /* 2823 /*
2243 * Make all workers rogue. Trustee must be bound to the 2824 * Claim the manager position and make all workers rogue.
2244 * target cpu and can't be cancelled. 2825 * Trustee must be bound to the target cpu and can't be
2826 * cancelled.
2245 */ 2827 */
2246 BUG_ON(gcwq->cpu != smp_processor_id()); 2828 BUG_ON(gcwq->cpu != smp_processor_id());
2829 rc = trustee_wait_event(!(gcwq->flags & GCWQ_MANAGING_WORKERS));
2830 BUG_ON(rc < 0);
2831
2832 gcwq->flags |= GCWQ_MANAGING_WORKERS;
2247 2833
2248 list_for_each_entry(worker, &gcwq->idle_list, entry) 2834 list_for_each_entry(worker, &gcwq->idle_list, entry)
2249 worker_set_flags(worker, WORKER_ROGUE, false); 2835 worker_set_flags(worker, WORKER_ROGUE, false);
@@ -2252,6 +2838,28 @@ static int __cpuinit trustee_thread(void *__gcwq)
2252 worker_set_flags(worker, WORKER_ROGUE, false); 2838 worker_set_flags(worker, WORKER_ROGUE, false);
2253 2839
2254 /* 2840 /*
2841 * Call schedule() so that we cross rq->lock and thus can
2842 * guarantee sched callbacks see the rogue flag. This is
2843 * necessary as scheduler callbacks may be invoked from other
2844 * cpus.
2845 */
2846 spin_unlock_irq(&gcwq->lock);
2847 schedule();
2848 spin_lock_irq(&gcwq->lock);
2849
2850 /*
2851 * Sched callbacks are disabled now. gcwq->nr_running should
2852 * be zero and will stay that way, making need_more_worker()
2853 * and keep_working() always return true as long as the
2854 * worklist is not empty.
2855 */
2856 WARN_ON_ONCE(atomic_read(get_gcwq_nr_running(gcwq->cpu)) != 0);
2857
2858 spin_unlock_irq(&gcwq->lock);
2859 del_timer_sync(&gcwq->idle_timer);
2860 spin_lock_irq(&gcwq->lock);
2861
2862 /*
2255 * We're now in charge. Notify and proceed to drain. We need 2863 * We're now in charge. Notify and proceed to drain. We need
2256 * to keep the gcwq running during the whole CPU down 2864 * to keep the gcwq running during the whole CPU down
2257 * procedure as other cpu hotunplug callbacks may need to 2865 * procedure as other cpu hotunplug callbacks may need to
@@ -2263,18 +2871,90 @@ static int __cpuinit trustee_thread(void *__gcwq)
2263 /* 2871 /*
2264 * The original cpu is in the process of dying and may go away 2872 * The original cpu is in the process of dying and may go away
2265 * anytime now. When that happens, we and all workers would 2873 * anytime now. When that happens, we and all workers would
2266 * be migrated to other cpus. Try draining any left work. 2874 * be migrated to other cpus. Try draining any left work. We
2267 * Note that if the gcwq is frozen, there may be frozen works 2875 * want to get it over with ASAP - spam rescuers, wake up as
2268 * in freezeable cwqs. Don't declare completion while frozen. 2876 * many idlers as necessary and create new ones till the
2877 * worklist is empty. Note that if the gcwq is frozen, there
2878 * may be frozen works in freezeable cwqs. Don't declare
2879 * completion while frozen.
2269 */ 2880 */
2270 while (gcwq->nr_workers != gcwq->nr_idle || 2881 while (gcwq->nr_workers != gcwq->nr_idle ||
2271 gcwq->flags & GCWQ_FREEZING || 2882 gcwq->flags & GCWQ_FREEZING ||
2272 gcwq->trustee_state == TRUSTEE_IN_CHARGE) { 2883 gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
2884 int nr_works = 0;
2885
2886 list_for_each_entry(work, &gcwq->worklist, entry) {
2887 send_mayday(work);
2888 nr_works++;
2889 }
2890
2891 list_for_each_entry(worker, &gcwq->idle_list, entry) {
2892 if (!nr_works--)
2893 break;
2894 wake_up_process(worker->task);
2895 }
2896
2897 if (need_to_create_worker(gcwq)) {
2898 spin_unlock_irq(&gcwq->lock);
2899 worker = create_worker(gcwq, false);
2900 spin_lock_irq(&gcwq->lock);
2901 if (worker) {
2902 worker_set_flags(worker, WORKER_ROGUE, false);
2903 start_worker(worker);
2904 }
2905 }
2906
2273 /* give a breather */ 2907 /* give a breather */
2274 if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0) 2908 if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
2275 break; 2909 break;
2276 } 2910 }
2277 2911
2912 /*
2913 * Either all works have been scheduled and cpu is down, or
2914 * cpu down has already been canceled. Wait for and butcher
2915 * all workers till we're canceled.
2916 */
2917 do {
2918 rc = trustee_wait_event(!list_empty(&gcwq->idle_list));
2919 while (!list_empty(&gcwq->idle_list))
2920 destroy_worker(list_first_entry(&gcwq->idle_list,
2921 struct worker, entry));
2922 } while (gcwq->nr_workers && rc >= 0);
2923
2924 /*
2925 * At this point, either draining has completed and no worker
2926 * is left, or cpu down has been canceled or the cpu is being
2927 * brought back up. There shouldn't be any idle one left.
2928 * Tell the remaining busy ones to rebind once it finishes the
2929 * currently scheduled works by scheduling the rebind_work.
2930 */
2931 WARN_ON(!list_empty(&gcwq->idle_list));
2932
2933 for_each_busy_worker(worker, i, pos, gcwq) {
2934 struct work_struct *rebind_work = &worker->rebind_work;
2935
2936 /*
2937 * Rebind_work may race with future cpu hotplug
2938 * operations. Use a separate flag to mark that
2939 * rebinding is scheduled.
2940 */
2941 worker_set_flags(worker, WORKER_REBIND, false);
2942 worker_clr_flags(worker, WORKER_ROGUE);
2943
2944 /* queue rebind_work, wq doesn't matter, use the default one */
2945 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
2946 work_data_bits(rebind_work)))
2947 continue;
2948
2949 debug_work_activate(rebind_work);
2950 insert_work(get_cwq(gcwq->cpu, keventd_wq), rebind_work,
2951 worker->scheduled.next,
2952 work_color_to_flags(WORK_NO_COLOR));
2953 }
2954
2955 /* relinquish manager role */
2956 gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
2957
2278 /* notify completion */ 2958 /* notify completion */
2279 gcwq->trustee = NULL; 2959 gcwq->trustee = NULL;
2280 gcwq->trustee_state = TRUSTEE_DONE; 2960 gcwq->trustee_state = TRUSTEE_DONE;
@@ -2313,10 +2993,8 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
2313 unsigned int cpu = (unsigned long)hcpu; 2993 unsigned int cpu = (unsigned long)hcpu;
2314 struct global_cwq *gcwq = get_gcwq(cpu); 2994 struct global_cwq *gcwq = get_gcwq(cpu);
2315 struct task_struct *new_trustee = NULL; 2995 struct task_struct *new_trustee = NULL;
2316 struct worker *worker; 2996 struct worker *uninitialized_var(new_worker);
2317 struct hlist_node *pos;
2318 unsigned long flags; 2997 unsigned long flags;
2319 int i;
2320 2998
2321 action &= ~CPU_TASKS_FROZEN; 2999 action &= ~CPU_TASKS_FROZEN;
2322 3000
@@ -2327,6 +3005,15 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
2327 if (IS_ERR(new_trustee)) 3005 if (IS_ERR(new_trustee))
2328 return notifier_from_errno(PTR_ERR(new_trustee)); 3006 return notifier_from_errno(PTR_ERR(new_trustee));
2329 kthread_bind(new_trustee, cpu); 3007 kthread_bind(new_trustee, cpu);
3008 /* fall through */
3009 case CPU_UP_PREPARE:
3010 BUG_ON(gcwq->first_idle);
3011 new_worker = create_worker(gcwq, false);
3012 if (!new_worker) {
3013 if (new_trustee)
3014 kthread_stop(new_trustee);
3015 return NOTIFY_BAD;
3016 }
2330 } 3017 }
2331 3018
2332 /* some are called w/ irq disabled, don't disturb irq status */ 3019 /* some are called w/ irq disabled, don't disturb irq status */
@@ -2340,26 +3027,50 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
2340 gcwq->trustee_state = TRUSTEE_START; 3027 gcwq->trustee_state = TRUSTEE_START;
2341 wake_up_process(gcwq->trustee); 3028 wake_up_process(gcwq->trustee);
2342 wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE); 3029 wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
3030 /* fall through */
3031 case CPU_UP_PREPARE:
3032 BUG_ON(gcwq->first_idle);
3033 gcwq->first_idle = new_worker;
3034 break;
3035
3036 case CPU_DYING:
3037 /*
3038 * Before this, the trustee and all workers except for
3039 * the ones which are still executing works from
3040 * before the last CPU down must be on the cpu. After
3041 * this, they'll all be diasporas.
3042 */
3043 gcwq->flags |= GCWQ_DISASSOCIATED;
2343 break; 3044 break;
2344 3045
2345 case CPU_POST_DEAD: 3046 case CPU_POST_DEAD:
2346 gcwq->trustee_state = TRUSTEE_BUTCHER; 3047 gcwq->trustee_state = TRUSTEE_BUTCHER;
3048 /* fall through */
3049 case CPU_UP_CANCELED:
3050 destroy_worker(gcwq->first_idle);
3051 gcwq->first_idle = NULL;
2347 break; 3052 break;
2348 3053
2349 case CPU_DOWN_FAILED: 3054 case CPU_DOWN_FAILED:
2350 case CPU_ONLINE: 3055 case CPU_ONLINE:
3056 gcwq->flags &= ~GCWQ_DISASSOCIATED;
2351 if (gcwq->trustee_state != TRUSTEE_DONE) { 3057 if (gcwq->trustee_state != TRUSTEE_DONE) {
2352 gcwq->trustee_state = TRUSTEE_RELEASE; 3058 gcwq->trustee_state = TRUSTEE_RELEASE;
2353 wake_up_process(gcwq->trustee); 3059 wake_up_process(gcwq->trustee);
2354 wait_trustee_state(gcwq, TRUSTEE_DONE); 3060 wait_trustee_state(gcwq, TRUSTEE_DONE);
2355 } 3061 }
2356 3062
2357 /* clear ROGUE from all workers */ 3063 /*
2358 list_for_each_entry(worker, &gcwq->idle_list, entry) 3064 * Trustee is done and there might be no worker left.
2359 worker_clr_flags(worker, WORKER_ROGUE); 3065 * Put the first_idle in and request a real manager to
2360 3066 * take a look.
2361 for_each_busy_worker(worker, i, pos, gcwq) 3067 */
2362 worker_clr_flags(worker, WORKER_ROGUE); 3068 spin_unlock_irq(&gcwq->lock);
3069 kthread_bind(gcwq->first_idle->task, cpu);
3070 spin_lock_irq(&gcwq->lock);
3071 gcwq->flags |= GCWQ_MANAGE_WORKERS;
3072 start_worker(gcwq->first_idle);
3073 gcwq->first_idle = NULL;
2363 break; 3074 break;
2364 } 3075 }
2365 3076
@@ -2548,10 +3259,10 @@ void thaw_workqueues(void)
2548 if (wq->single_cpu == gcwq->cpu && 3259 if (wq->single_cpu == gcwq->cpu &&
2549 !cwq->nr_active && list_empty(&cwq->delayed_works)) 3260 !cwq->nr_active && list_empty(&cwq->delayed_works))
2550 cwq_unbind_single_cpu(cwq); 3261 cwq_unbind_single_cpu(cwq);
2551
2552 wake_up_process(cwq->worker->task);
2553 } 3262 }
2554 3263
3264 wake_up_worker(gcwq);
3265
2555 spin_unlock_irq(&gcwq->lock); 3266 spin_unlock_irq(&gcwq->lock);
2556 } 3267 }
2557 3268
@@ -2588,12 +3299,31 @@ void __init init_workqueues(void)
2588 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) 3299 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
2589 INIT_HLIST_HEAD(&gcwq->busy_hash[i]); 3300 INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
2590 3301
3302 init_timer_deferrable(&gcwq->idle_timer);
3303 gcwq->idle_timer.function = idle_worker_timeout;
3304 gcwq->idle_timer.data = (unsigned long)gcwq;
3305
3306 setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,
3307 (unsigned long)gcwq);
3308
2591 ida_init(&gcwq->worker_ida); 3309 ida_init(&gcwq->worker_ida);
2592 3310
2593 gcwq->trustee_state = TRUSTEE_DONE; 3311 gcwq->trustee_state = TRUSTEE_DONE;
2594 init_waitqueue_head(&gcwq->trustee_wait); 3312 init_waitqueue_head(&gcwq->trustee_wait);
2595 } 3313 }
2596 3314
3315 /* create the initial worker */
3316 for_each_online_cpu(cpu) {
3317 struct global_cwq *gcwq = get_gcwq(cpu);
3318 struct worker *worker;
3319
3320 worker = create_worker(gcwq, true);
3321 BUG_ON(!worker);
3322 spin_lock_irq(&gcwq->lock);
3323 start_worker(worker);
3324 spin_unlock_irq(&gcwq->lock);
3325 }
3326
2597 keventd_wq = create_workqueue("events"); 3327 keventd_wq = create_workqueue("events");
2598 BUG_ON(!keventd_wq); 3328 BUG_ON(!keventd_wq);
2599} 3329}