aboutsummaryrefslogtreecommitdiffstats
path: root/kernel
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2012-07-14 01:16:45 -0400
committerTejun Heo <tj@kernel.org>2012-07-14 01:24:45 -0400
commit3270476a6c0ce322354df8679652f060d66526dc (patch)
treedb58846beb7c5e1c1b50b7e2f1c2538320408c26 /kernel
parent4ce62e9e30cacc26885cab133ad1de358dd79f21 (diff)
workqueue: reimplement WQ_HIGHPRI using a separate worker_pool
WQ_HIGHPRI was implemented by queueing highpri work items at the head of the global worklist. Other than queueing at the head, they weren't handled differently; unfortunately, this could lead to execution latency of a few seconds on heavily loaded systems. Now that workqueue code has been updated to deal with multiple worker_pools per global_cwq, this patch reimplements WQ_HIGHPRI using a separate worker_pool. NR_WORKER_POOLS is bumped to two and gcwq->pools[0] is used for normal pri work items and ->pools[1] for highpri. Highpri workers get -20 nice level and has 'H' suffix in their names. Note that this change increases the number of kworkers per cpu. POOL_HIGHPRI_PENDING, pool_determine_ins_pos() and highpri chain wakeup code in process_one_work() are no longer used and removed. This allows proper prioritization of highpri work items and removes high execution latency of highpri work items. v2: nr_running indexing bug in get_pool_nr_running() fixed. v3: Refreshed for the get_pool_nr_running() update in the previous patch. Signed-off-by: Tejun Heo <tj@kernel.org> Reported-by: Josh Hunt <joshhunt00@gmail.com> LKML-Reference: <CAKA=qzaHqwZ8eqpLNFjxnO2fX-tgAOjmpvxgBFjv6dJeQaOW1w@mail.gmail.com> Cc: Tony Luck <tony.luck@intel.com> Cc: Fengguang Wu <fengguang.wu@intel.com>
Diffstat (limited to 'kernel')
-rw-r--r--kernel/workqueue.c100
1 files changed, 27 insertions, 73 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index b0daaea44ea..4fa9e3552f1 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -52,7 +52,6 @@ enum {
52 /* pool flags */ 52 /* pool flags */
53 POOL_MANAGE_WORKERS = 1 << 0, /* need to manage workers */ 53 POOL_MANAGE_WORKERS = 1 << 0, /* need to manage workers */
54 POOL_MANAGING_WORKERS = 1 << 1, /* managing workers */ 54 POOL_MANAGING_WORKERS = 1 << 1, /* managing workers */
55 POOL_HIGHPRI_PENDING = 1 << 2, /* highpri works on queue */
56 55
57 /* worker flags */ 56 /* worker flags */
58 WORKER_STARTED = 1 << 0, /* started */ 57 WORKER_STARTED = 1 << 0, /* started */
@@ -74,7 +73,7 @@ enum {
74 TRUSTEE_RELEASE = 3, /* release workers */ 73 TRUSTEE_RELEASE = 3, /* release workers */
75 TRUSTEE_DONE = 4, /* trustee is done */ 74 TRUSTEE_DONE = 4, /* trustee is done */
76 75
77 NR_WORKER_POOLS = 1, /* # worker pools per gcwq */ 76 NR_WORKER_POOLS = 2, /* # worker pools per gcwq */
78 77
79 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */ 78 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
80 BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER, 79 BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER,
@@ -95,6 +94,7 @@ enum {
95 * all cpus. Give -20. 94 * all cpus. Give -20.
96 */ 95 */
97 RESCUER_NICE_LEVEL = -20, 96 RESCUER_NICE_LEVEL = -20,
97 HIGHPRI_NICE_LEVEL = -20,
98}; 98};
99 99
100/* 100/*
@@ -174,7 +174,7 @@ struct global_cwq {
174 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE]; 174 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
175 /* L: hash of busy workers */ 175 /* L: hash of busy workers */
176 176
177 struct worker_pool pool; /* the worker pools */ 177 struct worker_pool pools[2]; /* normal and highpri pools */
178 178
179 struct task_struct *trustee; /* L: for gcwq shutdown */ 179 struct task_struct *trustee; /* L: for gcwq shutdown */
180 unsigned int trustee_state; /* L: trustee state */ 180 unsigned int trustee_state; /* L: trustee state */
@@ -277,7 +277,8 @@ EXPORT_SYMBOL_GPL(system_nrt_freezable_wq);
277#include <trace/events/workqueue.h> 277#include <trace/events/workqueue.h>
278 278
279#define for_each_worker_pool(pool, gcwq) \ 279#define for_each_worker_pool(pool, gcwq) \
280 for ((pool) = &(gcwq)->pool; (pool); (pool) = NULL) 280 for ((pool) = &(gcwq)->pools[0]; \
281 (pool) < &(gcwq)->pools[NR_WORKER_POOLS]; (pool)++)
281 282
282#define for_each_busy_worker(worker, i, pos, gcwq) \ 283#define for_each_busy_worker(worker, i, pos, gcwq) \
283 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \ 284 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \
@@ -473,6 +474,11 @@ static atomic_t unbound_pool_nr_running[NR_WORKER_POOLS] = {
473 474
474static int worker_thread(void *__worker); 475static int worker_thread(void *__worker);
475 476
477static int worker_pool_pri(struct worker_pool *pool)
478{
479 return pool - pool->gcwq->pools;
480}
481
476static struct global_cwq *get_gcwq(unsigned int cpu) 482static struct global_cwq *get_gcwq(unsigned int cpu)
477{ 483{
478 if (cpu != WORK_CPU_UNBOUND) 484 if (cpu != WORK_CPU_UNBOUND)
@@ -484,7 +490,7 @@ static struct global_cwq *get_gcwq(unsigned int cpu)
484static atomic_t *get_pool_nr_running(struct worker_pool *pool) 490static atomic_t *get_pool_nr_running(struct worker_pool *pool)
485{ 491{
486 int cpu = pool->gcwq->cpu; 492 int cpu = pool->gcwq->cpu;
487 int idx = 0; 493 int idx = worker_pool_pri(pool);
488 494
489 if (cpu != WORK_CPU_UNBOUND) 495 if (cpu != WORK_CPU_UNBOUND)
490 return &per_cpu(pool_nr_running, cpu)[idx]; 496 return &per_cpu(pool_nr_running, cpu)[idx];
@@ -586,15 +592,14 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
586} 592}
587 593
588/* 594/*
589 * Policy functions. These define the policies on how the global 595 * Policy functions. These define the policies on how the global worker
590 * worker pool is managed. Unless noted otherwise, these functions 596 * pools are managed. Unless noted otherwise, these functions assume that
591 * assume that they're being called with gcwq->lock held. 597 * they're being called with gcwq->lock held.
592 */ 598 */
593 599
594static bool __need_more_worker(struct worker_pool *pool) 600static bool __need_more_worker(struct worker_pool *pool)
595{ 601{
596 return !atomic_read(get_pool_nr_running(pool)) || 602 return !atomic_read(get_pool_nr_running(pool));
597 (pool->flags & POOL_HIGHPRI_PENDING);
598} 603}
599 604
600/* 605/*
@@ -621,9 +626,7 @@ static bool keep_working(struct worker_pool *pool)
621{ 626{
622 atomic_t *nr_running = get_pool_nr_running(pool); 627 atomic_t *nr_running = get_pool_nr_running(pool);
623 628
624 return !list_empty(&pool->worklist) && 629 return !list_empty(&pool->worklist) && atomic_read(nr_running) <= 1;
625 (atomic_read(nr_running) <= 1 ||
626 (pool->flags & POOL_HIGHPRI_PENDING));
627} 630}
628 631
629/* Do we need a new worker? Called from manager. */ 632/* Do we need a new worker? Called from manager. */
@@ -892,43 +895,6 @@ static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
892} 895}
893 896
894/** 897/**
895 * pool_determine_ins_pos - find insertion position
896 * @pool: pool of interest
897 * @cwq: cwq a work is being queued for
898 *
899 * A work for @cwq is about to be queued on @pool, determine insertion
900 * position for the work. If @cwq is for HIGHPRI wq, the work is
901 * queued at the head of the queue but in FIFO order with respect to
902 * other HIGHPRI works; otherwise, at the end of the queue. This
903 * function also sets POOL_HIGHPRI_PENDING flag to hint @pool that
904 * there are HIGHPRI works pending.
905 *
906 * CONTEXT:
907 * spin_lock_irq(gcwq->lock).
908 *
909 * RETURNS:
910 * Pointer to inserstion position.
911 */
912static inline struct list_head *pool_determine_ins_pos(struct worker_pool *pool,
913 struct cpu_workqueue_struct *cwq)
914{
915 struct work_struct *twork;
916
917 if (likely(!(cwq->wq->flags & WQ_HIGHPRI)))
918 return &pool->worklist;
919
920 list_for_each_entry(twork, &pool->worklist, entry) {
921 struct cpu_workqueue_struct *tcwq = get_work_cwq(twork);
922
923 if (!(tcwq->wq->flags & WQ_HIGHPRI))
924 break;
925 }
926
927 pool->flags |= POOL_HIGHPRI_PENDING;
928 return &twork->entry;
929}
930
931/**
932 * insert_work - insert a work into gcwq 898 * insert_work - insert a work into gcwq
933 * @cwq: cwq @work belongs to 899 * @cwq: cwq @work belongs to
934 * @work: work to insert 900 * @work: work to insert
@@ -1068,7 +1034,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
1068 if (likely(cwq->nr_active < cwq->max_active)) { 1034 if (likely(cwq->nr_active < cwq->max_active)) {
1069 trace_workqueue_activate_work(work); 1035 trace_workqueue_activate_work(work);
1070 cwq->nr_active++; 1036 cwq->nr_active++;
1071 worklist = pool_determine_ins_pos(cwq->pool, cwq); 1037 worklist = &cwq->pool->worklist;
1072 } else { 1038 } else {
1073 work_flags |= WORK_STRUCT_DELAYED; 1039 work_flags |= WORK_STRUCT_DELAYED;
1074 worklist = &cwq->delayed_works; 1040 worklist = &cwq->delayed_works;
@@ -1385,6 +1351,7 @@ static struct worker *create_worker(struct worker_pool *pool, bool bind)
1385{ 1351{
1386 struct global_cwq *gcwq = pool->gcwq; 1352 struct global_cwq *gcwq = pool->gcwq;
1387 bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND; 1353 bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND;
1354 const char *pri = worker_pool_pri(pool) ? "H" : "";
1388 struct worker *worker = NULL; 1355 struct worker *worker = NULL;
1389 int id = -1; 1356 int id = -1;
1390 1357
@@ -1406,15 +1373,17 @@ static struct worker *create_worker(struct worker_pool *pool, bool bind)
1406 1373
1407 if (!on_unbound_cpu) 1374 if (!on_unbound_cpu)
1408 worker->task = kthread_create_on_node(worker_thread, 1375 worker->task = kthread_create_on_node(worker_thread,
1409 worker, 1376 worker, cpu_to_node(gcwq->cpu),
1410 cpu_to_node(gcwq->cpu), 1377 "kworker/%u:%d%s", gcwq->cpu, id, pri);
1411 "kworker/%u:%d", gcwq->cpu, id);
1412 else 1378 else
1413 worker->task = kthread_create(worker_thread, worker, 1379 worker->task = kthread_create(worker_thread, worker,
1414 "kworker/u:%d", id); 1380 "kworker/u:%d%s", id, pri);
1415 if (IS_ERR(worker->task)) 1381 if (IS_ERR(worker->task))
1416 goto fail; 1382 goto fail;
1417 1383
1384 if (worker_pool_pri(pool))
1385 set_user_nice(worker->task, HIGHPRI_NICE_LEVEL);
1386
1418 /* 1387 /*
1419 * A rogue worker will become a regular one if CPU comes 1388 * A rogue worker will become a regular one if CPU comes
1420 * online later on. Make sure every worker has 1389 * online later on. Make sure every worker has
@@ -1761,10 +1730,9 @@ static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
1761{ 1730{
1762 struct work_struct *work = list_first_entry(&cwq->delayed_works, 1731 struct work_struct *work = list_first_entry(&cwq->delayed_works,
1763 struct work_struct, entry); 1732 struct work_struct, entry);
1764 struct list_head *pos = pool_determine_ins_pos(cwq->pool, cwq);
1765 1733
1766 trace_workqueue_activate_work(work); 1734 trace_workqueue_activate_work(work);
1767 move_linked_works(work, pos, NULL); 1735 move_linked_works(work, &cwq->pool->worklist, NULL);
1768 __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work)); 1736 __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
1769 cwq->nr_active++; 1737 cwq->nr_active++;
1770} 1738}
@@ -1880,21 +1848,6 @@ __acquires(&gcwq->lock)
1880 list_del_init(&work->entry); 1848 list_del_init(&work->entry);
1881 1849
1882 /* 1850 /*
1883 * If HIGHPRI_PENDING, check the next work, and, if HIGHPRI,
1884 * wake up another worker; otherwise, clear HIGHPRI_PENDING.
1885 */
1886 if (unlikely(pool->flags & POOL_HIGHPRI_PENDING)) {
1887 struct work_struct *nwork = list_first_entry(&pool->worklist,
1888 struct work_struct, entry);
1889
1890 if (!list_empty(&pool->worklist) &&
1891 get_work_cwq(nwork)->wq->flags & WQ_HIGHPRI)
1892 wake_up_worker(pool);
1893 else
1894 pool->flags &= ~POOL_HIGHPRI_PENDING;
1895 }
1896
1897 /*
1898 * CPU intensive works don't participate in concurrency 1851 * CPU intensive works don't participate in concurrency
1899 * management. They're the scheduler's responsibility. 1852 * management. They're the scheduler's responsibility.
1900 */ 1853 */
@@ -3047,9 +3000,10 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
3047 for_each_cwq_cpu(cpu, wq) { 3000 for_each_cwq_cpu(cpu, wq) {
3048 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3001 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3049 struct global_cwq *gcwq = get_gcwq(cpu); 3002 struct global_cwq *gcwq = get_gcwq(cpu);
3003 int pool_idx = (bool)(flags & WQ_HIGHPRI);
3050 3004
3051 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK); 3005 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
3052 cwq->pool = &gcwq->pool; 3006 cwq->pool = &gcwq->pools[pool_idx];
3053 cwq->wq = wq; 3007 cwq->wq = wq;
3054 cwq->flush_color = -1; 3008 cwq->flush_color = -1;
3055 cwq->max_active = max_active; 3009 cwq->max_active = max_active;