aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/md/dm-raid1.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/md/dm-raid1.c')
-rw-r--r--drivers/md/dm-raid1.c664
1 files changed, 576 insertions, 88 deletions
diff --git a/drivers/md/dm-raid1.c b/drivers/md/dm-raid1.c
index 31123d4a6b9c..edc057f5cdcc 100644
--- a/drivers/md/dm-raid1.c
+++ b/drivers/md/dm-raid1.c
@@ -6,6 +6,7 @@
6 6
7#include "dm.h" 7#include "dm.h"
8#include "dm-bio-list.h" 8#include "dm-bio-list.h"
9#include "dm-bio-record.h"
9#include "dm-io.h" 10#include "dm-io.h"
10#include "dm-log.h" 11#include "dm-log.h"
11#include "kcopyd.h" 12#include "kcopyd.h"
@@ -20,6 +21,7 @@
20#include <linux/vmalloc.h> 21#include <linux/vmalloc.h>
21#include <linux/workqueue.h> 22#include <linux/workqueue.h>
22#include <linux/log2.h> 23#include <linux/log2.h>
24#include <linux/hardirq.h>
23 25
24#define DM_MSG_PREFIX "raid1" 26#define DM_MSG_PREFIX "raid1"
25#define DM_IO_PAGES 64 27#define DM_IO_PAGES 64
@@ -113,9 +115,16 @@ struct region {
113/*----------------------------------------------------------------- 115/*-----------------------------------------------------------------
114 * Mirror set structures. 116 * Mirror set structures.
115 *---------------------------------------------------------------*/ 117 *---------------------------------------------------------------*/
118enum dm_raid1_error {
119 DM_RAID1_WRITE_ERROR,
120 DM_RAID1_SYNC_ERROR,
121 DM_RAID1_READ_ERROR
122};
123
116struct mirror { 124struct mirror {
117 struct mirror_set *ms; 125 struct mirror_set *ms;
118 atomic_t error_count; 126 atomic_t error_count;
127 uint32_t error_type;
119 struct dm_dev *dev; 128 struct dm_dev *dev;
120 sector_t offset; 129 sector_t offset;
121}; 130};
@@ -127,21 +136,25 @@ struct mirror_set {
127 struct kcopyd_client *kcopyd_client; 136 struct kcopyd_client *kcopyd_client;
128 uint64_t features; 137 uint64_t features;
129 138
130 spinlock_t lock; /* protects the next two lists */ 139 spinlock_t lock; /* protects the lists */
131 struct bio_list reads; 140 struct bio_list reads;
132 struct bio_list writes; 141 struct bio_list writes;
142 struct bio_list failures;
133 143
134 struct dm_io_client *io_client; 144 struct dm_io_client *io_client;
145 mempool_t *read_record_pool;
135 146
136 /* recovery */ 147 /* recovery */
137 region_t nr_regions; 148 region_t nr_regions;
138 int in_sync; 149 int in_sync;
139 int log_failure; 150 int log_failure;
151 atomic_t suspend;
140 152
141 struct mirror *default_mirror; /* Default mirror */ 153 atomic_t default_mirror; /* Default mirror */
142 154
143 struct workqueue_struct *kmirrord_wq; 155 struct workqueue_struct *kmirrord_wq;
144 struct work_struct kmirrord_work; 156 struct work_struct kmirrord_work;
157 struct work_struct trigger_event;
145 158
146 unsigned int nr_mirrors; 159 unsigned int nr_mirrors;
147 struct mirror mirror[0]; 160 struct mirror mirror[0];
@@ -362,6 +375,16 @@ static void complete_resync_work(struct region *reg, int success)
362 struct region_hash *rh = reg->rh; 375 struct region_hash *rh = reg->rh;
363 376
364 rh->log->type->set_region_sync(rh->log, reg->key, success); 377 rh->log->type->set_region_sync(rh->log, reg->key, success);
378
379 /*
380 * Dispatch the bios before we call 'wake_up_all'.
381 * This is important because if we are suspending,
382 * we want to know that recovery is complete and
383 * the work queue is flushed. If we wake_up_all
384 * before we dispatch_bios (queue bios and call wake()),
385 * then we risk suspending before the work queue
386 * has been properly flushed.
387 */
365 dispatch_bios(rh->ms, &reg->delayed_bios); 388 dispatch_bios(rh->ms, &reg->delayed_bios);
366 if (atomic_dec_and_test(&rh->recovery_in_flight)) 389 if (atomic_dec_and_test(&rh->recovery_in_flight))
367 wake_up_all(&_kmirrord_recovery_stopped); 390 wake_up_all(&_kmirrord_recovery_stopped);
@@ -626,24 +649,101 @@ static void rh_start_recovery(struct region_hash *rh)
626 wake(rh->ms); 649 wake(rh->ms);
627} 650}
628 651
652#define MIN_READ_RECORDS 20
653struct dm_raid1_read_record {
654 struct mirror *m;
655 struct dm_bio_details details;
656};
657
629/* 658/*
630 * Every mirror should look like this one. 659 * Every mirror should look like this one.
631 */ 660 */
632#define DEFAULT_MIRROR 0 661#define DEFAULT_MIRROR 0
633 662
634/* 663/*
635 * This is yucky. We squirrel the mirror_set struct away inside 664 * This is yucky. We squirrel the mirror struct away inside
636 * bi_next for write buffers. This is safe since the bh 665 * bi_next for read/write buffers. This is safe since the bh
637 * doesn't get submitted to the lower levels of block layer. 666 * doesn't get submitted to the lower levels of block layer.
638 */ 667 */
639static struct mirror_set *bio_get_ms(struct bio *bio) 668static struct mirror *bio_get_m(struct bio *bio)
669{
670 return (struct mirror *) bio->bi_next;
671}
672
673static void bio_set_m(struct bio *bio, struct mirror *m)
674{
675 bio->bi_next = (struct bio *) m;
676}
677
678static struct mirror *get_default_mirror(struct mirror_set *ms)
640{ 679{
641 return (struct mirror_set *) bio->bi_next; 680 return &ms->mirror[atomic_read(&ms->default_mirror)];
642} 681}
643 682
644static void bio_set_ms(struct bio *bio, struct mirror_set *ms) 683static void set_default_mirror(struct mirror *m)
645{ 684{
646 bio->bi_next = (struct bio *) ms; 685 struct mirror_set *ms = m->ms;
686 struct mirror *m0 = &(ms->mirror[0]);
687
688 atomic_set(&ms->default_mirror, m - m0);
689}
690
691/* fail_mirror
692 * @m: mirror device to fail
693 * @error_type: one of the enum's, DM_RAID1_*_ERROR
694 *
695 * If errors are being handled, record the type of
696 * error encountered for this device. If this type
697 * of error has already been recorded, we can return;
698 * otherwise, we must signal userspace by triggering
699 * an event. Additionally, if the device is the
700 * primary device, we must choose a new primary, but
701 * only if the mirror is in-sync.
702 *
703 * This function must not block.
704 */
705static void fail_mirror(struct mirror *m, enum dm_raid1_error error_type)
706{
707 struct mirror_set *ms = m->ms;
708 struct mirror *new;
709
710 if (!errors_handled(ms))
711 return;
712
713 /*
714 * error_count is used for nothing more than a
715 * simple way to tell if a device has encountered
716 * errors.
717 */
718 atomic_inc(&m->error_count);
719
720 if (test_and_set_bit(error_type, &m->error_type))
721 return;
722
723 if (m != get_default_mirror(ms))
724 goto out;
725
726 if (!ms->in_sync) {
727 /*
728 * Better to issue requests to same failing device
729 * than to risk returning corrupt data.
730 */
731 DMERR("Primary mirror (%s) failed while out-of-sync: "
732 "Reads may fail.", m->dev->name);
733 goto out;
734 }
735
736 for (new = ms->mirror; new < ms->mirror + ms->nr_mirrors; new++)
737 if (!atomic_read(&new->error_count)) {
738 set_default_mirror(new);
739 break;
740 }
741
742 if (unlikely(new == ms->mirror + ms->nr_mirrors))
743 DMWARN("All sides of mirror have failed.");
744
745out:
746 schedule_work(&ms->trigger_event);
647} 747}
648 748
649/*----------------------------------------------------------------- 749/*-----------------------------------------------------------------
@@ -656,15 +756,32 @@ static void bio_set_ms(struct bio *bio, struct mirror_set *ms)
656static void recovery_complete(int read_err, unsigned int write_err, 756static void recovery_complete(int read_err, unsigned int write_err,
657 void *context) 757 void *context)
658{ 758{
659 struct region *reg = (struct region *) context; 759 struct region *reg = (struct region *)context;
760 struct mirror_set *ms = reg->rh->ms;
761 int m, bit = 0;
660 762
661 if (read_err) 763 if (read_err) {
662 /* Read error means the failure of default mirror. */ 764 /* Read error means the failure of default mirror. */
663 DMERR_LIMIT("Unable to read primary mirror during recovery"); 765 DMERR_LIMIT("Unable to read primary mirror during recovery");
766 fail_mirror(get_default_mirror(ms), DM_RAID1_SYNC_ERROR);
767 }
664 768
665 if (write_err) 769 if (write_err) {
666 DMERR_LIMIT("Write error during recovery (error = 0x%x)", 770 DMERR_LIMIT("Write error during recovery (error = 0x%x)",
667 write_err); 771 write_err);
772 /*
773 * Bits correspond to devices (excluding default mirror).
774 * The default mirror cannot change during recovery.
775 */
776 for (m = 0; m < ms->nr_mirrors; m++) {
777 if (&ms->mirror[m] == get_default_mirror(ms))
778 continue;
779 if (test_bit(bit, &write_err))
780 fail_mirror(ms->mirror + m,
781 DM_RAID1_SYNC_ERROR);
782 bit++;
783 }
784 }
668 785
669 rh_recovery_end(reg, !(read_err || write_err)); 786 rh_recovery_end(reg, !(read_err || write_err));
670} 787}
@@ -678,7 +795,7 @@ static int recover(struct mirror_set *ms, struct region *reg)
678 unsigned long flags = 0; 795 unsigned long flags = 0;
679 796
680 /* fill in the source */ 797 /* fill in the source */
681 m = ms->default_mirror; 798 m = get_default_mirror(ms);
682 from.bdev = m->dev->bdev; 799 from.bdev = m->dev->bdev;
683 from.sector = m->offset + region_to_sector(reg->rh, reg->key); 800 from.sector = m->offset + region_to_sector(reg->rh, reg->key);
684 if (reg->key == (ms->nr_regions - 1)) { 801 if (reg->key == (ms->nr_regions - 1)) {
@@ -694,7 +811,7 @@ static int recover(struct mirror_set *ms, struct region *reg)
694 811
695 /* fill in the destinations */ 812 /* fill in the destinations */
696 for (i = 0, dest = to; i < ms->nr_mirrors; i++) { 813 for (i = 0, dest = to; i < ms->nr_mirrors; i++) {
697 if (&ms->mirror[i] == ms->default_mirror) 814 if (&ms->mirror[i] == get_default_mirror(ms))
698 continue; 815 continue;
699 816
700 m = ms->mirror + i; 817 m = ms->mirror + i;
@@ -748,17 +865,105 @@ static void do_recovery(struct mirror_set *ms)
748 *---------------------------------------------------------------*/ 865 *---------------------------------------------------------------*/
749static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector) 866static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector)
750{ 867{
751 /* FIXME: add read balancing */ 868 struct mirror *m = get_default_mirror(ms);
752 return ms->default_mirror; 869
870 do {
871 if (likely(!atomic_read(&m->error_count)))
872 return m;
873
874 if (m-- == ms->mirror)
875 m += ms->nr_mirrors;
876 } while (m != get_default_mirror(ms));
877
878 return NULL;
879}
880
881static int default_ok(struct mirror *m)
882{
883 struct mirror *default_mirror = get_default_mirror(m->ms);
884
885 return !atomic_read(&default_mirror->error_count);
886}
887
888static int mirror_available(struct mirror_set *ms, struct bio *bio)
889{
890 region_t region = bio_to_region(&ms->rh, bio);
891
892 if (ms->rh.log->type->in_sync(ms->rh.log, region, 0))
893 return choose_mirror(ms, bio->bi_sector) ? 1 : 0;
894
895 return 0;
753} 896}
754 897
755/* 898/*
756 * remap a buffer to a particular mirror. 899 * remap a buffer to a particular mirror.
757 */ 900 */
758static void map_bio(struct mirror_set *ms, struct mirror *m, struct bio *bio) 901static sector_t map_sector(struct mirror *m, struct bio *bio)
902{
903 return m->offset + (bio->bi_sector - m->ms->ti->begin);
904}
905
906static void map_bio(struct mirror *m, struct bio *bio)
759{ 907{
760 bio->bi_bdev = m->dev->bdev; 908 bio->bi_bdev = m->dev->bdev;
761 bio->bi_sector = m->offset + (bio->bi_sector - ms->ti->begin); 909 bio->bi_sector = map_sector(m, bio);
910}
911
912static void map_region(struct io_region *io, struct mirror *m,
913 struct bio *bio)
914{
915 io->bdev = m->dev->bdev;
916 io->sector = map_sector(m, bio);
917 io->count = bio->bi_size >> 9;
918}
919
920/*-----------------------------------------------------------------
921 * Reads
922 *---------------------------------------------------------------*/
923static void read_callback(unsigned long error, void *context)
924{
925 struct bio *bio = context;
926 struct mirror *m;
927
928 m = bio_get_m(bio);
929 bio_set_m(bio, NULL);
930
931 if (likely(!error)) {
932 bio_endio(bio, 0);
933 return;
934 }
935
936 fail_mirror(m, DM_RAID1_READ_ERROR);
937
938 if (likely(default_ok(m)) || mirror_available(m->ms, bio)) {
939 DMWARN_LIMIT("Read failure on mirror device %s. "
940 "Trying alternative device.",
941 m->dev->name);
942 queue_bio(m->ms, bio, bio_rw(bio));
943 return;
944 }
945
946 DMERR_LIMIT("Read failure on mirror device %s. Failing I/O.",
947 m->dev->name);
948 bio_endio(bio, -EIO);
949}
950
951/* Asynchronous read. */
952static void read_async_bio(struct mirror *m, struct bio *bio)
953{
954 struct io_region io;
955 struct dm_io_request io_req = {
956 .bi_rw = READ,
957 .mem.type = DM_IO_BVEC,
958 .mem.ptr.bvec = bio->bi_io_vec + bio->bi_idx,
959 .notify.fn = read_callback,
960 .notify.context = bio,
961 .client = m->ms->io_client,
962 };
963
964 map_region(&io, m, bio);
965 bio_set_m(bio, m);
966 (void) dm_io(&io_req, 1, &io, NULL);
762} 967}
763 968
764static void do_reads(struct mirror_set *ms, struct bio_list *reads) 969static void do_reads(struct mirror_set *ms, struct bio_list *reads)
@@ -769,17 +974,20 @@ static void do_reads(struct mirror_set *ms, struct bio_list *reads)
769 974
770 while ((bio = bio_list_pop(reads))) { 975 while ((bio = bio_list_pop(reads))) {
771 region = bio_to_region(&ms->rh, bio); 976 region = bio_to_region(&ms->rh, bio);
977 m = get_default_mirror(ms);
772 978
773 /* 979 /*
774 * We can only read balance if the region is in sync. 980 * We can only read balance if the region is in sync.
775 */ 981 */
776 if (rh_in_sync(&ms->rh, region, 1)) 982 if (likely(rh_in_sync(&ms->rh, region, 1)))
777 m = choose_mirror(ms, bio->bi_sector); 983 m = choose_mirror(ms, bio->bi_sector);
778 else 984 else if (m && atomic_read(&m->error_count))
779 m = ms->default_mirror; 985 m = NULL;
780 986
781 map_bio(ms, m, bio); 987 if (likely(m))
782 generic_make_request(bio); 988 read_async_bio(m, bio);
989 else
990 bio_endio(bio, -EIO);
783 } 991 }
784} 992}
785 993
@@ -793,15 +1001,70 @@ static void do_reads(struct mirror_set *ms, struct bio_list *reads)
793 * RECOVERING: delay the io until recovery completes 1001 * RECOVERING: delay the io until recovery completes
794 * NOSYNC: increment pending, just write to the default mirror 1002 * NOSYNC: increment pending, just write to the default mirror
795 *---------------------------------------------------------------*/ 1003 *---------------------------------------------------------------*/
1004
1005/* __bio_mark_nosync
1006 * @ms
1007 * @bio
1008 * @done
1009 * @error
1010 *
1011 * The bio was written on some mirror(s) but failed on other mirror(s).
1012 * We can successfully endio the bio but should avoid the region being
1013 * marked clean by setting the state RH_NOSYNC.
1014 *
1015 * This function is _not_ safe in interrupt context!
1016 */
1017static void __bio_mark_nosync(struct mirror_set *ms,
1018 struct bio *bio, unsigned done, int error)
1019{
1020 unsigned long flags;
1021 struct region_hash *rh = &ms->rh;
1022 struct dirty_log *log = ms->rh.log;
1023 struct region *reg;
1024 region_t region = bio_to_region(rh, bio);
1025 int recovering = 0;
1026
1027 /* We must inform the log that the sync count has changed. */
1028 log->type->set_region_sync(log, region, 0);
1029 ms->in_sync = 0;
1030
1031 read_lock(&rh->hash_lock);
1032 reg = __rh_find(rh, region);
1033 read_unlock(&rh->hash_lock);
1034
1035 /* region hash entry should exist because write was in-flight */
1036 BUG_ON(!reg);
1037 BUG_ON(!list_empty(&reg->list));
1038
1039 spin_lock_irqsave(&rh->region_lock, flags);
1040 /*
1041 * Possible cases:
1042 * 1) RH_DIRTY
1043 * 2) RH_NOSYNC: was dirty, other preceeding writes failed
1044 * 3) RH_RECOVERING: flushing pending writes
1045 * Either case, the region should have not been connected to list.
1046 */
1047 recovering = (reg->state == RH_RECOVERING);
1048 reg->state = RH_NOSYNC;
1049 BUG_ON(!list_empty(&reg->list));
1050 spin_unlock_irqrestore(&rh->region_lock, flags);
1051
1052 bio_endio(bio, error);
1053 if (recovering)
1054 complete_resync_work(reg, 0);
1055}
1056
796static void write_callback(unsigned long error, void *context) 1057static void write_callback(unsigned long error, void *context)
797{ 1058{
798 unsigned int i; 1059 unsigned i, ret = 0;
799 int uptodate = 1;
800 struct bio *bio = (struct bio *) context; 1060 struct bio *bio = (struct bio *) context;
801 struct mirror_set *ms; 1061 struct mirror_set *ms;
1062 int uptodate = 0;
1063 int should_wake = 0;
1064 unsigned long flags;
802 1065
803 ms = bio_get_ms(bio); 1066 ms = bio_get_m(bio)->ms;
804 bio_set_ms(bio, NULL); 1067 bio_set_m(bio, NULL);
805 1068
806 /* 1069 /*
807 * NOTE: We don't decrement the pending count here, 1070 * NOTE: We don't decrement the pending count here,
@@ -809,26 +1072,42 @@ static void write_callback(unsigned long error, void *context)
809 * This way we handle both writes to SYNC and NOSYNC 1072 * This way we handle both writes to SYNC and NOSYNC
810 * regions with the same code. 1073 * regions with the same code.
811 */ 1074 */
1075 if (likely(!error))
1076 goto out;
1077
1078 for (i = 0; i < ms->nr_mirrors; i++)
1079 if (test_bit(i, &error))
1080 fail_mirror(ms->mirror + i, DM_RAID1_WRITE_ERROR);
1081 else
1082 uptodate = 1;
812 1083
813 if (error) { 1084 if (unlikely(!uptodate)) {
1085 DMERR("All replicated volumes dead, failing I/O");
1086 /* None of the writes succeeded, fail the I/O. */
1087 ret = -EIO;
1088 } else if (errors_handled(ms)) {
814 /* 1089 /*
815 * only error the io if all mirrors failed. 1090 * Need to raise event. Since raising
816 * FIXME: bogus 1091 * events can block, we need to do it in
1092 * the main thread.
817 */ 1093 */
818 uptodate = 0; 1094 spin_lock_irqsave(&ms->lock, flags);
819 for (i = 0; i < ms->nr_mirrors; i++) 1095 if (!ms->failures.head)
820 if (!test_bit(i, &error)) { 1096 should_wake = 1;
821 uptodate = 1; 1097 bio_list_add(&ms->failures, bio);
822 break; 1098 spin_unlock_irqrestore(&ms->lock, flags);
823 } 1099 if (should_wake)
1100 wake(ms);
1101 return;
824 } 1102 }
825 bio_endio(bio, 0); 1103out:
1104 bio_endio(bio, ret);
826} 1105}
827 1106
828static void do_write(struct mirror_set *ms, struct bio *bio) 1107static void do_write(struct mirror_set *ms, struct bio *bio)
829{ 1108{
830 unsigned int i; 1109 unsigned int i;
831 struct io_region io[KCOPYD_MAX_REGIONS+1]; 1110 struct io_region io[ms->nr_mirrors], *dest = io;
832 struct mirror *m; 1111 struct mirror *m;
833 struct dm_io_request io_req = { 1112 struct dm_io_request io_req = {
834 .bi_rw = WRITE, 1113 .bi_rw = WRITE,
@@ -839,15 +1118,14 @@ static void do_write(struct mirror_set *ms, struct bio *bio)
839 .client = ms->io_client, 1118 .client = ms->io_client,
840 }; 1119 };
841 1120
842 for (i = 0; i < ms->nr_mirrors; i++) { 1121 for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++)
843 m = ms->mirror + i; 1122 map_region(dest++, m, bio);
844
845 io[i].bdev = m->dev->bdev;
846 io[i].sector = m->offset + (bio->bi_sector - ms->ti->begin);
847 io[i].count = bio->bi_size >> 9;
848 }
849 1123
850 bio_set_ms(bio, ms); 1124 /*
1125 * Use default mirror because we only need it to retrieve the reference
1126 * to the mirror set in write_callback().
1127 */
1128 bio_set_m(bio, get_default_mirror(ms));
851 1129
852 (void) dm_io(&io_req, ms->nr_mirrors, io, NULL); 1130 (void) dm_io(&io_req, ms->nr_mirrors, io, NULL);
853} 1131}
@@ -900,43 +1178,125 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
900 /* 1178 /*
901 * Dispatch io. 1179 * Dispatch io.
902 */ 1180 */
903 if (unlikely(ms->log_failure)) 1181 if (unlikely(ms->log_failure)) {
1182 spin_lock_irq(&ms->lock);
1183 bio_list_merge(&ms->failures, &sync);
1184 spin_unlock_irq(&ms->lock);
1185 } else
904 while ((bio = bio_list_pop(&sync))) 1186 while ((bio = bio_list_pop(&sync)))
905 bio_endio(bio, -EIO); 1187 do_write(ms, bio);
906 else while ((bio = bio_list_pop(&sync)))
907 do_write(ms, bio);
908 1188
909 while ((bio = bio_list_pop(&recover))) 1189 while ((bio = bio_list_pop(&recover)))
910 rh_delay(&ms->rh, bio); 1190 rh_delay(&ms->rh, bio);
911 1191
912 while ((bio = bio_list_pop(&nosync))) { 1192 while ((bio = bio_list_pop(&nosync))) {
913 map_bio(ms, ms->default_mirror, bio); 1193 map_bio(get_default_mirror(ms), bio);
914 generic_make_request(bio); 1194 generic_make_request(bio);
915 } 1195 }
916} 1196}
917 1197
1198static void do_failures(struct mirror_set *ms, struct bio_list *failures)
1199{
1200 struct bio *bio;
1201
1202 if (!failures->head)
1203 return;
1204
1205 if (!ms->log_failure) {
1206 while ((bio = bio_list_pop(failures)))
1207 __bio_mark_nosync(ms, bio, bio->bi_size, 0);
1208 return;
1209 }
1210
1211 /*
1212 * If the log has failed, unattempted writes are being
1213 * put on the failures list. We can't issue those writes
1214 * until a log has been marked, so we must store them.
1215 *
1216 * If a 'noflush' suspend is in progress, we can requeue
1217 * the I/O's to the core. This give userspace a chance
1218 * to reconfigure the mirror, at which point the core
1219 * will reissue the writes. If the 'noflush' flag is
1220 * not set, we have no choice but to return errors.
1221 *
1222 * Some writes on the failures list may have been
1223 * submitted before the log failure and represent a
1224 * failure to write to one of the devices. It is ok
1225 * for us to treat them the same and requeue them
1226 * as well.
1227 */
1228 if (dm_noflush_suspending(ms->ti)) {
1229 while ((bio = bio_list_pop(failures)))
1230 bio_endio(bio, DM_ENDIO_REQUEUE);
1231 return;
1232 }
1233
1234 if (atomic_read(&ms->suspend)) {
1235 while ((bio = bio_list_pop(failures)))
1236 bio_endio(bio, -EIO);
1237 return;
1238 }
1239
1240 spin_lock_irq(&ms->lock);
1241 bio_list_merge(&ms->failures, failures);
1242 spin_unlock_irq(&ms->lock);
1243
1244 wake(ms);
1245}
1246
1247static void trigger_event(struct work_struct *work)
1248{
1249 struct mirror_set *ms =
1250 container_of(work, struct mirror_set, trigger_event);
1251
1252 dm_table_event(ms->ti->table);
1253}
1254
918/*----------------------------------------------------------------- 1255/*-----------------------------------------------------------------
919 * kmirrord 1256 * kmirrord
920 *---------------------------------------------------------------*/ 1257 *---------------------------------------------------------------*/
921static void do_mirror(struct work_struct *work) 1258static int _do_mirror(struct work_struct *work)
922{ 1259{
923 struct mirror_set *ms =container_of(work, struct mirror_set, 1260 struct mirror_set *ms =container_of(work, struct mirror_set,
924 kmirrord_work); 1261 kmirrord_work);
925 struct bio_list reads, writes; 1262 struct bio_list reads, writes, failures;
1263 unsigned long flags;
926 1264
927 spin_lock(&ms->lock); 1265 spin_lock_irqsave(&ms->lock, flags);
928 reads = ms->reads; 1266 reads = ms->reads;
929 writes = ms->writes; 1267 writes = ms->writes;
1268 failures = ms->failures;
930 bio_list_init(&ms->reads); 1269 bio_list_init(&ms->reads);
931 bio_list_init(&ms->writes); 1270 bio_list_init(&ms->writes);
932 spin_unlock(&ms->lock); 1271 bio_list_init(&ms->failures);
1272 spin_unlock_irqrestore(&ms->lock, flags);
933 1273
934 rh_update_states(&ms->rh); 1274 rh_update_states(&ms->rh);
935 do_recovery(ms); 1275 do_recovery(ms);
936 do_reads(ms, &reads); 1276 do_reads(ms, &reads);
937 do_writes(ms, &writes); 1277 do_writes(ms, &writes);
1278 do_failures(ms, &failures);
1279
1280 return (ms->failures.head) ? 1 : 0;
938} 1281}
939 1282
1283static void do_mirror(struct work_struct *work)
1284{
1285 /*
1286 * If _do_mirror returns 1, we give it
1287 * another shot. This helps for cases like
1288 * 'suspend' where we call flush_workqueue
1289 * and expect all work to be finished. If
1290 * a failure happens during a suspend, we
1291 * couldn't issue a 'wake' because it would
1292 * not be honored. Therefore, we return '1'
1293 * from _do_mirror, and retry here.
1294 */
1295 while (_do_mirror(work))
1296 schedule();
1297}
1298
1299
940/*----------------------------------------------------------------- 1300/*-----------------------------------------------------------------
941 * Target functions 1301 * Target functions
942 *---------------------------------------------------------------*/ 1302 *---------------------------------------------------------------*/
@@ -965,11 +1325,23 @@ static struct mirror_set *alloc_context(unsigned int nr_mirrors,
965 ms->nr_mirrors = nr_mirrors; 1325 ms->nr_mirrors = nr_mirrors;
966 ms->nr_regions = dm_sector_div_up(ti->len, region_size); 1326 ms->nr_regions = dm_sector_div_up(ti->len, region_size);
967 ms->in_sync = 0; 1327 ms->in_sync = 0;
968 ms->default_mirror = &ms->mirror[DEFAULT_MIRROR]; 1328 ms->log_failure = 0;
1329 atomic_set(&ms->suspend, 0);
1330 atomic_set(&ms->default_mirror, DEFAULT_MIRROR);
1331
1332 len = sizeof(struct dm_raid1_read_record);
1333 ms->read_record_pool = mempool_create_kmalloc_pool(MIN_READ_RECORDS,
1334 len);
1335 if (!ms->read_record_pool) {
1336 ti->error = "Error creating mirror read_record_pool";
1337 kfree(ms);
1338 return NULL;
1339 }
969 1340
970 ms->io_client = dm_io_client_create(DM_IO_PAGES); 1341 ms->io_client = dm_io_client_create(DM_IO_PAGES);
971 if (IS_ERR(ms->io_client)) { 1342 if (IS_ERR(ms->io_client)) {
972 ti->error = "Error creating dm_io client"; 1343 ti->error = "Error creating dm_io client";
1344 mempool_destroy(ms->read_record_pool);
973 kfree(ms); 1345 kfree(ms);
974 return NULL; 1346 return NULL;
975 } 1347 }
@@ -977,6 +1349,7 @@ static struct mirror_set *alloc_context(unsigned int nr_mirrors,
977 if (rh_init(&ms->rh, ms, dl, region_size, ms->nr_regions)) { 1349 if (rh_init(&ms->rh, ms, dl, region_size, ms->nr_regions)) {
978 ti->error = "Error creating dirty region hash"; 1350 ti->error = "Error creating dirty region hash";
979 dm_io_client_destroy(ms->io_client); 1351 dm_io_client_destroy(ms->io_client);
1352 mempool_destroy(ms->read_record_pool);
980 kfree(ms); 1353 kfree(ms);
981 return NULL; 1354 return NULL;
982 } 1355 }
@@ -992,6 +1365,7 @@ static void free_context(struct mirror_set *ms, struct dm_target *ti,
992 1365
993 dm_io_client_destroy(ms->io_client); 1366 dm_io_client_destroy(ms->io_client);
994 rh_exit(&ms->rh); 1367 rh_exit(&ms->rh);
1368 mempool_destroy(ms->read_record_pool);
995 kfree(ms); 1369 kfree(ms);
996} 1370}
997 1371
@@ -1019,6 +1393,8 @@ static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
1019 } 1393 }
1020 1394
1021 ms->mirror[mirror].ms = ms; 1395 ms->mirror[mirror].ms = ms;
1396 atomic_set(&(ms->mirror[mirror].error_count), 0);
1397 ms->mirror[mirror].error_type = 0;
1022 ms->mirror[mirror].offset = offset; 1398 ms->mirror[mirror].offset = offset;
1023 1399
1024 return 0; 1400 return 0;
@@ -1171,6 +1547,7 @@ static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1171 goto err_free_context; 1547 goto err_free_context;
1172 } 1548 }
1173 INIT_WORK(&ms->kmirrord_work, do_mirror); 1549 INIT_WORK(&ms->kmirrord_work, do_mirror);
1550 INIT_WORK(&ms->trigger_event, trigger_event);
1174 1551
1175 r = parse_features(ms, argc, argv, &args_used); 1552 r = parse_features(ms, argc, argv, &args_used);
1176 if (r) 1553 if (r)
@@ -1220,14 +1597,15 @@ static void mirror_dtr(struct dm_target *ti)
1220 1597
1221static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw) 1598static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
1222{ 1599{
1600 unsigned long flags;
1223 int should_wake = 0; 1601 int should_wake = 0;
1224 struct bio_list *bl; 1602 struct bio_list *bl;
1225 1603
1226 bl = (rw == WRITE) ? &ms->writes : &ms->reads; 1604 bl = (rw == WRITE) ? &ms->writes : &ms->reads;
1227 spin_lock(&ms->lock); 1605 spin_lock_irqsave(&ms->lock, flags);
1228 should_wake = !(bl->head); 1606 should_wake = !(bl->head);
1229 bio_list_add(bl, bio); 1607 bio_list_add(bl, bio);
1230 spin_unlock(&ms->lock); 1608 spin_unlock_irqrestore(&ms->lock, flags);
1231 1609
1232 if (should_wake) 1610 if (should_wake)
1233 wake(ms); 1611 wake(ms);
@@ -1242,10 +1620,11 @@ static int mirror_map(struct dm_target *ti, struct bio *bio,
1242 int r, rw = bio_rw(bio); 1620 int r, rw = bio_rw(bio);
1243 struct mirror *m; 1621 struct mirror *m;
1244 struct mirror_set *ms = ti->private; 1622 struct mirror_set *ms = ti->private;
1245 1623 struct dm_raid1_read_record *read_record = NULL;
1246 map_context->ll = bio_to_region(&ms->rh, bio);
1247 1624
1248 if (rw == WRITE) { 1625 if (rw == WRITE) {
1626 /* Save region for mirror_end_io() handler */
1627 map_context->ll = bio_to_region(&ms->rh, bio);
1249 queue_bio(ms, bio, rw); 1628 queue_bio(ms, bio, rw);
1250 return DM_MAPIO_SUBMITTED; 1629 return DM_MAPIO_SUBMITTED;
1251 } 1630 }
@@ -1255,28 +1634,34 @@ static int mirror_map(struct dm_target *ti, struct bio *bio,
1255 if (r < 0 && r != -EWOULDBLOCK) 1634 if (r < 0 && r != -EWOULDBLOCK)
1256 return r; 1635 return r;
1257 1636
1258 if (r == -EWOULDBLOCK) /* FIXME: ugly */
1259 r = DM_MAPIO_SUBMITTED;
1260
1261 /* 1637 /*
1262 * We don't want to fast track a recovery just for a read 1638 * If region is not in-sync queue the bio.
1263 * ahead. So we just let it silently fail.
1264 * FIXME: get rid of this.
1265 */ 1639 */
1266 if (!r && rw == READA) 1640 if (!r || (r == -EWOULDBLOCK)) {
1267 return -EIO; 1641 if (rw == READA)
1642 return -EWOULDBLOCK;
1268 1643
1269 if (!r) {
1270 /* Pass this io over to the daemon */
1271 queue_bio(ms, bio, rw); 1644 queue_bio(ms, bio, rw);
1272 return DM_MAPIO_SUBMITTED; 1645 return DM_MAPIO_SUBMITTED;
1273 } 1646 }
1274 1647
1648 /*
1649 * The region is in-sync and we can perform reads directly.
1650 * Store enough information so we can retry if it fails.
1651 */
1275 m = choose_mirror(ms, bio->bi_sector); 1652 m = choose_mirror(ms, bio->bi_sector);
1276 if (!m) 1653 if (unlikely(!m))
1277 return -EIO; 1654 return -EIO;
1278 1655
1279 map_bio(ms, m, bio); 1656 read_record = mempool_alloc(ms->read_record_pool, GFP_NOIO);
1657 if (likely(read_record)) {
1658 dm_bio_record(&read_record->details, bio);
1659 map_context->ptr = read_record;
1660 read_record->m = m;
1661 }
1662
1663 map_bio(m, bio);
1664
1280 return DM_MAPIO_REMAPPED; 1665 return DM_MAPIO_REMAPPED;
1281} 1666}
1282 1667
@@ -1285,71 +1670,173 @@ static int mirror_end_io(struct dm_target *ti, struct bio *bio,
1285{ 1670{
1286 int rw = bio_rw(bio); 1671 int rw = bio_rw(bio);
1287 struct mirror_set *ms = (struct mirror_set *) ti->private; 1672 struct mirror_set *ms = (struct mirror_set *) ti->private;
1288 region_t region = map_context->ll; 1673 struct mirror *m = NULL;
1674 struct dm_bio_details *bd = NULL;
1675 struct dm_raid1_read_record *read_record = map_context->ptr;
1289 1676
1290 /* 1677 /*
1291 * We need to dec pending if this was a write. 1678 * We need to dec pending if this was a write.
1292 */ 1679 */
1293 if (rw == WRITE) 1680 if (rw == WRITE) {
1294 rh_dec(&ms->rh, region); 1681 rh_dec(&ms->rh, map_context->ll);
1682 return error;
1683 }
1295 1684
1296 return 0; 1685 if (error == -EOPNOTSUPP)
1686 goto out;
1687
1688 if ((error == -EWOULDBLOCK) && bio_rw_ahead(bio))
1689 goto out;
1690
1691 if (unlikely(error)) {
1692 if (!read_record) {
1693 /*
1694 * There wasn't enough memory to record necessary
1695 * information for a retry or there was no other
1696 * mirror in-sync.
1697 */
1698 DMERR_LIMIT("Mirror read failed from %s.",
1699 m->dev->name);
1700 return -EIO;
1701 }
1702 DMERR("Mirror read failed from %s. Trying alternative device.",
1703 m->dev->name);
1704
1705 m = read_record->m;
1706 fail_mirror(m, DM_RAID1_READ_ERROR);
1707
1708 /*
1709 * A failed read is requeued for another attempt using an intact
1710 * mirror.
1711 */
1712 if (default_ok(m) || mirror_available(ms, bio)) {
1713 bd = &read_record->details;
1714
1715 dm_bio_restore(bd, bio);
1716 mempool_free(read_record, ms->read_record_pool);
1717 map_context->ptr = NULL;
1718 queue_bio(ms, bio, rw);
1719 return 1;
1720 }
1721 DMERR("All replicated volumes dead, failing I/O");
1722 }
1723
1724out:
1725 if (read_record) {
1726 mempool_free(read_record, ms->read_record_pool);
1727 map_context->ptr = NULL;
1728 }
1729
1730 return error;
1297} 1731}
1298 1732
1299static void mirror_postsuspend(struct dm_target *ti) 1733static void mirror_presuspend(struct dm_target *ti)
1300{ 1734{
1301 struct mirror_set *ms = (struct mirror_set *) ti->private; 1735 struct mirror_set *ms = (struct mirror_set *) ti->private;
1302 struct dirty_log *log = ms->rh.log; 1736 struct dirty_log *log = ms->rh.log;
1303 1737
1738 atomic_set(&ms->suspend, 1);
1739
1740 /*
1741 * We must finish up all the work that we've
1742 * generated (i.e. recovery work).
1743 */
1304 rh_stop_recovery(&ms->rh); 1744 rh_stop_recovery(&ms->rh);
1305 1745
1306 /* Wait for all I/O we generated to complete */
1307 wait_event(_kmirrord_recovery_stopped, 1746 wait_event(_kmirrord_recovery_stopped,
1308 !atomic_read(&ms->rh.recovery_in_flight)); 1747 !atomic_read(&ms->rh.recovery_in_flight));
1309 1748
1749 if (log->type->presuspend && log->type->presuspend(log))
1750 /* FIXME: need better error handling */
1751 DMWARN("log presuspend failed");
1752
1753 /*
1754 * Now that recovery is complete/stopped and the
1755 * delayed bios are queued, we need to wait for
1756 * the worker thread to complete. This way,
1757 * we know that all of our I/O has been pushed.
1758 */
1759 flush_workqueue(ms->kmirrord_wq);
1760}
1761
1762static void mirror_postsuspend(struct dm_target *ti)
1763{
1764 struct mirror_set *ms = ti->private;
1765 struct dirty_log *log = ms->rh.log;
1766
1310 if (log->type->postsuspend && log->type->postsuspend(log)) 1767 if (log->type->postsuspend && log->type->postsuspend(log))
1311 /* FIXME: need better error handling */ 1768 /* FIXME: need better error handling */
1312 DMWARN("log suspend failed"); 1769 DMWARN("log postsuspend failed");
1313} 1770}
1314 1771
1315static void mirror_resume(struct dm_target *ti) 1772static void mirror_resume(struct dm_target *ti)
1316{ 1773{
1317 struct mirror_set *ms = (struct mirror_set *) ti->private; 1774 struct mirror_set *ms = ti->private;
1318 struct dirty_log *log = ms->rh.log; 1775 struct dirty_log *log = ms->rh.log;
1776
1777 atomic_set(&ms->suspend, 0);
1319 if (log->type->resume && log->type->resume(log)) 1778 if (log->type->resume && log->type->resume(log))
1320 /* FIXME: need better error handling */ 1779 /* FIXME: need better error handling */
1321 DMWARN("log resume failed"); 1780 DMWARN("log resume failed");
1322 rh_start_recovery(&ms->rh); 1781 rh_start_recovery(&ms->rh);
1323} 1782}
1324 1783
1784/*
1785 * device_status_char
1786 * @m: mirror device/leg we want the status of
1787 *
1788 * We return one character representing the most severe error
1789 * we have encountered.
1790 * A => Alive - No failures
1791 * D => Dead - A write failure occurred leaving mirror out-of-sync
1792 * S => Sync - A sychronization failure occurred, mirror out-of-sync
1793 * R => Read - A read failure occurred, mirror data unaffected
1794 *
1795 * Returns: <char>
1796 */
1797static char device_status_char(struct mirror *m)
1798{
1799 if (!atomic_read(&(m->error_count)))
1800 return 'A';
1801
1802 return (test_bit(DM_RAID1_WRITE_ERROR, &(m->error_type))) ? 'D' :
1803 (test_bit(DM_RAID1_SYNC_ERROR, &(m->error_type))) ? 'S' :
1804 (test_bit(DM_RAID1_READ_ERROR, &(m->error_type))) ? 'R' : 'U';
1805}
1806
1807
1325static int mirror_status(struct dm_target *ti, status_type_t type, 1808static int mirror_status(struct dm_target *ti, status_type_t type,
1326 char *result, unsigned int maxlen) 1809 char *result, unsigned int maxlen)
1327{ 1810{
1328 unsigned int m, sz = 0; 1811 unsigned int m, sz = 0;
1329 struct mirror_set *ms = (struct mirror_set *) ti->private; 1812 struct mirror_set *ms = (struct mirror_set *) ti->private;
1813 struct dirty_log *log = ms->rh.log;
1814 char buffer[ms->nr_mirrors + 1];
1330 1815
1331 switch (type) { 1816 switch (type) {
1332 case STATUSTYPE_INFO: 1817 case STATUSTYPE_INFO:
1333 DMEMIT("%d ", ms->nr_mirrors); 1818 DMEMIT("%d ", ms->nr_mirrors);
1334 for (m = 0; m < ms->nr_mirrors; m++) 1819 for (m = 0; m < ms->nr_mirrors; m++) {
1335 DMEMIT("%s ", ms->mirror[m].dev->name); 1820 DMEMIT("%s ", ms->mirror[m].dev->name);
1821 buffer[m] = device_status_char(&(ms->mirror[m]));
1822 }
1823 buffer[m] = '\0';
1336 1824
1337 DMEMIT("%llu/%llu 0 ", 1825 DMEMIT("%llu/%llu 1 %s ",
1338 (unsigned long long)ms->rh.log->type-> 1826 (unsigned long long)log->type->get_sync_count(ms->rh.log),
1339 get_sync_count(ms->rh.log), 1827 (unsigned long long)ms->nr_regions, buffer);
1340 (unsigned long long)ms->nr_regions);
1341 1828
1342 sz += ms->rh.log->type->status(ms->rh.log, type, result+sz, maxlen-sz); 1829 sz += log->type->status(ms->rh.log, type, result+sz, maxlen-sz);
1343 1830
1344 break; 1831 break;
1345 1832
1346 case STATUSTYPE_TABLE: 1833 case STATUSTYPE_TABLE:
1347 sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen); 1834 sz = log->type->status(ms->rh.log, type, result, maxlen);
1348 1835
1349 DMEMIT("%d", ms->nr_mirrors); 1836 DMEMIT("%d", ms->nr_mirrors);
1350 for (m = 0; m < ms->nr_mirrors; m++) 1837 for (m = 0; m < ms->nr_mirrors; m++)
1351 DMEMIT(" %s %llu", ms->mirror[m].dev->name, 1838 DMEMIT(" %s %llu", ms->mirror[m].dev->name,
1352 (unsigned long long)ms->mirror[m].offset); 1839 (unsigned long long)ms->mirror[m].offset);
1353 1840
1354 if (ms->features & DM_RAID1_HANDLE_ERRORS) 1841 if (ms->features & DM_RAID1_HANDLE_ERRORS)
1355 DMEMIT(" 1 handle_errors"); 1842 DMEMIT(" 1 handle_errors");
@@ -1360,12 +1847,13 @@ static int mirror_status(struct dm_target *ti, status_type_t type,
1360 1847
1361static struct target_type mirror_target = { 1848static struct target_type mirror_target = {
1362 .name = "mirror", 1849 .name = "mirror",
1363 .version = {1, 0, 3}, 1850 .version = {1, 0, 20},
1364 .module = THIS_MODULE, 1851 .module = THIS_MODULE,
1365 .ctr = mirror_ctr, 1852 .ctr = mirror_ctr,
1366 .dtr = mirror_dtr, 1853 .dtr = mirror_dtr,
1367 .map = mirror_map, 1854 .map = mirror_map,
1368 .end_io = mirror_end_io, 1855 .end_io = mirror_end_io,
1856 .presuspend = mirror_presuspend,
1369 .postsuspend = mirror_postsuspend, 1857 .postsuspend = mirror_postsuspend,
1370 .resume = mirror_resume, 1858 .resume = mirror_resume,
1371 .status = mirror_status, 1859 .status = mirror_status,