aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJens Axboe <axboe@kernel.dk>2018-08-26 12:10:05 -0400
committerJens Axboe <axboe@kernel.dk>2018-08-27 13:27:26 -0400
commit38cfb5a45ee013bfab5d1ae4c4738815e744b440 (patch)
treea85aa2ef3699f6a42256efbf23705993f6da6d98
parent061a5427530633de93ace4ef001b99961984af62 (diff)
blk-wbt: improve waking of tasks
We have two potential issues: 1) After commit 2887e41b910b, we only wake one process at the time when we finish an IO. We really want to wake up as many tasks as can queue IO. Before this commit, we woke up everyone, which could cause a thundering herd issue. 2) A task can potentially consume two wakeups, causing us to (in practice) miss a wakeup. Fix both by providing our own wakeup function, which stops __wake_up_common() from waking up more tasks if we fail to get a queueing token. With the strict ordering we have on the wait list, this wakes the right tasks and the right amount of tasks. Based on a patch from Jianchao Wang <jianchao.w.wang@oracle.com>. Tested-by: Agarwal, Anchal <anchalag@amazon.com> Signed-off-by: Jens Axboe <axboe@kernel.dk>
-rw-r--r--block/blk-wbt.c63
1 files changed, 56 insertions, 7 deletions
diff --git a/block/blk-wbt.c b/block/blk-wbt.c
index 4575b4650370..bfb0d21d19ce 100644
--- a/block/blk-wbt.c
+++ b/block/blk-wbt.c
@@ -161,7 +161,7 @@ static void wbt_rqw_done(struct rq_wb *rwb, struct rq_wait *rqw,
161 int diff = limit - inflight; 161 int diff = limit - inflight;
162 162
163 if (!inflight || diff >= rwb->wb_background / 2) 163 if (!inflight || diff >= rwb->wb_background / 2)
164 wake_up(&rqw->wait); 164 wake_up_all(&rqw->wait);
165 } 165 }
166} 166}
167 167
@@ -488,6 +488,34 @@ static inline unsigned int get_limit(struct rq_wb *rwb, unsigned long rw)
488 return limit; 488 return limit;
489} 489}
490 490
491struct wbt_wait_data {
492 struct wait_queue_entry wq;
493 struct task_struct *task;
494 struct rq_wb *rwb;
495 struct rq_wait *rqw;
496 unsigned long rw;
497 bool got_token;
498};
499
500static int wbt_wake_function(struct wait_queue_entry *curr, unsigned int mode,
501 int wake_flags, void *key)
502{
503 struct wbt_wait_data *data = container_of(curr, struct wbt_wait_data,
504 wq);
505
506 /*
507 * If we fail to get a budget, return -1 to interrupt the wake up
508 * loop in __wake_up_common.
509 */
510 if (!rq_wait_inc_below(data->rqw, get_limit(data->rwb, data->rw)))
511 return -1;
512
513 data->got_token = true;
514 list_del_init(&curr->entry);
515 wake_up_process(data->task);
516 return 1;
517}
518
491/* 519/*
492 * Block if we will exceed our limit, or if we are currently waiting for 520 * Block if we will exceed our limit, or if we are currently waiting for
493 * the timer to kick off queuing again. 521 * the timer to kick off queuing again.
@@ -498,19 +526,40 @@ static void __wbt_wait(struct rq_wb *rwb, enum wbt_flags wb_acct,
498 __acquires(lock) 526 __acquires(lock)
499{ 527{
500 struct rq_wait *rqw = get_rq_wait(rwb, wb_acct); 528 struct rq_wait *rqw = get_rq_wait(rwb, wb_acct);
501 DECLARE_WAITQUEUE(wait, current); 529 struct wbt_wait_data data = {
530 .wq = {
531 .func = wbt_wake_function,
532 .entry = LIST_HEAD_INIT(data.wq.entry),
533 },
534 .task = current,
535 .rwb = rwb,
536 .rqw = rqw,
537 .rw = rw,
538 };
502 bool has_sleeper; 539 bool has_sleeper;
503 540
504 has_sleeper = wq_has_sleeper(&rqw->wait); 541 has_sleeper = wq_has_sleeper(&rqw->wait);
505 if (!has_sleeper && rq_wait_inc_below(rqw, get_limit(rwb, rw))) 542 if (!has_sleeper && rq_wait_inc_below(rqw, get_limit(rwb, rw)))
506 return; 543 return;
507 544
508 add_wait_queue_exclusive(&rqw->wait, &wait); 545 prepare_to_wait_exclusive(&rqw->wait, &data.wq, TASK_UNINTERRUPTIBLE);
509 do { 546 do {
510 set_current_state(TASK_UNINTERRUPTIBLE); 547 if (data.got_token)
548 break;
511 549
512 if (!has_sleeper && rq_wait_inc_below(rqw, get_limit(rwb, rw))) 550 if (!has_sleeper &&
551 rq_wait_inc_below(rqw, get_limit(rwb, rw))) {
552 finish_wait(&rqw->wait, &data.wq);
553
554 /*
555 * We raced with wbt_wake_function() getting a token,
556 * which means we now have two. Put our local token
557 * and wake anyone else potentially waiting for one.
558 */
559 if (data.got_token)
560 wbt_rqw_done(rwb, rqw, wb_acct);
513 break; 561 break;
562 }
514 563
515 if (lock) { 564 if (lock) {
516 spin_unlock_irq(lock); 565 spin_unlock_irq(lock);
@@ -518,11 +567,11 @@ static void __wbt_wait(struct rq_wb *rwb, enum wbt_flags wb_acct,
518 spin_lock_irq(lock); 567 spin_lock_irq(lock);
519 } else 568 } else
520 io_schedule(); 569 io_schedule();
570
521 has_sleeper = false; 571 has_sleeper = false;
522 } while (1); 572 } while (1);
523 573
524 __set_current_state(TASK_RUNNING); 574 finish_wait(&rqw->wait, &data.wq);
525 remove_wait_queue(&rqw->wait, &wait);
526} 575}
527 576
528static inline bool wbt_should_throttle(struct rq_wb *rwb, struct bio *bio) 577static inline bool wbt_should_throttle(struct rq_wb *rwb, struct bio *bio)