aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/md/bcache/journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/md/bcache/journal.c')
-rw-r--r--drivers/md/bcache/journal.c293
1 files changed, 143 insertions, 150 deletions
diff --git a/drivers/md/bcache/journal.c b/drivers/md/bcache/journal.c
index 8435f81e5d85..ecdaa671bd50 100644
--- a/drivers/md/bcache/journal.c
+++ b/drivers/md/bcache/journal.c
@@ -7,7 +7,6 @@
7#include "bcache.h" 7#include "bcache.h"
8#include "btree.h" 8#include "btree.h"
9#include "debug.h" 9#include "debug.h"
10#include "request.h"
11 10
12#include <trace/events/bcache.h> 11#include <trace/events/bcache.h>
13 12
@@ -31,17 +30,20 @@ static void journal_read_endio(struct bio *bio, int error)
31} 30}
32 31
33static int journal_read_bucket(struct cache *ca, struct list_head *list, 32static int journal_read_bucket(struct cache *ca, struct list_head *list,
34 struct btree_op *op, unsigned bucket_index) 33 unsigned bucket_index)
35{ 34{
36 struct journal_device *ja = &ca->journal; 35 struct journal_device *ja = &ca->journal;
37 struct bio *bio = &ja->bio; 36 struct bio *bio = &ja->bio;
38 37
39 struct journal_replay *i; 38 struct journal_replay *i;
40 struct jset *j, *data = ca->set->journal.w[0].data; 39 struct jset *j, *data = ca->set->journal.w[0].data;
40 struct closure cl;
41 unsigned len, left, offset = 0; 41 unsigned len, left, offset = 0;
42 int ret = 0; 42 int ret = 0;
43 sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]); 43 sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]);
44 44
45 closure_init_stack(&cl);
46
45 pr_debug("reading %llu", (uint64_t) bucket); 47 pr_debug("reading %llu", (uint64_t) bucket);
46 48
47 while (offset < ca->sb.bucket_size) { 49 while (offset < ca->sb.bucket_size) {
@@ -55,11 +57,11 @@ reread: left = ca->sb.bucket_size - offset;
55 bio->bi_size = len << 9; 57 bio->bi_size = len << 9;
56 58
57 bio->bi_end_io = journal_read_endio; 59 bio->bi_end_io = journal_read_endio;
58 bio->bi_private = &op->cl; 60 bio->bi_private = &cl;
59 bch_bio_map(bio, data); 61 bch_bio_map(bio, data);
60 62
61 closure_bio_submit(bio, &op->cl, ca); 63 closure_bio_submit(bio, &cl, ca);
62 closure_sync(&op->cl); 64 closure_sync(&cl);
63 65
64 /* This function could be simpler now since we no longer write 66 /* This function could be simpler now since we no longer write
65 * journal entries that overlap bucket boundaries; this means 67 * journal entries that overlap bucket boundaries; this means
@@ -72,7 +74,7 @@ reread: left = ca->sb.bucket_size - offset;
72 struct list_head *where; 74 struct list_head *where;
73 size_t blocks, bytes = set_bytes(j); 75 size_t blocks, bytes = set_bytes(j);
74 76
75 if (j->magic != jset_magic(ca->set)) 77 if (j->magic != jset_magic(&ca->sb))
76 return ret; 78 return ret;
77 79
78 if (bytes > left << 9) 80 if (bytes > left << 9)
@@ -129,12 +131,11 @@ next_set:
129 return ret; 131 return ret;
130} 132}
131 133
132int bch_journal_read(struct cache_set *c, struct list_head *list, 134int bch_journal_read(struct cache_set *c, struct list_head *list)
133 struct btree_op *op)
134{ 135{
135#define read_bucket(b) \ 136#define read_bucket(b) \
136 ({ \ 137 ({ \
137 int ret = journal_read_bucket(ca, list, op, b); \ 138 int ret = journal_read_bucket(ca, list, b); \
138 __set_bit(b, bitmap); \ 139 __set_bit(b, bitmap); \
139 if (ret < 0) \ 140 if (ret < 0) \
140 return ret; \ 141 return ret; \
@@ -292,8 +293,7 @@ void bch_journal_mark(struct cache_set *c, struct list_head *list)
292 } 293 }
293} 294}
294 295
295int bch_journal_replay(struct cache_set *s, struct list_head *list, 296int bch_journal_replay(struct cache_set *s, struct list_head *list)
296 struct btree_op *op)
297{ 297{
298 int ret = 0, keys = 0, entries = 0; 298 int ret = 0, keys = 0, entries = 0;
299 struct bkey *k; 299 struct bkey *k;
@@ -301,31 +301,30 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list,
301 list_entry(list->prev, struct journal_replay, list); 301 list_entry(list->prev, struct journal_replay, list);
302 302
303 uint64_t start = i->j.last_seq, end = i->j.seq, n = start; 303 uint64_t start = i->j.last_seq, end = i->j.seq, n = start;
304 struct keylist keylist;
305
306 bch_keylist_init(&keylist);
304 307
305 list_for_each_entry(i, list, list) { 308 list_for_each_entry(i, list, list) {
306 BUG_ON(i->pin && atomic_read(i->pin) != 1); 309 BUG_ON(i->pin && atomic_read(i->pin) != 1);
307 310
308 if (n != i->j.seq) 311 cache_set_err_on(n != i->j.seq, s,
309 pr_err( 312"bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)",
310 "journal entries %llu-%llu missing! (replaying %llu-%llu)\n", 313 n, i->j.seq - 1, start, end);
311 n, i->j.seq - 1, start, end);
312 314
313 for (k = i->j.start; 315 for (k = i->j.start;
314 k < end(&i->j); 316 k < end(&i->j);
315 k = bkey_next(k)) { 317 k = bkey_next(k)) {
316 trace_bcache_journal_replay_key(k); 318 trace_bcache_journal_replay_key(k);
317 319
318 bkey_copy(op->keys.top, k); 320 bkey_copy(keylist.top, k);
319 bch_keylist_push(&op->keys); 321 bch_keylist_push(&keylist);
320
321 op->journal = i->pin;
322 atomic_inc(op->journal);
323 322
324 ret = bch_btree_insert(op, s); 323 ret = bch_btree_insert(s, &keylist, i->pin, NULL);
325 if (ret) 324 if (ret)
326 goto err; 325 goto err;
327 326
328 BUG_ON(!bch_keylist_empty(&op->keys)); 327 BUG_ON(!bch_keylist_empty(&keylist));
329 keys++; 328 keys++;
330 329
331 cond_resched(); 330 cond_resched();
@@ -339,14 +338,13 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list,
339 338
340 pr_info("journal replay done, %i keys in %i entries, seq %llu", 339 pr_info("journal replay done, %i keys in %i entries, seq %llu",
341 keys, entries, end); 340 keys, entries, end);
342 341err:
343 while (!list_empty(list)) { 342 while (!list_empty(list)) {
344 i = list_first_entry(list, struct journal_replay, list); 343 i = list_first_entry(list, struct journal_replay, list);
345 list_del(&i->list); 344 list_del(&i->list);
346 kfree(i); 345 kfree(i);
347 } 346 }
348err: 347
349 closure_sync(&op->cl);
350 return ret; 348 return ret;
351} 349}
352 350
@@ -358,48 +356,35 @@ static void btree_flush_write(struct cache_set *c)
358 * Try to find the btree node with that references the oldest journal 356 * Try to find the btree node with that references the oldest journal
359 * entry, best is our current candidate and is locked if non NULL: 357 * entry, best is our current candidate and is locked if non NULL:
360 */ 358 */
361 struct btree *b, *best = NULL; 359 struct btree *b, *best;
362 unsigned iter; 360 unsigned i;
361retry:
362 best = NULL;
363
364 for_each_cached_btree(b, c, i)
365 if (btree_current_write(b)->journal) {
366 if (!best)
367 best = b;
368 else if (journal_pin_cmp(c,
369 btree_current_write(best)->journal,
370 btree_current_write(b)->journal)) {
371 best = b;
372 }
373 }
363 374
364 for_each_cached_btree(b, c, iter) { 375 b = best;
365 if (!down_write_trylock(&b->lock)) 376 if (b) {
366 continue; 377 rw_lock(true, b, b->level);
367 378
368 if (!btree_node_dirty(b) || 379 if (!btree_current_write(b)->journal) {
369 !btree_current_write(b)->journal) {
370 rw_unlock(true, b); 380 rw_unlock(true, b);
371 continue; 381 /* We raced */
382 goto retry;
372 } 383 }
373 384
374 if (!best) 385 bch_btree_node_write(b, NULL);
375 best = b; 386 rw_unlock(true, b);
376 else if (journal_pin_cmp(c,
377 btree_current_write(best),
378 btree_current_write(b))) {
379 rw_unlock(true, best);
380 best = b;
381 } else
382 rw_unlock(true, b);
383 } 387 }
384
385 if (best)
386 goto out;
387
388 /* We can't find the best btree node, just pick the first */
389 list_for_each_entry(b, &c->btree_cache, list)
390 if (!b->level && btree_node_dirty(b)) {
391 best = b;
392 rw_lock(true, best, best->level);
393 goto found;
394 }
395
396out:
397 if (!best)
398 return;
399found:
400 if (btree_node_dirty(best))
401 bch_btree_node_write(best, NULL);
402 rw_unlock(true, best);
403} 388}
404 389
405#define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) 390#define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1)
@@ -495,7 +480,7 @@ static void journal_reclaim(struct cache_set *c)
495 do_journal_discard(ca); 480 do_journal_discard(ca);
496 481
497 if (c->journal.blocks_free) 482 if (c->journal.blocks_free)
498 return; 483 goto out;
499 484
500 /* 485 /*
501 * Allocate: 486 * Allocate:
@@ -521,7 +506,7 @@ static void journal_reclaim(struct cache_set *c)
521 506
522 if (n) 507 if (n)
523 c->journal.blocks_free = c->sb.bucket_size >> c->block_bits; 508 c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
524 509out:
525 if (!journal_full(&c->journal)) 510 if (!journal_full(&c->journal))
526 __closure_wake_up(&c->journal.wait); 511 __closure_wake_up(&c->journal.wait);
527} 512}
@@ -554,32 +539,26 @@ static void journal_write_endio(struct bio *bio, int error)
554 struct journal_write *w = bio->bi_private; 539 struct journal_write *w = bio->bi_private;
555 540
556 cache_set_err_on(error, w->c, "journal io error"); 541 cache_set_err_on(error, w->c, "journal io error");
557 closure_put(&w->c->journal.io.cl); 542 closure_put(&w->c->journal.io);
558} 543}
559 544
560static void journal_write(struct closure *); 545static void journal_write(struct closure *);
561 546
562static void journal_write_done(struct closure *cl) 547static void journal_write_done(struct closure *cl)
563{ 548{
564 struct journal *j = container_of(cl, struct journal, io.cl); 549 struct journal *j = container_of(cl, struct journal, io);
565 struct cache_set *c = container_of(j, struct cache_set, journal);
566
567 struct journal_write *w = (j->cur == j->w) 550 struct journal_write *w = (j->cur == j->w)
568 ? &j->w[1] 551 ? &j->w[1]
569 : &j->w[0]; 552 : &j->w[0];
570 553
571 __closure_wake_up(&w->wait); 554 __closure_wake_up(&w->wait);
572 555 continue_at_nobarrier(cl, journal_write, system_wq);
573 if (c->journal_delay_ms)
574 closure_delay(&j->io, msecs_to_jiffies(c->journal_delay_ms));
575
576 continue_at(cl, journal_write, system_wq);
577} 556}
578 557
579static void journal_write_unlocked(struct closure *cl) 558static void journal_write_unlocked(struct closure *cl)
580 __releases(c->journal.lock) 559 __releases(c->journal.lock)
581{ 560{
582 struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl); 561 struct cache_set *c = container_of(cl, struct cache_set, journal.io);
583 struct cache *ca; 562 struct cache *ca;
584 struct journal_write *w = c->journal.cur; 563 struct journal_write *w = c->journal.cur;
585 struct bkey *k = &c->journal.key; 564 struct bkey *k = &c->journal.key;
@@ -617,7 +596,7 @@ static void journal_write_unlocked(struct closure *cl)
617 for_each_cache(ca, c, i) 596 for_each_cache(ca, c, i)
618 w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0]; 597 w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0];
619 598
620 w->data->magic = jset_magic(c); 599 w->data->magic = jset_magic(&c->sb);
621 w->data->version = BCACHE_JSET_VERSION; 600 w->data->version = BCACHE_JSET_VERSION;
622 w->data->last_seq = last_seq(&c->journal); 601 w->data->last_seq = last_seq(&c->journal);
623 w->data->csum = csum_set(w->data); 602 w->data->csum = csum_set(w->data);
@@ -660,121 +639,134 @@ static void journal_write_unlocked(struct closure *cl)
660 639
661static void journal_write(struct closure *cl) 640static void journal_write(struct closure *cl)
662{ 641{
663 struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl); 642 struct cache_set *c = container_of(cl, struct cache_set, journal.io);
664 643
665 spin_lock(&c->journal.lock); 644 spin_lock(&c->journal.lock);
666 journal_write_unlocked(cl); 645 journal_write_unlocked(cl);
667} 646}
668 647
669static void __journal_try_write(struct cache_set *c, bool noflush) 648static void journal_try_write(struct cache_set *c)
670 __releases(c->journal.lock) 649 __releases(c->journal.lock)
671{ 650{
672 struct closure *cl = &c->journal.io.cl; 651 struct closure *cl = &c->journal.io;
652 struct journal_write *w = c->journal.cur;
673 653
674 if (!closure_trylock(cl, &c->cl)) 654 w->need_write = true;
675 spin_unlock(&c->journal.lock); 655
676 else if (noflush && journal_full(&c->journal)) { 656 if (closure_trylock(cl, &c->cl))
677 spin_unlock(&c->journal.lock);
678 continue_at(cl, journal_write, system_wq);
679 } else
680 journal_write_unlocked(cl); 657 journal_write_unlocked(cl);
658 else
659 spin_unlock(&c->journal.lock);
681} 660}
682 661
683#define journal_try_write(c) __journal_try_write(c, false) 662static struct journal_write *journal_wait_for_write(struct cache_set *c,
684 663 unsigned nkeys)
685void bch_journal_meta(struct cache_set *c, struct closure *cl)
686{ 664{
687 struct journal_write *w; 665 size_t sectors;
666 struct closure cl;
688 667
689 if (CACHE_SYNC(&c->sb)) { 668 closure_init_stack(&cl);
690 spin_lock(&c->journal.lock); 669
670 spin_lock(&c->journal.lock);
691 671
692 w = c->journal.cur; 672 while (1) {
693 w->need_write = true; 673 struct journal_write *w = c->journal.cur;
694 674
695 if (cl) 675 sectors = __set_blocks(w->data, w->data->keys + nkeys,
696 BUG_ON(!closure_wait(&w->wait, cl)); 676 c) * c->sb.block_size;
697 677
698 closure_flush(&c->journal.io); 678 if (sectors <= min_t(size_t,
699 __journal_try_write(c, true); 679 c->journal.blocks_free * c->sb.block_size,
680 PAGE_SECTORS << JSET_BITS))
681 return w;
682
683 /* XXX: tracepoint */
684 if (!journal_full(&c->journal)) {
685 trace_bcache_journal_entry_full(c);
686
687 /*
688 * XXX: If we were inserting so many keys that they
689 * won't fit in an _empty_ journal write, we'll
690 * deadlock. For now, handle this in
691 * bch_keylist_realloc() - but something to think about.
692 */
693 BUG_ON(!w->data->keys);
694
695 closure_wait(&w->wait, &cl);
696 journal_try_write(c); /* unlocks */
697 } else {
698 trace_bcache_journal_full(c);
699
700 closure_wait(&c->journal.wait, &cl);
701 journal_reclaim(c);
702 spin_unlock(&c->journal.lock);
703
704 btree_flush_write(c);
705 }
706
707 closure_sync(&cl);
708 spin_lock(&c->journal.lock);
700 } 709 }
701} 710}
702 711
712static void journal_write_work(struct work_struct *work)
713{
714 struct cache_set *c = container_of(to_delayed_work(work),
715 struct cache_set,
716 journal.work);
717 spin_lock(&c->journal.lock);
718 journal_try_write(c);
719}
720
703/* 721/*
704 * Entry point to the journalling code - bio_insert() and btree_invalidate() 722 * Entry point to the journalling code - bio_insert() and btree_invalidate()
705 * pass bch_journal() a list of keys to be journalled, and then 723 * pass bch_journal() a list of keys to be journalled, and then
706 * bch_journal() hands those same keys off to btree_insert_async() 724 * bch_journal() hands those same keys off to btree_insert_async()
707 */ 725 */
708 726
709void bch_journal(struct closure *cl) 727atomic_t *bch_journal(struct cache_set *c,
728 struct keylist *keys,
729 struct closure *parent)
710{ 730{
711 struct btree_op *op = container_of(cl, struct btree_op, cl);
712 struct cache_set *c = op->c;
713 struct journal_write *w; 731 struct journal_write *w;
714 size_t b, n = ((uint64_t *) op->keys.top) - op->keys.list; 732 atomic_t *ret;
715
716 if (op->type != BTREE_INSERT ||
717 !CACHE_SYNC(&c->sb))
718 goto out;
719 733
720 /* 734 if (!CACHE_SYNC(&c->sb))
721 * If we're looping because we errored, might already be waiting on 735 return NULL;
722 * another journal write:
723 */
724 while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING)
725 closure_sync(cl->parent);
726 736
727 spin_lock(&c->journal.lock); 737 w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
728 738
729 if (journal_full(&c->journal)) { 739 memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys));
730 trace_bcache_journal_full(c); 740 w->data->keys += bch_keylist_nkeys(keys);
731 741
732 closure_wait(&c->journal.wait, cl); 742 ret = &fifo_back(&c->journal.pin);
743 atomic_inc(ret);
733 744
734 journal_reclaim(c); 745 if (parent) {
746 closure_wait(&w->wait, parent);
747 journal_try_write(c);
748 } else if (!w->need_write) {
749 schedule_delayed_work(&c->journal.work,
750 msecs_to_jiffies(c->journal_delay_ms));
751 spin_unlock(&c->journal.lock);
752 } else {
735 spin_unlock(&c->journal.lock); 753 spin_unlock(&c->journal.lock);
736
737 btree_flush_write(c);
738 continue_at(cl, bch_journal, bcache_wq);
739 } 754 }
740 755
741 w = c->journal.cur;
742 w->need_write = true;
743 b = __set_blocks(w->data, w->data->keys + n, c);
744
745 if (b * c->sb.block_size > PAGE_SECTORS << JSET_BITS ||
746 b > c->journal.blocks_free) {
747 trace_bcache_journal_entry_full(c);
748
749 /*
750 * XXX: If we were inserting so many keys that they won't fit in
751 * an _empty_ journal write, we'll deadlock. For now, handle
752 * this in bch_keylist_realloc() - but something to think about.
753 */
754 BUG_ON(!w->data->keys);
755
756 BUG_ON(!closure_wait(&w->wait, cl));
757
758 closure_flush(&c->journal.io);
759 756
760 journal_try_write(c); 757 return ret;
761 continue_at(cl, bch_journal, bcache_wq); 758}
762 }
763
764 memcpy(end(w->data), op->keys.list, n * sizeof(uint64_t));
765 w->data->keys += n;
766 759
767 op->journal = &fifo_back(&c->journal.pin); 760void bch_journal_meta(struct cache_set *c, struct closure *cl)
768 atomic_inc(op->journal); 761{
762 struct keylist keys;
763 atomic_t *ref;
769 764
770 if (op->flush_journal) { 765 bch_keylist_init(&keys);
771 closure_flush(&c->journal.io);
772 closure_wait(&w->wait, cl->parent);
773 }
774 766
775 journal_try_write(c); 767 ref = bch_journal(c, &keys, cl);
776out: 768 if (ref)
777 bch_btree_insert_async(cl); 769 atomic_dec_bug(ref);
778} 770}
779 771
780void bch_journal_free(struct cache_set *c) 772void bch_journal_free(struct cache_set *c)
@@ -790,6 +782,7 @@ int bch_journal_alloc(struct cache_set *c)
790 782
791 closure_init_unlocked(&j->io); 783 closure_init_unlocked(&j->io);
792 spin_lock_init(&j->lock); 784 spin_lock_init(&j->lock);
785 INIT_DELAYED_WORK(&j->work, journal_write_work);
793 786
794 c->journal_delay_ms = 100; 787 c->journal_delay_ms = 100;
795 788