aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c1234
1 files changed, 681 insertions, 553 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 692d97628a10..d951daa0ca9a 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -58,7 +58,7 @@ enum {
58 * be executing on any CPU. The gcwq behaves as an unbound one. 58 * be executing on any CPU. The gcwq behaves as an unbound one.
59 * 59 *
60 * Note that DISASSOCIATED can be flipped only while holding 60 * Note that DISASSOCIATED can be flipped only while holding
61 * managership of all pools on the gcwq to avoid changing binding 61 * assoc_mutex of all pools on the gcwq to avoid changing binding
62 * state while create_worker() is in progress. 62 * state while create_worker() is in progress.
63 */ 63 */
64 GCWQ_DISASSOCIATED = 1 << 0, /* cpu can't serve workers */ 64 GCWQ_DISASSOCIATED = 1 << 0, /* cpu can't serve workers */
@@ -66,17 +66,17 @@ enum {
66 66
67 /* pool flags */ 67 /* pool flags */
68 POOL_MANAGE_WORKERS = 1 << 0, /* need to manage workers */ 68 POOL_MANAGE_WORKERS = 1 << 0, /* need to manage workers */
69 POOL_MANAGING_WORKERS = 1 << 1, /* managing workers */
69 70
70 /* worker flags */ 71 /* worker flags */
71 WORKER_STARTED = 1 << 0, /* started */ 72 WORKER_STARTED = 1 << 0, /* started */
72 WORKER_DIE = 1 << 1, /* die die die */ 73 WORKER_DIE = 1 << 1, /* die die die */
73 WORKER_IDLE = 1 << 2, /* is idle */ 74 WORKER_IDLE = 1 << 2, /* is idle */
74 WORKER_PREP = 1 << 3, /* preparing to run works */ 75 WORKER_PREP = 1 << 3, /* preparing to run works */
75 WORKER_REBIND = 1 << 5, /* mom is home, come back */
76 WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */ 76 WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */
77 WORKER_UNBOUND = 1 << 7, /* worker is unbound */ 77 WORKER_UNBOUND = 1 << 7, /* worker is unbound */
78 78
79 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_REBIND | WORKER_UNBOUND | 79 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_UNBOUND |
80 WORKER_CPU_INTENSIVE, 80 WORKER_CPU_INTENSIVE,
81 81
82 NR_WORKER_POOLS = 2, /* # worker pools per gcwq */ 82 NR_WORKER_POOLS = 2, /* # worker pools per gcwq */
@@ -125,7 +125,6 @@ enum {
125 125
126struct global_cwq; 126struct global_cwq;
127struct worker_pool; 127struct worker_pool;
128struct idle_rebind;
129 128
130/* 129/*
131 * The poor guys doing the actual heavy lifting. All on-duty workers 130 * The poor guys doing the actual heavy lifting. All on-duty workers
@@ -149,7 +148,6 @@ struct worker {
149 int id; /* I: worker id */ 148 int id; /* I: worker id */
150 149
151 /* for rebinding worker to CPU */ 150 /* for rebinding worker to CPU */
152 struct idle_rebind *idle_rebind; /* L: for idle worker */
153 struct work_struct rebind_work; /* L: for busy worker */ 151 struct work_struct rebind_work; /* L: for busy worker */
154}; 152};
155 153
@@ -159,13 +157,15 @@ struct worker_pool {
159 157
160 struct list_head worklist; /* L: list of pending works */ 158 struct list_head worklist; /* L: list of pending works */
161 int nr_workers; /* L: total number of workers */ 159 int nr_workers; /* L: total number of workers */
160
161 /* nr_idle includes the ones off idle_list for rebinding */
162 int nr_idle; /* L: currently idle ones */ 162 int nr_idle; /* L: currently idle ones */
163 163
164 struct list_head idle_list; /* X: list of idle workers */ 164 struct list_head idle_list; /* X: list of idle workers */
165 struct timer_list idle_timer; /* L: worker idle timeout */ 165 struct timer_list idle_timer; /* L: worker idle timeout */
166 struct timer_list mayday_timer; /* L: SOS timer for workers */ 166 struct timer_list mayday_timer; /* L: SOS timer for workers */
167 167
168 struct mutex manager_mutex; /* mutex manager should hold */ 168 struct mutex assoc_mutex; /* protect GCWQ_DISASSOCIATED */
169 struct ida worker_ida; /* L: for worker IDs */ 169 struct ida worker_ida; /* L: for worker IDs */
170}; 170};
171 171
@@ -183,9 +183,8 @@ struct global_cwq {
183 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE]; 183 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
184 /* L: hash of busy workers */ 184 /* L: hash of busy workers */
185 185
186 struct worker_pool pools[2]; /* normal and highpri pools */ 186 struct worker_pool pools[NR_WORKER_POOLS];
187 187 /* normal and highpri pools */
188 wait_queue_head_t rebind_hold; /* rebind hold wait */
189} ____cacheline_aligned_in_smp; 188} ____cacheline_aligned_in_smp;
190 189
191/* 190/*
@@ -268,17 +267,15 @@ struct workqueue_struct {
268}; 267};
269 268
270struct workqueue_struct *system_wq __read_mostly; 269struct workqueue_struct *system_wq __read_mostly;
271struct workqueue_struct *system_long_wq __read_mostly;
272struct workqueue_struct *system_nrt_wq __read_mostly;
273struct workqueue_struct *system_unbound_wq __read_mostly;
274struct workqueue_struct *system_freezable_wq __read_mostly;
275struct workqueue_struct *system_nrt_freezable_wq __read_mostly;
276EXPORT_SYMBOL_GPL(system_wq); 270EXPORT_SYMBOL_GPL(system_wq);
271struct workqueue_struct *system_highpri_wq __read_mostly;
272EXPORT_SYMBOL_GPL(system_highpri_wq);
273struct workqueue_struct *system_long_wq __read_mostly;
277EXPORT_SYMBOL_GPL(system_long_wq); 274EXPORT_SYMBOL_GPL(system_long_wq);
278EXPORT_SYMBOL_GPL(system_nrt_wq); 275struct workqueue_struct *system_unbound_wq __read_mostly;
279EXPORT_SYMBOL_GPL(system_unbound_wq); 276EXPORT_SYMBOL_GPL(system_unbound_wq);
277struct workqueue_struct *system_freezable_wq __read_mostly;
280EXPORT_SYMBOL_GPL(system_freezable_wq); 278EXPORT_SYMBOL_GPL(system_freezable_wq);
281EXPORT_SYMBOL_GPL(system_nrt_freezable_wq);
282 279
283#define CREATE_TRACE_POINTS 280#define CREATE_TRACE_POINTS
284#include <trace/events/workqueue.h> 281#include <trace/events/workqueue.h>
@@ -533,18 +530,24 @@ static int work_next_color(int color)
533} 530}
534 531
535/* 532/*
536 * A work's data points to the cwq with WORK_STRUCT_CWQ set while the 533 * While queued, %WORK_STRUCT_CWQ is set and non flag bits of a work's data
537 * work is on queue. Once execution starts, WORK_STRUCT_CWQ is 534 * contain the pointer to the queued cwq. Once execution starts, the flag
538 * cleared and the work data contains the cpu number it was last on. 535 * is cleared and the high bits contain OFFQ flags and CPU number.
539 * 536 *
540 * set_work_{cwq|cpu}() and clear_work_data() can be used to set the 537 * set_work_cwq(), set_work_cpu_and_clear_pending(), mark_work_canceling()
541 * cwq, cpu or clear work->data. These functions should only be 538 * and clear_work_data() can be used to set the cwq, cpu or clear
542 * called while the work is owned - ie. while the PENDING bit is set. 539 * work->data. These functions should only be called while the work is
540 * owned - ie. while the PENDING bit is set.
543 * 541 *
544 * get_work_[g]cwq() can be used to obtain the gcwq or cwq 542 * get_work_[g]cwq() can be used to obtain the gcwq or cwq corresponding to
545 * corresponding to a work. gcwq is available once the work has been 543 * a work. gcwq is available once the work has been queued anywhere after
546 * queued anywhere after initialization. cwq is available only from 544 * initialization until it is sync canceled. cwq is available only while
547 * queueing until execution starts. 545 * the work item is queued.
546 *
547 * %WORK_OFFQ_CANCELING is used to mark a work item which is being
548 * canceled. While being canceled, a work item may have its PENDING set
549 * but stay off timer and worklist for arbitrarily long and nobody should
550 * try to steal the PENDING bit.
548 */ 551 */
549static inline void set_work_data(struct work_struct *work, unsigned long data, 552static inline void set_work_data(struct work_struct *work, unsigned long data,
550 unsigned long flags) 553 unsigned long flags)
@@ -561,13 +564,22 @@ static void set_work_cwq(struct work_struct *work,
561 WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags); 564 WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags);
562} 565}
563 566
564static void set_work_cpu(struct work_struct *work, unsigned int cpu) 567static void set_work_cpu_and_clear_pending(struct work_struct *work,
568 unsigned int cpu)
565{ 569{
566 set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING); 570 /*
571 * The following wmb is paired with the implied mb in
572 * test_and_set_bit(PENDING) and ensures all updates to @work made
573 * here are visible to and precede any updates by the next PENDING
574 * owner.
575 */
576 smp_wmb();
577 set_work_data(work, (unsigned long)cpu << WORK_OFFQ_CPU_SHIFT, 0);
567} 578}
568 579
569static void clear_work_data(struct work_struct *work) 580static void clear_work_data(struct work_struct *work)
570{ 581{
582 smp_wmb(); /* see set_work_cpu_and_clear_pending() */
571 set_work_data(work, WORK_STRUCT_NO_CPU, 0); 583 set_work_data(work, WORK_STRUCT_NO_CPU, 0);
572} 584}
573 585
@@ -590,7 +602,7 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
590 return ((struct cpu_workqueue_struct *) 602 return ((struct cpu_workqueue_struct *)
591 (data & WORK_STRUCT_WQ_DATA_MASK))->pool->gcwq; 603 (data & WORK_STRUCT_WQ_DATA_MASK))->pool->gcwq;
592 604
593 cpu = data >> WORK_STRUCT_FLAG_BITS; 605 cpu = data >> WORK_OFFQ_CPU_SHIFT;
594 if (cpu == WORK_CPU_NONE) 606 if (cpu == WORK_CPU_NONE)
595 return NULL; 607 return NULL;
596 608
@@ -598,6 +610,22 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
598 return get_gcwq(cpu); 610 return get_gcwq(cpu);
599} 611}
600 612
613static void mark_work_canceling(struct work_struct *work)
614{
615 struct global_cwq *gcwq = get_work_gcwq(work);
616 unsigned long cpu = gcwq ? gcwq->cpu : WORK_CPU_NONE;
617
618 set_work_data(work, (cpu << WORK_OFFQ_CPU_SHIFT) | WORK_OFFQ_CANCELING,
619 WORK_STRUCT_PENDING);
620}
621
622static bool work_is_canceling(struct work_struct *work)
623{
624 unsigned long data = atomic_long_read(&work->data);
625
626 return !(data & WORK_STRUCT_CWQ) && (data & WORK_OFFQ_CANCELING);
627}
628
601/* 629/*
602 * Policy functions. These define the policies on how the global worker 630 * Policy functions. These define the policies on how the global worker
603 * pools are managed. Unless noted otherwise, these functions assume that 631 * pools are managed. Unless noted otherwise, these functions assume that
@@ -652,10 +680,17 @@ static bool need_to_manage_workers(struct worker_pool *pool)
652/* Do we have too many workers and should some go away? */ 680/* Do we have too many workers and should some go away? */
653static bool too_many_workers(struct worker_pool *pool) 681static bool too_many_workers(struct worker_pool *pool)
654{ 682{
655 bool managing = mutex_is_locked(&pool->manager_mutex); 683 bool managing = pool->flags & POOL_MANAGING_WORKERS;
656 int nr_idle = pool->nr_idle + managing; /* manager is considered idle */ 684 int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
657 int nr_busy = pool->nr_workers - nr_idle; 685 int nr_busy = pool->nr_workers - nr_idle;
658 686
687 /*
688 * nr_idle and idle_list may disagree if idle rebinding is in
689 * progress. Never return %true if idle_list is empty.
690 */
691 if (list_empty(&pool->idle_list))
692 return false;
693
659 return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy; 694 return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
660} 695}
661 696
@@ -902,6 +937,206 @@ static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
902} 937}
903 938
904/** 939/**
940 * move_linked_works - move linked works to a list
941 * @work: start of series of works to be scheduled
942 * @head: target list to append @work to
943 * @nextp: out paramter for nested worklist walking
944 *
945 * Schedule linked works starting from @work to @head. Work series to
946 * be scheduled starts at @work and includes any consecutive work with
947 * WORK_STRUCT_LINKED set in its predecessor.
948 *
949 * If @nextp is not NULL, it's updated to point to the next work of
950 * the last scheduled work. This allows move_linked_works() to be
951 * nested inside outer list_for_each_entry_safe().
952 *
953 * CONTEXT:
954 * spin_lock_irq(gcwq->lock).
955 */
956static void move_linked_works(struct work_struct *work, struct list_head *head,
957 struct work_struct **nextp)
958{
959 struct work_struct *n;
960
961 /*
962 * Linked worklist will always end before the end of the list,
963 * use NULL for list head.
964 */
965 list_for_each_entry_safe_from(work, n, NULL, entry) {
966 list_move_tail(&work->entry, head);
967 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
968 break;
969 }
970
971 /*
972 * If we're already inside safe list traversal and have moved
973 * multiple works to the scheduled queue, the next position
974 * needs to be updated.
975 */
976 if (nextp)
977 *nextp = n;
978}
979
980static void cwq_activate_delayed_work(struct work_struct *work)
981{
982 struct cpu_workqueue_struct *cwq = get_work_cwq(work);
983
984 trace_workqueue_activate_work(work);
985 move_linked_works(work, &cwq->pool->worklist, NULL);
986 __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
987 cwq->nr_active++;
988}
989
990static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
991{
992 struct work_struct *work = list_first_entry(&cwq->delayed_works,
993 struct work_struct, entry);
994
995 cwq_activate_delayed_work(work);
996}
997
998/**
999 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
1000 * @cwq: cwq of interest
1001 * @color: color of work which left the queue
1002 *
1003 * A work either has completed or is removed from pending queue,
1004 * decrement nr_in_flight of its cwq and handle workqueue flushing.
1005 *
1006 * CONTEXT:
1007 * spin_lock_irq(gcwq->lock).
1008 */
1009static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
1010{
1011 /* ignore uncolored works */
1012 if (color == WORK_NO_COLOR)
1013 return;
1014
1015 cwq->nr_in_flight[color]--;
1016
1017 cwq->nr_active--;
1018 if (!list_empty(&cwq->delayed_works)) {
1019 /* one down, submit a delayed one */
1020 if (cwq->nr_active < cwq->max_active)
1021 cwq_activate_first_delayed(cwq);
1022 }
1023
1024 /* is flush in progress and are we at the flushing tip? */
1025 if (likely(cwq->flush_color != color))
1026 return;
1027
1028 /* are there still in-flight works? */
1029 if (cwq->nr_in_flight[color])
1030 return;
1031
1032 /* this cwq is done, clear flush_color */
1033 cwq->flush_color = -1;
1034
1035 /*
1036 * If this was the last cwq, wake up the first flusher. It
1037 * will handle the rest.
1038 */
1039 if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
1040 complete(&cwq->wq->first_flusher->done);
1041}
1042
1043/**
1044 * try_to_grab_pending - steal work item from worklist and disable irq
1045 * @work: work item to steal
1046 * @is_dwork: @work is a delayed_work
1047 * @flags: place to store irq state
1048 *
1049 * Try to grab PENDING bit of @work. This function can handle @work in any
1050 * stable state - idle, on timer or on worklist. Return values are
1051 *
1052 * 1 if @work was pending and we successfully stole PENDING
1053 * 0 if @work was idle and we claimed PENDING
1054 * -EAGAIN if PENDING couldn't be grabbed at the moment, safe to busy-retry
1055 * -ENOENT if someone else is canceling @work, this state may persist
1056 * for arbitrarily long
1057 *
1058 * On >= 0 return, the caller owns @work's PENDING bit. To avoid getting
1059 * interrupted while holding PENDING and @work off queue, irq must be
1060 * disabled on entry. This, combined with delayed_work->timer being
1061 * irqsafe, ensures that we return -EAGAIN for finite short period of time.
1062 *
1063 * On successful return, >= 0, irq is disabled and the caller is
1064 * responsible for releasing it using local_irq_restore(*@flags).
1065 *
1066 * This function is safe to call from any context including IRQ handler.
1067 */
1068static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
1069 unsigned long *flags)
1070{
1071 struct global_cwq *gcwq;
1072
1073 local_irq_save(*flags);
1074
1075 /* try to steal the timer if it exists */
1076 if (is_dwork) {
1077 struct delayed_work *dwork = to_delayed_work(work);
1078
1079 /*
1080 * dwork->timer is irqsafe. If del_timer() fails, it's
1081 * guaranteed that the timer is not queued anywhere and not
1082 * running on the local CPU.
1083 */
1084 if (likely(del_timer(&dwork->timer)))
1085 return 1;
1086 }
1087
1088 /* try to claim PENDING the normal way */
1089 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
1090 return 0;
1091
1092 /*
1093 * The queueing is in progress, or it is already queued. Try to
1094 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
1095 */
1096 gcwq = get_work_gcwq(work);
1097 if (!gcwq)
1098 goto fail;
1099
1100 spin_lock(&gcwq->lock);
1101 if (!list_empty(&work->entry)) {
1102 /*
1103 * This work is queued, but perhaps we locked the wrong gcwq.
1104 * In that case we must see the new value after rmb(), see
1105 * insert_work()->wmb().
1106 */
1107 smp_rmb();
1108 if (gcwq == get_work_gcwq(work)) {
1109 debug_work_deactivate(work);
1110
1111 /*
1112 * A delayed work item cannot be grabbed directly
1113 * because it might have linked NO_COLOR work items
1114 * which, if left on the delayed_list, will confuse
1115 * cwq->nr_active management later on and cause
1116 * stall. Make sure the work item is activated
1117 * before grabbing.
1118 */
1119 if (*work_data_bits(work) & WORK_STRUCT_DELAYED)
1120 cwq_activate_delayed_work(work);
1121
1122 list_del_init(&work->entry);
1123 cwq_dec_nr_in_flight(get_work_cwq(work),
1124 get_work_color(work));
1125
1126 spin_unlock(&gcwq->lock);
1127 return 1;
1128 }
1129 }
1130 spin_unlock(&gcwq->lock);
1131fail:
1132 local_irq_restore(*flags);
1133 if (work_is_canceling(work))
1134 return -ENOENT;
1135 cpu_relax();
1136 return -EAGAIN;
1137}
1138
1139/**
905 * insert_work - insert a work into gcwq 1140 * insert_work - insert a work into gcwq
906 * @cwq: cwq @work belongs to 1141 * @cwq: cwq @work belongs to
907 * @work: work to insert 1142 * @work: work to insert
@@ -981,7 +1216,15 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
981 struct cpu_workqueue_struct *cwq; 1216 struct cpu_workqueue_struct *cwq;
982 struct list_head *worklist; 1217 struct list_head *worklist;
983 unsigned int work_flags; 1218 unsigned int work_flags;
984 unsigned long flags; 1219 unsigned int req_cpu = cpu;
1220
1221 /*
1222 * While a work item is PENDING && off queue, a task trying to
1223 * steal the PENDING will busy-loop waiting for it to either get
1224 * queued or lose PENDING. Grabbing PENDING and queueing should
1225 * happen with IRQ disabled.
1226 */
1227 WARN_ON_ONCE(!irqs_disabled());
985 1228
986 debug_work_activate(work); 1229 debug_work_activate(work);
987 1230
@@ -994,21 +1237,22 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
994 if (!(wq->flags & WQ_UNBOUND)) { 1237 if (!(wq->flags & WQ_UNBOUND)) {
995 struct global_cwq *last_gcwq; 1238 struct global_cwq *last_gcwq;
996 1239
997 if (unlikely(cpu == WORK_CPU_UNBOUND)) 1240 if (cpu == WORK_CPU_UNBOUND)
998 cpu = raw_smp_processor_id(); 1241 cpu = raw_smp_processor_id();
999 1242
1000 /* 1243 /*
1001 * It's multi cpu. If @wq is non-reentrant and @work 1244 * It's multi cpu. If @work was previously on a different
1002 * was previously on a different cpu, it might still 1245 * cpu, it might still be running there, in which case the
1003 * be running there, in which case the work needs to 1246 * work needs to be queued on that cpu to guarantee
1004 * be queued on that cpu to guarantee non-reentrance. 1247 * non-reentrancy.
1005 */ 1248 */
1006 gcwq = get_gcwq(cpu); 1249 gcwq = get_gcwq(cpu);
1007 if (wq->flags & WQ_NON_REENTRANT && 1250 last_gcwq = get_work_gcwq(work);
1008 (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) { 1251
1252 if (last_gcwq && last_gcwq != gcwq) {
1009 struct worker *worker; 1253 struct worker *worker;
1010 1254
1011 spin_lock_irqsave(&last_gcwq->lock, flags); 1255 spin_lock(&last_gcwq->lock);
1012 1256
1013 worker = find_worker_executing_work(last_gcwq, work); 1257 worker = find_worker_executing_work(last_gcwq, work);
1014 1258
@@ -1016,22 +1260,23 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
1016 gcwq = last_gcwq; 1260 gcwq = last_gcwq;
1017 else { 1261 else {
1018 /* meh... not running there, queue here */ 1262 /* meh... not running there, queue here */
1019 spin_unlock_irqrestore(&last_gcwq->lock, flags); 1263 spin_unlock(&last_gcwq->lock);
1020 spin_lock_irqsave(&gcwq->lock, flags); 1264 spin_lock(&gcwq->lock);
1021 } 1265 }
1022 } else 1266 } else {
1023 spin_lock_irqsave(&gcwq->lock, flags); 1267 spin_lock(&gcwq->lock);
1268 }
1024 } else { 1269 } else {
1025 gcwq = get_gcwq(WORK_CPU_UNBOUND); 1270 gcwq = get_gcwq(WORK_CPU_UNBOUND);
1026 spin_lock_irqsave(&gcwq->lock, flags); 1271 spin_lock(&gcwq->lock);
1027 } 1272 }
1028 1273
1029 /* gcwq determined, get cwq and queue */ 1274 /* gcwq determined, get cwq and queue */
1030 cwq = get_cwq(gcwq->cpu, wq); 1275 cwq = get_cwq(gcwq->cpu, wq);
1031 trace_workqueue_queue_work(cpu, cwq, work); 1276 trace_workqueue_queue_work(req_cpu, cwq, work);
1032 1277
1033 if (WARN_ON(!list_empty(&work->entry))) { 1278 if (WARN_ON(!list_empty(&work->entry))) {
1034 spin_unlock_irqrestore(&gcwq->lock, flags); 1279 spin_unlock(&gcwq->lock);
1035 return; 1280 return;
1036 } 1281 }
1037 1282
@@ -1049,79 +1294,110 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
1049 1294
1050 insert_work(cwq, work, worklist, work_flags); 1295 insert_work(cwq, work, worklist, work_flags);
1051 1296
1052 spin_unlock_irqrestore(&gcwq->lock, flags); 1297 spin_unlock(&gcwq->lock);
1053} 1298}
1054 1299
1055/** 1300/**
1056 * queue_work - queue work on a workqueue 1301 * queue_work_on - queue work on specific cpu
1302 * @cpu: CPU number to execute work on
1057 * @wq: workqueue to use 1303 * @wq: workqueue to use
1058 * @work: work to queue 1304 * @work: work to queue
1059 * 1305 *
1060 * Returns 0 if @work was already on a queue, non-zero otherwise. 1306 * Returns %false if @work was already on a queue, %true otherwise.
1061 * 1307 *
1062 * We queue the work to the CPU on which it was submitted, but if the CPU dies 1308 * We queue the work to a specific CPU, the caller must ensure it
1063 * it can be processed by another CPU. 1309 * can't go away.
1064 */ 1310 */
1065int queue_work(struct workqueue_struct *wq, struct work_struct *work) 1311bool queue_work_on(int cpu, struct workqueue_struct *wq,
1312 struct work_struct *work)
1066{ 1313{
1067 int ret; 1314 bool ret = false;
1315 unsigned long flags;
1068 1316
1069 ret = queue_work_on(get_cpu(), wq, work); 1317 local_irq_save(flags);
1070 put_cpu(); 1318
1319 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1320 __queue_work(cpu, wq, work);
1321 ret = true;
1322 }
1071 1323
1324 local_irq_restore(flags);
1072 return ret; 1325 return ret;
1073} 1326}
1074EXPORT_SYMBOL_GPL(queue_work); 1327EXPORT_SYMBOL_GPL(queue_work_on);
1075 1328
1076/** 1329/**
1077 * queue_work_on - queue work on specific cpu 1330 * queue_work - queue work on a workqueue
1078 * @cpu: CPU number to execute work on
1079 * @wq: workqueue to use 1331 * @wq: workqueue to use
1080 * @work: work to queue 1332 * @work: work to queue
1081 * 1333 *
1082 * Returns 0 if @work was already on a queue, non-zero otherwise. 1334 * Returns %false if @work was already on a queue, %true otherwise.
1083 * 1335 *
1084 * We queue the work to a specific CPU, the caller must ensure it 1336 * We queue the work to the CPU on which it was submitted, but if the CPU dies
1085 * can't go away. 1337 * it can be processed by another CPU.
1086 */ 1338 */
1087int 1339bool queue_work(struct workqueue_struct *wq, struct work_struct *work)
1088queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
1089{ 1340{
1090 int ret = 0; 1341 return queue_work_on(WORK_CPU_UNBOUND, wq, work);
1091
1092 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1093 __queue_work(cpu, wq, work);
1094 ret = 1;
1095 }
1096 return ret;
1097} 1342}
1098EXPORT_SYMBOL_GPL(queue_work_on); 1343EXPORT_SYMBOL_GPL(queue_work);
1099 1344
1100static void delayed_work_timer_fn(unsigned long __data) 1345void delayed_work_timer_fn(unsigned long __data)
1101{ 1346{
1102 struct delayed_work *dwork = (struct delayed_work *)__data; 1347 struct delayed_work *dwork = (struct delayed_work *)__data;
1103 struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work); 1348 struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
1104 1349
1105 __queue_work(smp_processor_id(), cwq->wq, &dwork->work); 1350 /* should have been called from irqsafe timer with irq already off */
1351 __queue_work(dwork->cpu, cwq->wq, &dwork->work);
1106} 1352}
1353EXPORT_SYMBOL_GPL(delayed_work_timer_fn);
1107 1354
1108/** 1355static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
1109 * queue_delayed_work - queue work on a workqueue after delay 1356 struct delayed_work *dwork, unsigned long delay)
1110 * @wq: workqueue to use
1111 * @dwork: delayable work to queue
1112 * @delay: number of jiffies to wait before queueing
1113 *
1114 * Returns 0 if @work was already on a queue, non-zero otherwise.
1115 */
1116int queue_delayed_work(struct workqueue_struct *wq,
1117 struct delayed_work *dwork, unsigned long delay)
1118{ 1357{
1119 if (delay == 0) 1358 struct timer_list *timer = &dwork->timer;
1120 return queue_work(wq, &dwork->work); 1359 struct work_struct *work = &dwork->work;
1360 unsigned int lcpu;
1361
1362 WARN_ON_ONCE(timer->function != delayed_work_timer_fn ||
1363 timer->data != (unsigned long)dwork);
1364 BUG_ON(timer_pending(timer));
1365 BUG_ON(!list_empty(&work->entry));
1121 1366
1122 return queue_delayed_work_on(-1, wq, dwork, delay); 1367 timer_stats_timer_set_start_info(&dwork->timer);
1368
1369 /*
1370 * This stores cwq for the moment, for the timer_fn. Note that the
1371 * work's gcwq is preserved to allow reentrance detection for
1372 * delayed works.
1373 */
1374 if (!(wq->flags & WQ_UNBOUND)) {
1375 struct global_cwq *gcwq = get_work_gcwq(work);
1376
1377 /*
1378 * If we cannot get the last gcwq from @work directly,
1379 * select the last CPU such that it avoids unnecessarily
1380 * triggering non-reentrancy check in __queue_work().
1381 */
1382 lcpu = cpu;
1383 if (gcwq)
1384 lcpu = gcwq->cpu;
1385 if (lcpu == WORK_CPU_UNBOUND)
1386 lcpu = raw_smp_processor_id();
1387 } else {
1388 lcpu = WORK_CPU_UNBOUND;
1389 }
1390
1391 set_work_cwq(work, get_cwq(lcpu, wq), 0);
1392
1393 dwork->cpu = cpu;
1394 timer->expires = jiffies + delay;
1395
1396 if (unlikely(cpu != WORK_CPU_UNBOUND))
1397 add_timer_on(timer, cpu);
1398 else
1399 add_timer(timer);
1123} 1400}
1124EXPORT_SYMBOL_GPL(queue_delayed_work);
1125 1401
1126/** 1402/**
1127 * queue_delayed_work_on - queue work on specific CPU after delay 1403 * queue_delayed_work_on - queue work on specific CPU after delay
@@ -1130,53 +1406,100 @@ EXPORT_SYMBOL_GPL(queue_delayed_work);
1130 * @dwork: work to queue 1406 * @dwork: work to queue
1131 * @delay: number of jiffies to wait before queueing 1407 * @delay: number of jiffies to wait before queueing
1132 * 1408 *
1133 * Returns 0 if @work was already on a queue, non-zero otherwise. 1409 * Returns %false if @work was already on a queue, %true otherwise. If
1410 * @delay is zero and @dwork is idle, it will be scheduled for immediate
1411 * execution.
1134 */ 1412 */
1135int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 1413bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
1136 struct delayed_work *dwork, unsigned long delay) 1414 struct delayed_work *dwork, unsigned long delay)
1137{ 1415{
1138 int ret = 0;
1139 struct timer_list *timer = &dwork->timer;
1140 struct work_struct *work = &dwork->work; 1416 struct work_struct *work = &dwork->work;
1417 bool ret = false;
1418 unsigned long flags;
1141 1419
1142 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 1420 if (!delay)
1143 unsigned int lcpu; 1421 return queue_work_on(cpu, wq, &dwork->work);
1144 1422
1145 BUG_ON(timer_pending(timer)); 1423 /* read the comment in __queue_work() */
1146 BUG_ON(!list_empty(&work->entry)); 1424 local_irq_save(flags);
1147 1425
1148 timer_stats_timer_set_start_info(&dwork->timer); 1426 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1427 __queue_delayed_work(cpu, wq, dwork, delay);
1428 ret = true;
1429 }
1149 1430
1150 /* 1431 local_irq_restore(flags);
1151 * This stores cwq for the moment, for the timer_fn. 1432 return ret;
1152 * Note that the work's gcwq is preserved to allow 1433}
1153 * reentrance detection for delayed works. 1434EXPORT_SYMBOL_GPL(queue_delayed_work_on);
1154 */
1155 if (!(wq->flags & WQ_UNBOUND)) {
1156 struct global_cwq *gcwq = get_work_gcwq(work);
1157 1435
1158 if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND) 1436/**
1159 lcpu = gcwq->cpu; 1437 * queue_delayed_work - queue work on a workqueue after delay
1160 else 1438 * @wq: workqueue to use
1161 lcpu = raw_smp_processor_id(); 1439 * @dwork: delayable work to queue
1162 } else 1440 * @delay: number of jiffies to wait before queueing
1163 lcpu = WORK_CPU_UNBOUND; 1441 *
1442 * Equivalent to queue_delayed_work_on() but tries to use the local CPU.
1443 */
1444bool queue_delayed_work(struct workqueue_struct *wq,
1445 struct delayed_work *dwork, unsigned long delay)
1446{
1447 return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
1448}
1449EXPORT_SYMBOL_GPL(queue_delayed_work);
1164 1450
1165 set_work_cwq(work, get_cwq(lcpu, wq), 0); 1451/**
1452 * mod_delayed_work_on - modify delay of or queue a delayed work on specific CPU
1453 * @cpu: CPU number to execute work on
1454 * @wq: workqueue to use
1455 * @dwork: work to queue
1456 * @delay: number of jiffies to wait before queueing
1457 *
1458 * If @dwork is idle, equivalent to queue_delayed_work_on(); otherwise,
1459 * modify @dwork's timer so that it expires after @delay. If @delay is
1460 * zero, @work is guaranteed to be scheduled immediately regardless of its
1461 * current state.
1462 *
1463 * Returns %false if @dwork was idle and queued, %true if @dwork was
1464 * pending and its timer was modified.
1465 *
1466 * This function is safe to call from any context including IRQ handler.
1467 * See try_to_grab_pending() for details.
1468 */
1469bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
1470 struct delayed_work *dwork, unsigned long delay)
1471{
1472 unsigned long flags;
1473 int ret;
1166 1474
1167 timer->expires = jiffies + delay; 1475 do {
1168 timer->data = (unsigned long)dwork; 1476 ret = try_to_grab_pending(&dwork->work, true, &flags);
1169 timer->function = delayed_work_timer_fn; 1477 } while (unlikely(ret == -EAGAIN));
1170 1478
1171 if (unlikely(cpu >= 0)) 1479 if (likely(ret >= 0)) {
1172 add_timer_on(timer, cpu); 1480 __queue_delayed_work(cpu, wq, dwork, delay);
1173 else 1481 local_irq_restore(flags);
1174 add_timer(timer);
1175 ret = 1;
1176 } 1482 }
1483
1484 /* -ENOENT from try_to_grab_pending() becomes %true */
1177 return ret; 1485 return ret;
1178} 1486}
1179EXPORT_SYMBOL_GPL(queue_delayed_work_on); 1487EXPORT_SYMBOL_GPL(mod_delayed_work_on);
1488
1489/**
1490 * mod_delayed_work - modify delay of or queue a delayed work
1491 * @wq: workqueue to use
1492 * @dwork: work to queue
1493 * @delay: number of jiffies to wait before queueing
1494 *
1495 * mod_delayed_work_on() on local CPU.
1496 */
1497bool mod_delayed_work(struct workqueue_struct *wq, struct delayed_work *dwork,
1498 unsigned long delay)
1499{
1500 return mod_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
1501}
1502EXPORT_SYMBOL_GPL(mod_delayed_work);
1180 1503
1181/** 1504/**
1182 * worker_enter_idle - enter idle state 1505 * worker_enter_idle - enter idle state
@@ -1304,28 +1627,21 @@ __acquires(&gcwq->lock)
1304 } 1627 }
1305} 1628}
1306 1629
1307struct idle_rebind {
1308 int cnt; /* # workers to be rebound */
1309 struct completion done; /* all workers rebound */
1310};
1311
1312/* 1630/*
1313 * Rebind an idle @worker to its CPU. During CPU onlining, this has to 1631 * Rebind an idle @worker to its CPU. worker_thread() will test
1314 * happen synchronously for idle workers. worker_thread() will test 1632 * list_empty(@worker->entry) before leaving idle and call this function.
1315 * %WORKER_REBIND before leaving idle and call this function.
1316 */ 1633 */
1317static void idle_worker_rebind(struct worker *worker) 1634static void idle_worker_rebind(struct worker *worker)
1318{ 1635{
1319 struct global_cwq *gcwq = worker->pool->gcwq; 1636 struct global_cwq *gcwq = worker->pool->gcwq;
1320 1637
1321 /* CPU must be online at this point */ 1638 /* CPU may go down again inbetween, clear UNBOUND only on success */
1322 WARN_ON(!worker_maybe_bind_and_lock(worker)); 1639 if (worker_maybe_bind_and_lock(worker))
1323 if (!--worker->idle_rebind->cnt) 1640 worker_clr_flags(worker, WORKER_UNBOUND);
1324 complete(&worker->idle_rebind->done);
1325 spin_unlock_irq(&worker->pool->gcwq->lock);
1326 1641
1327 /* we did our part, wait for rebind_workers() to finish up */ 1642 /* rebind complete, become available again */
1328 wait_event(gcwq->rebind_hold, !(worker->flags & WORKER_REBIND)); 1643 list_add(&worker->entry, &worker->pool->idle_list);
1644 spin_unlock_irq(&gcwq->lock);
1329} 1645}
1330 1646
1331/* 1647/*
@@ -1340,7 +1656,7 @@ static void busy_worker_rebind_fn(struct work_struct *work)
1340 struct global_cwq *gcwq = worker->pool->gcwq; 1656 struct global_cwq *gcwq = worker->pool->gcwq;
1341 1657
1342 if (worker_maybe_bind_and_lock(worker)) 1658 if (worker_maybe_bind_and_lock(worker))
1343 worker_clr_flags(worker, WORKER_REBIND); 1659 worker_clr_flags(worker, WORKER_UNBOUND);
1344 1660
1345 spin_unlock_irq(&gcwq->lock); 1661 spin_unlock_irq(&gcwq->lock);
1346} 1662}
@@ -1352,102 +1668,74 @@ static void busy_worker_rebind_fn(struct work_struct *work)
1352 * @gcwq->cpu is coming online. Rebind all workers to the CPU. Rebinding 1668 * @gcwq->cpu is coming online. Rebind all workers to the CPU. Rebinding
1353 * is different for idle and busy ones. 1669 * is different for idle and busy ones.
1354 * 1670 *
1355 * The idle ones should be rebound synchronously and idle rebinding should 1671 * Idle ones will be removed from the idle_list and woken up. They will
1356 * be complete before any worker starts executing work items with 1672 * add themselves back after completing rebind. This ensures that the
1357 * concurrency management enabled; otherwise, scheduler may oops trying to 1673 * idle_list doesn't contain any unbound workers when re-bound busy workers
1358 * wake up non-local idle worker from wq_worker_sleeping(). 1674 * try to perform local wake-ups for concurrency management.
1359 * 1675 *
1360 * This is achieved by repeatedly requesting rebinding until all idle 1676 * Busy workers can rebind after they finish their current work items.
1361 * workers are known to have been rebound under @gcwq->lock and holding all 1677 * Queueing the rebind work item at the head of the scheduled list is
1362 * idle workers from becoming busy until idle rebinding is complete. 1678 * enough. Note that nr_running will be properly bumped as busy workers
1679 * rebind.
1363 * 1680 *
1364 * Once idle workers are rebound, busy workers can be rebound as they 1681 * On return, all non-manager workers are scheduled for rebind - see
1365 * finish executing their current work items. Queueing the rebind work at 1682 * manage_workers() for the manager special case. Any idle worker
1366 * the head of their scheduled lists is enough. Note that nr_running will 1683 * including the manager will not appear on @idle_list until rebind is
1367 * be properbly bumped as busy workers rebind. 1684 * complete, making local wake-ups safe.
1368 *
1369 * On return, all workers are guaranteed to either be bound or have rebind
1370 * work item scheduled.
1371 */ 1685 */
1372static void rebind_workers(struct global_cwq *gcwq) 1686static void rebind_workers(struct global_cwq *gcwq)
1373 __releases(&gcwq->lock) __acquires(&gcwq->lock)
1374{ 1687{
1375 struct idle_rebind idle_rebind;
1376 struct worker_pool *pool; 1688 struct worker_pool *pool;
1377 struct worker *worker; 1689 struct worker *worker, *n;
1378 struct hlist_node *pos; 1690 struct hlist_node *pos;
1379 int i; 1691 int i;
1380 1692
1381 lockdep_assert_held(&gcwq->lock); 1693 lockdep_assert_held(&gcwq->lock);
1382 1694
1383 for_each_worker_pool(pool, gcwq) 1695 for_each_worker_pool(pool, gcwq)
1384 lockdep_assert_held(&pool->manager_mutex); 1696 lockdep_assert_held(&pool->assoc_mutex);
1385 1697
1386 /* 1698 /* dequeue and kick idle ones */
1387 * Rebind idle workers. Interlocked both ways. We wait for
1388 * workers to rebind via @idle_rebind.done. Workers will wait for
1389 * us to finish up by watching %WORKER_REBIND.
1390 */
1391 init_completion(&idle_rebind.done);
1392retry:
1393 idle_rebind.cnt = 1;
1394 INIT_COMPLETION(idle_rebind.done);
1395
1396 /* set REBIND and kick idle ones, we'll wait for these later */
1397 for_each_worker_pool(pool, gcwq) { 1699 for_each_worker_pool(pool, gcwq) {
1398 list_for_each_entry(worker, &pool->idle_list, entry) { 1700 list_for_each_entry_safe(worker, n, &pool->idle_list, entry) {
1399 if (worker->flags & WORKER_REBIND) 1701 /*
1400 continue; 1702 * idle workers should be off @pool->idle_list
1401 1703 * until rebind is complete to avoid receiving
1402 /* morph UNBOUND to REBIND */ 1704 * premature local wake-ups.
1403 worker->flags &= ~WORKER_UNBOUND; 1705 */
1404 worker->flags |= WORKER_REBIND; 1706 list_del_init(&worker->entry);
1405
1406 idle_rebind.cnt++;
1407 worker->idle_rebind = &idle_rebind;
1408 1707
1409 /* worker_thread() will call idle_worker_rebind() */ 1708 /*
1709 * worker_thread() will see the above dequeuing
1710 * and call idle_worker_rebind().
1711 */
1410 wake_up_process(worker->task); 1712 wake_up_process(worker->task);
1411 } 1713 }
1412 } 1714 }
1413 1715
1414 if (--idle_rebind.cnt) {
1415 spin_unlock_irq(&gcwq->lock);
1416 wait_for_completion(&idle_rebind.done);
1417 spin_lock_irq(&gcwq->lock);
1418 /* busy ones might have become idle while waiting, retry */
1419 goto retry;
1420 }
1421
1422 /*
1423 * All idle workers are rebound and waiting for %WORKER_REBIND to
1424 * be cleared inside idle_worker_rebind(). Clear and release.
1425 * Clearing %WORKER_REBIND from this foreign context is safe
1426 * because these workers are still guaranteed to be idle.
1427 */
1428 for_each_worker_pool(pool, gcwq)
1429 list_for_each_entry(worker, &pool->idle_list, entry)
1430 worker->flags &= ~WORKER_REBIND;
1431
1432 wake_up_all(&gcwq->rebind_hold);
1433
1434 /* rebind busy workers */ 1716 /* rebind busy workers */
1435 for_each_busy_worker(worker, i, pos, gcwq) { 1717 for_each_busy_worker(worker, i, pos, gcwq) {
1436 struct work_struct *rebind_work = &worker->rebind_work; 1718 struct work_struct *rebind_work = &worker->rebind_work;
1437 1719 struct workqueue_struct *wq;
1438 /* morph UNBOUND to REBIND */
1439 worker->flags &= ~WORKER_UNBOUND;
1440 worker->flags |= WORKER_REBIND;
1441 1720
1442 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT, 1721 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
1443 work_data_bits(rebind_work))) 1722 work_data_bits(rebind_work)))
1444 continue; 1723 continue;
1445 1724
1446 /* wq doesn't matter, use the default one */
1447 debug_work_activate(rebind_work); 1725 debug_work_activate(rebind_work);
1448 insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work, 1726
1449 worker->scheduled.next, 1727 /*
1450 work_color_to_flags(WORK_NO_COLOR)); 1728 * wq doesn't really matter but let's keep @worker->pool
1729 * and @cwq->pool consistent for sanity.
1730 */
1731 if (worker_pool_pri(worker->pool))
1732 wq = system_highpri_wq;
1733 else
1734 wq = system_wq;
1735
1736 insert_work(get_cwq(gcwq->cpu, wq), rebind_work,
1737 worker->scheduled.next,
1738 work_color_to_flags(WORK_NO_COLOR));
1451 } 1739 }
1452} 1740}
1453 1741
@@ -1794,9 +2082,45 @@ static bool manage_workers(struct worker *worker)
1794 struct worker_pool *pool = worker->pool; 2082 struct worker_pool *pool = worker->pool;
1795 bool ret = false; 2083 bool ret = false;
1796 2084
1797 if (!mutex_trylock(&pool->manager_mutex)) 2085 if (pool->flags & POOL_MANAGING_WORKERS)
1798 return ret; 2086 return ret;
1799 2087
2088 pool->flags |= POOL_MANAGING_WORKERS;
2089
2090 /*
2091 * To simplify both worker management and CPU hotplug, hold off
2092 * management while hotplug is in progress. CPU hotplug path can't
2093 * grab %POOL_MANAGING_WORKERS to achieve this because that can
2094 * lead to idle worker depletion (all become busy thinking someone
2095 * else is managing) which in turn can result in deadlock under
2096 * extreme circumstances. Use @pool->assoc_mutex to synchronize
2097 * manager against CPU hotplug.
2098 *
2099 * assoc_mutex would always be free unless CPU hotplug is in
2100 * progress. trylock first without dropping @gcwq->lock.
2101 */
2102 if (unlikely(!mutex_trylock(&pool->assoc_mutex))) {
2103 spin_unlock_irq(&pool->gcwq->lock);
2104 mutex_lock(&pool->assoc_mutex);
2105 /*
2106 * CPU hotplug could have happened while we were waiting
2107 * for assoc_mutex. Hotplug itself can't handle us
2108 * because manager isn't either on idle or busy list, and
2109 * @gcwq's state and ours could have deviated.
2110 *
2111 * As hotplug is now excluded via assoc_mutex, we can
2112 * simply try to bind. It will succeed or fail depending
2113 * on @gcwq's current state. Try it and adjust
2114 * %WORKER_UNBOUND accordingly.
2115 */
2116 if (worker_maybe_bind_and_lock(worker))
2117 worker->flags &= ~WORKER_UNBOUND;
2118 else
2119 worker->flags |= WORKER_UNBOUND;
2120
2121 ret = true;
2122 }
2123
1800 pool->flags &= ~POOL_MANAGE_WORKERS; 2124 pool->flags &= ~POOL_MANAGE_WORKERS;
1801 2125
1802 /* 2126 /*
@@ -1806,112 +2130,12 @@ static bool manage_workers(struct worker *worker)
1806 ret |= maybe_destroy_workers(pool); 2130 ret |= maybe_destroy_workers(pool);
1807 ret |= maybe_create_worker(pool); 2131 ret |= maybe_create_worker(pool);
1808 2132
1809 mutex_unlock(&pool->manager_mutex); 2133 pool->flags &= ~POOL_MANAGING_WORKERS;
2134 mutex_unlock(&pool->assoc_mutex);
1810 return ret; 2135 return ret;
1811} 2136}
1812 2137
1813/** 2138/**
1814 * move_linked_works - move linked works to a list
1815 * @work: start of series of works to be scheduled
1816 * @head: target list to append @work to
1817 * @nextp: out paramter for nested worklist walking
1818 *
1819 * Schedule linked works starting from @work to @head. Work series to
1820 * be scheduled starts at @work and includes any consecutive work with
1821 * WORK_STRUCT_LINKED set in its predecessor.
1822 *
1823 * If @nextp is not NULL, it's updated to point to the next work of
1824 * the last scheduled work. This allows move_linked_works() to be
1825 * nested inside outer list_for_each_entry_safe().
1826 *
1827 * CONTEXT:
1828 * spin_lock_irq(gcwq->lock).
1829 */
1830static void move_linked_works(struct work_struct *work, struct list_head *head,
1831 struct work_struct **nextp)
1832{
1833 struct work_struct *n;
1834
1835 /*
1836 * Linked worklist will always end before the end of the list,
1837 * use NULL for list head.
1838 */
1839 list_for_each_entry_safe_from(work, n, NULL, entry) {
1840 list_move_tail(&work->entry, head);
1841 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
1842 break;
1843 }
1844
1845 /*
1846 * If we're already inside safe list traversal and have moved
1847 * multiple works to the scheduled queue, the next position
1848 * needs to be updated.
1849 */
1850 if (nextp)
1851 *nextp = n;
1852}
1853
1854static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
1855{
1856 struct work_struct *work = list_first_entry(&cwq->delayed_works,
1857 struct work_struct, entry);
1858
1859 trace_workqueue_activate_work(work);
1860 move_linked_works(work, &cwq->pool->worklist, NULL);
1861 __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
1862 cwq->nr_active++;
1863}
1864
1865/**
1866 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
1867 * @cwq: cwq of interest
1868 * @color: color of work which left the queue
1869 * @delayed: for a delayed work
1870 *
1871 * A work either has completed or is removed from pending queue,
1872 * decrement nr_in_flight of its cwq and handle workqueue flushing.
1873 *
1874 * CONTEXT:
1875 * spin_lock_irq(gcwq->lock).
1876 */
1877static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color,
1878 bool delayed)
1879{
1880 /* ignore uncolored works */
1881 if (color == WORK_NO_COLOR)
1882 return;
1883
1884 cwq->nr_in_flight[color]--;
1885
1886 if (!delayed) {
1887 cwq->nr_active--;
1888 if (!list_empty(&cwq->delayed_works)) {
1889 /* one down, submit a delayed one */
1890 if (cwq->nr_active < cwq->max_active)
1891 cwq_activate_first_delayed(cwq);
1892 }
1893 }
1894
1895 /* is flush in progress and are we at the flushing tip? */
1896 if (likely(cwq->flush_color != color))
1897 return;
1898
1899 /* are there still in-flight works? */
1900 if (cwq->nr_in_flight[color])
1901 return;
1902
1903 /* this cwq is done, clear flush_color */
1904 cwq->flush_color = -1;
1905
1906 /*
1907 * If this was the last cwq, wake up the first flusher. It
1908 * will handle the rest.
1909 */
1910 if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
1911 complete(&cwq->wq->first_flusher->done);
1912}
1913
1914/**
1915 * process_one_work - process single work 2139 * process_one_work - process single work
1916 * @worker: self 2140 * @worker: self
1917 * @work: work to process 2141 * @work: work to process
@@ -1954,7 +2178,7 @@ __acquires(&gcwq->lock)
1954 * necessary to avoid spurious warnings from rescuers servicing the 2178 * necessary to avoid spurious warnings from rescuers servicing the
1955 * unbound or a disassociated gcwq. 2179 * unbound or a disassociated gcwq.
1956 */ 2180 */
1957 WARN_ON_ONCE(!(worker->flags & (WORKER_UNBOUND | WORKER_REBIND)) && 2181 WARN_ON_ONCE(!(worker->flags & WORKER_UNBOUND) &&
1958 !(gcwq->flags & GCWQ_DISASSOCIATED) && 2182 !(gcwq->flags & GCWQ_DISASSOCIATED) &&
1959 raw_smp_processor_id() != gcwq->cpu); 2183 raw_smp_processor_id() != gcwq->cpu);
1960 2184
@@ -1970,15 +2194,13 @@ __acquires(&gcwq->lock)
1970 return; 2194 return;
1971 } 2195 }
1972 2196
1973 /* claim and process */ 2197 /* claim and dequeue */
1974 debug_work_deactivate(work); 2198 debug_work_deactivate(work);
1975 hlist_add_head(&worker->hentry, bwh); 2199 hlist_add_head(&worker->hentry, bwh);
1976 worker->current_work = work; 2200 worker->current_work = work;
1977 worker->current_cwq = cwq; 2201 worker->current_cwq = cwq;
1978 work_color = get_work_color(work); 2202 work_color = get_work_color(work);
1979 2203
1980 /* record the current cpu number in the work data and dequeue */
1981 set_work_cpu(work, gcwq->cpu);
1982 list_del_init(&work->entry); 2204 list_del_init(&work->entry);
1983 2205
1984 /* 2206 /*
@@ -1995,9 +2217,16 @@ __acquires(&gcwq->lock)
1995 if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool)) 2217 if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool))
1996 wake_up_worker(pool); 2218 wake_up_worker(pool);
1997 2219
2220 /*
2221 * Record the last CPU and clear PENDING which should be the last
2222 * update to @work. Also, do this inside @gcwq->lock so that
2223 * PENDING and queued state changes happen together while IRQ is
2224 * disabled.
2225 */
2226 set_work_cpu_and_clear_pending(work, gcwq->cpu);
2227
1998 spin_unlock_irq(&gcwq->lock); 2228 spin_unlock_irq(&gcwq->lock);
1999 2229
2000 work_clear_pending(work);
2001 lock_map_acquire_read(&cwq->wq->lockdep_map); 2230 lock_map_acquire_read(&cwq->wq->lockdep_map);
2002 lock_map_acquire(&lockdep_map); 2231 lock_map_acquire(&lockdep_map);
2003 trace_workqueue_execute_start(work); 2232 trace_workqueue_execute_start(work);
@@ -2011,11 +2240,9 @@ __acquires(&gcwq->lock)
2011 lock_map_release(&cwq->wq->lockdep_map); 2240 lock_map_release(&cwq->wq->lockdep_map);
2012 2241
2013 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 2242 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
2014 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 2243 pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
2015 "%s/0x%08x/%d\n", 2244 " last function: %pf\n",
2016 current->comm, preempt_count(), task_pid_nr(current)); 2245 current->comm, preempt_count(), task_pid_nr(current), f);
2017 printk(KERN_ERR " last function: ");
2018 print_symbol("%s\n", (unsigned long)f);
2019 debug_show_held_locks(current); 2246 debug_show_held_locks(current);
2020 dump_stack(); 2247 dump_stack();
2021 } 2248 }
@@ -2030,7 +2257,7 @@ __acquires(&gcwq->lock)
2030 hlist_del_init(&worker->hentry); 2257 hlist_del_init(&worker->hentry);
2031 worker->current_work = NULL; 2258 worker->current_work = NULL;
2032 worker->current_cwq = NULL; 2259 worker->current_cwq = NULL;
2033 cwq_dec_nr_in_flight(cwq, work_color, false); 2260 cwq_dec_nr_in_flight(cwq, work_color);
2034} 2261}
2035 2262
2036/** 2263/**
@@ -2075,18 +2302,17 @@ static int worker_thread(void *__worker)
2075woke_up: 2302woke_up:
2076 spin_lock_irq(&gcwq->lock); 2303 spin_lock_irq(&gcwq->lock);
2077 2304
2078 /* 2305 /* we are off idle list if destruction or rebind is requested */
2079 * DIE can be set only while idle and REBIND set while busy has 2306 if (unlikely(list_empty(&worker->entry))) {
2080 * @worker->rebind_work scheduled. Checking here is enough.
2081 */
2082 if (unlikely(worker->flags & (WORKER_REBIND | WORKER_DIE))) {
2083 spin_unlock_irq(&gcwq->lock); 2307 spin_unlock_irq(&gcwq->lock);
2084 2308
2309 /* if DIE is set, destruction is requested */
2085 if (worker->flags & WORKER_DIE) { 2310 if (worker->flags & WORKER_DIE) {
2086 worker->task->flags &= ~PF_WQ_WORKER; 2311 worker->task->flags &= ~PF_WQ_WORKER;
2087 return 0; 2312 return 0;
2088 } 2313 }
2089 2314
2315 /* otherwise, rebind */
2090 idle_worker_rebind(worker); 2316 idle_worker_rebind(worker);
2091 goto woke_up; 2317 goto woke_up;
2092 } 2318 }
@@ -2569,8 +2795,8 @@ reflush:
2569 2795
2570 if (++flush_cnt == 10 || 2796 if (++flush_cnt == 10 ||
2571 (flush_cnt % 100 == 0 && flush_cnt <= 1000)) 2797 (flush_cnt % 100 == 0 && flush_cnt <= 1000))
2572 pr_warning("workqueue %s: flush on destruction isn't complete after %u tries\n", 2798 pr_warn("workqueue %s: flush on destruction isn't complete after %u tries\n",
2573 wq->name, flush_cnt); 2799 wq->name, flush_cnt);
2574 goto reflush; 2800 goto reflush;
2575 } 2801 }
2576 2802
@@ -2581,8 +2807,7 @@ reflush:
2581} 2807}
2582EXPORT_SYMBOL_GPL(drain_workqueue); 2808EXPORT_SYMBOL_GPL(drain_workqueue);
2583 2809
2584static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr, 2810static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
2585 bool wait_executing)
2586{ 2811{
2587 struct worker *worker = NULL; 2812 struct worker *worker = NULL;
2588 struct global_cwq *gcwq; 2813 struct global_cwq *gcwq;
@@ -2604,13 +2829,12 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
2604 cwq = get_work_cwq(work); 2829 cwq = get_work_cwq(work);
2605 if (unlikely(!cwq || gcwq != cwq->pool->gcwq)) 2830 if (unlikely(!cwq || gcwq != cwq->pool->gcwq))
2606 goto already_gone; 2831 goto already_gone;
2607 } else if (wait_executing) { 2832 } else {
2608 worker = find_worker_executing_work(gcwq, work); 2833 worker = find_worker_executing_work(gcwq, work);
2609 if (!worker) 2834 if (!worker)
2610 goto already_gone; 2835 goto already_gone;
2611 cwq = worker->current_cwq; 2836 cwq = worker->current_cwq;
2612 } else 2837 }
2613 goto already_gone;
2614 2838
2615 insert_wq_barrier(cwq, barr, work, worker); 2839 insert_wq_barrier(cwq, barr, work, worker);
2616 spin_unlock_irq(&gcwq->lock); 2840 spin_unlock_irq(&gcwq->lock);
@@ -2637,15 +2861,8 @@ already_gone:
2637 * flush_work - wait for a work to finish executing the last queueing instance 2861 * flush_work - wait for a work to finish executing the last queueing instance
2638 * @work: the work to flush 2862 * @work: the work to flush
2639 * 2863 *
2640 * Wait until @work has finished execution. This function considers 2864 * Wait until @work has finished execution. @work is guaranteed to be idle
2641 * only the last queueing instance of @work. If @work has been 2865 * on return if it hasn't been requeued since flush started.
2642 * enqueued across different CPUs on a non-reentrant workqueue or on
2643 * multiple workqueues, @work might still be executing on return on
2644 * some of the CPUs from earlier queueing.
2645 *
2646 * If @work was queued only on a non-reentrant, ordered or unbound
2647 * workqueue, @work is guaranteed to be idle on return if it hasn't
2648 * been requeued since flush started.
2649 * 2866 *
2650 * RETURNS: 2867 * RETURNS:
2651 * %true if flush_work() waited for the work to finish execution, 2868 * %true if flush_work() waited for the work to finish execution,
@@ -2658,140 +2875,36 @@ bool flush_work(struct work_struct *work)
2658 lock_map_acquire(&work->lockdep_map); 2875 lock_map_acquire(&work->lockdep_map);
2659 lock_map_release(&work->lockdep_map); 2876 lock_map_release(&work->lockdep_map);
2660 2877
2661 if (start_flush_work(work, &barr, true)) { 2878 if (start_flush_work(work, &barr)) {
2662 wait_for_completion(&barr.done); 2879 wait_for_completion(&barr.done);
2663 destroy_work_on_stack(&barr.work); 2880 destroy_work_on_stack(&barr.work);
2664 return true; 2881 return true;
2665 } else 2882 } else {
2666 return false;
2667}
2668EXPORT_SYMBOL_GPL(flush_work);
2669
2670static bool wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
2671{
2672 struct wq_barrier barr;
2673 struct worker *worker;
2674
2675 spin_lock_irq(&gcwq->lock);
2676
2677 worker = find_worker_executing_work(gcwq, work);
2678 if (unlikely(worker))
2679 insert_wq_barrier(worker->current_cwq, &barr, work, worker);
2680
2681 spin_unlock_irq(&gcwq->lock);
2682
2683 if (unlikely(worker)) {
2684 wait_for_completion(&barr.done);
2685 destroy_work_on_stack(&barr.work);
2686 return true;
2687 } else
2688 return false; 2883 return false;
2689}
2690
2691static bool wait_on_work(struct work_struct *work)
2692{
2693 bool ret = false;
2694 int cpu;
2695
2696 might_sleep();
2697
2698 lock_map_acquire(&work->lockdep_map);
2699 lock_map_release(&work->lockdep_map);
2700
2701 for_each_gcwq_cpu(cpu)
2702 ret |= wait_on_cpu_work(get_gcwq(cpu), work);
2703 return ret;
2704}
2705
2706/**
2707 * flush_work_sync - wait until a work has finished execution
2708 * @work: the work to flush
2709 *
2710 * Wait until @work has finished execution. On return, it's
2711 * guaranteed that all queueing instances of @work which happened
2712 * before this function is called are finished. In other words, if
2713 * @work hasn't been requeued since this function was called, @work is
2714 * guaranteed to be idle on return.
2715 *
2716 * RETURNS:
2717 * %true if flush_work_sync() waited for the work to finish execution,
2718 * %false if it was already idle.
2719 */
2720bool flush_work_sync(struct work_struct *work)
2721{
2722 struct wq_barrier barr;
2723 bool pending, waited;
2724
2725 /* we'll wait for executions separately, queue barr only if pending */
2726 pending = start_flush_work(work, &barr, false);
2727
2728 /* wait for executions to finish */
2729 waited = wait_on_work(work);
2730
2731 /* wait for the pending one */
2732 if (pending) {
2733 wait_for_completion(&barr.done);
2734 destroy_work_on_stack(&barr.work);
2735 }
2736
2737 return pending || waited;
2738}
2739EXPORT_SYMBOL_GPL(flush_work_sync);
2740
2741/*
2742 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
2743 * so this work can't be re-armed in any way.
2744 */
2745static int try_to_grab_pending(struct work_struct *work)
2746{
2747 struct global_cwq *gcwq;
2748 int ret = -1;
2749
2750 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
2751 return 0;
2752
2753 /*
2754 * The queueing is in progress, or it is already queued. Try to
2755 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
2756 */
2757 gcwq = get_work_gcwq(work);
2758 if (!gcwq)
2759 return ret;
2760
2761 spin_lock_irq(&gcwq->lock);
2762 if (!list_empty(&work->entry)) {
2763 /*
2764 * This work is queued, but perhaps we locked the wrong gcwq.
2765 * In that case we must see the new value after rmb(), see
2766 * insert_work()->wmb().
2767 */
2768 smp_rmb();
2769 if (gcwq == get_work_gcwq(work)) {
2770 debug_work_deactivate(work);
2771 list_del_init(&work->entry);
2772 cwq_dec_nr_in_flight(get_work_cwq(work),
2773 get_work_color(work),
2774 *work_data_bits(work) & WORK_STRUCT_DELAYED);
2775 ret = 1;
2776 }
2777 } 2884 }
2778 spin_unlock_irq(&gcwq->lock);
2779
2780 return ret;
2781} 2885}
2886EXPORT_SYMBOL_GPL(flush_work);
2782 2887
2783static bool __cancel_work_timer(struct work_struct *work, 2888static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
2784 struct timer_list* timer)
2785{ 2889{
2890 unsigned long flags;
2786 int ret; 2891 int ret;
2787 2892
2788 do { 2893 do {
2789 ret = (timer && likely(del_timer(timer))); 2894 ret = try_to_grab_pending(work, is_dwork, &flags);
2790 if (!ret) 2895 /*
2791 ret = try_to_grab_pending(work); 2896 * If someone else is canceling, wait for the same event it
2792 wait_on_work(work); 2897 * would be waiting for before retrying.
2898 */
2899 if (unlikely(ret == -ENOENT))
2900 flush_work(work);
2793 } while (unlikely(ret < 0)); 2901 } while (unlikely(ret < 0));
2794 2902
2903 /* tell other tasks trying to grab @work to back off */
2904 mark_work_canceling(work);
2905 local_irq_restore(flags);
2906
2907 flush_work(work);
2795 clear_work_data(work); 2908 clear_work_data(work);
2796 return ret; 2909 return ret;
2797} 2910}
@@ -2816,7 +2929,7 @@ static bool __cancel_work_timer(struct work_struct *work,
2816 */ 2929 */
2817bool cancel_work_sync(struct work_struct *work) 2930bool cancel_work_sync(struct work_struct *work)
2818{ 2931{
2819 return __cancel_work_timer(work, NULL); 2932 return __cancel_work_timer(work, false);
2820} 2933}
2821EXPORT_SYMBOL_GPL(cancel_work_sync); 2934EXPORT_SYMBOL_GPL(cancel_work_sync);
2822 2935
@@ -2834,33 +2947,44 @@ EXPORT_SYMBOL_GPL(cancel_work_sync);
2834 */ 2947 */
2835bool flush_delayed_work(struct delayed_work *dwork) 2948bool flush_delayed_work(struct delayed_work *dwork)
2836{ 2949{
2950 local_irq_disable();
2837 if (del_timer_sync(&dwork->timer)) 2951 if (del_timer_sync(&dwork->timer))
2838 __queue_work(raw_smp_processor_id(), 2952 __queue_work(dwork->cpu,
2839 get_work_cwq(&dwork->work)->wq, &dwork->work); 2953 get_work_cwq(&dwork->work)->wq, &dwork->work);
2954 local_irq_enable();
2840 return flush_work(&dwork->work); 2955 return flush_work(&dwork->work);
2841} 2956}
2842EXPORT_SYMBOL(flush_delayed_work); 2957EXPORT_SYMBOL(flush_delayed_work);
2843 2958
2844/** 2959/**
2845 * flush_delayed_work_sync - wait for a dwork to finish 2960 * cancel_delayed_work - cancel a delayed work
2846 * @dwork: the delayed work to flush 2961 * @dwork: delayed_work to cancel
2847 * 2962 *
2848 * Delayed timer is cancelled and the pending work is queued for 2963 * Kill off a pending delayed_work. Returns %true if @dwork was pending
2849 * execution immediately. Other than timer handling, its behavior 2964 * and canceled; %false if wasn't pending. Note that the work callback
2850 * is identical to flush_work_sync(). 2965 * function may still be running on return, unless it returns %true and the
2966 * work doesn't re-arm itself. Explicitly flush or use
2967 * cancel_delayed_work_sync() to wait on it.
2851 * 2968 *
2852 * RETURNS: 2969 * This function is safe to call from any context including IRQ handler.
2853 * %true if flush_work_sync() waited for the work to finish execution,
2854 * %false if it was already idle.
2855 */ 2970 */
2856bool flush_delayed_work_sync(struct delayed_work *dwork) 2971bool cancel_delayed_work(struct delayed_work *dwork)
2857{ 2972{
2858 if (del_timer_sync(&dwork->timer)) 2973 unsigned long flags;
2859 __queue_work(raw_smp_processor_id(), 2974 int ret;
2860 get_work_cwq(&dwork->work)->wq, &dwork->work); 2975
2861 return flush_work_sync(&dwork->work); 2976 do {
2977 ret = try_to_grab_pending(&dwork->work, true, &flags);
2978 } while (unlikely(ret == -EAGAIN));
2979
2980 if (unlikely(ret < 0))
2981 return false;
2982
2983 set_work_cpu_and_clear_pending(&dwork->work, work_cpu(&dwork->work));
2984 local_irq_restore(flags);
2985 return true;
2862} 2986}
2863EXPORT_SYMBOL(flush_delayed_work_sync); 2987EXPORT_SYMBOL(cancel_delayed_work);
2864 2988
2865/** 2989/**
2866 * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish 2990 * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish
@@ -2873,54 +2997,39 @@ EXPORT_SYMBOL(flush_delayed_work_sync);
2873 */ 2997 */
2874bool cancel_delayed_work_sync(struct delayed_work *dwork) 2998bool cancel_delayed_work_sync(struct delayed_work *dwork)
2875{ 2999{
2876 return __cancel_work_timer(&dwork->work, &dwork->timer); 3000 return __cancel_work_timer(&dwork->work, true);
2877} 3001}
2878EXPORT_SYMBOL(cancel_delayed_work_sync); 3002EXPORT_SYMBOL(cancel_delayed_work_sync);
2879 3003
2880/** 3004/**
2881 * schedule_work - put work task in global workqueue
2882 * @work: job to be done
2883 *
2884 * Returns zero if @work was already on the kernel-global workqueue and
2885 * non-zero otherwise.
2886 *
2887 * This puts a job in the kernel-global workqueue if it was not already
2888 * queued and leaves it in the same position on the kernel-global
2889 * workqueue otherwise.
2890 */
2891int schedule_work(struct work_struct *work)
2892{
2893 return queue_work(system_wq, work);
2894}
2895EXPORT_SYMBOL(schedule_work);
2896
2897/*
2898 * schedule_work_on - put work task on a specific cpu 3005 * schedule_work_on - put work task on a specific cpu
2899 * @cpu: cpu to put the work task on 3006 * @cpu: cpu to put the work task on
2900 * @work: job to be done 3007 * @work: job to be done
2901 * 3008 *
2902 * This puts a job on a specific cpu 3009 * This puts a job on a specific cpu
2903 */ 3010 */
2904int schedule_work_on(int cpu, struct work_struct *work) 3011bool schedule_work_on(int cpu, struct work_struct *work)
2905{ 3012{
2906 return queue_work_on(cpu, system_wq, work); 3013 return queue_work_on(cpu, system_wq, work);
2907} 3014}
2908EXPORT_SYMBOL(schedule_work_on); 3015EXPORT_SYMBOL(schedule_work_on);
2909 3016
2910/** 3017/**
2911 * schedule_delayed_work - put work task in global workqueue after delay 3018 * schedule_work - put work task in global workqueue
2912 * @dwork: job to be done 3019 * @work: job to be done
2913 * @delay: number of jiffies to wait or 0 for immediate execution
2914 * 3020 *
2915 * After waiting for a given time this puts a job in the kernel-global 3021 * Returns %false if @work was already on the kernel-global workqueue and
2916 * workqueue. 3022 * %true otherwise.
3023 *
3024 * This puts a job in the kernel-global workqueue if it was not already
3025 * queued and leaves it in the same position on the kernel-global
3026 * workqueue otherwise.
2917 */ 3027 */
2918int schedule_delayed_work(struct delayed_work *dwork, 3028bool schedule_work(struct work_struct *work)
2919 unsigned long delay)
2920{ 3029{
2921 return queue_delayed_work(system_wq, dwork, delay); 3030 return queue_work(system_wq, work);
2922} 3031}
2923EXPORT_SYMBOL(schedule_delayed_work); 3032EXPORT_SYMBOL(schedule_work);
2924 3033
2925/** 3034/**
2926 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 3035 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
@@ -2931,14 +3040,28 @@ EXPORT_SYMBOL(schedule_delayed_work);
2931 * After waiting for a given time this puts a job in the kernel-global 3040 * After waiting for a given time this puts a job in the kernel-global
2932 * workqueue on the specified CPU. 3041 * workqueue on the specified CPU.
2933 */ 3042 */
2934int schedule_delayed_work_on(int cpu, 3043bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork,
2935 struct delayed_work *dwork, unsigned long delay) 3044 unsigned long delay)
2936{ 3045{
2937 return queue_delayed_work_on(cpu, system_wq, dwork, delay); 3046 return queue_delayed_work_on(cpu, system_wq, dwork, delay);
2938} 3047}
2939EXPORT_SYMBOL(schedule_delayed_work_on); 3048EXPORT_SYMBOL(schedule_delayed_work_on);
2940 3049
2941/** 3050/**
3051 * schedule_delayed_work - put work task in global workqueue after delay
3052 * @dwork: job to be done
3053 * @delay: number of jiffies to wait or 0 for immediate execution
3054 *
3055 * After waiting for a given time this puts a job in the kernel-global
3056 * workqueue.
3057 */
3058bool schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)
3059{
3060 return queue_delayed_work(system_wq, dwork, delay);
3061}
3062EXPORT_SYMBOL(schedule_delayed_work);
3063
3064/**
2942 * schedule_on_each_cpu - execute a function synchronously on each online CPU 3065 * schedule_on_each_cpu - execute a function synchronously on each online CPU
2943 * @func: the function to call 3066 * @func: the function to call
2944 * 3067 *
@@ -3085,9 +3208,8 @@ static int wq_clamp_max_active(int max_active, unsigned int flags,
3085 int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE; 3208 int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
3086 3209
3087 if (max_active < 1 || max_active > lim) 3210 if (max_active < 1 || max_active > lim)
3088 printk(KERN_WARNING "workqueue: max_active %d requested for %s " 3211 pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n",
3089 "is out of range, clamping between %d and %d\n", 3212 max_active, name, 1, lim);
3090 max_active, name, 1, lim);
3091 3213
3092 return clamp_val(max_active, 1, lim); 3214 return clamp_val(max_active, 1, lim);
3093} 3215}
@@ -3243,6 +3365,26 @@ void destroy_workqueue(struct workqueue_struct *wq)
3243EXPORT_SYMBOL_GPL(destroy_workqueue); 3365EXPORT_SYMBOL_GPL(destroy_workqueue);
3244 3366
3245/** 3367/**
3368 * cwq_set_max_active - adjust max_active of a cwq
3369 * @cwq: target cpu_workqueue_struct
3370 * @max_active: new max_active value.
3371 *
3372 * Set @cwq->max_active to @max_active and activate delayed works if
3373 * increased.
3374 *
3375 * CONTEXT:
3376 * spin_lock_irq(gcwq->lock).
3377 */
3378static void cwq_set_max_active(struct cpu_workqueue_struct *cwq, int max_active)
3379{
3380 cwq->max_active = max_active;
3381
3382 while (!list_empty(&cwq->delayed_works) &&
3383 cwq->nr_active < cwq->max_active)
3384 cwq_activate_first_delayed(cwq);
3385}
3386
3387/**
3246 * workqueue_set_max_active - adjust max_active of a workqueue 3388 * workqueue_set_max_active - adjust max_active of a workqueue
3247 * @wq: target workqueue 3389 * @wq: target workqueue
3248 * @max_active: new max_active value. 3390 * @max_active: new max_active value.
@@ -3269,7 +3411,7 @@ void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
3269 3411
3270 if (!(wq->flags & WQ_FREEZABLE) || 3412 if (!(wq->flags & WQ_FREEZABLE) ||
3271 !(gcwq->flags & GCWQ_FREEZING)) 3413 !(gcwq->flags & GCWQ_FREEZING))
3272 get_cwq(gcwq->cpu, wq)->max_active = max_active; 3414 cwq_set_max_active(get_cwq(gcwq->cpu, wq), max_active);
3273 3415
3274 spin_unlock_irq(&gcwq->lock); 3416 spin_unlock_irq(&gcwq->lock);
3275 } 3417 }
@@ -3364,23 +3506,23 @@ EXPORT_SYMBOL_GPL(work_busy);
3364 */ 3506 */
3365 3507
3366/* claim manager positions of all pools */ 3508/* claim manager positions of all pools */
3367static void gcwq_claim_management_and_lock(struct global_cwq *gcwq) 3509static void gcwq_claim_assoc_and_lock(struct global_cwq *gcwq)
3368{ 3510{
3369 struct worker_pool *pool; 3511 struct worker_pool *pool;
3370 3512
3371 for_each_worker_pool(pool, gcwq) 3513 for_each_worker_pool(pool, gcwq)
3372 mutex_lock_nested(&pool->manager_mutex, pool - gcwq->pools); 3514 mutex_lock_nested(&pool->assoc_mutex, pool - gcwq->pools);
3373 spin_lock_irq(&gcwq->lock); 3515 spin_lock_irq(&gcwq->lock);
3374} 3516}
3375 3517
3376/* release manager positions */ 3518/* release manager positions */
3377static void gcwq_release_management_and_unlock(struct global_cwq *gcwq) 3519static void gcwq_release_assoc_and_unlock(struct global_cwq *gcwq)
3378{ 3520{
3379 struct worker_pool *pool; 3521 struct worker_pool *pool;
3380 3522
3381 spin_unlock_irq(&gcwq->lock); 3523 spin_unlock_irq(&gcwq->lock);
3382 for_each_worker_pool(pool, gcwq) 3524 for_each_worker_pool(pool, gcwq)
3383 mutex_unlock(&pool->manager_mutex); 3525 mutex_unlock(&pool->assoc_mutex);
3384} 3526}
3385 3527
3386static void gcwq_unbind_fn(struct work_struct *work) 3528static void gcwq_unbind_fn(struct work_struct *work)
@@ -3393,7 +3535,7 @@ static void gcwq_unbind_fn(struct work_struct *work)
3393 3535
3394 BUG_ON(gcwq->cpu != smp_processor_id()); 3536 BUG_ON(gcwq->cpu != smp_processor_id());
3395 3537
3396 gcwq_claim_management_and_lock(gcwq); 3538 gcwq_claim_assoc_and_lock(gcwq);
3397 3539
3398 /* 3540 /*
3399 * We've claimed all manager positions. Make all workers unbound 3541 * We've claimed all manager positions. Make all workers unbound
@@ -3410,7 +3552,7 @@ static void gcwq_unbind_fn(struct work_struct *work)
3410 3552
3411 gcwq->flags |= GCWQ_DISASSOCIATED; 3553 gcwq->flags |= GCWQ_DISASSOCIATED;
3412 3554
3413 gcwq_release_management_and_unlock(gcwq); 3555 gcwq_release_assoc_and_unlock(gcwq);
3414 3556
3415 /* 3557 /*
3416 * Call schedule() so that we cross rq->lock and thus can guarantee 3558 * Call schedule() so that we cross rq->lock and thus can guarantee
@@ -3438,7 +3580,7 @@ static void gcwq_unbind_fn(struct work_struct *work)
3438 * Workqueues should be brought up before normal priority CPU notifiers. 3580 * Workqueues should be brought up before normal priority CPU notifiers.
3439 * This will be registered high priority CPU notifier. 3581 * This will be registered high priority CPU notifier.
3440 */ 3582 */
3441static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb, 3583static int __cpuinit workqueue_cpu_up_callback(struct notifier_block *nfb,
3442 unsigned long action, 3584 unsigned long action,
3443 void *hcpu) 3585 void *hcpu)
3444{ 3586{
@@ -3466,10 +3608,10 @@ static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb,
3466 3608
3467 case CPU_DOWN_FAILED: 3609 case CPU_DOWN_FAILED:
3468 case CPU_ONLINE: 3610 case CPU_ONLINE:
3469 gcwq_claim_management_and_lock(gcwq); 3611 gcwq_claim_assoc_and_lock(gcwq);
3470 gcwq->flags &= ~GCWQ_DISASSOCIATED; 3612 gcwq->flags &= ~GCWQ_DISASSOCIATED;
3471 rebind_workers(gcwq); 3613 rebind_workers(gcwq);
3472 gcwq_release_management_and_unlock(gcwq); 3614 gcwq_release_assoc_and_unlock(gcwq);
3473 break; 3615 break;
3474 } 3616 }
3475 return NOTIFY_OK; 3617 return NOTIFY_OK;
@@ -3479,7 +3621,7 @@ static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb,
3479 * Workqueues should be brought down after normal priority CPU notifiers. 3621 * Workqueues should be brought down after normal priority CPU notifiers.
3480 * This will be registered as low priority CPU notifier. 3622 * This will be registered as low priority CPU notifier.
3481 */ 3623 */
3482static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb, 3624static int __cpuinit workqueue_cpu_down_callback(struct notifier_block *nfb,
3483 unsigned long action, 3625 unsigned long action,
3484 void *hcpu) 3626 void *hcpu)
3485{ 3627{
@@ -3490,7 +3632,7 @@ static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb,
3490 case CPU_DOWN_PREPARE: 3632 case CPU_DOWN_PREPARE:
3491 /* unbinding should happen on the local CPU */ 3633 /* unbinding should happen on the local CPU */
3492 INIT_WORK_ONSTACK(&unbind_work, gcwq_unbind_fn); 3634 INIT_WORK_ONSTACK(&unbind_work, gcwq_unbind_fn);
3493 schedule_work_on(cpu, &unbind_work); 3635 queue_work_on(cpu, system_highpri_wq, &unbind_work);
3494 flush_work(&unbind_work); 3636 flush_work(&unbind_work);
3495 break; 3637 break;
3496 } 3638 }
@@ -3500,18 +3642,17 @@ static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb,
3500#ifdef CONFIG_SMP 3642#ifdef CONFIG_SMP
3501 3643
3502struct work_for_cpu { 3644struct work_for_cpu {
3503 struct completion completion; 3645 struct work_struct work;
3504 long (*fn)(void *); 3646 long (*fn)(void *);
3505 void *arg; 3647 void *arg;
3506 long ret; 3648 long ret;
3507}; 3649};
3508 3650
3509static int do_work_for_cpu(void *_wfc) 3651static void work_for_cpu_fn(struct work_struct *work)
3510{ 3652{
3511 struct work_for_cpu *wfc = _wfc; 3653 struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work);
3654
3512 wfc->ret = wfc->fn(wfc->arg); 3655 wfc->ret = wfc->fn(wfc->arg);
3513 complete(&wfc->completion);
3514 return 0;
3515} 3656}
3516 3657
3517/** 3658/**
@@ -3526,19 +3667,11 @@ static int do_work_for_cpu(void *_wfc)
3526 */ 3667 */
3527long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg) 3668long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
3528{ 3669{
3529 struct task_struct *sub_thread; 3670 struct work_for_cpu wfc = { .fn = fn, .arg = arg };
3530 struct work_for_cpu wfc = {
3531 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
3532 .fn = fn,
3533 .arg = arg,
3534 };
3535 3671
3536 sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu"); 3672 INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn);
3537 if (IS_ERR(sub_thread)) 3673 schedule_work_on(cpu, &wfc.work);
3538 return PTR_ERR(sub_thread); 3674 flush_work(&wfc.work);
3539 kthread_bind(sub_thread, cpu);
3540 wake_up_process(sub_thread);
3541 wait_for_completion(&wfc.completion);
3542 return wfc.ret; 3675 return wfc.ret;
3543} 3676}
3544EXPORT_SYMBOL_GPL(work_on_cpu); 3677EXPORT_SYMBOL_GPL(work_on_cpu);
@@ -3668,11 +3801,7 @@ void thaw_workqueues(void)
3668 continue; 3801 continue;
3669 3802
3670 /* restore max_active and repopulate worklist */ 3803 /* restore max_active and repopulate worklist */
3671 cwq->max_active = wq->saved_max_active; 3804 cwq_set_max_active(cwq, wq->saved_max_active);
3672
3673 while (!list_empty(&cwq->delayed_works) &&
3674 cwq->nr_active < cwq->max_active)
3675 cwq_activate_first_delayed(cwq);
3676 } 3805 }
3677 3806
3678 for_each_worker_pool(pool, gcwq) 3807 for_each_worker_pool(pool, gcwq)
@@ -3692,8 +3821,12 @@ static int __init init_workqueues(void)
3692 unsigned int cpu; 3821 unsigned int cpu;
3693 int i; 3822 int i;
3694 3823
3824 /* make sure we have enough bits for OFFQ CPU number */
3825 BUILD_BUG_ON((1LU << (BITS_PER_LONG - WORK_OFFQ_CPU_SHIFT)) <
3826 WORK_CPU_LAST);
3827
3695 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); 3828 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
3696 cpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); 3829 hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);
3697 3830
3698 /* initialize gcwqs */ 3831 /* initialize gcwqs */
3699 for_each_gcwq_cpu(cpu) { 3832 for_each_gcwq_cpu(cpu) {
@@ -3719,11 +3852,9 @@ static int __init init_workqueues(void)
3719 setup_timer(&pool->mayday_timer, gcwq_mayday_timeout, 3852 setup_timer(&pool->mayday_timer, gcwq_mayday_timeout,
3720 (unsigned long)pool); 3853 (unsigned long)pool);
3721 3854
3722 mutex_init(&pool->manager_mutex); 3855 mutex_init(&pool->assoc_mutex);
3723 ida_init(&pool->worker_ida); 3856 ida_init(&pool->worker_ida);
3724 } 3857 }
3725
3726 init_waitqueue_head(&gcwq->rebind_hold);
3727 } 3858 }
3728 3859
3729 /* create the initial worker */ 3860 /* create the initial worker */
@@ -3746,17 +3877,14 @@ static int __init init_workqueues(void)
3746 } 3877 }
3747 3878
3748 system_wq = alloc_workqueue("events", 0, 0); 3879 system_wq = alloc_workqueue("events", 0, 0);
3880 system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
3749 system_long_wq = alloc_workqueue("events_long", 0, 0); 3881 system_long_wq = alloc_workqueue("events_long", 0, 0);
3750 system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
3751 system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, 3882 system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
3752 WQ_UNBOUND_MAX_ACTIVE); 3883 WQ_UNBOUND_MAX_ACTIVE);
3753 system_freezable_wq = alloc_workqueue("events_freezable", 3884 system_freezable_wq = alloc_workqueue("events_freezable",
3754 WQ_FREEZABLE, 0); 3885 WQ_FREEZABLE, 0);
3755 system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable", 3886 BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
3756 WQ_NON_REENTRANT | WQ_FREEZABLE, 0); 3887 !system_unbound_wq || !system_freezable_wq);
3757 BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||
3758 !system_unbound_wq || !system_freezable_wq ||
3759 !system_nrt_freezable_wq);
3760 return 0; 3888 return 0;
3761} 3889}
3762early_initcall(init_workqueues); 3890early_initcall(init_workqueues);