aboutsummaryrefslogtreecommitdiffstats
path: root/kernel
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2010-06-29 04:07:12 -0400
committerTejun Heo <tj@kernel.org>2010-06-29 04:07:12 -0400
commitdb7bccf45cb87522096b8f43144e31ca605a9f24 (patch)
treed16cc764243fb6feaa4c9dea5398e139c8012124 /kernel
parentc8e55f360210c1bc49bea5d62bc3939b7ee13483 (diff)
workqueue: reimplement CPU hotplugging support using trustee
Reimplement CPU hotplugging support using trustee thread. On CPU down, a trustee thread is created and each step of CPU down is executed by the trustee and workqueue_cpu_callback() simply drives and waits for trustee state transitions. CPU down operation no longer waits for works to be drained but trustee sticks around till all pending works have been completed. If CPU is brought back up while works are still draining, workqueue_cpu_callback() tells trustee to step down and tell workers to rebind to the cpu. As it's difficult to tell whether cwqs are empty if it's freezing or frozen, trustee doesn't consider draining to be complete while a gcwq is freezing or frozen (tracked by new GCWQ_FREEZING flag). Also, workers which get unbound from their cpu are marked with WORKER_ROGUE. Trustee based implementation doesn't bring any new feature at this point but it will be used to manage worker pool when dynamic shared worker pool is implemented. Signed-off-by: Tejun Heo <tj@kernel.org>
Diffstat (limited to 'kernel')
-rw-r--r--kernel/workqueue.c293
1 files changed, 277 insertions, 16 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index d64913aa486a..f57855f718d7 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -36,14 +36,27 @@
36#include <linux/idr.h> 36#include <linux/idr.h>
37 37
38enum { 38enum {
39 /* global_cwq flags */
40 GCWQ_FREEZING = 1 << 3, /* freeze in progress */
41
39 /* worker flags */ 42 /* worker flags */
40 WORKER_STARTED = 1 << 0, /* started */ 43 WORKER_STARTED = 1 << 0, /* started */
41 WORKER_DIE = 1 << 1, /* die die die */ 44 WORKER_DIE = 1 << 1, /* die die die */
42 WORKER_IDLE = 1 << 2, /* is idle */ 45 WORKER_IDLE = 1 << 2, /* is idle */
46 WORKER_ROGUE = 1 << 4, /* not bound to any cpu */
47
48 /* gcwq->trustee_state */
49 TRUSTEE_START = 0, /* start */
50 TRUSTEE_IN_CHARGE = 1, /* trustee in charge of gcwq */
51 TRUSTEE_BUTCHER = 2, /* butcher workers */
52 TRUSTEE_RELEASE = 3, /* release workers */
53 TRUSTEE_DONE = 4, /* trustee is done */
43 54
44 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */ 55 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
45 BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER, 56 BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER,
46 BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1, 57 BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1,
58
59 TRUSTEE_COOLDOWN = HZ / 10, /* for trustee draining */
47}; 60};
48 61
49/* 62/*
@@ -83,6 +96,7 @@ struct worker {
83struct global_cwq { 96struct global_cwq {
84 spinlock_t lock; /* the gcwq lock */ 97 spinlock_t lock; /* the gcwq lock */
85 unsigned int cpu; /* I: the associated cpu */ 98 unsigned int cpu; /* I: the associated cpu */
99 unsigned int flags; /* L: GCWQ_* flags */
86 100
87 int nr_workers; /* L: total number of workers */ 101 int nr_workers; /* L: total number of workers */
88 int nr_idle; /* L: currently idle ones */ 102 int nr_idle; /* L: currently idle ones */
@@ -93,6 +107,10 @@ struct global_cwq {
93 /* L: hash of busy workers */ 107 /* L: hash of busy workers */
94 108
95 struct ida worker_ida; /* L: for worker IDs */ 109 struct ida worker_ida; /* L: for worker IDs */
110
111 struct task_struct *trustee; /* L: for gcwq shutdown */
112 unsigned int trustee_state; /* L: trustee state */
113 wait_queue_head_t trustee_wait; /* trustee wait */
96} ____cacheline_aligned_in_smp; 114} ____cacheline_aligned_in_smp;
97 115
98/* 116/*
@@ -148,6 +166,10 @@ struct workqueue_struct {
148#endif 166#endif
149}; 167};
150 168
169#define for_each_busy_worker(worker, i, pos, gcwq) \
170 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \
171 hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
172
151#ifdef CONFIG_DEBUG_OBJECTS_WORK 173#ifdef CONFIG_DEBUG_OBJECTS_WORK
152 174
153static struct debug_obj_descr work_debug_descr; 175static struct debug_obj_descr work_debug_descr;
@@ -546,6 +568,9 @@ static void worker_enter_idle(struct worker *worker)
546 568
547 /* idle_list is LIFO */ 569 /* idle_list is LIFO */
548 list_add(&worker->entry, &gcwq->idle_list); 570 list_add(&worker->entry, &gcwq->idle_list);
571
572 if (unlikely(worker->flags & WORKER_ROGUE))
573 wake_up_all(&gcwq->trustee_wait);
549} 574}
550 575
551/** 576/**
@@ -622,8 +647,15 @@ static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
622 if (IS_ERR(worker->task)) 647 if (IS_ERR(worker->task))
623 goto fail; 648 goto fail;
624 649
650 /*
651 * A rogue worker will become a regular one if CPU comes
652 * online later on. Make sure every worker has
653 * PF_THREAD_BOUND set.
654 */
625 if (bind) 655 if (bind)
626 kthread_bind(worker->task, gcwq->cpu); 656 kthread_bind(worker->task, gcwq->cpu);
657 else
658 worker->task->flags |= PF_THREAD_BOUND;
627 659
628 return worker; 660 return worker;
629fail: 661fail:
@@ -882,10 +914,6 @@ static int worker_thread(void *__worker)
882 struct cpu_workqueue_struct *cwq = worker->cwq; 914 struct cpu_workqueue_struct *cwq = worker->cwq;
883 915
884woke_up: 916woke_up:
885 if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
886 get_cpu_mask(gcwq->cpu))))
887 set_cpus_allowed_ptr(worker->task, get_cpu_mask(gcwq->cpu));
888
889 spin_lock_irq(&gcwq->lock); 917 spin_lock_irq(&gcwq->lock);
890 918
891 /* DIE can be set only while we're idle, checking here is enough */ 919 /* DIE can be set only while we're idle, checking here is enough */
@@ -895,7 +923,7 @@ woke_up:
895 } 923 }
896 924
897 worker_leave_idle(worker); 925 worker_leave_idle(worker);
898 926recheck:
899 /* 927 /*
900 * ->scheduled list can only be filled while a worker is 928 * ->scheduled list can only be filled while a worker is
901 * preparing to process a work or actually processing it. 929 * preparing to process a work or actually processing it.
@@ -908,6 +936,22 @@ woke_up:
908 list_first_entry(&cwq->worklist, 936 list_first_entry(&cwq->worklist,
909 struct work_struct, entry); 937 struct work_struct, entry);
910 938
939 /*
940 * The following is a rather inefficient way to close
941 * race window against cpu hotplug operations. Will
942 * be replaced soon.
943 */
944 if (unlikely(!(worker->flags & WORKER_ROGUE) &&
945 !cpumask_equal(&worker->task->cpus_allowed,
946 get_cpu_mask(gcwq->cpu)))) {
947 spin_unlock_irq(&gcwq->lock);
948 set_cpus_allowed_ptr(worker->task,
949 get_cpu_mask(gcwq->cpu));
950 cpu_relax();
951 spin_lock_irq(&gcwq->lock);
952 goto recheck;
953 }
954
911 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) { 955 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
912 /* optimization path, not strictly necessary */ 956 /* optimization path, not strictly necessary */
913 process_one_work(worker, work); 957 process_one_work(worker, work);
@@ -1812,29 +1856,237 @@ void destroy_workqueue(struct workqueue_struct *wq)
1812} 1856}
1813EXPORT_SYMBOL_GPL(destroy_workqueue); 1857EXPORT_SYMBOL_GPL(destroy_workqueue);
1814 1858
1859/*
1860 * CPU hotplug.
1861 *
1862 * CPU hotplug is implemented by allowing cwqs to be detached from
1863 * CPU, running with unbound workers and allowing them to be
1864 * reattached later if the cpu comes back online. A separate thread
1865 * is created to govern cwqs in such state and is called the trustee.
1866 *
1867 * Trustee states and their descriptions.
1868 *
1869 * START Command state used on startup. On CPU_DOWN_PREPARE, a
1870 * new trustee is started with this state.
1871 *
1872 * IN_CHARGE Once started, trustee will enter this state after
1873 * making all existing workers rogue. DOWN_PREPARE waits
1874 * for trustee to enter this state. After reaching
1875 * IN_CHARGE, trustee tries to execute the pending
1876 * worklist until it's empty and the state is set to
1877 * BUTCHER, or the state is set to RELEASE.
1878 *
1879 * BUTCHER Command state which is set by the cpu callback after
1880 * the cpu has went down. Once this state is set trustee
1881 * knows that there will be no new works on the worklist
1882 * and once the worklist is empty it can proceed to
1883 * killing idle workers.
1884 *
1885 * RELEASE Command state which is set by the cpu callback if the
1886 * cpu down has been canceled or it has come online
1887 * again. After recognizing this state, trustee stops
1888 * trying to drain or butcher and transits to DONE.
1889 *
1890 * DONE Trustee will enter this state after BUTCHER or RELEASE
1891 * is complete.
1892 *
1893 * trustee CPU draining
1894 * took over down complete
1895 * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
1896 * | | ^
1897 * | CPU is back online v return workers |
1898 * ----------------> RELEASE --------------
1899 */
1900
1901/**
1902 * trustee_wait_event_timeout - timed event wait for trustee
1903 * @cond: condition to wait for
1904 * @timeout: timeout in jiffies
1905 *
1906 * wait_event_timeout() for trustee to use. Handles locking and
1907 * checks for RELEASE request.
1908 *
1909 * CONTEXT:
1910 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1911 * multiple times. To be used by trustee.
1912 *
1913 * RETURNS:
1914 * Positive indicating left time if @cond is satisfied, 0 if timed
1915 * out, -1 if canceled.
1916 */
1917#define trustee_wait_event_timeout(cond, timeout) ({ \
1918 long __ret = (timeout); \
1919 while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) && \
1920 __ret) { \
1921 spin_unlock_irq(&gcwq->lock); \
1922 __wait_event_timeout(gcwq->trustee_wait, (cond) || \
1923 (gcwq->trustee_state == TRUSTEE_RELEASE), \
1924 __ret); \
1925 spin_lock_irq(&gcwq->lock); \
1926 } \
1927 gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret); \
1928})
1929
1930/**
1931 * trustee_wait_event - event wait for trustee
1932 * @cond: condition to wait for
1933 *
1934 * wait_event() for trustee to use. Automatically handles locking and
1935 * checks for CANCEL request.
1936 *
1937 * CONTEXT:
1938 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1939 * multiple times. To be used by trustee.
1940 *
1941 * RETURNS:
1942 * 0 if @cond is satisfied, -1 if canceled.
1943 */
1944#define trustee_wait_event(cond) ({ \
1945 long __ret1; \
1946 __ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
1947 __ret1 < 0 ? -1 : 0; \
1948})
1949
1950static int __cpuinit trustee_thread(void *__gcwq)
1951{
1952 struct global_cwq *gcwq = __gcwq;
1953 struct worker *worker;
1954 struct hlist_node *pos;
1955 int i;
1956
1957 BUG_ON(gcwq->cpu != smp_processor_id());
1958
1959 spin_lock_irq(&gcwq->lock);
1960 /*
1961 * Make all multithread workers rogue. Trustee must be bound
1962 * to the target cpu and can't be cancelled.
1963 */
1964 BUG_ON(gcwq->cpu != smp_processor_id());
1965
1966 list_for_each_entry(worker, &gcwq->idle_list, entry)
1967 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
1968 worker->flags |= WORKER_ROGUE;
1969
1970 for_each_busy_worker(worker, i, pos, gcwq)
1971 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
1972 worker->flags |= WORKER_ROGUE;
1973
1974 /*
1975 * We're now in charge. Notify and proceed to drain. We need
1976 * to keep the gcwq running during the whole CPU down
1977 * procedure as other cpu hotunplug callbacks may need to
1978 * flush currently running tasks.
1979 */
1980 gcwq->trustee_state = TRUSTEE_IN_CHARGE;
1981 wake_up_all(&gcwq->trustee_wait);
1982
1983 /*
1984 * The original cpu is in the process of dying and may go away
1985 * anytime now. When that happens, we and all workers would
1986 * be migrated to other cpus. Try draining any left work.
1987 * Note that if the gcwq is frozen, there may be frozen works
1988 * in freezeable cwqs. Don't declare completion while frozen.
1989 */
1990 while (gcwq->nr_workers != gcwq->nr_idle ||
1991 gcwq->flags & GCWQ_FREEZING ||
1992 gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
1993 /* give a breather */
1994 if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
1995 break;
1996 }
1997
1998 /* notify completion */
1999 gcwq->trustee = NULL;
2000 gcwq->trustee_state = TRUSTEE_DONE;
2001 wake_up_all(&gcwq->trustee_wait);
2002 spin_unlock_irq(&gcwq->lock);
2003 return 0;
2004}
2005
2006/**
2007 * wait_trustee_state - wait for trustee to enter the specified state
2008 * @gcwq: gcwq the trustee of interest belongs to
2009 * @state: target state to wait for
2010 *
2011 * Wait for the trustee to reach @state. DONE is already matched.
2012 *
2013 * CONTEXT:
2014 * spin_lock_irq(gcwq->lock) which may be released and regrabbed
2015 * multiple times. To be used by cpu_callback.
2016 */
2017static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
2018{
2019 if (!(gcwq->trustee_state == state ||
2020 gcwq->trustee_state == TRUSTEE_DONE)) {
2021 spin_unlock_irq(&gcwq->lock);
2022 __wait_event(gcwq->trustee_wait,
2023 gcwq->trustee_state == state ||
2024 gcwq->trustee_state == TRUSTEE_DONE);
2025 spin_lock_irq(&gcwq->lock);
2026 }
2027}
2028
1815static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 2029static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
1816 unsigned long action, 2030 unsigned long action,
1817 void *hcpu) 2031 void *hcpu)
1818{ 2032{
1819 unsigned int cpu = (unsigned long)hcpu; 2033 unsigned int cpu = (unsigned long)hcpu;
1820 struct cpu_workqueue_struct *cwq; 2034 struct global_cwq *gcwq = get_gcwq(cpu);
1821 struct workqueue_struct *wq; 2035 struct task_struct *new_trustee = NULL;
2036 struct worker *worker;
2037 struct hlist_node *pos;
2038 unsigned long flags;
2039 int i;
1822 2040
1823 action &= ~CPU_TASKS_FROZEN; 2041 action &= ~CPU_TASKS_FROZEN;
1824 2042
1825 list_for_each_entry(wq, &workqueues, list) { 2043 switch (action) {
1826 if (wq->flags & WQ_SINGLE_THREAD) 2044 case CPU_DOWN_PREPARE:
1827 continue; 2045 new_trustee = kthread_create(trustee_thread, gcwq,
2046 "workqueue_trustee/%d\n", cpu);
2047 if (IS_ERR(new_trustee))
2048 return notifier_from_errno(PTR_ERR(new_trustee));
2049 kthread_bind(new_trustee, cpu);
2050 }
1828 2051
1829 cwq = get_cwq(cpu, wq); 2052 /* some are called w/ irq disabled, don't disturb irq status */
2053 spin_lock_irqsave(&gcwq->lock, flags);
1830 2054
1831 switch (action) { 2055 switch (action) {
1832 case CPU_POST_DEAD: 2056 case CPU_DOWN_PREPARE:
1833 flush_workqueue(wq); 2057 /* initialize trustee and tell it to acquire the gcwq */
1834 break; 2058 BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
2059 gcwq->trustee = new_trustee;
2060 gcwq->trustee_state = TRUSTEE_START;
2061 wake_up_process(gcwq->trustee);
2062 wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
2063 break;
2064
2065 case CPU_POST_DEAD:
2066 gcwq->trustee_state = TRUSTEE_BUTCHER;
2067 break;
2068
2069 case CPU_DOWN_FAILED:
2070 case CPU_ONLINE:
2071 if (gcwq->trustee_state != TRUSTEE_DONE) {
2072 gcwq->trustee_state = TRUSTEE_RELEASE;
2073 wake_up_process(gcwq->trustee);
2074 wait_trustee_state(gcwq, TRUSTEE_DONE);
1835 } 2075 }
2076
2077 /* clear ROGUE from all multithread workers */
2078 list_for_each_entry(worker, &gcwq->idle_list, entry)
2079 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
2080 worker->flags &= ~WORKER_ROGUE;
2081
2082 for_each_busy_worker(worker, i, pos, gcwq)
2083 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
2084 worker->flags &= ~WORKER_ROGUE;
2085 break;
1836 } 2086 }
1837 2087
2088 spin_unlock_irqrestore(&gcwq->lock, flags);
2089
1838 return notifier_from_errno(0); 2090 return notifier_from_errno(0);
1839} 2091}
1840 2092
@@ -1912,6 +2164,9 @@ void freeze_workqueues_begin(void)
1912 2164
1913 spin_lock_irq(&gcwq->lock); 2165 spin_lock_irq(&gcwq->lock);
1914 2166
2167 BUG_ON(gcwq->flags & GCWQ_FREEZING);
2168 gcwq->flags |= GCWQ_FREEZING;
2169
1915 list_for_each_entry(wq, &workqueues, list) { 2170 list_for_each_entry(wq, &workqueues, list) {
1916 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 2171 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1917 2172
@@ -1995,6 +2250,9 @@ void thaw_workqueues(void)
1995 2250
1996 spin_lock_irq(&gcwq->lock); 2251 spin_lock_irq(&gcwq->lock);
1997 2252
2253 BUG_ON(!(gcwq->flags & GCWQ_FREEZING));
2254 gcwq->flags &= ~GCWQ_FREEZING;
2255
1998 list_for_each_entry(wq, &workqueues, list) { 2256 list_for_each_entry(wq, &workqueues, list) {
1999 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 2257 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2000 2258
@@ -2026,7 +2284,7 @@ void __init init_workqueues(void)
2026 int i; 2284 int i;
2027 2285
2028 singlethread_cpu = cpumask_first(cpu_possible_mask); 2286 singlethread_cpu = cpumask_first(cpu_possible_mask);
2029 hotcpu_notifier(workqueue_cpu_callback, 0); 2287 hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
2030 2288
2031 /* initialize gcwqs */ 2289 /* initialize gcwqs */
2032 for_each_possible_cpu(cpu) { 2290 for_each_possible_cpu(cpu) {
@@ -2040,6 +2298,9 @@ void __init init_workqueues(void)
2040 INIT_HLIST_HEAD(&gcwq->busy_hash[i]); 2298 INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
2041 2299
2042 ida_init(&gcwq->worker_ida); 2300 ida_init(&gcwq->worker_ida);
2301
2302 gcwq->trustee_state = TRUSTEE_DONE;
2303 init_waitqueue_head(&gcwq->trustee_wait);
2043 } 2304 }
2044 2305
2045 keventd_wq = create_workqueue("events"); 2306 keventd_wq = create_workqueue("events");