aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/md/bcache/journal.c
diff options
context:
space:
mode:
authorKent Overstreet <kmo@daterainc.com>2013-10-24 20:07:04 -0400
committerKent Overstreet <kmo@daterainc.com>2013-11-11 00:56:02 -0500
commita34a8bfd4e6358c646928320d37b0425c0762f8a (patch)
tree650dd57be0460f439551baca3514009b4287bb12 /drivers/md/bcache/journal.c
parentcdd972b164be8fc69f6ee8533c5a07b621da74c7 (diff)
bcache: Refactor journalling flow control
Making things less asynchronous that don't need to be - bch_journal() only has to block when the journal or journal entry is full, which is emphatically not a fast path. So make it a normal function that just returns when it finishes, to make the code and control flow easier to follow. Signed-off-by: Kent Overstreet <kmo@daterainc.com>
Diffstat (limited to 'drivers/md/bcache/journal.c')
-rw-r--r--drivers/md/bcache/journal.c213
1 files changed, 100 insertions, 113 deletions
diff --git a/drivers/md/bcache/journal.c b/drivers/md/bcache/journal.c
index 1bdefdb1fa71..940e89e0d706 100644
--- a/drivers/md/bcache/journal.c
+++ b/drivers/md/bcache/journal.c
@@ -318,7 +318,6 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list,
318 bch_keylist_push(&op->keys); 318 bch_keylist_push(&op->keys);
319 319
320 op->journal = i->pin; 320 op->journal = i->pin;
321 atomic_inc(op->journal);
322 321
323 ret = bch_btree_insert(op, s, &op->keys); 322 ret = bch_btree_insert(op, s, &op->keys);
324 if (ret) 323 if (ret)
@@ -357,48 +356,35 @@ static void btree_flush_write(struct cache_set *c)
357 * Try to find the btree node with that references the oldest journal 356 * Try to find the btree node with that references the oldest journal
358 * entry, best is our current candidate and is locked if non NULL: 357 * entry, best is our current candidate and is locked if non NULL:
359 */ 358 */
360 struct btree *b, *best = NULL; 359 struct btree *b, *best;
361 unsigned iter; 360 unsigned i;
361retry:
362 best = NULL;
363
364 for_each_cached_btree(b, c, i)
365 if (btree_current_write(b)->journal) {
366 if (!best)
367 best = b;
368 else if (journal_pin_cmp(c,
369 btree_current_write(best),
370 btree_current_write(b))) {
371 best = b;
372 }
373 }
362 374
363 for_each_cached_btree(b, c, iter) { 375 b = best;
364 if (!down_write_trylock(&b->lock)) 376 if (b) {
365 continue; 377 rw_lock(true, b, b->level);
366 378
367 if (!btree_node_dirty(b) || 379 if (!btree_current_write(b)->journal) {
368 !btree_current_write(b)->journal) {
369 rw_unlock(true, b); 380 rw_unlock(true, b);
370 continue; 381 /* We raced */
382 goto retry;
371 } 383 }
372 384
373 if (!best) 385 bch_btree_node_write(b, NULL);
374 best = b; 386 rw_unlock(true, b);
375 else if (journal_pin_cmp(c,
376 btree_current_write(best),
377 btree_current_write(b))) {
378 rw_unlock(true, best);
379 best = b;
380 } else
381 rw_unlock(true, b);
382 } 387 }
383
384 if (best)
385 goto out;
386
387 /* We can't find the best btree node, just pick the first */
388 list_for_each_entry(b, &c->btree_cache, list)
389 if (!b->level && btree_node_dirty(b)) {
390 best = b;
391 rw_lock(true, best, best->level);
392 goto found;
393 }
394
395out:
396 if (!best)
397 return;
398found:
399 if (btree_node_dirty(best))
400 bch_btree_node_write(best, NULL);
401 rw_unlock(true, best);
402} 388}
403 389
404#define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) 390#define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1)
@@ -494,7 +480,7 @@ static void journal_reclaim(struct cache_set *c)
494 do_journal_discard(ca); 480 do_journal_discard(ca);
495 481
496 if (c->journal.blocks_free) 482 if (c->journal.blocks_free)
497 return; 483 goto out;
498 484
499 /* 485 /*
500 * Allocate: 486 * Allocate:
@@ -520,7 +506,7 @@ static void journal_reclaim(struct cache_set *c)
520 506
521 if (n) 507 if (n)
522 c->journal.blocks_free = c->sb.bucket_size >> c->block_bits; 508 c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
523 509out:
524 if (!journal_full(&c->journal)) 510 if (!journal_full(&c->journal))
525 __closure_wake_up(&c->journal.wait); 511 __closure_wake_up(&c->journal.wait);
526} 512}
@@ -659,7 +645,7 @@ static void journal_write(struct closure *cl)
659 journal_write_unlocked(cl); 645 journal_write_unlocked(cl);
660} 646}
661 647
662static void __journal_try_write(struct cache_set *c, bool noflush) 648static void journal_try_write(struct cache_set *c)
663 __releases(c->journal.lock) 649 __releases(c->journal.lock)
664{ 650{
665 struct closure *cl = &c->journal.io; 651 struct closure *cl = &c->journal.io;
@@ -667,29 +653,59 @@ static void __journal_try_write(struct cache_set *c, bool noflush)
667 653
668 w->need_write = true; 654 w->need_write = true;
669 655
670 if (!closure_trylock(cl, &c->cl)) 656 if (closure_trylock(cl, &c->cl))
671 spin_unlock(&c->journal.lock);
672 else if (noflush && journal_full(&c->journal)) {
673 spin_unlock(&c->journal.lock);
674 continue_at(cl, journal_write, system_wq);
675 } else
676 journal_write_unlocked(cl); 657 journal_write_unlocked(cl);
658 else
659 spin_unlock(&c->journal.lock);
677} 660}
678 661
679#define journal_try_write(c) __journal_try_write(c, false) 662static struct journal_write *journal_wait_for_write(struct cache_set *c,
680 663 unsigned nkeys)
681void bch_journal_meta(struct cache_set *c, struct closure *cl)
682{ 664{
683 struct journal_write *w; 665 size_t sectors;
666 struct closure cl;
684 667
685 if (CACHE_SYNC(&c->sb)) { 668 closure_init_stack(&cl);
686 spin_lock(&c->journal.lock); 669
687 w = c->journal.cur; 670 spin_lock(&c->journal.lock);
671
672 while (1) {
673 struct journal_write *w = c->journal.cur;
674
675 sectors = __set_blocks(w->data, w->data->keys + nkeys,
676 c) * c->sb.block_size;
677
678 if (sectors <= min_t(size_t,
679 c->journal.blocks_free * c->sb.block_size,
680 PAGE_SECTORS << JSET_BITS))
681 return w;
682
683 /* XXX: tracepoint */
684 if (!journal_full(&c->journal)) {
685 trace_bcache_journal_entry_full(c);
686
687 /*
688 * XXX: If we were inserting so many keys that they
689 * won't fit in an _empty_ journal write, we'll
690 * deadlock. For now, handle this in
691 * bch_keylist_realloc() - but something to think about.
692 */
693 BUG_ON(!w->data->keys);
694
695 closure_wait(&w->wait, &cl);
696 journal_try_write(c); /* unlocks */
697 } else {
698 trace_bcache_journal_full(c);
699
700 closure_wait(&c->journal.wait, &cl);
701 journal_reclaim(c);
702 spin_unlock(&c->journal.lock);
688 703
689 if (cl) 704 btree_flush_write(c);
690 BUG_ON(!closure_wait(&w->wait, cl)); 705 }
691 706
692 __journal_try_write(c, true); 707 closure_sync(&cl);
708 spin_lock(&c->journal.lock);
693 } 709 }
694} 710}
695 711
@@ -708,68 +724,26 @@ static void journal_write_work(struct work_struct *work)
708 * bch_journal() hands those same keys off to btree_insert_async() 724 * bch_journal() hands those same keys off to btree_insert_async()
709 */ 725 */
710 726
711void bch_journal(struct closure *cl) 727atomic_t *bch_journal(struct cache_set *c,
728 struct keylist *keys,
729 struct closure *parent)
712{ 730{
713 struct btree_op *op = container_of(cl, struct btree_op, cl);
714 struct cache_set *c = op->c;
715 struct journal_write *w; 731 struct journal_write *w;
716 size_t sectors, nkeys; 732 atomic_t *ret;
717
718 if (op->type != BTREE_INSERT ||
719 !CACHE_SYNC(&c->sb))
720 goto out;
721
722 /*
723 * If we're looping because we errored, might already be waiting on
724 * another journal write:
725 */
726 while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING)
727 closure_sync(cl->parent);
728
729 spin_lock(&c->journal.lock);
730
731 if (journal_full(&c->journal)) {
732 trace_bcache_journal_full(c);
733
734 closure_wait(&c->journal.wait, cl);
735
736 journal_reclaim(c);
737 spin_unlock(&c->journal.lock);
738
739 btree_flush_write(c);
740 continue_at(cl, bch_journal, bcache_wq);
741 }
742 733
743 w = c->journal.cur; 734 if (!CACHE_SYNC(&c->sb))
744 nkeys = w->data->keys + bch_keylist_nkeys(&op->keys); 735 return NULL;
745 sectors = __set_blocks(w->data, nkeys, c) * c->sb.block_size;
746 736
747 if (sectors > min_t(size_t, 737 w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
748 c->journal.blocks_free * c->sb.block_size,
749 PAGE_SECTORS << JSET_BITS)) {
750 trace_bcache_journal_entry_full(c);
751 738
752 /* 739 memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys));
753 * XXX: If we were inserting so many keys that they won't fit in 740 w->data->keys += bch_keylist_nkeys(keys);
754 * an _empty_ journal write, we'll deadlock. For now, handle
755 * this in bch_keylist_realloc() - but something to think about.
756 */
757 BUG_ON(!w->data->keys);
758 741
759 BUG_ON(!closure_wait(&w->wait, cl)); 742 ret = &fifo_back(&c->journal.pin);
743 atomic_inc(ret);
760 744
761 journal_try_write(c); 745 if (parent) {
762 continue_at(cl, bch_journal, bcache_wq); 746 closure_wait(&w->wait, parent);
763 }
764
765 memcpy(end(w->data), op->keys.keys, bch_keylist_bytes(&op->keys));
766 w->data->keys += bch_keylist_nkeys(&op->keys);
767
768 op->journal = &fifo_back(&c->journal.pin);
769 atomic_inc(op->journal);
770
771 if (op->flush_journal) {
772 closure_wait(&w->wait, cl->parent);
773 journal_try_write(c); 747 journal_try_write(c);
774 } else if (!w->need_write) { 748 } else if (!w->need_write) {
775 schedule_delayed_work(&c->journal.work, 749 schedule_delayed_work(&c->journal.work,
@@ -778,8 +752,21 @@ void bch_journal(struct closure *cl)
778 } else { 752 } else {
779 spin_unlock(&c->journal.lock); 753 spin_unlock(&c->journal.lock);
780 } 754 }
781out: 755
782 bch_btree_insert_async(cl); 756
757 return ret;
758}
759
760void bch_journal_meta(struct cache_set *c, struct closure *cl)
761{
762 struct keylist keys;
763 atomic_t *ref;
764
765 bch_keylist_init(&keys);
766
767 ref = bch_journal(c, &keys, cl);
768 if (ref)
769 atomic_dec_bug(ref);
783} 770}
784 771
785void bch_journal_free(struct cache_set *c) 772void bch_journal_free(struct cache_set *c)