aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2010-06-29 04:07:12 -0400
committerTejun Heo <tj@kernel.org>2010-06-29 04:07:12 -0400
commitaffee4b294a0fc97d67c8a77dc080c4dd262a79e (patch)
tree5b3fd79640ad7940e0abbed193a192d3919f259d
parentc34056a3fdde777c079cc8a70785c2602f2586cb (diff)
workqueue: reimplement work flushing using linked works
A work is linked to the next one by having WORK_STRUCT_LINKED bit set and these links can be chained. When a linked work is dispatched to a worker, all linked works are dispatched to the worker's newly added ->scheduled queue and processed back-to-back. Currently, as there's only single worker per cwq, having linked works doesn't make any visible behavior difference. This change is to prepare for multiple shared workers per cpu. Signed-off-by: Tejun Heo <tj@kernel.org>
-rw-r--r--include/linux/workqueue.h4
-rw-r--r--kernel/workqueue.c152
2 files changed, 134 insertions, 22 deletions
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 8762f62103d8..4f4fdba722c3 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -24,8 +24,9 @@ typedef void (*work_func_t)(struct work_struct *work);
24 24
25enum { 25enum {
26 WORK_STRUCT_PENDING_BIT = 0, /* work item is pending execution */ 26 WORK_STRUCT_PENDING_BIT = 0, /* work item is pending execution */
27 WORK_STRUCT_LINKED_BIT = 1, /* next work is linked to this one */
27#ifdef CONFIG_DEBUG_OBJECTS_WORK 28#ifdef CONFIG_DEBUG_OBJECTS_WORK
28 WORK_STRUCT_STATIC_BIT = 1, /* static initializer (debugobjects) */ 29 WORK_STRUCT_STATIC_BIT = 2, /* static initializer (debugobjects) */
29 WORK_STRUCT_COLOR_SHIFT = 3, /* color for workqueue flushing */ 30 WORK_STRUCT_COLOR_SHIFT = 3, /* color for workqueue flushing */
30#else 31#else
31 WORK_STRUCT_COLOR_SHIFT = 2, /* color for workqueue flushing */ 32 WORK_STRUCT_COLOR_SHIFT = 2, /* color for workqueue flushing */
@@ -34,6 +35,7 @@ enum {
34 WORK_STRUCT_COLOR_BITS = 4, 35 WORK_STRUCT_COLOR_BITS = 4,
35 36
36 WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT, 37 WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT,
38 WORK_STRUCT_LINKED = 1 << WORK_STRUCT_LINKED_BIT,
37#ifdef CONFIG_DEBUG_OBJECTS_WORK 39#ifdef CONFIG_DEBUG_OBJECTS_WORK
38 WORK_STRUCT_STATIC = 1 << WORK_STRUCT_STATIC_BIT, 40 WORK_STRUCT_STATIC = 1 << WORK_STRUCT_STATIC_BIT,
39#else 41#else
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 600db10a4dbf..9953d3c7bd10 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -51,6 +51,7 @@ struct cpu_workqueue_struct;
51 51
52struct worker { 52struct worker {
53 struct work_struct *current_work; /* L: work being processed */ 53 struct work_struct *current_work; /* L: work being processed */
54 struct list_head scheduled; /* L: scheduled works */
54 struct task_struct *task; /* I: worker task */ 55 struct task_struct *task; /* I: worker task */
55 struct cpu_workqueue_struct *cwq; /* I: the associated cwq */ 56 struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
56 int id; /* I: worker id */ 57 int id; /* I: worker id */
@@ -445,6 +446,8 @@ static struct worker *alloc_worker(void)
445 struct worker *worker; 446 struct worker *worker;
446 447
447 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 448 worker = kzalloc(sizeof(*worker), GFP_KERNEL);
449 if (worker)
450 INIT_LIST_HEAD(&worker->scheduled);
448 return worker; 451 return worker;
449} 452}
450 453
@@ -530,6 +533,7 @@ static void destroy_worker(struct worker *worker)
530 533
531 /* sanity check frenzy */ 534 /* sanity check frenzy */
532 BUG_ON(worker->current_work); 535 BUG_ON(worker->current_work);
536 BUG_ON(!list_empty(&worker->scheduled));
533 537
534 kthread_stop(worker->task); 538 kthread_stop(worker->task);
535 kfree(worker); 539 kfree(worker);
@@ -540,6 +544,47 @@ static void destroy_worker(struct worker *worker)
540} 544}
541 545
542/** 546/**
547 * move_linked_works - move linked works to a list
548 * @work: start of series of works to be scheduled
549 * @head: target list to append @work to
550 * @nextp: out paramter for nested worklist walking
551 *
552 * Schedule linked works starting from @work to @head. Work series to
553 * be scheduled starts at @work and includes any consecutive work with
554 * WORK_STRUCT_LINKED set in its predecessor.
555 *
556 * If @nextp is not NULL, it's updated to point to the next work of
557 * the last scheduled work. This allows move_linked_works() to be
558 * nested inside outer list_for_each_entry_safe().
559 *
560 * CONTEXT:
561 * spin_lock_irq(cwq->lock).
562 */
563static void move_linked_works(struct work_struct *work, struct list_head *head,
564 struct work_struct **nextp)
565{
566 struct work_struct *n;
567
568 /*
569 * Linked worklist will always end before the end of the list,
570 * use NULL for list head.
571 */
572 list_for_each_entry_safe_from(work, n, NULL, entry) {
573 list_move_tail(&work->entry, head);
574 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
575 break;
576 }
577
578 /*
579 * If we're already inside safe list traversal and have moved
580 * multiple works to the scheduled queue, the next position
581 * needs to be updated.
582 */
583 if (nextp)
584 *nextp = n;
585}
586
587/**
543 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight 588 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
544 * @cwq: cwq of interest 589 * @cwq: cwq of interest
545 * @color: color of work which left the queue 590 * @color: color of work which left the queue
@@ -639,17 +684,25 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
639 cwq_dec_nr_in_flight(cwq, work_color); 684 cwq_dec_nr_in_flight(cwq, work_color);
640} 685}
641 686
642static void run_workqueue(struct worker *worker) 687/**
688 * process_scheduled_works - process scheduled works
689 * @worker: self
690 *
691 * Process all scheduled works. Please note that the scheduled list
692 * may change while processing a work, so this function repeatedly
693 * fetches a work from the top and executes it.
694 *
695 * CONTEXT:
696 * spin_lock_irq(cwq->lock) which may be released and regrabbed
697 * multiple times.
698 */
699static void process_scheduled_works(struct worker *worker)
643{ 700{
644 struct cpu_workqueue_struct *cwq = worker->cwq; 701 while (!list_empty(&worker->scheduled)) {
645 702 struct work_struct *work = list_first_entry(&worker->scheduled,
646 spin_lock_irq(&cwq->lock);
647 while (!list_empty(&cwq->worklist)) {
648 struct work_struct *work = list_entry(cwq->worklist.next,
649 struct work_struct, entry); 703 struct work_struct, entry);
650 process_one_work(worker, work); 704 process_one_work(worker, work);
651 } 705 }
652 spin_unlock_irq(&cwq->lock);
653} 706}
654 707
655/** 708/**
@@ -684,7 +737,28 @@ static int worker_thread(void *__worker)
684 get_cpu_mask(cwq->cpu)))) 737 get_cpu_mask(cwq->cpu))))
685 set_cpus_allowed_ptr(worker->task, 738 set_cpus_allowed_ptr(worker->task,
686 get_cpu_mask(cwq->cpu)); 739 get_cpu_mask(cwq->cpu));
687 run_workqueue(worker); 740
741 spin_lock_irq(&cwq->lock);
742
743 while (!list_empty(&cwq->worklist)) {
744 struct work_struct *work =
745 list_first_entry(&cwq->worklist,
746 struct work_struct, entry);
747
748 if (likely(!(*work_data_bits(work) &
749 WORK_STRUCT_LINKED))) {
750 /* optimization path, not strictly necessary */
751 process_one_work(worker, work);
752 if (unlikely(!list_empty(&worker->scheduled)))
753 process_scheduled_works(worker);
754 } else {
755 move_linked_works(work, &worker->scheduled,
756 NULL);
757 process_scheduled_works(worker);
758 }
759 }
760
761 spin_unlock_irq(&cwq->lock);
688 } 762 }
689 763
690 return 0; 764 return 0;
@@ -705,16 +779,33 @@ static void wq_barrier_func(struct work_struct *work)
705 * insert_wq_barrier - insert a barrier work 779 * insert_wq_barrier - insert a barrier work
706 * @cwq: cwq to insert barrier into 780 * @cwq: cwq to insert barrier into
707 * @barr: wq_barrier to insert 781 * @barr: wq_barrier to insert
708 * @head: insertion point 782 * @target: target work to attach @barr to
783 * @worker: worker currently executing @target, NULL if @target is not executing
709 * 784 *
710 * Insert barrier @barr into @cwq before @head. 785 * @barr is linked to @target such that @barr is completed only after
786 * @target finishes execution. Please note that the ordering
787 * guarantee is observed only with respect to @target and on the local
788 * cpu.
789 *
790 * Currently, a queued barrier can't be canceled. This is because
791 * try_to_grab_pending() can't determine whether the work to be
792 * grabbed is at the head of the queue and thus can't clear LINKED
793 * flag of the previous work while there must be a valid next work
794 * after a work with LINKED flag set.
795 *
796 * Note that when @worker is non-NULL, @target may be modified
797 * underneath us, so we can't reliably determine cwq from @target.
711 * 798 *
712 * CONTEXT: 799 * CONTEXT:
713 * spin_lock_irq(cwq->lock). 800 * spin_lock_irq(cwq->lock).
714 */ 801 */
715static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, 802static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
716 struct wq_barrier *barr, struct list_head *head) 803 struct wq_barrier *barr,
804 struct work_struct *target, struct worker *worker)
717{ 805{
806 struct list_head *head;
807 unsigned int linked = 0;
808
718 /* 809 /*
719 * debugobject calls are safe here even with cwq->lock locked 810 * debugobject calls are safe here even with cwq->lock locked
720 * as we know for sure that this will not trigger any of the 811 * as we know for sure that this will not trigger any of the
@@ -725,8 +816,24 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
725 __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work)); 816 __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
726 init_completion(&barr->done); 817 init_completion(&barr->done);
727 818
819 /*
820 * If @target is currently being executed, schedule the
821 * barrier to the worker; otherwise, put it after @target.
822 */
823 if (worker)
824 head = worker->scheduled.next;
825 else {
826 unsigned long *bits = work_data_bits(target);
827
828 head = target->entry.next;
829 /* there can already be other linked works, inherit and set */
830 linked = *bits & WORK_STRUCT_LINKED;
831 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
832 }
833
728 debug_work_activate(&barr->work); 834 debug_work_activate(&barr->work);
729 insert_work(cwq, &barr->work, head, work_color_to_flags(WORK_NO_COLOR)); 835 insert_work(cwq, &barr->work, head,
836 work_color_to_flags(WORK_NO_COLOR) | linked);
730} 837}
731 838
732/** 839/**
@@ -964,8 +1071,8 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
964 */ 1071 */
965int flush_work(struct work_struct *work) 1072int flush_work(struct work_struct *work)
966{ 1073{
1074 struct worker *worker = NULL;
967 struct cpu_workqueue_struct *cwq; 1075 struct cpu_workqueue_struct *cwq;
968 struct list_head *prev;
969 struct wq_barrier barr; 1076 struct wq_barrier barr;
970 1077
971 might_sleep(); 1078 might_sleep();
@@ -985,14 +1092,14 @@ int flush_work(struct work_struct *work)
985 smp_rmb(); 1092 smp_rmb();
986 if (unlikely(cwq != get_wq_data(work))) 1093 if (unlikely(cwq != get_wq_data(work)))
987 goto already_gone; 1094 goto already_gone;
988 prev = &work->entry;
989 } else { 1095 } else {
990 if (!cwq->worker || cwq->worker->current_work != work) 1096 if (cwq->worker && cwq->worker->current_work == work)
1097 worker = cwq->worker;
1098 if (!worker)
991 goto already_gone; 1099 goto already_gone;
992 prev = &cwq->worklist;
993 } 1100 }
994 insert_wq_barrier(cwq, &barr, prev->next);
995 1101
1102 insert_wq_barrier(cwq, &barr, work, worker);
996 spin_unlock_irq(&cwq->lock); 1103 spin_unlock_irq(&cwq->lock);
997 wait_for_completion(&barr.done); 1104 wait_for_completion(&barr.done);
998 destroy_work_on_stack(&barr.work); 1105 destroy_work_on_stack(&barr.work);
@@ -1048,16 +1155,19 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
1048 struct work_struct *work) 1155 struct work_struct *work)
1049{ 1156{
1050 struct wq_barrier barr; 1157 struct wq_barrier barr;
1051 int running = 0; 1158 struct worker *worker;
1052 1159
1053 spin_lock_irq(&cwq->lock); 1160 spin_lock_irq(&cwq->lock);
1161
1162 worker = NULL;
1054 if (unlikely(cwq->worker && cwq->worker->current_work == work)) { 1163 if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
1055 insert_wq_barrier(cwq, &barr, cwq->worklist.next); 1164 worker = cwq->worker;
1056 running = 1; 1165 insert_wq_barrier(cwq, &barr, work, worker);
1057 } 1166 }
1167
1058 spin_unlock_irq(&cwq->lock); 1168 spin_unlock_irq(&cwq->lock);
1059 1169
1060 if (unlikely(running)) { 1170 if (unlikely(worker)) {
1061 wait_for_completion(&barr.done); 1171 wait_for_completion(&barr.done);
1062 destroy_work_on_stack(&barr.work); 1172 destroy_work_on_stack(&barr.work);
1063 } 1173 }