aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c1217
1 files changed, 639 insertions, 578 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 3c5a79e2134c..d951daa0ca9a 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -58,7 +58,7 @@ enum {
58 * be executing on any CPU. The gcwq behaves as an unbound one. 58 * be executing on any CPU. The gcwq behaves as an unbound one.
59 * 59 *
60 * Note that DISASSOCIATED can be flipped only while holding 60 * Note that DISASSOCIATED can be flipped only while holding
61 * managership of all pools on the gcwq to avoid changing binding 61 * assoc_mutex of all pools on the gcwq to avoid changing binding
62 * state while create_worker() is in progress. 62 * state while create_worker() is in progress.
63 */ 63 */
64 GCWQ_DISASSOCIATED = 1 << 0, /* cpu can't serve workers */ 64 GCWQ_DISASSOCIATED = 1 << 0, /* cpu can't serve workers */
@@ -73,11 +73,10 @@ enum {
73 WORKER_DIE = 1 << 1, /* die die die */ 73 WORKER_DIE = 1 << 1, /* die die die */
74 WORKER_IDLE = 1 << 2, /* is idle */ 74 WORKER_IDLE = 1 << 2, /* is idle */
75 WORKER_PREP = 1 << 3, /* preparing to run works */ 75 WORKER_PREP = 1 << 3, /* preparing to run works */
76 WORKER_REBIND = 1 << 5, /* mom is home, come back */
77 WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */ 76 WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */
78 WORKER_UNBOUND = 1 << 7, /* worker is unbound */ 77 WORKER_UNBOUND = 1 << 7, /* worker is unbound */
79 78
80 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_REBIND | WORKER_UNBOUND | 79 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_UNBOUND |
81 WORKER_CPU_INTENSIVE, 80 WORKER_CPU_INTENSIVE,
82 81
83 NR_WORKER_POOLS = 2, /* # worker pools per gcwq */ 82 NR_WORKER_POOLS = 2, /* # worker pools per gcwq */
@@ -126,7 +125,6 @@ enum {
126 125
127struct global_cwq; 126struct global_cwq;
128struct worker_pool; 127struct worker_pool;
129struct idle_rebind;
130 128
131/* 129/*
132 * The poor guys doing the actual heavy lifting. All on-duty workers 130 * The poor guys doing the actual heavy lifting. All on-duty workers
@@ -150,7 +148,6 @@ struct worker {
150 int id; /* I: worker id */ 148 int id; /* I: worker id */
151 149
152 /* for rebinding worker to CPU */ 150 /* for rebinding worker to CPU */
153 struct idle_rebind *idle_rebind; /* L: for idle worker */
154 struct work_struct rebind_work; /* L: for busy worker */ 151 struct work_struct rebind_work; /* L: for busy worker */
155}; 152};
156 153
@@ -160,13 +157,15 @@ struct worker_pool {
160 157
161 struct list_head worklist; /* L: list of pending works */ 158 struct list_head worklist; /* L: list of pending works */
162 int nr_workers; /* L: total number of workers */ 159 int nr_workers; /* L: total number of workers */
160
161 /* nr_idle includes the ones off idle_list for rebinding */
163 int nr_idle; /* L: currently idle ones */ 162 int nr_idle; /* L: currently idle ones */
164 163
165 struct list_head idle_list; /* X: list of idle workers */ 164 struct list_head idle_list; /* X: list of idle workers */
166 struct timer_list idle_timer; /* L: worker idle timeout */ 165 struct timer_list idle_timer; /* L: worker idle timeout */
167 struct timer_list mayday_timer; /* L: SOS timer for workers */ 166 struct timer_list mayday_timer; /* L: SOS timer for workers */
168 167
169 struct mutex manager_mutex; /* mutex manager should hold */ 168 struct mutex assoc_mutex; /* protect GCWQ_DISASSOCIATED */
170 struct ida worker_ida; /* L: for worker IDs */ 169 struct ida worker_ida; /* L: for worker IDs */
171}; 170};
172 171
@@ -184,9 +183,8 @@ struct global_cwq {
184 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE]; 183 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
185 /* L: hash of busy workers */ 184 /* L: hash of busy workers */
186 185
187 struct worker_pool pools[2]; /* normal and highpri pools */ 186 struct worker_pool pools[NR_WORKER_POOLS];
188 187 /* normal and highpri pools */
189 wait_queue_head_t rebind_hold; /* rebind hold wait */
190} ____cacheline_aligned_in_smp; 188} ____cacheline_aligned_in_smp;
191 189
192/* 190/*
@@ -269,17 +267,15 @@ struct workqueue_struct {
269}; 267};
270 268
271struct workqueue_struct *system_wq __read_mostly; 269struct workqueue_struct *system_wq __read_mostly;
272struct workqueue_struct *system_long_wq __read_mostly;
273struct workqueue_struct *system_nrt_wq __read_mostly;
274struct workqueue_struct *system_unbound_wq __read_mostly;
275struct workqueue_struct *system_freezable_wq __read_mostly;
276struct workqueue_struct *system_nrt_freezable_wq __read_mostly;
277EXPORT_SYMBOL_GPL(system_wq); 270EXPORT_SYMBOL_GPL(system_wq);
271struct workqueue_struct *system_highpri_wq __read_mostly;
272EXPORT_SYMBOL_GPL(system_highpri_wq);
273struct workqueue_struct *system_long_wq __read_mostly;
278EXPORT_SYMBOL_GPL(system_long_wq); 274EXPORT_SYMBOL_GPL(system_long_wq);
279EXPORT_SYMBOL_GPL(system_nrt_wq); 275struct workqueue_struct *system_unbound_wq __read_mostly;
280EXPORT_SYMBOL_GPL(system_unbound_wq); 276EXPORT_SYMBOL_GPL(system_unbound_wq);
277struct workqueue_struct *system_freezable_wq __read_mostly;
281EXPORT_SYMBOL_GPL(system_freezable_wq); 278EXPORT_SYMBOL_GPL(system_freezable_wq);
282EXPORT_SYMBOL_GPL(system_nrt_freezable_wq);
283 279
284#define CREATE_TRACE_POINTS 280#define CREATE_TRACE_POINTS
285#include <trace/events/workqueue.h> 281#include <trace/events/workqueue.h>
@@ -534,18 +530,24 @@ static int work_next_color(int color)
534} 530}
535 531
536/* 532/*
537 * A work's data points to the cwq with WORK_STRUCT_CWQ set while the 533 * While queued, %WORK_STRUCT_CWQ is set and non flag bits of a work's data
538 * work is on queue. Once execution starts, WORK_STRUCT_CWQ is 534 * contain the pointer to the queued cwq. Once execution starts, the flag
539 * cleared and the work data contains the cpu number it was last on. 535 * is cleared and the high bits contain OFFQ flags and CPU number.
540 * 536 *
541 * set_work_{cwq|cpu}() and clear_work_data() can be used to set the 537 * set_work_cwq(), set_work_cpu_and_clear_pending(), mark_work_canceling()
542 * cwq, cpu or clear work->data. These functions should only be 538 * and clear_work_data() can be used to set the cwq, cpu or clear
543 * called while the work is owned - ie. while the PENDING bit is set. 539 * work->data. These functions should only be called while the work is
540 * owned - ie. while the PENDING bit is set.
544 * 541 *
545 * get_work_[g]cwq() can be used to obtain the gcwq or cwq 542 * get_work_[g]cwq() can be used to obtain the gcwq or cwq corresponding to
546 * corresponding to a work. gcwq is available once the work has been 543 * a work. gcwq is available once the work has been queued anywhere after
547 * queued anywhere after initialization. cwq is available only from 544 * initialization until it is sync canceled. cwq is available only while
548 * queueing until execution starts. 545 * the work item is queued.
546 *
547 * %WORK_OFFQ_CANCELING is used to mark a work item which is being
548 * canceled. While being canceled, a work item may have its PENDING set
549 * but stay off timer and worklist for arbitrarily long and nobody should
550 * try to steal the PENDING bit.
549 */ 551 */
550static inline void set_work_data(struct work_struct *work, unsigned long data, 552static inline void set_work_data(struct work_struct *work, unsigned long data,
551 unsigned long flags) 553 unsigned long flags)
@@ -562,13 +564,22 @@ static void set_work_cwq(struct work_struct *work,
562 WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags); 564 WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags);
563} 565}
564 566
565static void set_work_cpu(struct work_struct *work, unsigned int cpu) 567static void set_work_cpu_and_clear_pending(struct work_struct *work,
568 unsigned int cpu)
566{ 569{
567 set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING); 570 /*
571 * The following wmb is paired with the implied mb in
572 * test_and_set_bit(PENDING) and ensures all updates to @work made
573 * here are visible to and precede any updates by the next PENDING
574 * owner.
575 */
576 smp_wmb();
577 set_work_data(work, (unsigned long)cpu << WORK_OFFQ_CPU_SHIFT, 0);
568} 578}
569 579
570static void clear_work_data(struct work_struct *work) 580static void clear_work_data(struct work_struct *work)
571{ 581{
582 smp_wmb(); /* see set_work_cpu_and_clear_pending() */
572 set_work_data(work, WORK_STRUCT_NO_CPU, 0); 583 set_work_data(work, WORK_STRUCT_NO_CPU, 0);
573} 584}
574 585
@@ -591,7 +602,7 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
591 return ((struct cpu_workqueue_struct *) 602 return ((struct cpu_workqueue_struct *)
592 (data & WORK_STRUCT_WQ_DATA_MASK))->pool->gcwq; 603 (data & WORK_STRUCT_WQ_DATA_MASK))->pool->gcwq;
593 604
594 cpu = data >> WORK_STRUCT_FLAG_BITS; 605 cpu = data >> WORK_OFFQ_CPU_SHIFT;
595 if (cpu == WORK_CPU_NONE) 606 if (cpu == WORK_CPU_NONE)
596 return NULL; 607 return NULL;
597 608
@@ -599,6 +610,22 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
599 return get_gcwq(cpu); 610 return get_gcwq(cpu);
600} 611}
601 612
613static void mark_work_canceling(struct work_struct *work)
614{
615 struct global_cwq *gcwq = get_work_gcwq(work);
616 unsigned long cpu = gcwq ? gcwq->cpu : WORK_CPU_NONE;
617
618 set_work_data(work, (cpu << WORK_OFFQ_CPU_SHIFT) | WORK_OFFQ_CANCELING,
619 WORK_STRUCT_PENDING);
620}
621
622static bool work_is_canceling(struct work_struct *work)
623{
624 unsigned long data = atomic_long_read(&work->data);
625
626 return !(data & WORK_STRUCT_CWQ) && (data & WORK_OFFQ_CANCELING);
627}
628
602/* 629/*
603 * Policy functions. These define the policies on how the global worker 630 * Policy functions. These define the policies on how the global worker
604 * pools are managed. Unless noted otherwise, these functions assume that 631 * pools are managed. Unless noted otherwise, these functions assume that
@@ -657,6 +684,13 @@ static bool too_many_workers(struct worker_pool *pool)
657 int nr_idle = pool->nr_idle + managing; /* manager is considered idle */ 684 int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
658 int nr_busy = pool->nr_workers - nr_idle; 685 int nr_busy = pool->nr_workers - nr_idle;
659 686
687 /*
688 * nr_idle and idle_list may disagree if idle rebinding is in
689 * progress. Never return %true if idle_list is empty.
690 */
691 if (list_empty(&pool->idle_list))
692 return false;
693
660 return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy; 694 return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
661} 695}
662 696
@@ -903,6 +937,206 @@ static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
903} 937}
904 938
905/** 939/**
940 * move_linked_works - move linked works to a list
941 * @work: start of series of works to be scheduled
942 * @head: target list to append @work to
943 * @nextp: out paramter for nested worklist walking
944 *
945 * Schedule linked works starting from @work to @head. Work series to
946 * be scheduled starts at @work and includes any consecutive work with
947 * WORK_STRUCT_LINKED set in its predecessor.
948 *
949 * If @nextp is not NULL, it's updated to point to the next work of
950 * the last scheduled work. This allows move_linked_works() to be
951 * nested inside outer list_for_each_entry_safe().
952 *
953 * CONTEXT:
954 * spin_lock_irq(gcwq->lock).
955 */
956static void move_linked_works(struct work_struct *work, struct list_head *head,
957 struct work_struct **nextp)
958{
959 struct work_struct *n;
960
961 /*
962 * Linked worklist will always end before the end of the list,
963 * use NULL for list head.
964 */
965 list_for_each_entry_safe_from(work, n, NULL, entry) {
966 list_move_tail(&work->entry, head);
967 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
968 break;
969 }
970
971 /*
972 * If we're already inside safe list traversal and have moved
973 * multiple works to the scheduled queue, the next position
974 * needs to be updated.
975 */
976 if (nextp)
977 *nextp = n;
978}
979
980static void cwq_activate_delayed_work(struct work_struct *work)
981{
982 struct cpu_workqueue_struct *cwq = get_work_cwq(work);
983
984 trace_workqueue_activate_work(work);
985 move_linked_works(work, &cwq->pool->worklist, NULL);
986 __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
987 cwq->nr_active++;
988}
989
990static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
991{
992 struct work_struct *work = list_first_entry(&cwq->delayed_works,
993 struct work_struct, entry);
994
995 cwq_activate_delayed_work(work);
996}
997
998/**
999 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
1000 * @cwq: cwq of interest
1001 * @color: color of work which left the queue
1002 *
1003 * A work either has completed or is removed from pending queue,
1004 * decrement nr_in_flight of its cwq and handle workqueue flushing.
1005 *
1006 * CONTEXT:
1007 * spin_lock_irq(gcwq->lock).
1008 */
1009static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
1010{
1011 /* ignore uncolored works */
1012 if (color == WORK_NO_COLOR)
1013 return;
1014
1015 cwq->nr_in_flight[color]--;
1016
1017 cwq->nr_active--;
1018 if (!list_empty(&cwq->delayed_works)) {
1019 /* one down, submit a delayed one */
1020 if (cwq->nr_active < cwq->max_active)
1021 cwq_activate_first_delayed(cwq);
1022 }
1023
1024 /* is flush in progress and are we at the flushing tip? */
1025 if (likely(cwq->flush_color != color))
1026 return;
1027
1028 /* are there still in-flight works? */
1029 if (cwq->nr_in_flight[color])
1030 return;
1031
1032 /* this cwq is done, clear flush_color */
1033 cwq->flush_color = -1;
1034
1035 /*
1036 * If this was the last cwq, wake up the first flusher. It
1037 * will handle the rest.
1038 */
1039 if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
1040 complete(&cwq->wq->first_flusher->done);
1041}
1042
1043/**
1044 * try_to_grab_pending - steal work item from worklist and disable irq
1045 * @work: work item to steal
1046 * @is_dwork: @work is a delayed_work
1047 * @flags: place to store irq state
1048 *
1049 * Try to grab PENDING bit of @work. This function can handle @work in any
1050 * stable state - idle, on timer or on worklist. Return values are
1051 *
1052 * 1 if @work was pending and we successfully stole PENDING
1053 * 0 if @work was idle and we claimed PENDING
1054 * -EAGAIN if PENDING couldn't be grabbed at the moment, safe to busy-retry
1055 * -ENOENT if someone else is canceling @work, this state may persist
1056 * for arbitrarily long
1057 *
1058 * On >= 0 return, the caller owns @work's PENDING bit. To avoid getting
1059 * interrupted while holding PENDING and @work off queue, irq must be
1060 * disabled on entry. This, combined with delayed_work->timer being
1061 * irqsafe, ensures that we return -EAGAIN for finite short period of time.
1062 *
1063 * On successful return, >= 0, irq is disabled and the caller is
1064 * responsible for releasing it using local_irq_restore(*@flags).
1065 *
1066 * This function is safe to call from any context including IRQ handler.
1067 */
1068static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
1069 unsigned long *flags)
1070{
1071 struct global_cwq *gcwq;
1072
1073 local_irq_save(*flags);
1074
1075 /* try to steal the timer if it exists */
1076 if (is_dwork) {
1077 struct delayed_work *dwork = to_delayed_work(work);
1078
1079 /*
1080 * dwork->timer is irqsafe. If del_timer() fails, it's
1081 * guaranteed that the timer is not queued anywhere and not
1082 * running on the local CPU.
1083 */
1084 if (likely(del_timer(&dwork->timer)))
1085 return 1;
1086 }
1087
1088 /* try to claim PENDING the normal way */
1089 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
1090 return 0;
1091
1092 /*
1093 * The queueing is in progress, or it is already queued. Try to
1094 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
1095 */
1096 gcwq = get_work_gcwq(work);
1097 if (!gcwq)
1098 goto fail;
1099
1100 spin_lock(&gcwq->lock);
1101 if (!list_empty(&work->entry)) {
1102 /*
1103 * This work is queued, but perhaps we locked the wrong gcwq.
1104 * In that case we must see the new value after rmb(), see
1105 * insert_work()->wmb().
1106 */
1107 smp_rmb();
1108 if (gcwq == get_work_gcwq(work)) {
1109 debug_work_deactivate(work);
1110
1111 /*
1112 * A delayed work item cannot be grabbed directly
1113 * because it might have linked NO_COLOR work items
1114 * which, if left on the delayed_list, will confuse
1115 * cwq->nr_active management later on and cause
1116 * stall. Make sure the work item is activated
1117 * before grabbing.
1118 */
1119 if (*work_data_bits(work) & WORK_STRUCT_DELAYED)
1120 cwq_activate_delayed_work(work);
1121
1122 list_del_init(&work->entry);
1123 cwq_dec_nr_in_flight(get_work_cwq(work),
1124 get_work_color(work));
1125
1126 spin_unlock(&gcwq->lock);
1127 return 1;
1128 }
1129 }
1130 spin_unlock(&gcwq->lock);
1131fail:
1132 local_irq_restore(*flags);
1133 if (work_is_canceling(work))
1134 return -ENOENT;
1135 cpu_relax();
1136 return -EAGAIN;
1137}
1138
1139/**
906 * insert_work - insert a work into gcwq 1140 * insert_work - insert a work into gcwq
907 * @cwq: cwq @work belongs to 1141 * @cwq: cwq @work belongs to
908 * @work: work to insert 1142 * @work: work to insert
@@ -982,7 +1216,15 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
982 struct cpu_workqueue_struct *cwq; 1216 struct cpu_workqueue_struct *cwq;
983 struct list_head *worklist; 1217 struct list_head *worklist;
984 unsigned int work_flags; 1218 unsigned int work_flags;
985 unsigned long flags; 1219 unsigned int req_cpu = cpu;
1220
1221 /*
1222 * While a work item is PENDING && off queue, a task trying to
1223 * steal the PENDING will busy-loop waiting for it to either get
1224 * queued or lose PENDING. Grabbing PENDING and queueing should
1225 * happen with IRQ disabled.
1226 */
1227 WARN_ON_ONCE(!irqs_disabled());
986 1228
987 debug_work_activate(work); 1229 debug_work_activate(work);
988 1230
@@ -995,21 +1237,22 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
995 if (!(wq->flags & WQ_UNBOUND)) { 1237 if (!(wq->flags & WQ_UNBOUND)) {
996 struct global_cwq *last_gcwq; 1238 struct global_cwq *last_gcwq;
997 1239
998 if (unlikely(cpu == WORK_CPU_UNBOUND)) 1240 if (cpu == WORK_CPU_UNBOUND)
999 cpu = raw_smp_processor_id(); 1241 cpu = raw_smp_processor_id();
1000 1242
1001 /* 1243 /*
1002 * It's multi cpu. If @wq is non-reentrant and @work 1244 * It's multi cpu. If @work was previously on a different
1003 * was previously on a different cpu, it might still 1245 * cpu, it might still be running there, in which case the
1004 * be running there, in which case the work needs to 1246 * work needs to be queued on that cpu to guarantee
1005 * be queued on that cpu to guarantee non-reentrance. 1247 * non-reentrancy.
1006 */ 1248 */
1007 gcwq = get_gcwq(cpu); 1249 gcwq = get_gcwq(cpu);
1008 if (wq->flags & WQ_NON_REENTRANT && 1250 last_gcwq = get_work_gcwq(work);
1009 (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) { 1251
1252 if (last_gcwq && last_gcwq != gcwq) {
1010 struct worker *worker; 1253 struct worker *worker;
1011 1254
1012 spin_lock_irqsave(&last_gcwq->lock, flags); 1255 spin_lock(&last_gcwq->lock);
1013 1256
1014 worker = find_worker_executing_work(last_gcwq, work); 1257 worker = find_worker_executing_work(last_gcwq, work);
1015 1258
@@ -1017,22 +1260,23 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
1017 gcwq = last_gcwq; 1260 gcwq = last_gcwq;
1018 else { 1261 else {
1019 /* meh... not running there, queue here */ 1262 /* meh... not running there, queue here */
1020 spin_unlock_irqrestore(&last_gcwq->lock, flags); 1263 spin_unlock(&last_gcwq->lock);
1021 spin_lock_irqsave(&gcwq->lock, flags); 1264 spin_lock(&gcwq->lock);
1022 } 1265 }
1023 } else 1266 } else {
1024 spin_lock_irqsave(&gcwq->lock, flags); 1267 spin_lock(&gcwq->lock);
1268 }
1025 } else { 1269 } else {
1026 gcwq = get_gcwq(WORK_CPU_UNBOUND); 1270 gcwq = get_gcwq(WORK_CPU_UNBOUND);
1027 spin_lock_irqsave(&gcwq->lock, flags); 1271 spin_lock(&gcwq->lock);
1028 } 1272 }
1029 1273
1030 /* gcwq determined, get cwq and queue */ 1274 /* gcwq determined, get cwq and queue */
1031 cwq = get_cwq(gcwq->cpu, wq); 1275 cwq = get_cwq(gcwq->cpu, wq);
1032 trace_workqueue_queue_work(cpu, cwq, work); 1276 trace_workqueue_queue_work(req_cpu, cwq, work);
1033 1277
1034 if (WARN_ON(!list_empty(&work->entry))) { 1278 if (WARN_ON(!list_empty(&work->entry))) {
1035 spin_unlock_irqrestore(&gcwq->lock, flags); 1279 spin_unlock(&gcwq->lock);
1036 return; 1280 return;
1037 } 1281 }
1038 1282
@@ -1050,79 +1294,110 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
1050 1294
1051 insert_work(cwq, work, worklist, work_flags); 1295 insert_work(cwq, work, worklist, work_flags);
1052 1296
1053 spin_unlock_irqrestore(&gcwq->lock, flags); 1297 spin_unlock(&gcwq->lock);
1054} 1298}
1055 1299
1056/** 1300/**
1057 * queue_work - queue work on a workqueue 1301 * queue_work_on - queue work on specific cpu
1302 * @cpu: CPU number to execute work on
1058 * @wq: workqueue to use 1303 * @wq: workqueue to use
1059 * @work: work to queue 1304 * @work: work to queue
1060 * 1305 *
1061 * Returns 0 if @work was already on a queue, non-zero otherwise. 1306 * Returns %false if @work was already on a queue, %true otherwise.
1062 * 1307 *
1063 * We queue the work to the CPU on which it was submitted, but if the CPU dies 1308 * We queue the work to a specific CPU, the caller must ensure it
1064 * it can be processed by another CPU. 1309 * can't go away.
1065 */ 1310 */
1066int queue_work(struct workqueue_struct *wq, struct work_struct *work) 1311bool queue_work_on(int cpu, struct workqueue_struct *wq,
1312 struct work_struct *work)
1067{ 1313{
1068 int ret; 1314 bool ret = false;
1315 unsigned long flags;
1069 1316
1070 ret = queue_work_on(get_cpu(), wq, work); 1317 local_irq_save(flags);
1071 put_cpu(); 1318
1319 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1320 __queue_work(cpu, wq, work);
1321 ret = true;
1322 }
1072 1323
1324 local_irq_restore(flags);
1073 return ret; 1325 return ret;
1074} 1326}
1075EXPORT_SYMBOL_GPL(queue_work); 1327EXPORT_SYMBOL_GPL(queue_work_on);
1076 1328
1077/** 1329/**
1078 * queue_work_on - queue work on specific cpu 1330 * queue_work - queue work on a workqueue
1079 * @cpu: CPU number to execute work on
1080 * @wq: workqueue to use 1331 * @wq: workqueue to use
1081 * @work: work to queue 1332 * @work: work to queue
1082 * 1333 *
1083 * Returns 0 if @work was already on a queue, non-zero otherwise. 1334 * Returns %false if @work was already on a queue, %true otherwise.
1084 * 1335 *
1085 * We queue the work to a specific CPU, the caller must ensure it 1336 * We queue the work to the CPU on which it was submitted, but if the CPU dies
1086 * can't go away. 1337 * it can be processed by another CPU.
1087 */ 1338 */
1088int 1339bool queue_work(struct workqueue_struct *wq, struct work_struct *work)
1089queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
1090{ 1340{
1091 int ret = 0; 1341 return queue_work_on(WORK_CPU_UNBOUND, wq, work);
1092
1093 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1094 __queue_work(cpu, wq, work);
1095 ret = 1;
1096 }
1097 return ret;
1098} 1342}
1099EXPORT_SYMBOL_GPL(queue_work_on); 1343EXPORT_SYMBOL_GPL(queue_work);
1100 1344
1101static void delayed_work_timer_fn(unsigned long __data) 1345void delayed_work_timer_fn(unsigned long __data)
1102{ 1346{
1103 struct delayed_work *dwork = (struct delayed_work *)__data; 1347 struct delayed_work *dwork = (struct delayed_work *)__data;
1104 struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work); 1348 struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
1105 1349
1106 __queue_work(smp_processor_id(), cwq->wq, &dwork->work); 1350 /* should have been called from irqsafe timer with irq already off */
1351 __queue_work(dwork->cpu, cwq->wq, &dwork->work);
1107} 1352}
1353EXPORT_SYMBOL_GPL(delayed_work_timer_fn);
1108 1354
1109/** 1355static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
1110 * queue_delayed_work - queue work on a workqueue after delay 1356 struct delayed_work *dwork, unsigned long delay)
1111 * @wq: workqueue to use
1112 * @dwork: delayable work to queue
1113 * @delay: number of jiffies to wait before queueing
1114 *
1115 * Returns 0 if @work was already on a queue, non-zero otherwise.
1116 */
1117int queue_delayed_work(struct workqueue_struct *wq,
1118 struct delayed_work *dwork, unsigned long delay)
1119{ 1357{
1120 if (delay == 0) 1358 struct timer_list *timer = &dwork->timer;
1121 return queue_work(wq, &dwork->work); 1359 struct work_struct *work = &dwork->work;
1360 unsigned int lcpu;
1361
1362 WARN_ON_ONCE(timer->function != delayed_work_timer_fn ||
1363 timer->data != (unsigned long)dwork);
1364 BUG_ON(timer_pending(timer));
1365 BUG_ON(!list_empty(&work->entry));
1366
1367 timer_stats_timer_set_start_info(&dwork->timer);
1368
1369 /*
1370 * This stores cwq for the moment, for the timer_fn. Note that the
1371 * work's gcwq is preserved to allow reentrance detection for
1372 * delayed works.
1373 */
1374 if (!(wq->flags & WQ_UNBOUND)) {
1375 struct global_cwq *gcwq = get_work_gcwq(work);
1122 1376
1123 return queue_delayed_work_on(-1, wq, dwork, delay); 1377 /*
1378 * If we cannot get the last gcwq from @work directly,
1379 * select the last CPU such that it avoids unnecessarily
1380 * triggering non-reentrancy check in __queue_work().
1381 */
1382 lcpu = cpu;
1383 if (gcwq)
1384 lcpu = gcwq->cpu;
1385 if (lcpu == WORK_CPU_UNBOUND)
1386 lcpu = raw_smp_processor_id();
1387 } else {
1388 lcpu = WORK_CPU_UNBOUND;
1389 }
1390
1391 set_work_cwq(work, get_cwq(lcpu, wq), 0);
1392
1393 dwork->cpu = cpu;
1394 timer->expires = jiffies + delay;
1395
1396 if (unlikely(cpu != WORK_CPU_UNBOUND))
1397 add_timer_on(timer, cpu);
1398 else
1399 add_timer(timer);
1124} 1400}
1125EXPORT_SYMBOL_GPL(queue_delayed_work);
1126 1401
1127/** 1402/**
1128 * queue_delayed_work_on - queue work on specific CPU after delay 1403 * queue_delayed_work_on - queue work on specific CPU after delay
@@ -1131,53 +1406,100 @@ EXPORT_SYMBOL_GPL(queue_delayed_work);
1131 * @dwork: work to queue 1406 * @dwork: work to queue
1132 * @delay: number of jiffies to wait before queueing 1407 * @delay: number of jiffies to wait before queueing
1133 * 1408 *
1134 * Returns 0 if @work was already on a queue, non-zero otherwise. 1409 * Returns %false if @work was already on a queue, %true otherwise. If
1410 * @delay is zero and @dwork is idle, it will be scheduled for immediate
1411 * execution.
1135 */ 1412 */
1136int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 1413bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
1137 struct delayed_work *dwork, unsigned long delay) 1414 struct delayed_work *dwork, unsigned long delay)
1138{ 1415{
1139 int ret = 0;
1140 struct timer_list *timer = &dwork->timer;
1141 struct work_struct *work = &dwork->work; 1416 struct work_struct *work = &dwork->work;
1417 bool ret = false;
1418 unsigned long flags;
1142 1419
1143 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 1420 if (!delay)
1144 unsigned int lcpu; 1421 return queue_work_on(cpu, wq, &dwork->work);
1145 1422
1146 BUG_ON(timer_pending(timer)); 1423 /* read the comment in __queue_work() */
1147 BUG_ON(!list_empty(&work->entry)); 1424 local_irq_save(flags);
1148 1425
1149 timer_stats_timer_set_start_info(&dwork->timer); 1426 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1427 __queue_delayed_work(cpu, wq, dwork, delay);
1428 ret = true;
1429 }
1150 1430
1151 /* 1431 local_irq_restore(flags);
1152 * This stores cwq for the moment, for the timer_fn. 1432 return ret;
1153 * Note that the work's gcwq is preserved to allow 1433}
1154 * reentrance detection for delayed works. 1434EXPORT_SYMBOL_GPL(queue_delayed_work_on);
1155 */
1156 if (!(wq->flags & WQ_UNBOUND)) {
1157 struct global_cwq *gcwq = get_work_gcwq(work);
1158 1435
1159 if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND) 1436/**
1160 lcpu = gcwq->cpu; 1437 * queue_delayed_work - queue work on a workqueue after delay
1161 else 1438 * @wq: workqueue to use
1162 lcpu = raw_smp_processor_id(); 1439 * @dwork: delayable work to queue
1163 } else 1440 * @delay: number of jiffies to wait before queueing
1164 lcpu = WORK_CPU_UNBOUND; 1441 *
1442 * Equivalent to queue_delayed_work_on() but tries to use the local CPU.
1443 */
1444bool queue_delayed_work(struct workqueue_struct *wq,
1445 struct delayed_work *dwork, unsigned long delay)
1446{
1447 return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
1448}
1449EXPORT_SYMBOL_GPL(queue_delayed_work);
1165 1450
1166 set_work_cwq(work, get_cwq(lcpu, wq), 0); 1451/**
1452 * mod_delayed_work_on - modify delay of or queue a delayed work on specific CPU
1453 * @cpu: CPU number to execute work on
1454 * @wq: workqueue to use
1455 * @dwork: work to queue
1456 * @delay: number of jiffies to wait before queueing
1457 *
1458 * If @dwork is idle, equivalent to queue_delayed_work_on(); otherwise,
1459 * modify @dwork's timer so that it expires after @delay. If @delay is
1460 * zero, @work is guaranteed to be scheduled immediately regardless of its
1461 * current state.
1462 *
1463 * Returns %false if @dwork was idle and queued, %true if @dwork was
1464 * pending and its timer was modified.
1465 *
1466 * This function is safe to call from any context including IRQ handler.
1467 * See try_to_grab_pending() for details.
1468 */
1469bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
1470 struct delayed_work *dwork, unsigned long delay)
1471{
1472 unsigned long flags;
1473 int ret;
1167 1474
1168 timer->expires = jiffies + delay; 1475 do {
1169 timer->data = (unsigned long)dwork; 1476 ret = try_to_grab_pending(&dwork->work, true, &flags);
1170 timer->function = delayed_work_timer_fn; 1477 } while (unlikely(ret == -EAGAIN));
1171 1478
1172 if (unlikely(cpu >= 0)) 1479 if (likely(ret >= 0)) {
1173 add_timer_on(timer, cpu); 1480 __queue_delayed_work(cpu, wq, dwork, delay);
1174 else 1481 local_irq_restore(flags);
1175 add_timer(timer);
1176 ret = 1;
1177 } 1482 }
1483
1484 /* -ENOENT from try_to_grab_pending() becomes %true */
1178 return ret; 1485 return ret;
1179} 1486}
1180EXPORT_SYMBOL_GPL(queue_delayed_work_on); 1487EXPORT_SYMBOL_GPL(mod_delayed_work_on);
1488
1489/**
1490 * mod_delayed_work - modify delay of or queue a delayed work
1491 * @wq: workqueue to use
1492 * @dwork: work to queue
1493 * @delay: number of jiffies to wait before queueing
1494 *
1495 * mod_delayed_work_on() on local CPU.
1496 */
1497bool mod_delayed_work(struct workqueue_struct *wq, struct delayed_work *dwork,
1498 unsigned long delay)
1499{
1500 return mod_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
1501}
1502EXPORT_SYMBOL_GPL(mod_delayed_work);
1181 1503
1182/** 1504/**
1183 * worker_enter_idle - enter idle state 1505 * worker_enter_idle - enter idle state
@@ -1305,37 +1627,21 @@ __acquires(&gcwq->lock)
1305 } 1627 }
1306} 1628}
1307 1629
1308struct idle_rebind {
1309 int cnt; /* # workers to be rebound */
1310 struct completion done; /* all workers rebound */
1311};
1312
1313/* 1630/*
1314 * Rebind an idle @worker to its CPU. During CPU onlining, this has to 1631 * Rebind an idle @worker to its CPU. worker_thread() will test
1315 * happen synchronously for idle workers. worker_thread() will test 1632 * list_empty(@worker->entry) before leaving idle and call this function.
1316 * %WORKER_REBIND before leaving idle and call this function.
1317 */ 1633 */
1318static void idle_worker_rebind(struct worker *worker) 1634static void idle_worker_rebind(struct worker *worker)
1319{ 1635{
1320 struct global_cwq *gcwq = worker->pool->gcwq; 1636 struct global_cwq *gcwq = worker->pool->gcwq;
1321 1637
1322 /* CPU must be online at this point */ 1638 /* CPU may go down again inbetween, clear UNBOUND only on success */
1323 WARN_ON(!worker_maybe_bind_and_lock(worker)); 1639 if (worker_maybe_bind_and_lock(worker))
1324 if (!--worker->idle_rebind->cnt) 1640 worker_clr_flags(worker, WORKER_UNBOUND);
1325 complete(&worker->idle_rebind->done);
1326 spin_unlock_irq(&worker->pool->gcwq->lock);
1327 1641
1328 /* we did our part, wait for rebind_workers() to finish up */ 1642 /* rebind complete, become available again */
1329 wait_event(gcwq->rebind_hold, !(worker->flags & WORKER_REBIND)); 1643 list_add(&worker->entry, &worker->pool->idle_list);
1330 1644 spin_unlock_irq(&gcwq->lock);
1331 /*
1332 * rebind_workers() shouldn't finish until all workers passed the
1333 * above WORKER_REBIND wait. Tell it when done.
1334 */
1335 spin_lock_irq(&worker->pool->gcwq->lock);
1336 if (!--worker->idle_rebind->cnt)
1337 complete(&worker->idle_rebind->done);
1338 spin_unlock_irq(&worker->pool->gcwq->lock);
1339} 1645}
1340 1646
1341/* 1647/*
@@ -1349,16 +1655,8 @@ static void busy_worker_rebind_fn(struct work_struct *work)
1349 struct worker *worker = container_of(work, struct worker, rebind_work); 1655 struct worker *worker = container_of(work, struct worker, rebind_work);
1350 struct global_cwq *gcwq = worker->pool->gcwq; 1656 struct global_cwq *gcwq = worker->pool->gcwq;
1351 1657
1352 worker_maybe_bind_and_lock(worker); 1658 if (worker_maybe_bind_and_lock(worker))
1353 1659 worker_clr_flags(worker, WORKER_UNBOUND);
1354 /*
1355 * %WORKER_REBIND must be cleared even if the above binding failed;
1356 * otherwise, we may confuse the next CPU_UP cycle or oops / get
1357 * stuck by calling idle_worker_rebind() prematurely. If CPU went
1358 * down again inbetween, %WORKER_UNBOUND would be set, so clearing
1359 * %WORKER_REBIND is always safe.
1360 */
1361 worker_clr_flags(worker, WORKER_REBIND);
1362 1660
1363 spin_unlock_irq(&gcwq->lock); 1661 spin_unlock_irq(&gcwq->lock);
1364} 1662}
@@ -1370,123 +1668,74 @@ static void busy_worker_rebind_fn(struct work_struct *work)
1370 * @gcwq->cpu is coming online. Rebind all workers to the CPU. Rebinding 1668 * @gcwq->cpu is coming online. Rebind all workers to the CPU. Rebinding
1371 * is different for idle and busy ones. 1669 * is different for idle and busy ones.
1372 * 1670 *
1373 * The idle ones should be rebound synchronously and idle rebinding should 1671 * Idle ones will be removed from the idle_list and woken up. They will
1374 * be complete before any worker starts executing work items with 1672 * add themselves back after completing rebind. This ensures that the
1375 * concurrency management enabled; otherwise, scheduler may oops trying to 1673 * idle_list doesn't contain any unbound workers when re-bound busy workers
1376 * wake up non-local idle worker from wq_worker_sleeping(). 1674 * try to perform local wake-ups for concurrency management.
1377 * 1675 *
1378 * This is achieved by repeatedly requesting rebinding until all idle 1676 * Busy workers can rebind after they finish their current work items.
1379 * workers are known to have been rebound under @gcwq->lock and holding all 1677 * Queueing the rebind work item at the head of the scheduled list is
1380 * idle workers from becoming busy until idle rebinding is complete. 1678 * enough. Note that nr_running will be properly bumped as busy workers
1679 * rebind.
1381 * 1680 *
1382 * Once idle workers are rebound, busy workers can be rebound as they 1681 * On return, all non-manager workers are scheduled for rebind - see
1383 * finish executing their current work items. Queueing the rebind work at 1682 * manage_workers() for the manager special case. Any idle worker
1384 * the head of their scheduled lists is enough. Note that nr_running will 1683 * including the manager will not appear on @idle_list until rebind is
1385 * be properbly bumped as busy workers rebind. 1684 * complete, making local wake-ups safe.
1386 *
1387 * On return, all workers are guaranteed to either be bound or have rebind
1388 * work item scheduled.
1389 */ 1685 */
1390static void rebind_workers(struct global_cwq *gcwq) 1686static void rebind_workers(struct global_cwq *gcwq)
1391 __releases(&gcwq->lock) __acquires(&gcwq->lock)
1392{ 1687{
1393 struct idle_rebind idle_rebind;
1394 struct worker_pool *pool; 1688 struct worker_pool *pool;
1395 struct worker *worker; 1689 struct worker *worker, *n;
1396 struct hlist_node *pos; 1690 struct hlist_node *pos;
1397 int i; 1691 int i;
1398 1692
1399 lockdep_assert_held(&gcwq->lock); 1693 lockdep_assert_held(&gcwq->lock);
1400 1694
1401 for_each_worker_pool(pool, gcwq) 1695 for_each_worker_pool(pool, gcwq)
1402 lockdep_assert_held(&pool->manager_mutex); 1696 lockdep_assert_held(&pool->assoc_mutex);
1403 1697
1404 /* 1698 /* dequeue and kick idle ones */
1405 * Rebind idle workers. Interlocked both ways. We wait for
1406 * workers to rebind via @idle_rebind.done. Workers will wait for
1407 * us to finish up by watching %WORKER_REBIND.
1408 */
1409 init_completion(&idle_rebind.done);
1410retry:
1411 idle_rebind.cnt = 1;
1412 INIT_COMPLETION(idle_rebind.done);
1413
1414 /* set REBIND and kick idle ones, we'll wait for these later */
1415 for_each_worker_pool(pool, gcwq) { 1699 for_each_worker_pool(pool, gcwq) {
1416 list_for_each_entry(worker, &pool->idle_list, entry) { 1700 list_for_each_entry_safe(worker, n, &pool->idle_list, entry) {
1417 unsigned long worker_flags = worker->flags; 1701 /*
1418 1702 * idle workers should be off @pool->idle_list
1419 if (worker->flags & WORKER_REBIND) 1703 * until rebind is complete to avoid receiving
1420 continue; 1704 * premature local wake-ups.
1421 1705 */
1422 /* morph UNBOUND to REBIND atomically */ 1706 list_del_init(&worker->entry);
1423 worker_flags &= ~WORKER_UNBOUND;
1424 worker_flags |= WORKER_REBIND;
1425 ACCESS_ONCE(worker->flags) = worker_flags;
1426
1427 idle_rebind.cnt++;
1428 worker->idle_rebind = &idle_rebind;
1429 1707
1430 /* worker_thread() will call idle_worker_rebind() */ 1708 /*
1709 * worker_thread() will see the above dequeuing
1710 * and call idle_worker_rebind().
1711 */
1431 wake_up_process(worker->task); 1712 wake_up_process(worker->task);
1432 } 1713 }
1433 } 1714 }
1434 1715
1435 if (--idle_rebind.cnt) { 1716 /* rebind busy workers */
1436 spin_unlock_irq(&gcwq->lock);
1437 wait_for_completion(&idle_rebind.done);
1438 spin_lock_irq(&gcwq->lock);
1439 /* busy ones might have become idle while waiting, retry */
1440 goto retry;
1441 }
1442
1443 /* all idle workers are rebound, rebind busy workers */
1444 for_each_busy_worker(worker, i, pos, gcwq) { 1717 for_each_busy_worker(worker, i, pos, gcwq) {
1445 struct work_struct *rebind_work = &worker->rebind_work; 1718 struct work_struct *rebind_work = &worker->rebind_work;
1446 unsigned long worker_flags = worker->flags; 1719 struct workqueue_struct *wq;
1447
1448 /* morph UNBOUND to REBIND atomically */
1449 worker_flags &= ~WORKER_UNBOUND;
1450 worker_flags |= WORKER_REBIND;
1451 ACCESS_ONCE(worker->flags) = worker_flags;
1452 1720
1453 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT, 1721 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
1454 work_data_bits(rebind_work))) 1722 work_data_bits(rebind_work)))
1455 continue; 1723 continue;
1456 1724
1457 /* wq doesn't matter, use the default one */
1458 debug_work_activate(rebind_work); 1725 debug_work_activate(rebind_work);
1459 insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work,
1460 worker->scheduled.next,
1461 work_color_to_flags(WORK_NO_COLOR));
1462 }
1463
1464 /*
1465 * All idle workers are rebound and waiting for %WORKER_REBIND to
1466 * be cleared inside idle_worker_rebind(). Clear and release.
1467 * Clearing %WORKER_REBIND from this foreign context is safe
1468 * because these workers are still guaranteed to be idle.
1469 *
1470 * We need to make sure all idle workers passed WORKER_REBIND wait
1471 * in idle_worker_rebind() before returning; otherwise, workers can
1472 * get stuck at the wait if hotplug cycle repeats.
1473 */
1474 idle_rebind.cnt = 1;
1475 INIT_COMPLETION(idle_rebind.done);
1476
1477 for_each_worker_pool(pool, gcwq) {
1478 list_for_each_entry(worker, &pool->idle_list, entry) {
1479 worker->flags &= ~WORKER_REBIND;
1480 idle_rebind.cnt++;
1481 }
1482 }
1483 1726
1484 wake_up_all(&gcwq->rebind_hold); 1727 /*
1728 * wq doesn't really matter but let's keep @worker->pool
1729 * and @cwq->pool consistent for sanity.
1730 */
1731 if (worker_pool_pri(worker->pool))
1732 wq = system_highpri_wq;
1733 else
1734 wq = system_wq;
1485 1735
1486 if (--idle_rebind.cnt) { 1736 insert_work(get_cwq(gcwq->cpu, wq), rebind_work,
1487 spin_unlock_irq(&gcwq->lock); 1737 worker->scheduled.next,
1488 wait_for_completion(&idle_rebind.done); 1738 work_color_to_flags(WORK_NO_COLOR));
1489 spin_lock_irq(&gcwq->lock);
1490 } 1739 }
1491} 1740}
1492 1741
@@ -1844,22 +2093,22 @@ static bool manage_workers(struct worker *worker)
1844 * grab %POOL_MANAGING_WORKERS to achieve this because that can 2093 * grab %POOL_MANAGING_WORKERS to achieve this because that can
1845 * lead to idle worker depletion (all become busy thinking someone 2094 * lead to idle worker depletion (all become busy thinking someone
1846 * else is managing) which in turn can result in deadlock under 2095 * else is managing) which in turn can result in deadlock under
1847 * extreme circumstances. Use @pool->manager_mutex to synchronize 2096 * extreme circumstances. Use @pool->assoc_mutex to synchronize
1848 * manager against CPU hotplug. 2097 * manager against CPU hotplug.
1849 * 2098 *
1850 * manager_mutex would always be free unless CPU hotplug is in 2099 * assoc_mutex would always be free unless CPU hotplug is in
1851 * progress. trylock first without dropping @gcwq->lock. 2100 * progress. trylock first without dropping @gcwq->lock.
1852 */ 2101 */
1853 if (unlikely(!mutex_trylock(&pool->manager_mutex))) { 2102 if (unlikely(!mutex_trylock(&pool->assoc_mutex))) {
1854 spin_unlock_irq(&pool->gcwq->lock); 2103 spin_unlock_irq(&pool->gcwq->lock);
1855 mutex_lock(&pool->manager_mutex); 2104 mutex_lock(&pool->assoc_mutex);
1856 /* 2105 /*
1857 * CPU hotplug could have happened while we were waiting 2106 * CPU hotplug could have happened while we were waiting
1858 * for manager_mutex. Hotplug itself can't handle us 2107 * for assoc_mutex. Hotplug itself can't handle us
1859 * because manager isn't either on idle or busy list, and 2108 * because manager isn't either on idle or busy list, and
1860 * @gcwq's state and ours could have deviated. 2109 * @gcwq's state and ours could have deviated.
1861 * 2110 *
1862 * As hotplug is now excluded via manager_mutex, we can 2111 * As hotplug is now excluded via assoc_mutex, we can
1863 * simply try to bind. It will succeed or fail depending 2112 * simply try to bind. It will succeed or fail depending
1864 * on @gcwq's current state. Try it and adjust 2113 * on @gcwq's current state. Try it and adjust
1865 * %WORKER_UNBOUND accordingly. 2114 * %WORKER_UNBOUND accordingly.
@@ -1882,112 +2131,11 @@ static bool manage_workers(struct worker *worker)
1882 ret |= maybe_create_worker(pool); 2131 ret |= maybe_create_worker(pool);
1883 2132
1884 pool->flags &= ~POOL_MANAGING_WORKERS; 2133 pool->flags &= ~POOL_MANAGING_WORKERS;
1885 mutex_unlock(&pool->manager_mutex); 2134 mutex_unlock(&pool->assoc_mutex);
1886 return ret; 2135 return ret;
1887} 2136}
1888 2137
1889/** 2138/**
1890 * move_linked_works - move linked works to a list
1891 * @work: start of series of works to be scheduled
1892 * @head: target list to append @work to
1893 * @nextp: out paramter for nested worklist walking
1894 *
1895 * Schedule linked works starting from @work to @head. Work series to
1896 * be scheduled starts at @work and includes any consecutive work with
1897 * WORK_STRUCT_LINKED set in its predecessor.
1898 *
1899 * If @nextp is not NULL, it's updated to point to the next work of
1900 * the last scheduled work. This allows move_linked_works() to be
1901 * nested inside outer list_for_each_entry_safe().
1902 *
1903 * CONTEXT:
1904 * spin_lock_irq(gcwq->lock).
1905 */
1906static void move_linked_works(struct work_struct *work, struct list_head *head,
1907 struct work_struct **nextp)
1908{
1909 struct work_struct *n;
1910
1911 /*
1912 * Linked worklist will always end before the end of the list,
1913 * use NULL for list head.
1914 */
1915 list_for_each_entry_safe_from(work, n, NULL, entry) {
1916 list_move_tail(&work->entry, head);
1917 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
1918 break;
1919 }
1920
1921 /*
1922 * If we're already inside safe list traversal and have moved
1923 * multiple works to the scheduled queue, the next position
1924 * needs to be updated.
1925 */
1926 if (nextp)
1927 *nextp = n;
1928}
1929
1930static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
1931{
1932 struct work_struct *work = list_first_entry(&cwq->delayed_works,
1933 struct work_struct, entry);
1934
1935 trace_workqueue_activate_work(work);
1936 move_linked_works(work, &cwq->pool->worklist, NULL);
1937 __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
1938 cwq->nr_active++;
1939}
1940
1941/**
1942 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
1943 * @cwq: cwq of interest
1944 * @color: color of work which left the queue
1945 * @delayed: for a delayed work
1946 *
1947 * A work either has completed or is removed from pending queue,
1948 * decrement nr_in_flight of its cwq and handle workqueue flushing.
1949 *
1950 * CONTEXT:
1951 * spin_lock_irq(gcwq->lock).
1952 */
1953static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color,
1954 bool delayed)
1955{
1956 /* ignore uncolored works */
1957 if (color == WORK_NO_COLOR)
1958 return;
1959
1960 cwq->nr_in_flight[color]--;
1961
1962 if (!delayed) {
1963 cwq->nr_active--;
1964 if (!list_empty(&cwq->delayed_works)) {
1965 /* one down, submit a delayed one */
1966 if (cwq->nr_active < cwq->max_active)
1967 cwq_activate_first_delayed(cwq);
1968 }
1969 }
1970
1971 /* is flush in progress and are we at the flushing tip? */
1972 if (likely(cwq->flush_color != color))
1973 return;
1974
1975 /* are there still in-flight works? */
1976 if (cwq->nr_in_flight[color])
1977 return;
1978
1979 /* this cwq is done, clear flush_color */
1980 cwq->flush_color = -1;
1981
1982 /*
1983 * If this was the last cwq, wake up the first flusher. It
1984 * will handle the rest.
1985 */
1986 if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
1987 complete(&cwq->wq->first_flusher->done);
1988}
1989
1990/**
1991 * process_one_work - process single work 2139 * process_one_work - process single work
1992 * @worker: self 2140 * @worker: self
1993 * @work: work to process 2141 * @work: work to process
@@ -2030,7 +2178,7 @@ __acquires(&gcwq->lock)
2030 * necessary to avoid spurious warnings from rescuers servicing the 2178 * necessary to avoid spurious warnings from rescuers servicing the
2031 * unbound or a disassociated gcwq. 2179 * unbound or a disassociated gcwq.
2032 */ 2180 */
2033 WARN_ON_ONCE(!(worker->flags & (WORKER_UNBOUND | WORKER_REBIND)) && 2181 WARN_ON_ONCE(!(worker->flags & WORKER_UNBOUND) &&
2034 !(gcwq->flags & GCWQ_DISASSOCIATED) && 2182 !(gcwq->flags & GCWQ_DISASSOCIATED) &&
2035 raw_smp_processor_id() != gcwq->cpu); 2183 raw_smp_processor_id() != gcwq->cpu);
2036 2184
@@ -2046,15 +2194,13 @@ __acquires(&gcwq->lock)
2046 return; 2194 return;
2047 } 2195 }
2048 2196
2049 /* claim and process */ 2197 /* claim and dequeue */
2050 debug_work_deactivate(work); 2198 debug_work_deactivate(work);
2051 hlist_add_head(&worker->hentry, bwh); 2199 hlist_add_head(&worker->hentry, bwh);
2052 worker->current_work = work; 2200 worker->current_work = work;
2053 worker->current_cwq = cwq; 2201 worker->current_cwq = cwq;
2054 work_color = get_work_color(work); 2202 work_color = get_work_color(work);
2055 2203
2056 /* record the current cpu number in the work data and dequeue */
2057 set_work_cpu(work, gcwq->cpu);
2058 list_del_init(&work->entry); 2204 list_del_init(&work->entry);
2059 2205
2060 /* 2206 /*
@@ -2071,9 +2217,16 @@ __acquires(&gcwq->lock)
2071 if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool)) 2217 if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool))
2072 wake_up_worker(pool); 2218 wake_up_worker(pool);
2073 2219
2220 /*
2221 * Record the last CPU and clear PENDING which should be the last
2222 * update to @work. Also, do this inside @gcwq->lock so that
2223 * PENDING and queued state changes happen together while IRQ is
2224 * disabled.
2225 */
2226 set_work_cpu_and_clear_pending(work, gcwq->cpu);
2227
2074 spin_unlock_irq(&gcwq->lock); 2228 spin_unlock_irq(&gcwq->lock);
2075 2229
2076 work_clear_pending(work);
2077 lock_map_acquire_read(&cwq->wq->lockdep_map); 2230 lock_map_acquire_read(&cwq->wq->lockdep_map);
2078 lock_map_acquire(&lockdep_map); 2231 lock_map_acquire(&lockdep_map);
2079 trace_workqueue_execute_start(work); 2232 trace_workqueue_execute_start(work);
@@ -2087,11 +2240,9 @@ __acquires(&gcwq->lock)
2087 lock_map_release(&cwq->wq->lockdep_map); 2240 lock_map_release(&cwq->wq->lockdep_map);
2088 2241
2089 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 2242 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
2090 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 2243 pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
2091 "%s/0x%08x/%d\n", 2244 " last function: %pf\n",
2092 current->comm, preempt_count(), task_pid_nr(current)); 2245 current->comm, preempt_count(), task_pid_nr(current), f);
2093 printk(KERN_ERR " last function: ");
2094 print_symbol("%s\n", (unsigned long)f);
2095 debug_show_held_locks(current); 2246 debug_show_held_locks(current);
2096 dump_stack(); 2247 dump_stack();
2097 } 2248 }
@@ -2106,7 +2257,7 @@ __acquires(&gcwq->lock)
2106 hlist_del_init(&worker->hentry); 2257 hlist_del_init(&worker->hentry);
2107 worker->current_work = NULL; 2258 worker->current_work = NULL;
2108 worker->current_cwq = NULL; 2259 worker->current_cwq = NULL;
2109 cwq_dec_nr_in_flight(cwq, work_color, false); 2260 cwq_dec_nr_in_flight(cwq, work_color);
2110} 2261}
2111 2262
2112/** 2263/**
@@ -2151,18 +2302,17 @@ static int worker_thread(void *__worker)
2151woke_up: 2302woke_up:
2152 spin_lock_irq(&gcwq->lock); 2303 spin_lock_irq(&gcwq->lock);
2153 2304
2154 /* 2305 /* we are off idle list if destruction or rebind is requested */
2155 * DIE can be set only while idle and REBIND set while busy has 2306 if (unlikely(list_empty(&worker->entry))) {
2156 * @worker->rebind_work scheduled. Checking here is enough.
2157 */
2158 if (unlikely(worker->flags & (WORKER_REBIND | WORKER_DIE))) {
2159 spin_unlock_irq(&gcwq->lock); 2307 spin_unlock_irq(&gcwq->lock);
2160 2308
2309 /* if DIE is set, destruction is requested */
2161 if (worker->flags & WORKER_DIE) { 2310 if (worker->flags & WORKER_DIE) {
2162 worker->task->flags &= ~PF_WQ_WORKER; 2311 worker->task->flags &= ~PF_WQ_WORKER;
2163 return 0; 2312 return 0;
2164 } 2313 }
2165 2314
2315 /* otherwise, rebind */
2166 idle_worker_rebind(worker); 2316 idle_worker_rebind(worker);
2167 goto woke_up; 2317 goto woke_up;
2168 } 2318 }
@@ -2645,8 +2795,8 @@ reflush:
2645 2795
2646 if (++flush_cnt == 10 || 2796 if (++flush_cnt == 10 ||
2647 (flush_cnt % 100 == 0 && flush_cnt <= 1000)) 2797 (flush_cnt % 100 == 0 && flush_cnt <= 1000))
2648 pr_warning("workqueue %s: flush on destruction isn't complete after %u tries\n", 2798 pr_warn("workqueue %s: flush on destruction isn't complete after %u tries\n",
2649 wq->name, flush_cnt); 2799 wq->name, flush_cnt);
2650 goto reflush; 2800 goto reflush;
2651 } 2801 }
2652 2802
@@ -2657,8 +2807,7 @@ reflush:
2657} 2807}
2658EXPORT_SYMBOL_GPL(drain_workqueue); 2808EXPORT_SYMBOL_GPL(drain_workqueue);
2659 2809
2660static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr, 2810static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
2661 bool wait_executing)
2662{ 2811{
2663 struct worker *worker = NULL; 2812 struct worker *worker = NULL;
2664 struct global_cwq *gcwq; 2813 struct global_cwq *gcwq;
@@ -2680,13 +2829,12 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
2680 cwq = get_work_cwq(work); 2829 cwq = get_work_cwq(work);
2681 if (unlikely(!cwq || gcwq != cwq->pool->gcwq)) 2830 if (unlikely(!cwq || gcwq != cwq->pool->gcwq))
2682 goto already_gone; 2831 goto already_gone;
2683 } else if (wait_executing) { 2832 } else {
2684 worker = find_worker_executing_work(gcwq, work); 2833 worker = find_worker_executing_work(gcwq, work);
2685 if (!worker) 2834 if (!worker)
2686 goto already_gone; 2835 goto already_gone;
2687 cwq = worker->current_cwq; 2836 cwq = worker->current_cwq;
2688 } else 2837 }
2689 goto already_gone;
2690 2838
2691 insert_wq_barrier(cwq, barr, work, worker); 2839 insert_wq_barrier(cwq, barr, work, worker);
2692 spin_unlock_irq(&gcwq->lock); 2840 spin_unlock_irq(&gcwq->lock);
@@ -2713,15 +2861,8 @@ already_gone:
2713 * flush_work - wait for a work to finish executing the last queueing instance 2861 * flush_work - wait for a work to finish executing the last queueing instance
2714 * @work: the work to flush 2862 * @work: the work to flush
2715 * 2863 *
2716 * Wait until @work has finished execution. This function considers 2864 * Wait until @work has finished execution. @work is guaranteed to be idle
2717 * only the last queueing instance of @work. If @work has been 2865 * on return if it hasn't been requeued since flush started.
2718 * enqueued across different CPUs on a non-reentrant workqueue or on
2719 * multiple workqueues, @work might still be executing on return on
2720 * some of the CPUs from earlier queueing.
2721 *
2722 * If @work was queued only on a non-reentrant, ordered or unbound
2723 * workqueue, @work is guaranteed to be idle on return if it hasn't
2724 * been requeued since flush started.
2725 * 2866 *
2726 * RETURNS: 2867 * RETURNS:
2727 * %true if flush_work() waited for the work to finish execution, 2868 * %true if flush_work() waited for the work to finish execution,
@@ -2734,140 +2875,36 @@ bool flush_work(struct work_struct *work)
2734 lock_map_acquire(&work->lockdep_map); 2875 lock_map_acquire(&work->lockdep_map);
2735 lock_map_release(&work->lockdep_map); 2876 lock_map_release(&work->lockdep_map);
2736 2877
2737 if (start_flush_work(work, &barr, true)) { 2878 if (start_flush_work(work, &barr)) {
2738 wait_for_completion(&barr.done); 2879 wait_for_completion(&barr.done);
2739 destroy_work_on_stack(&barr.work); 2880 destroy_work_on_stack(&barr.work);
2740 return true; 2881 return true;
2741 } else 2882 } else {
2742 return false;
2743}
2744EXPORT_SYMBOL_GPL(flush_work);
2745
2746static bool wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
2747{
2748 struct wq_barrier barr;
2749 struct worker *worker;
2750
2751 spin_lock_irq(&gcwq->lock);
2752
2753 worker = find_worker_executing_work(gcwq, work);
2754 if (unlikely(worker))
2755 insert_wq_barrier(worker->current_cwq, &barr, work, worker);
2756
2757 spin_unlock_irq(&gcwq->lock);
2758
2759 if (unlikely(worker)) {
2760 wait_for_completion(&barr.done);
2761 destroy_work_on_stack(&barr.work);
2762 return true;
2763 } else
2764 return false; 2883 return false;
2765}
2766
2767static bool wait_on_work(struct work_struct *work)
2768{
2769 bool ret = false;
2770 int cpu;
2771
2772 might_sleep();
2773
2774 lock_map_acquire(&work->lockdep_map);
2775 lock_map_release(&work->lockdep_map);
2776
2777 for_each_gcwq_cpu(cpu)
2778 ret |= wait_on_cpu_work(get_gcwq(cpu), work);
2779 return ret;
2780}
2781
2782/**
2783 * flush_work_sync - wait until a work has finished execution
2784 * @work: the work to flush
2785 *
2786 * Wait until @work has finished execution. On return, it's
2787 * guaranteed that all queueing instances of @work which happened
2788 * before this function is called are finished. In other words, if
2789 * @work hasn't been requeued since this function was called, @work is
2790 * guaranteed to be idle on return.
2791 *
2792 * RETURNS:
2793 * %true if flush_work_sync() waited for the work to finish execution,
2794 * %false if it was already idle.
2795 */
2796bool flush_work_sync(struct work_struct *work)
2797{
2798 struct wq_barrier barr;
2799 bool pending, waited;
2800
2801 /* we'll wait for executions separately, queue barr only if pending */
2802 pending = start_flush_work(work, &barr, false);
2803
2804 /* wait for executions to finish */
2805 waited = wait_on_work(work);
2806
2807 /* wait for the pending one */
2808 if (pending) {
2809 wait_for_completion(&barr.done);
2810 destroy_work_on_stack(&barr.work);
2811 } 2884 }
2812
2813 return pending || waited;
2814}
2815EXPORT_SYMBOL_GPL(flush_work_sync);
2816
2817/*
2818 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
2819 * so this work can't be re-armed in any way.
2820 */
2821static int try_to_grab_pending(struct work_struct *work)
2822{
2823 struct global_cwq *gcwq;
2824 int ret = -1;
2825
2826 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
2827 return 0;
2828
2829 /*
2830 * The queueing is in progress, or it is already queued. Try to
2831 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
2832 */
2833 gcwq = get_work_gcwq(work);
2834 if (!gcwq)
2835 return ret;
2836
2837 spin_lock_irq(&gcwq->lock);
2838 if (!list_empty(&work->entry)) {
2839 /*
2840 * This work is queued, but perhaps we locked the wrong gcwq.
2841 * In that case we must see the new value after rmb(), see
2842 * insert_work()->wmb().
2843 */
2844 smp_rmb();
2845 if (gcwq == get_work_gcwq(work)) {
2846 debug_work_deactivate(work);
2847 list_del_init(&work->entry);
2848 cwq_dec_nr_in_flight(get_work_cwq(work),
2849 get_work_color(work),
2850 *work_data_bits(work) & WORK_STRUCT_DELAYED);
2851 ret = 1;
2852 }
2853 }
2854 spin_unlock_irq(&gcwq->lock);
2855
2856 return ret;
2857} 2885}
2886EXPORT_SYMBOL_GPL(flush_work);
2858 2887
2859static bool __cancel_work_timer(struct work_struct *work, 2888static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
2860 struct timer_list* timer)
2861{ 2889{
2890 unsigned long flags;
2862 int ret; 2891 int ret;
2863 2892
2864 do { 2893 do {
2865 ret = (timer && likely(del_timer(timer))); 2894 ret = try_to_grab_pending(work, is_dwork, &flags);
2866 if (!ret) 2895 /*
2867 ret = try_to_grab_pending(work); 2896 * If someone else is canceling, wait for the same event it
2868 wait_on_work(work); 2897 * would be waiting for before retrying.
2898 */
2899 if (unlikely(ret == -ENOENT))
2900 flush_work(work);
2869 } while (unlikely(ret < 0)); 2901 } while (unlikely(ret < 0));
2870 2902
2903 /* tell other tasks trying to grab @work to back off */
2904 mark_work_canceling(work);
2905 local_irq_restore(flags);
2906
2907 flush_work(work);
2871 clear_work_data(work); 2908 clear_work_data(work);
2872 return ret; 2909 return ret;
2873} 2910}
@@ -2892,7 +2929,7 @@ static bool __cancel_work_timer(struct work_struct *work,
2892 */ 2929 */
2893bool cancel_work_sync(struct work_struct *work) 2930bool cancel_work_sync(struct work_struct *work)
2894{ 2931{
2895 return __cancel_work_timer(work, NULL); 2932 return __cancel_work_timer(work, false);
2896} 2933}
2897EXPORT_SYMBOL_GPL(cancel_work_sync); 2934EXPORT_SYMBOL_GPL(cancel_work_sync);
2898 2935
@@ -2910,33 +2947,44 @@ EXPORT_SYMBOL_GPL(cancel_work_sync);
2910 */ 2947 */
2911bool flush_delayed_work(struct delayed_work *dwork) 2948bool flush_delayed_work(struct delayed_work *dwork)
2912{ 2949{
2950 local_irq_disable();
2913 if (del_timer_sync(&dwork->timer)) 2951 if (del_timer_sync(&dwork->timer))
2914 __queue_work(raw_smp_processor_id(), 2952 __queue_work(dwork->cpu,
2915 get_work_cwq(&dwork->work)->wq, &dwork->work); 2953 get_work_cwq(&dwork->work)->wq, &dwork->work);
2954 local_irq_enable();
2916 return flush_work(&dwork->work); 2955 return flush_work(&dwork->work);
2917} 2956}
2918EXPORT_SYMBOL(flush_delayed_work); 2957EXPORT_SYMBOL(flush_delayed_work);
2919 2958
2920/** 2959/**
2921 * flush_delayed_work_sync - wait for a dwork to finish 2960 * cancel_delayed_work - cancel a delayed work
2922 * @dwork: the delayed work to flush 2961 * @dwork: delayed_work to cancel
2923 * 2962 *
2924 * Delayed timer is cancelled and the pending work is queued for 2963 * Kill off a pending delayed_work. Returns %true if @dwork was pending
2925 * execution immediately. Other than timer handling, its behavior 2964 * and canceled; %false if wasn't pending. Note that the work callback
2926 * is identical to flush_work_sync(). 2965 * function may still be running on return, unless it returns %true and the
2966 * work doesn't re-arm itself. Explicitly flush or use
2967 * cancel_delayed_work_sync() to wait on it.
2927 * 2968 *
2928 * RETURNS: 2969 * This function is safe to call from any context including IRQ handler.
2929 * %true if flush_work_sync() waited for the work to finish execution,
2930 * %false if it was already idle.
2931 */ 2970 */
2932bool flush_delayed_work_sync(struct delayed_work *dwork) 2971bool cancel_delayed_work(struct delayed_work *dwork)
2933{ 2972{
2934 if (del_timer_sync(&dwork->timer)) 2973 unsigned long flags;
2935 __queue_work(raw_smp_processor_id(), 2974 int ret;
2936 get_work_cwq(&dwork->work)->wq, &dwork->work); 2975
2937 return flush_work_sync(&dwork->work); 2976 do {
2977 ret = try_to_grab_pending(&dwork->work, true, &flags);
2978 } while (unlikely(ret == -EAGAIN));
2979
2980 if (unlikely(ret < 0))
2981 return false;
2982
2983 set_work_cpu_and_clear_pending(&dwork->work, work_cpu(&dwork->work));
2984 local_irq_restore(flags);
2985 return true;
2938} 2986}
2939EXPORT_SYMBOL(flush_delayed_work_sync); 2987EXPORT_SYMBOL(cancel_delayed_work);
2940 2988
2941/** 2989/**
2942 * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish 2990 * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish
@@ -2949,54 +2997,39 @@ EXPORT_SYMBOL(flush_delayed_work_sync);
2949 */ 2997 */
2950bool cancel_delayed_work_sync(struct delayed_work *dwork) 2998bool cancel_delayed_work_sync(struct delayed_work *dwork)
2951{ 2999{
2952 return __cancel_work_timer(&dwork->work, &dwork->timer); 3000 return __cancel_work_timer(&dwork->work, true);
2953} 3001}
2954EXPORT_SYMBOL(cancel_delayed_work_sync); 3002EXPORT_SYMBOL(cancel_delayed_work_sync);
2955 3003
2956/** 3004/**
2957 * schedule_work - put work task in global workqueue
2958 * @work: job to be done
2959 *
2960 * Returns zero if @work was already on the kernel-global workqueue and
2961 * non-zero otherwise.
2962 *
2963 * This puts a job in the kernel-global workqueue if it was not already
2964 * queued and leaves it in the same position on the kernel-global
2965 * workqueue otherwise.
2966 */
2967int schedule_work(struct work_struct *work)
2968{
2969 return queue_work(system_wq, work);
2970}
2971EXPORT_SYMBOL(schedule_work);
2972
2973/*
2974 * schedule_work_on - put work task on a specific cpu 3005 * schedule_work_on - put work task on a specific cpu
2975 * @cpu: cpu to put the work task on 3006 * @cpu: cpu to put the work task on
2976 * @work: job to be done 3007 * @work: job to be done
2977 * 3008 *
2978 * This puts a job on a specific cpu 3009 * This puts a job on a specific cpu
2979 */ 3010 */
2980int schedule_work_on(int cpu, struct work_struct *work) 3011bool schedule_work_on(int cpu, struct work_struct *work)
2981{ 3012{
2982 return queue_work_on(cpu, system_wq, work); 3013 return queue_work_on(cpu, system_wq, work);
2983} 3014}
2984EXPORT_SYMBOL(schedule_work_on); 3015EXPORT_SYMBOL(schedule_work_on);
2985 3016
2986/** 3017/**
2987 * schedule_delayed_work - put work task in global workqueue after delay 3018 * schedule_work - put work task in global workqueue
2988 * @dwork: job to be done 3019 * @work: job to be done
2989 * @delay: number of jiffies to wait or 0 for immediate execution
2990 * 3020 *
2991 * After waiting for a given time this puts a job in the kernel-global 3021 * Returns %false if @work was already on the kernel-global workqueue and
2992 * workqueue. 3022 * %true otherwise.
3023 *
3024 * This puts a job in the kernel-global workqueue if it was not already
3025 * queued and leaves it in the same position on the kernel-global
3026 * workqueue otherwise.
2993 */ 3027 */
2994int schedule_delayed_work(struct delayed_work *dwork, 3028bool schedule_work(struct work_struct *work)
2995 unsigned long delay)
2996{ 3029{
2997 return queue_delayed_work(system_wq, dwork, delay); 3030 return queue_work(system_wq, work);
2998} 3031}
2999EXPORT_SYMBOL(schedule_delayed_work); 3032EXPORT_SYMBOL(schedule_work);
3000 3033
3001/** 3034/**
3002 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 3035 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
@@ -3007,14 +3040,28 @@ EXPORT_SYMBOL(schedule_delayed_work);
3007 * After waiting for a given time this puts a job in the kernel-global 3040 * After waiting for a given time this puts a job in the kernel-global
3008 * workqueue on the specified CPU. 3041 * workqueue on the specified CPU.
3009 */ 3042 */
3010int schedule_delayed_work_on(int cpu, 3043bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork,
3011 struct delayed_work *dwork, unsigned long delay) 3044 unsigned long delay)
3012{ 3045{
3013 return queue_delayed_work_on(cpu, system_wq, dwork, delay); 3046 return queue_delayed_work_on(cpu, system_wq, dwork, delay);
3014} 3047}
3015EXPORT_SYMBOL(schedule_delayed_work_on); 3048EXPORT_SYMBOL(schedule_delayed_work_on);
3016 3049
3017/** 3050/**
3051 * schedule_delayed_work - put work task in global workqueue after delay
3052 * @dwork: job to be done
3053 * @delay: number of jiffies to wait or 0 for immediate execution
3054 *
3055 * After waiting for a given time this puts a job in the kernel-global
3056 * workqueue.
3057 */
3058bool schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)
3059{
3060 return queue_delayed_work(system_wq, dwork, delay);
3061}
3062EXPORT_SYMBOL(schedule_delayed_work);
3063
3064/**
3018 * schedule_on_each_cpu - execute a function synchronously on each online CPU 3065 * schedule_on_each_cpu - execute a function synchronously on each online CPU
3019 * @func: the function to call 3066 * @func: the function to call
3020 * 3067 *
@@ -3161,9 +3208,8 @@ static int wq_clamp_max_active(int max_active, unsigned int flags,
3161 int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE; 3208 int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
3162 3209
3163 if (max_active < 1 || max_active > lim) 3210 if (max_active < 1 || max_active > lim)
3164 printk(KERN_WARNING "workqueue: max_active %d requested for %s " 3211 pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n",
3165 "is out of range, clamping between %d and %d\n", 3212 max_active, name, 1, lim);
3166 max_active, name, 1, lim);
3167 3213
3168 return clamp_val(max_active, 1, lim); 3214 return clamp_val(max_active, 1, lim);
3169} 3215}
@@ -3319,6 +3365,26 @@ void destroy_workqueue(struct workqueue_struct *wq)
3319EXPORT_SYMBOL_GPL(destroy_workqueue); 3365EXPORT_SYMBOL_GPL(destroy_workqueue);
3320 3366
3321/** 3367/**
3368 * cwq_set_max_active - adjust max_active of a cwq
3369 * @cwq: target cpu_workqueue_struct
3370 * @max_active: new max_active value.
3371 *
3372 * Set @cwq->max_active to @max_active and activate delayed works if
3373 * increased.
3374 *
3375 * CONTEXT:
3376 * spin_lock_irq(gcwq->lock).
3377 */
3378static void cwq_set_max_active(struct cpu_workqueue_struct *cwq, int max_active)
3379{
3380 cwq->max_active = max_active;
3381
3382 while (!list_empty(&cwq->delayed_works) &&
3383 cwq->nr_active < cwq->max_active)
3384 cwq_activate_first_delayed(cwq);
3385}
3386
3387/**
3322 * workqueue_set_max_active - adjust max_active of a workqueue 3388 * workqueue_set_max_active - adjust max_active of a workqueue
3323 * @wq: target workqueue 3389 * @wq: target workqueue
3324 * @max_active: new max_active value. 3390 * @max_active: new max_active value.
@@ -3345,7 +3411,7 @@ void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
3345 3411
3346 if (!(wq->flags & WQ_FREEZABLE) || 3412 if (!(wq->flags & WQ_FREEZABLE) ||
3347 !(gcwq->flags & GCWQ_FREEZING)) 3413 !(gcwq->flags & GCWQ_FREEZING))
3348 get_cwq(gcwq->cpu, wq)->max_active = max_active; 3414 cwq_set_max_active(get_cwq(gcwq->cpu, wq), max_active);
3349 3415
3350 spin_unlock_irq(&gcwq->lock); 3416 spin_unlock_irq(&gcwq->lock);
3351 } 3417 }
@@ -3440,23 +3506,23 @@ EXPORT_SYMBOL_GPL(work_busy);
3440 */ 3506 */
3441 3507
3442/* claim manager positions of all pools */ 3508/* claim manager positions of all pools */
3443static void gcwq_claim_management_and_lock(struct global_cwq *gcwq) 3509static void gcwq_claim_assoc_and_lock(struct global_cwq *gcwq)
3444{ 3510{
3445 struct worker_pool *pool; 3511 struct worker_pool *pool;
3446 3512
3447 for_each_worker_pool(pool, gcwq) 3513 for_each_worker_pool(pool, gcwq)
3448 mutex_lock_nested(&pool->manager_mutex, pool - gcwq->pools); 3514 mutex_lock_nested(&pool->assoc_mutex, pool - gcwq->pools);
3449 spin_lock_irq(&gcwq->lock); 3515 spin_lock_irq(&gcwq->lock);
3450} 3516}
3451 3517
3452/* release manager positions */ 3518/* release manager positions */
3453static void gcwq_release_management_and_unlock(struct global_cwq *gcwq) 3519static void gcwq_release_assoc_and_unlock(struct global_cwq *gcwq)
3454{ 3520{
3455 struct worker_pool *pool; 3521 struct worker_pool *pool;
3456 3522
3457 spin_unlock_irq(&gcwq->lock); 3523 spin_unlock_irq(&gcwq->lock);
3458 for_each_worker_pool(pool, gcwq) 3524 for_each_worker_pool(pool, gcwq)
3459 mutex_unlock(&pool->manager_mutex); 3525 mutex_unlock(&pool->assoc_mutex);
3460} 3526}
3461 3527
3462static void gcwq_unbind_fn(struct work_struct *work) 3528static void gcwq_unbind_fn(struct work_struct *work)
@@ -3469,7 +3535,7 @@ static void gcwq_unbind_fn(struct work_struct *work)
3469 3535
3470 BUG_ON(gcwq->cpu != smp_processor_id()); 3536 BUG_ON(gcwq->cpu != smp_processor_id());
3471 3537
3472 gcwq_claim_management_and_lock(gcwq); 3538 gcwq_claim_assoc_and_lock(gcwq);
3473 3539
3474 /* 3540 /*
3475 * We've claimed all manager positions. Make all workers unbound 3541 * We've claimed all manager positions. Make all workers unbound
@@ -3486,7 +3552,7 @@ static void gcwq_unbind_fn(struct work_struct *work)
3486 3552
3487 gcwq->flags |= GCWQ_DISASSOCIATED; 3553 gcwq->flags |= GCWQ_DISASSOCIATED;
3488 3554
3489 gcwq_release_management_and_unlock(gcwq); 3555 gcwq_release_assoc_and_unlock(gcwq);
3490 3556
3491 /* 3557 /*
3492 * Call schedule() so that we cross rq->lock and thus can guarantee 3558 * Call schedule() so that we cross rq->lock and thus can guarantee
@@ -3514,7 +3580,7 @@ static void gcwq_unbind_fn(struct work_struct *work)
3514 * Workqueues should be brought up before normal priority CPU notifiers. 3580 * Workqueues should be brought up before normal priority CPU notifiers.
3515 * This will be registered high priority CPU notifier. 3581 * This will be registered high priority CPU notifier.
3516 */ 3582 */
3517static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb, 3583static int __cpuinit workqueue_cpu_up_callback(struct notifier_block *nfb,
3518 unsigned long action, 3584 unsigned long action,
3519 void *hcpu) 3585 void *hcpu)
3520{ 3586{
@@ -3542,10 +3608,10 @@ static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb,
3542 3608
3543 case CPU_DOWN_FAILED: 3609 case CPU_DOWN_FAILED:
3544 case CPU_ONLINE: 3610 case CPU_ONLINE:
3545 gcwq_claim_management_and_lock(gcwq); 3611 gcwq_claim_assoc_and_lock(gcwq);
3546 gcwq->flags &= ~GCWQ_DISASSOCIATED; 3612 gcwq->flags &= ~GCWQ_DISASSOCIATED;
3547 rebind_workers(gcwq); 3613 rebind_workers(gcwq);
3548 gcwq_release_management_and_unlock(gcwq); 3614 gcwq_release_assoc_and_unlock(gcwq);
3549 break; 3615 break;
3550 } 3616 }
3551 return NOTIFY_OK; 3617 return NOTIFY_OK;
@@ -3555,7 +3621,7 @@ static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb,
3555 * Workqueues should be brought down after normal priority CPU notifiers. 3621 * Workqueues should be brought down after normal priority CPU notifiers.
3556 * This will be registered as low priority CPU notifier. 3622 * This will be registered as low priority CPU notifier.
3557 */ 3623 */
3558static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb, 3624static int __cpuinit workqueue_cpu_down_callback(struct notifier_block *nfb,
3559 unsigned long action, 3625 unsigned long action,
3560 void *hcpu) 3626 void *hcpu)
3561{ 3627{
@@ -3566,7 +3632,7 @@ static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb,
3566 case CPU_DOWN_PREPARE: 3632 case CPU_DOWN_PREPARE:
3567 /* unbinding should happen on the local CPU */ 3633 /* unbinding should happen on the local CPU */
3568 INIT_WORK_ONSTACK(&unbind_work, gcwq_unbind_fn); 3634 INIT_WORK_ONSTACK(&unbind_work, gcwq_unbind_fn);
3569 schedule_work_on(cpu, &unbind_work); 3635 queue_work_on(cpu, system_highpri_wq, &unbind_work);
3570 flush_work(&unbind_work); 3636 flush_work(&unbind_work);
3571 break; 3637 break;
3572 } 3638 }
@@ -3735,11 +3801,7 @@ void thaw_workqueues(void)
3735 continue; 3801 continue;
3736 3802
3737 /* restore max_active and repopulate worklist */ 3803 /* restore max_active and repopulate worklist */
3738 cwq->max_active = wq->saved_max_active; 3804 cwq_set_max_active(cwq, wq->saved_max_active);
3739
3740 while (!list_empty(&cwq->delayed_works) &&
3741 cwq->nr_active < cwq->max_active)
3742 cwq_activate_first_delayed(cwq);
3743 } 3805 }
3744 3806
3745 for_each_worker_pool(pool, gcwq) 3807 for_each_worker_pool(pool, gcwq)
@@ -3759,8 +3821,12 @@ static int __init init_workqueues(void)
3759 unsigned int cpu; 3821 unsigned int cpu;
3760 int i; 3822 int i;
3761 3823
3824 /* make sure we have enough bits for OFFQ CPU number */
3825 BUILD_BUG_ON((1LU << (BITS_PER_LONG - WORK_OFFQ_CPU_SHIFT)) <
3826 WORK_CPU_LAST);
3827
3762 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); 3828 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
3763 cpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); 3829 hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);
3764 3830
3765 /* initialize gcwqs */ 3831 /* initialize gcwqs */
3766 for_each_gcwq_cpu(cpu) { 3832 for_each_gcwq_cpu(cpu) {
@@ -3786,11 +3852,9 @@ static int __init init_workqueues(void)
3786 setup_timer(&pool->mayday_timer, gcwq_mayday_timeout, 3852 setup_timer(&pool->mayday_timer, gcwq_mayday_timeout,
3787 (unsigned long)pool); 3853 (unsigned long)pool);
3788 3854
3789 mutex_init(&pool->manager_mutex); 3855 mutex_init(&pool->assoc_mutex);
3790 ida_init(&pool->worker_ida); 3856 ida_init(&pool->worker_ida);
3791 } 3857 }
3792
3793 init_waitqueue_head(&gcwq->rebind_hold);
3794 } 3858 }
3795 3859
3796 /* create the initial worker */ 3860 /* create the initial worker */
@@ -3813,17 +3877,14 @@ static int __init init_workqueues(void)
3813 } 3877 }
3814 3878
3815 system_wq = alloc_workqueue("events", 0, 0); 3879 system_wq = alloc_workqueue("events", 0, 0);
3880 system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
3816 system_long_wq = alloc_workqueue("events_long", 0, 0); 3881 system_long_wq = alloc_workqueue("events_long", 0, 0);
3817 system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
3818 system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, 3882 system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
3819 WQ_UNBOUND_MAX_ACTIVE); 3883 WQ_UNBOUND_MAX_ACTIVE);
3820 system_freezable_wq = alloc_workqueue("events_freezable", 3884 system_freezable_wq = alloc_workqueue("events_freezable",
3821 WQ_FREEZABLE, 0); 3885 WQ_FREEZABLE, 0);
3822 system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable", 3886 BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
3823 WQ_NON_REENTRANT | WQ_FREEZABLE, 0); 3887 !system_unbound_wq || !system_freezable_wq);
3824 BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||
3825 !system_unbound_wq || !system_freezable_wq ||
3826 !system_nrt_freezable_wq);
3827 return 0; 3888 return 0;
3828} 3889}
3829early_initcall(init_workqueues); 3890early_initcall(init_workqueues);