aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2013-03-12 14:30:03 -0400
committerTejun Heo <tj@kernel.org>2013-03-12 14:30:03 -0400
commit29c91e9912bed7060df6116af90286500f5a700d (patch)
treef5de35e3da85b2f90bda13f7bfb5ea98fbd4d86d /kernel/workqueue.c
parent7a4e344c5675eefbde93ed9a98ef45e0e4957bc2 (diff)
workqueue: implement attribute-based unbound worker_pool management
This patch makes unbound worker_pools reference counted and dynamically created and destroyed as workqueues needing them come and go. All unbound worker_pools are hashed on unbound_pool_hash which is keyed by the content of worker_pool->attrs. When an unbound workqueue is allocated, get_unbound_pool() is called with the attributes of the workqueue. If there already is a matching worker_pool, the reference count is bumped and the pool is returned. If not, a new worker_pool with matching attributes is created and returned. When an unbound workqueue is destroyed, put_unbound_pool() is called which decrements the reference count of the associated worker_pool. If the refcnt reaches zero, the worker_pool is destroyed in sched-RCU safe way. Note that the standard unbound worker_pools - normal and highpri ones with no specific cpumask affinity - are no longer created explicitly during init_workqueues(). init_workqueues() only initializes workqueue_attrs to be used for standard unbound pools - unbound_std_wq_attrs[]. The pools are spawned on demand as workqueues are created. v2: - Comment added to init_worker_pool() explaining that @pool should be in a condition which can be passed to put_unbound_pool() even on failure. - pool->refcnt reaching zero and the pool being removed from unbound_pool_hash should be dynamic. pool->refcnt is converted to int from atomic_t and now manipulated inside workqueue_lock. - Removed an incorrect sanity check on nr_idle in put_unbound_pool() which may trigger spuriously. All changes were suggested by Lai Jiangshan. Signed-off-by: Tejun Heo <tj@kernel.org> Reviewed-by: Lai Jiangshan <laijs@cn.fujitsu.com>
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c237
1 files changed, 224 insertions, 13 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index b0d3cbb83f63..3fe2c79bf166 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -41,6 +41,7 @@
41#include <linux/debug_locks.h> 41#include <linux/debug_locks.h>
42#include <linux/lockdep.h> 42#include <linux/lockdep.h>
43#include <linux/idr.h> 43#include <linux/idr.h>
44#include <linux/jhash.h>
44#include <linux/hashtable.h> 45#include <linux/hashtable.h>
45#include <linux/rculist.h> 46#include <linux/rculist.h>
46 47
@@ -80,6 +81,7 @@ enum {
80 81
81 NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */ 82 NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */
82 83
84 UNBOUND_POOL_HASH_ORDER = 6, /* hashed by pool->attrs */
83 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */ 85 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
84 86
85 MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */ 87 MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
@@ -149,6 +151,8 @@ struct worker_pool {
149 struct ida worker_ida; /* L: for worker IDs */ 151 struct ida worker_ida; /* L: for worker IDs */
150 152
151 struct workqueue_attrs *attrs; /* I: worker attributes */ 153 struct workqueue_attrs *attrs; /* I: worker attributes */
154 struct hlist_node hash_node; /* R: unbound_pool_hash node */
155 int refcnt; /* refcnt for unbound pools */
152 156
153 /* 157 /*
154 * The current concurrency level. As it's likely to be accessed 158 * The current concurrency level. As it's likely to be accessed
@@ -156,6 +160,12 @@ struct worker_pool {
156 * cacheline. 160 * cacheline.
157 */ 161 */
158 atomic_t nr_running ____cacheline_aligned_in_smp; 162 atomic_t nr_running ____cacheline_aligned_in_smp;
163
164 /*
165 * Destruction of pool is sched-RCU protected to allow dereferences
166 * from get_work_pool().
167 */
168 struct rcu_head rcu;
159} ____cacheline_aligned_in_smp; 169} ____cacheline_aligned_in_smp;
160 170
161/* 171/*
@@ -218,6 +228,11 @@ struct workqueue_struct {
218 228
219static struct kmem_cache *pwq_cache; 229static struct kmem_cache *pwq_cache;
220 230
231/* hash of all unbound pools keyed by pool->attrs */
232static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
233
234static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
235
221struct workqueue_struct *system_wq __read_mostly; 236struct workqueue_struct *system_wq __read_mostly;
222EXPORT_SYMBOL_GPL(system_wq); 237EXPORT_SYMBOL_GPL(system_wq);
223struct workqueue_struct *system_highpri_wq __read_mostly; 238struct workqueue_struct *system_highpri_wq __read_mostly;
@@ -1742,7 +1757,7 @@ static struct worker *create_worker(struct worker_pool *pool)
1742 worker->pool = pool; 1757 worker->pool = pool;
1743 worker->id = id; 1758 worker->id = id;
1744 1759
1745 if (pool->cpu != WORK_CPU_UNBOUND) 1760 if (pool->cpu >= 0)
1746 worker->task = kthread_create_on_node(worker_thread, 1761 worker->task = kthread_create_on_node(worker_thread,
1747 worker, cpu_to_node(pool->cpu), 1762 worker, cpu_to_node(pool->cpu),
1748 "kworker/%d:%d%s", pool->cpu, id, pri); 1763 "kworker/%d:%d%s", pool->cpu, id, pri);
@@ -3161,16 +3176,68 @@ fail:
3161 return NULL; 3176 return NULL;
3162} 3177}
3163 3178
3179static void copy_workqueue_attrs(struct workqueue_attrs *to,
3180 const struct workqueue_attrs *from)
3181{
3182 to->nice = from->nice;
3183 cpumask_copy(to->cpumask, from->cpumask);
3184}
3185
3186/*
3187 * Hacky implementation of jhash of bitmaps which only considers the
3188 * specified number of bits. We probably want a proper implementation in
3189 * include/linux/jhash.h.
3190 */
3191static u32 jhash_bitmap(const unsigned long *bitmap, int bits, u32 hash)
3192{
3193 int nr_longs = bits / BITS_PER_LONG;
3194 int nr_leftover = bits % BITS_PER_LONG;
3195 unsigned long leftover = 0;
3196
3197 if (nr_longs)
3198 hash = jhash(bitmap, nr_longs * sizeof(long), hash);
3199 if (nr_leftover) {
3200 bitmap_copy(&leftover, bitmap + nr_longs, nr_leftover);
3201 hash = jhash(&leftover, sizeof(long), hash);
3202 }
3203 return hash;
3204}
3205
3206/* hash value of the content of @attr */
3207static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
3208{
3209 u32 hash = 0;
3210
3211 hash = jhash_1word(attrs->nice, hash);
3212 hash = jhash_bitmap(cpumask_bits(attrs->cpumask), nr_cpu_ids, hash);
3213 return hash;
3214}
3215
3216/* content equality test */
3217static bool wqattrs_equal(const struct workqueue_attrs *a,
3218 const struct workqueue_attrs *b)
3219{
3220 if (a->nice != b->nice)
3221 return false;
3222 if (!cpumask_equal(a->cpumask, b->cpumask))
3223 return false;
3224 return true;
3225}
3226
3164/** 3227/**
3165 * init_worker_pool - initialize a newly zalloc'd worker_pool 3228 * init_worker_pool - initialize a newly zalloc'd worker_pool
3166 * @pool: worker_pool to initialize 3229 * @pool: worker_pool to initialize
3167 * 3230 *
3168 * Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs. 3231 * Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs.
3169 * Returns 0 on success, -errno on failure. 3232 * Returns 0 on success, -errno on failure. Even on failure, all fields
3233 * inside @pool proper are initialized and put_unbound_pool() can be called
3234 * on @pool safely to release it.
3170 */ 3235 */
3171static int init_worker_pool(struct worker_pool *pool) 3236static int init_worker_pool(struct worker_pool *pool)
3172{ 3237{
3173 spin_lock_init(&pool->lock); 3238 spin_lock_init(&pool->lock);
3239 pool->id = -1;
3240 pool->cpu = -1;
3174 pool->flags |= POOL_DISASSOCIATED; 3241 pool->flags |= POOL_DISASSOCIATED;
3175 INIT_LIST_HEAD(&pool->worklist); 3242 INIT_LIST_HEAD(&pool->worklist);
3176 INIT_LIST_HEAD(&pool->idle_list); 3243 INIT_LIST_HEAD(&pool->idle_list);
@@ -3187,12 +3254,136 @@ static int init_worker_pool(struct worker_pool *pool)
3187 mutex_init(&pool->assoc_mutex); 3254 mutex_init(&pool->assoc_mutex);
3188 ida_init(&pool->worker_ida); 3255 ida_init(&pool->worker_ida);
3189 3256
3257 INIT_HLIST_NODE(&pool->hash_node);
3258 pool->refcnt = 1;
3259
3260 /* shouldn't fail above this point */
3190 pool->attrs = alloc_workqueue_attrs(GFP_KERNEL); 3261 pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
3191 if (!pool->attrs) 3262 if (!pool->attrs)
3192 return -ENOMEM; 3263 return -ENOMEM;
3193 return 0; 3264 return 0;
3194} 3265}
3195 3266
3267static void rcu_free_pool(struct rcu_head *rcu)
3268{
3269 struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
3270
3271 ida_destroy(&pool->worker_ida);
3272 free_workqueue_attrs(pool->attrs);
3273 kfree(pool);
3274}
3275
3276/**
3277 * put_unbound_pool - put a worker_pool
3278 * @pool: worker_pool to put
3279 *
3280 * Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
3281 * safe manner.
3282 */
3283static void put_unbound_pool(struct worker_pool *pool)
3284{
3285 struct worker *worker;
3286
3287 spin_lock_irq(&workqueue_lock);
3288 if (--pool->refcnt) {
3289 spin_unlock_irq(&workqueue_lock);
3290 return;
3291 }
3292
3293 /* sanity checks */
3294 if (WARN_ON(!(pool->flags & POOL_DISASSOCIATED)) ||
3295 WARN_ON(!list_empty(&pool->worklist))) {
3296 spin_unlock_irq(&workqueue_lock);
3297 return;
3298 }
3299
3300 /* release id and unhash */
3301 if (pool->id >= 0)
3302 idr_remove(&worker_pool_idr, pool->id);
3303 hash_del(&pool->hash_node);
3304
3305 spin_unlock_irq(&workqueue_lock);
3306
3307 /* lock out manager and destroy all workers */
3308 mutex_lock(&pool->manager_arb);
3309 spin_lock_irq(&pool->lock);
3310
3311 while ((worker = first_worker(pool)))
3312 destroy_worker(worker);
3313 WARN_ON(pool->nr_workers || pool->nr_idle);
3314
3315 spin_unlock_irq(&pool->lock);
3316 mutex_unlock(&pool->manager_arb);
3317
3318 /* shut down the timers */
3319 del_timer_sync(&pool->idle_timer);
3320 del_timer_sync(&pool->mayday_timer);
3321
3322 /* sched-RCU protected to allow dereferences from get_work_pool() */
3323 call_rcu_sched(&pool->rcu, rcu_free_pool);
3324}
3325
3326/**
3327 * get_unbound_pool - get a worker_pool with the specified attributes
3328 * @attrs: the attributes of the worker_pool to get
3329 *
3330 * Obtain a worker_pool which has the same attributes as @attrs, bump the
3331 * reference count and return it. If there already is a matching
3332 * worker_pool, it will be used; otherwise, this function attempts to
3333 * create a new one. On failure, returns NULL.
3334 */
3335static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
3336{
3337 static DEFINE_MUTEX(create_mutex);
3338 u32 hash = wqattrs_hash(attrs);
3339 struct worker_pool *pool;
3340 struct worker *worker;
3341
3342 mutex_lock(&create_mutex);
3343
3344 /* do we already have a matching pool? */
3345 spin_lock_irq(&workqueue_lock);
3346 hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
3347 if (wqattrs_equal(pool->attrs, attrs)) {
3348 pool->refcnt++;
3349 goto out_unlock;
3350 }
3351 }
3352 spin_unlock_irq(&workqueue_lock);
3353
3354 /* nope, create a new one */
3355 pool = kzalloc(sizeof(*pool), GFP_KERNEL);
3356 if (!pool || init_worker_pool(pool) < 0)
3357 goto fail;
3358
3359 copy_workqueue_attrs(pool->attrs, attrs);
3360
3361 if (worker_pool_assign_id(pool) < 0)
3362 goto fail;
3363
3364 /* create and start the initial worker */
3365 worker = create_worker(pool);
3366 if (!worker)
3367 goto fail;
3368
3369 spin_lock_irq(&pool->lock);
3370 start_worker(worker);
3371 spin_unlock_irq(&pool->lock);
3372
3373 /* install */
3374 spin_lock_irq(&workqueue_lock);
3375 hash_add(unbound_pool_hash, &pool->hash_node, hash);
3376out_unlock:
3377 spin_unlock_irq(&workqueue_lock);
3378 mutex_unlock(&create_mutex);
3379 return pool;
3380fail:
3381 mutex_unlock(&create_mutex);
3382 if (pool)
3383 put_unbound_pool(pool);
3384 return NULL;
3385}
3386
3196static int alloc_and_link_pwqs(struct workqueue_struct *wq) 3387static int alloc_and_link_pwqs(struct workqueue_struct *wq)
3197{ 3388{
3198 bool highpri = wq->flags & WQ_HIGHPRI; 3389 bool highpri = wq->flags & WQ_HIGHPRI;
@@ -3217,7 +3408,12 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
3217 if (!pwq) 3408 if (!pwq)
3218 return -ENOMEM; 3409 return -ENOMEM;
3219 3410
3220 pwq->pool = get_std_worker_pool(WORK_CPU_UNBOUND, highpri); 3411 pwq->pool = get_unbound_pool(unbound_std_wq_attrs[highpri]);
3412 if (!pwq->pool) {
3413 kmem_cache_free(pwq_cache, pwq);
3414 return -ENOMEM;
3415 }
3416
3221 list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs); 3417 list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs);
3222 } 3418 }
3223 3419
@@ -3395,6 +3591,15 @@ void destroy_workqueue(struct workqueue_struct *wq)
3395 kfree(wq->rescuer); 3591 kfree(wq->rescuer);
3396 } 3592 }
3397 3593
3594 /*
3595 * We're the sole accessor of @wq at this point. Directly access
3596 * the first pwq and put its pool.
3597 */
3598 if (wq->flags & WQ_UNBOUND) {
3599 pwq = list_first_entry(&wq->pwqs, struct pool_workqueue,
3600 pwqs_node);
3601 put_unbound_pool(pwq->pool);
3602 }
3398 free_pwqs(wq); 3603 free_pwqs(wq);
3399 kfree(wq); 3604 kfree(wq);
3400} 3605}
@@ -3857,19 +4062,14 @@ static int __init init_workqueues(void)
3857 hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); 4062 hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);
3858 4063
3859 /* initialize CPU pools */ 4064 /* initialize CPU pools */
3860 for_each_wq_cpu(cpu) { 4065 for_each_possible_cpu(cpu) {
3861 struct worker_pool *pool; 4066 struct worker_pool *pool;
3862 4067
3863 i = 0; 4068 i = 0;
3864 for_each_std_worker_pool(pool, cpu) { 4069 for_each_std_worker_pool(pool, cpu) {
3865 BUG_ON(init_worker_pool(pool)); 4070 BUG_ON(init_worker_pool(pool));
3866 pool->cpu = cpu; 4071 pool->cpu = cpu;
3867 4072 cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
3868 if (cpu != WORK_CPU_UNBOUND)
3869 cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
3870 else
3871 cpumask_setall(pool->attrs->cpumask);
3872
3873 pool->attrs->nice = std_nice[i++]; 4073 pool->attrs->nice = std_nice[i++];
3874 4074
3875 /* alloc pool ID */ 4075 /* alloc pool ID */
@@ -3878,14 +4078,13 @@ static int __init init_workqueues(void)
3878 } 4078 }
3879 4079
3880 /* create the initial worker */ 4080 /* create the initial worker */
3881 for_each_online_wq_cpu(cpu) { 4081 for_each_online_cpu(cpu) {
3882 struct worker_pool *pool; 4082 struct worker_pool *pool;
3883 4083
3884 for_each_std_worker_pool(pool, cpu) { 4084 for_each_std_worker_pool(pool, cpu) {
3885 struct worker *worker; 4085 struct worker *worker;
3886 4086
3887 if (cpu != WORK_CPU_UNBOUND) 4087 pool->flags &= ~POOL_DISASSOCIATED;
3888 pool->flags &= ~POOL_DISASSOCIATED;
3889 4088
3890 worker = create_worker(pool); 4089 worker = create_worker(pool);
3891 BUG_ON(!worker); 4090 BUG_ON(!worker);
@@ -3895,6 +4094,18 @@ static int __init init_workqueues(void)
3895 } 4094 }
3896 } 4095 }
3897 4096
4097 /* create default unbound wq attrs */
4098 for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
4099 struct workqueue_attrs *attrs;
4100
4101 BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
4102
4103 attrs->nice = std_nice[i];
4104 cpumask_setall(attrs->cpumask);
4105
4106 unbound_std_wq_attrs[i] = attrs;
4107 }
4108
3898 system_wq = alloc_workqueue("events", 0, 0); 4109 system_wq = alloc_workqueue("events", 0, 0);
3899 system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); 4110 system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
3900 system_long_wq = alloc_workqueue("events_long", 0, 0); 4111 system_long_wq = alloc_workqueue("events_long", 0, 0);