aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2010-06-29 04:07:11 -0400
committerTejun Heo <tj@kernel.org>2010-06-29 04:07:11 -0400
commit73f53c4aa732eced5fcb1844d3d452c30905f20f (patch)
tree6185ebab8337d33de9ca9c3d19fc5217bc4ee6bc /kernel/workqueue.c
parent0f900049cbe2767d47c2a62b54f0e822e1d66840 (diff)
workqueue: reimplement workqueue flushing using color coded works
Reimplement workqueue flushing using color coded works. wq has the current work color which is painted on the works being issued via cwqs. Flushing a workqueue is achieved by advancing the current work colors of cwqs and waiting for all the works which have any of the previous colors to drain. Currently there are 16 possible colors, one is reserved for no color and 15 colors are useable allowing 14 concurrent flushes. When color space gets full, flush attempts are batched up and processed together when color frees up, so even with many concurrent flushers, the new implementation won't build up huge queue of flushers which has to be processed one after another. Only works which are queued via __queue_work() are colored. Works which are directly put on queue using insert_work() use NO_COLOR and don't participate in workqueue flushing. Currently only works used for work-specific flush fall in this category. This new implementation leaves only cleanup_workqueue_thread() as the user of flush_cpu_workqueue(). Just make its users use flush_workqueue() and kthread_stop() directly and kill cleanup_workqueue_thread(). As workqueue flushing doesn't use barrier request anymore, the comment describing the complex synchronization around it in cleanup_workqueue_thread() is removed together with the function. This new implementation is to allow having and sharing multiple workers per cpu. Please note that one more bit is reserved for a future work flag by this patch. This is to avoid shifting bits and updating comments later. Signed-off-by: Tejun Heo <tj@kernel.org>
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c355
1 files changed, 303 insertions, 52 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 74a38499b19a..56e47c59d73b 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -41,6 +41,8 @@
41 * 41 *
42 * L: cwq->lock protected. Access with cwq->lock held. 42 * L: cwq->lock protected. Access with cwq->lock held.
43 * 43 *
44 * F: wq->flush_mutex protected.
45 *
44 * W: workqueue_lock protected. 46 * W: workqueue_lock protected.
45 */ 47 */
46 48
@@ -60,10 +62,23 @@ struct cpu_workqueue_struct {
60 unsigned int cpu; 62 unsigned int cpu;
61 63
62 struct workqueue_struct *wq; /* I: the owning workqueue */ 64 struct workqueue_struct *wq; /* I: the owning workqueue */
65 int work_color; /* L: current color */
66 int flush_color; /* L: flushing color */
67 int nr_in_flight[WORK_NR_COLORS];
68 /* L: nr of in_flight works */
63 struct task_struct *thread; 69 struct task_struct *thread;
64}; 70};
65 71
66/* 72/*
73 * Structure used to wait for workqueue flush.
74 */
75struct wq_flusher {
76 struct list_head list; /* F: list of flushers */
77 int flush_color; /* F: flush color waiting for */
78 struct completion done; /* flush completion */
79};
80
81/*
67 * The externally visible workqueue abstraction is an array of 82 * The externally visible workqueue abstraction is an array of
68 * per-CPU workqueues: 83 * per-CPU workqueues:
69 */ 84 */
@@ -71,6 +86,15 @@ struct workqueue_struct {
71 unsigned int flags; /* I: WQ_* flags */ 86 unsigned int flags; /* I: WQ_* flags */
72 struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */ 87 struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */
73 struct list_head list; /* W: list of all workqueues */ 88 struct list_head list; /* W: list of all workqueues */
89
90 struct mutex flush_mutex; /* protects wq flushing */
91 int work_color; /* F: current work color */
92 int flush_color; /* F: current flush color */
93 atomic_t nr_cwqs_to_flush; /* flush in progress */
94 struct wq_flusher *first_flusher; /* F: first flusher */
95 struct list_head flusher_queue; /* F: flush waiters */
96 struct list_head flusher_overflow; /* F: flush overflow list */
97
74 const char *name; /* I: workqueue name */ 98 const char *name; /* I: workqueue name */
75#ifdef CONFIG_LOCKDEP 99#ifdef CONFIG_LOCKDEP
76 struct lockdep_map lockdep_map; 100 struct lockdep_map lockdep_map;
@@ -207,6 +231,22 @@ static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
207 return get_cwq(cpu, wq); 231 return get_cwq(cpu, wq);
208} 232}
209 233
234static unsigned int work_color_to_flags(int color)
235{
236 return color << WORK_STRUCT_COLOR_SHIFT;
237}
238
239static int get_work_color(struct work_struct *work)
240{
241 return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
242 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
243}
244
245static int work_next_color(int color)
246{
247 return (color + 1) % WORK_NR_COLORS;
248}
249
210/* 250/*
211 * Set the workqueue on which a work item is to be run 251 * Set the workqueue on which a work item is to be run
212 * - Must *only* be called if the pending flag is set 252 * - Must *only* be called if the pending flag is set
@@ -273,7 +313,9 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
273 debug_work_activate(work); 313 debug_work_activate(work);
274 spin_lock_irqsave(&cwq->lock, flags); 314 spin_lock_irqsave(&cwq->lock, flags);
275 BUG_ON(!list_empty(&work->entry)); 315 BUG_ON(!list_empty(&work->entry));
276 insert_work(cwq, work, &cwq->worklist, 0); 316 cwq->nr_in_flight[cwq->work_color]++;
317 insert_work(cwq, work, &cwq->worklist,
318 work_color_to_flags(cwq->work_color));
277 spin_unlock_irqrestore(&cwq->lock, flags); 319 spin_unlock_irqrestore(&cwq->lock, flags);
278} 320}
279 321
@@ -387,6 +429,44 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
387EXPORT_SYMBOL_GPL(queue_delayed_work_on); 429EXPORT_SYMBOL_GPL(queue_delayed_work_on);
388 430
389/** 431/**
432 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
433 * @cwq: cwq of interest
434 * @color: color of work which left the queue
435 *
436 * A work either has completed or is removed from pending queue,
437 * decrement nr_in_flight of its cwq and handle workqueue flushing.
438 *
439 * CONTEXT:
440 * spin_lock_irq(cwq->lock).
441 */
442static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
443{
444 /* ignore uncolored works */
445 if (color == WORK_NO_COLOR)
446 return;
447
448 cwq->nr_in_flight[color]--;
449
450 /* is flush in progress and are we at the flushing tip? */
451 if (likely(cwq->flush_color != color))
452 return;
453
454 /* are there still in-flight works? */
455 if (cwq->nr_in_flight[color])
456 return;
457
458 /* this cwq is done, clear flush_color */
459 cwq->flush_color = -1;
460
461 /*
462 * If this was the last cwq, wake up the first flusher. It
463 * will handle the rest.
464 */
465 if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
466 complete(&cwq->wq->first_flusher->done);
467}
468
469/**
390 * process_one_work - process single work 470 * process_one_work - process single work
391 * @cwq: cwq to process work for 471 * @cwq: cwq to process work for
392 * @work: work to process 472 * @work: work to process
@@ -404,6 +484,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
404 struct work_struct *work) 484 struct work_struct *work)
405{ 485{
406 work_func_t f = work->func; 486 work_func_t f = work->func;
487 int work_color;
407#ifdef CONFIG_LOCKDEP 488#ifdef CONFIG_LOCKDEP
408 /* 489 /*
409 * It is permissible to free the struct work_struct from 490 * It is permissible to free the struct work_struct from
@@ -417,6 +498,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
417 /* claim and process */ 498 /* claim and process */
418 debug_work_deactivate(work); 499 debug_work_deactivate(work);
419 cwq->current_work = work; 500 cwq->current_work = work;
501 work_color = get_work_color(work);
420 list_del_init(&work->entry); 502 list_del_init(&work->entry);
421 503
422 spin_unlock_irq(&cwq->lock); 504 spin_unlock_irq(&cwq->lock);
@@ -443,6 +525,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
443 525
444 /* we're done with it, release */ 526 /* we're done with it, release */
445 cwq->current_work = NULL; 527 cwq->current_work = NULL;
528 cwq_dec_nr_in_flight(cwq, work_color);
446} 529}
447 530
448static void run_workqueue(struct cpu_workqueue_struct *cwq) 531static void run_workqueue(struct cpu_workqueue_struct *cwq)
@@ -529,29 +612,78 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
529 init_completion(&barr->done); 612 init_completion(&barr->done);
530 613
531 debug_work_activate(&barr->work); 614 debug_work_activate(&barr->work);
532 insert_work(cwq, &barr->work, head, 0); 615 insert_work(cwq, &barr->work, head, work_color_to_flags(WORK_NO_COLOR));
533} 616}
534 617
535static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 618/**
619 * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
620 * @wq: workqueue being flushed
621 * @flush_color: new flush color, < 0 for no-op
622 * @work_color: new work color, < 0 for no-op
623 *
624 * Prepare cwqs for workqueue flushing.
625 *
626 * If @flush_color is non-negative, flush_color on all cwqs should be
627 * -1. If no cwq has in-flight commands at the specified color, all
628 * cwq->flush_color's stay at -1 and %false is returned. If any cwq
629 * has in flight commands, its cwq->flush_color is set to
630 * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
631 * wakeup logic is armed and %true is returned.
632 *
633 * The caller should have initialized @wq->first_flusher prior to
634 * calling this function with non-negative @flush_color. If
635 * @flush_color is negative, no flush color update is done and %false
636 * is returned.
637 *
638 * If @work_color is non-negative, all cwqs should have the same
639 * work_color which is previous to @work_color and all will be
640 * advanced to @work_color.
641 *
642 * CONTEXT:
643 * mutex_lock(wq->flush_mutex).
644 *
645 * RETURNS:
646 * %true if @flush_color >= 0 and there's something to flush. %false
647 * otherwise.
648 */
649static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
650 int flush_color, int work_color)
536{ 651{
537 int active = 0; 652 bool wait = false;
538 struct wq_barrier barr; 653 unsigned int cpu;
539 654
540 WARN_ON(cwq->thread == current); 655 if (flush_color >= 0) {
541 656 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
542 spin_lock_irq(&cwq->lock); 657 atomic_set(&wq->nr_cwqs_to_flush, 1);
543 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
544 insert_wq_barrier(cwq, &barr, &cwq->worklist);
545 active = 1;
546 } 658 }
547 spin_unlock_irq(&cwq->lock);
548 659
549 if (active) { 660 for_each_possible_cpu(cpu) {
550 wait_for_completion(&barr.done); 661 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
551 destroy_work_on_stack(&barr.work); 662
663 spin_lock_irq(&cwq->lock);
664
665 if (flush_color >= 0) {
666 BUG_ON(cwq->flush_color != -1);
667
668 if (cwq->nr_in_flight[flush_color]) {
669 cwq->flush_color = flush_color;
670 atomic_inc(&wq->nr_cwqs_to_flush);
671 wait = true;
672 }
673 }
674
675 if (work_color >= 0) {
676 BUG_ON(work_color != work_next_color(cwq->work_color));
677 cwq->work_color = work_color;
678 }
679
680 spin_unlock_irq(&cwq->lock);
552 } 681 }
553 682
554 return active; 683 if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
684 complete(&wq->first_flusher->done);
685
686 return wait;
555} 687}
556 688
557/** 689/**
@@ -566,13 +698,143 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
566 */ 698 */
567void flush_workqueue(struct workqueue_struct *wq) 699void flush_workqueue(struct workqueue_struct *wq)
568{ 700{
569 int cpu; 701 struct wq_flusher this_flusher = {
702 .list = LIST_HEAD_INIT(this_flusher.list),
703 .flush_color = -1,
704 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
705 };
706 int next_color;
570 707
571 might_sleep();
572 lock_map_acquire(&wq->lockdep_map); 708 lock_map_acquire(&wq->lockdep_map);
573 lock_map_release(&wq->lockdep_map); 709 lock_map_release(&wq->lockdep_map);
574 for_each_possible_cpu(cpu) 710
575 flush_cpu_workqueue(get_cwq(cpu, wq)); 711 mutex_lock(&wq->flush_mutex);
712
713 /*
714 * Start-to-wait phase
715 */
716 next_color = work_next_color(wq->work_color);
717
718 if (next_color != wq->flush_color) {
719 /*
720 * Color space is not full. The current work_color
721 * becomes our flush_color and work_color is advanced
722 * by one.
723 */
724 BUG_ON(!list_empty(&wq->flusher_overflow));
725 this_flusher.flush_color = wq->work_color;
726 wq->work_color = next_color;
727
728 if (!wq->first_flusher) {
729 /* no flush in progress, become the first flusher */
730 BUG_ON(wq->flush_color != this_flusher.flush_color);
731
732 wq->first_flusher = &this_flusher;
733
734 if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
735 wq->work_color)) {
736 /* nothing to flush, done */
737 wq->flush_color = next_color;
738 wq->first_flusher = NULL;
739 goto out_unlock;
740 }
741 } else {
742 /* wait in queue */
743 BUG_ON(wq->flush_color == this_flusher.flush_color);
744 list_add_tail(&this_flusher.list, &wq->flusher_queue);
745 flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
746 }
747 } else {
748 /*
749 * Oops, color space is full, wait on overflow queue.
750 * The next flush completion will assign us
751 * flush_color and transfer to flusher_queue.
752 */
753 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
754 }
755
756 mutex_unlock(&wq->flush_mutex);
757
758 wait_for_completion(&this_flusher.done);
759
760 /*
761 * Wake-up-and-cascade phase
762 *
763 * First flushers are responsible for cascading flushes and
764 * handling overflow. Non-first flushers can simply return.
765 */
766 if (wq->first_flusher != &this_flusher)
767 return;
768
769 mutex_lock(&wq->flush_mutex);
770
771 wq->first_flusher = NULL;
772
773 BUG_ON(!list_empty(&this_flusher.list));
774 BUG_ON(wq->flush_color != this_flusher.flush_color);
775
776 while (true) {
777 struct wq_flusher *next, *tmp;
778
779 /* complete all the flushers sharing the current flush color */
780 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
781 if (next->flush_color != wq->flush_color)
782 break;
783 list_del_init(&next->list);
784 complete(&next->done);
785 }
786
787 BUG_ON(!list_empty(&wq->flusher_overflow) &&
788 wq->flush_color != work_next_color(wq->work_color));
789
790 /* this flush_color is finished, advance by one */
791 wq->flush_color = work_next_color(wq->flush_color);
792
793 /* one color has been freed, handle overflow queue */
794 if (!list_empty(&wq->flusher_overflow)) {
795 /*
796 * Assign the same color to all overflowed
797 * flushers, advance work_color and append to
798 * flusher_queue. This is the start-to-wait
799 * phase for these overflowed flushers.
800 */
801 list_for_each_entry(tmp, &wq->flusher_overflow, list)
802 tmp->flush_color = wq->work_color;
803
804 wq->work_color = work_next_color(wq->work_color);
805
806 list_splice_tail_init(&wq->flusher_overflow,
807 &wq->flusher_queue);
808 flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
809 }
810
811 if (list_empty(&wq->flusher_queue)) {
812 BUG_ON(wq->flush_color != wq->work_color);
813 break;
814 }
815
816 /*
817 * Need to flush more colors. Make the next flusher
818 * the new first flusher and arm cwqs.
819 */
820 BUG_ON(wq->flush_color == wq->work_color);
821 BUG_ON(wq->flush_color != next->flush_color);
822
823 list_del_init(&next->list);
824 wq->first_flusher = next;
825
826 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
827 break;
828
829 /*
830 * Meh... this color is already done, clear first
831 * flusher and repeat cascading.
832 */
833 wq->first_flusher = NULL;
834 }
835
836out_unlock:
837 mutex_unlock(&wq->flush_mutex);
576} 838}
577EXPORT_SYMBOL_GPL(flush_workqueue); 839EXPORT_SYMBOL_GPL(flush_workqueue);
578 840
@@ -659,6 +921,7 @@ static int try_to_grab_pending(struct work_struct *work)
659 if (cwq == get_wq_data(work)) { 921 if (cwq == get_wq_data(work)) {
660 debug_work_deactivate(work); 922 debug_work_deactivate(work);
661 list_del_init(&work->entry); 923 list_del_init(&work->entry);
924 cwq_dec_nr_in_flight(cwq, get_work_color(work));
662 ret = 1; 925 ret = 1;
663 } 926 }
664 } 927 }
@@ -1066,6 +1329,10 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1066 goto err; 1329 goto err;
1067 1330
1068 wq->flags = flags; 1331 wq->flags = flags;
1332 mutex_init(&wq->flush_mutex);
1333 atomic_set(&wq->nr_cwqs_to_flush, 0);
1334 INIT_LIST_HEAD(&wq->flusher_queue);
1335 INIT_LIST_HEAD(&wq->flusher_overflow);
1069 wq->name = name; 1336 wq->name = name;
1070 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 1337 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1071 INIT_LIST_HEAD(&wq->list); 1338 INIT_LIST_HEAD(&wq->list);
@@ -1083,6 +1350,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1083 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK); 1350 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
1084 cwq->wq = wq; 1351 cwq->wq = wq;
1085 cwq->cpu = cpu; 1352 cwq->cpu = cpu;
1353 cwq->flush_color = -1;
1086 spin_lock_init(&cwq->lock); 1354 spin_lock_init(&cwq->lock);
1087 INIT_LIST_HEAD(&cwq->worklist); 1355 INIT_LIST_HEAD(&cwq->worklist);
1088 init_waitqueue_head(&cwq->more_work); 1356 init_waitqueue_head(&cwq->more_work);
@@ -1116,33 +1384,6 @@ err:
1116} 1384}
1117EXPORT_SYMBOL_GPL(__create_workqueue_key); 1385EXPORT_SYMBOL_GPL(__create_workqueue_key);
1118 1386
1119static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
1120{
1121 /*
1122 * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
1123 * cpu_add_remove_lock protects cwq->thread.
1124 */
1125 if (cwq->thread == NULL)
1126 return;
1127
1128 lock_map_acquire(&cwq->wq->lockdep_map);
1129 lock_map_release(&cwq->wq->lockdep_map);
1130
1131 flush_cpu_workqueue(cwq);
1132 /*
1133 * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
1134 * a concurrent flush_workqueue() can insert a barrier after us.
1135 * However, in that case run_workqueue() won't return and check
1136 * kthread_should_stop() until it flushes all work_struct's.
1137 * When ->worklist becomes empty it is safe to exit because no
1138 * more work_structs can be queued on this cwq: flush_workqueue
1139 * checks list_empty(), and a "normal" queue_work() can't use
1140 * a dead CPU.
1141 */
1142 kthread_stop(cwq->thread);
1143 cwq->thread = NULL;
1144}
1145
1146/** 1387/**
1147 * destroy_workqueue - safely terminate a workqueue 1388 * destroy_workqueue - safely terminate a workqueue
1148 * @wq: target workqueue 1389 * @wq: target workqueue
@@ -1159,8 +1400,20 @@ void destroy_workqueue(struct workqueue_struct *wq)
1159 spin_unlock(&workqueue_lock); 1400 spin_unlock(&workqueue_lock);
1160 cpu_maps_update_done(); 1401 cpu_maps_update_done();
1161 1402
1162 for_each_possible_cpu(cpu) 1403 flush_workqueue(wq);
1163 cleanup_workqueue_thread(get_cwq(cpu, wq)); 1404
1405 for_each_possible_cpu(cpu) {
1406 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1407 int i;
1408
1409 if (cwq->thread) {
1410 kthread_stop(cwq->thread);
1411 cwq->thread = NULL;
1412 }
1413
1414 for (i = 0; i < WORK_NR_COLORS; i++)
1415 BUG_ON(cwq->nr_in_flight[i]);
1416 }
1164 1417
1165 free_cwqs(wq->cpu_wq); 1418 free_cwqs(wq->cpu_wq);
1166 kfree(wq); 1419 kfree(wq);
@@ -1185,9 +1438,7 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
1185 1438
1186 switch (action) { 1439 switch (action) {
1187 case CPU_POST_DEAD: 1440 case CPU_POST_DEAD:
1188 lock_map_acquire(&cwq->wq->lockdep_map); 1441 flush_workqueue(wq);
1189 lock_map_release(&cwq->wq->lockdep_map);
1190 flush_cpu_workqueue(cwq);
1191 break; 1442 break;
1192 } 1443 }
1193 } 1444 }