aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2018-05-04 10:57:31 -0400
committerIlya Dryomov <idryomov@gmail.com>2018-05-10 04:15:12 -0400
commitfc218544fbc800d1c91348ec834cacfb257348f7 (patch)
tree7a9216495d534db12d61093fc8f762798f1982f7
parent0010f7052d6cb71c4b120238e28cd3fa413913d1 (diff)
ceph: fix iov_iter issues in ceph_direct_read_write()
dio_get_pagev_size() and dio_get_pages_alloc() introduced in commit b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD request") assume that the passed iov_iter is ITER_IOVEC. This isn't the case with splice where it ends up poking into the guts of ITER_BVEC or ITER_PIPE iterators, causing lockups and crashes easily reproduced with generic/095. Rather than trying to figure out gap alignment and stuff pages into a page vector, add a helper for going from iov_iter to a bio_vec array and make use of the new CEPH_OSD_DATA_TYPE_BVECS code. Fixes: b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD request") Link: http://tracker.ceph.com/issues/18130 Signed-off-by: Ilya Dryomov <idryomov@gmail.com> Reviewed-by: Jeff Layton <jlayton@redhat.com> Reviewed-by: "Yan, Zheng" <zyan@redhat.com> Tested-by: Luis Henriques <lhenriques@suse.com>
-rw-r--r--fs/ceph/file.c195
1 files changed, 117 insertions, 78 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 8ce7849f3fbd..cf0e45b10121 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -70,69 +70,104 @@ static __le32 ceph_flags_sys2wire(u32 flags)
70 */ 70 */
71 71
72/* 72/*
73 * Calculate the length sum of direct io vectors that can 73 * How many pages to get in one call to iov_iter_get_pages(). This
74 * be combined into one page vector. 74 * determines the size of the on-stack array used as a buffer.
75 */ 75 */
76static size_t dio_get_pagev_size(const struct iov_iter *it) 76#define ITER_GET_BVECS_PAGES 64
77
78static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
79 struct bio_vec *bvecs)
77{ 80{
78 const struct iovec *iov = it->iov; 81 size_t size = 0;
79 const struct iovec *iovend = iov + it->nr_segs; 82 int bvec_idx = 0;
80 size_t size; 83
81 84 if (maxsize > iov_iter_count(iter))
82 size = iov->iov_len - it->iov_offset; 85 maxsize = iov_iter_count(iter);
83 /* 86
84 * An iov can be page vectored when both the current tail 87 while (size < maxsize) {
85 * and the next base are page aligned. 88 struct page *pages[ITER_GET_BVECS_PAGES];
86 */ 89 ssize_t bytes;
87 while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) && 90 size_t start;
88 (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) { 91 int idx = 0;
89 size += iov->iov_len; 92
90 } 93 bytes = iov_iter_get_pages(iter, pages, maxsize - size,
91 dout("dio_get_pagevlen len = %zu\n", size); 94 ITER_GET_BVECS_PAGES, &start);
92 return size; 95 if (bytes < 0)
96 return size ?: bytes;
97
98 iov_iter_advance(iter, bytes);
99 size += bytes;
100
101 for ( ; bytes; idx++, bvec_idx++) {
102 struct bio_vec bv = {
103 .bv_page = pages[idx],
104 .bv_len = min_t(int, bytes, PAGE_SIZE - start),
105 .bv_offset = start,
106 };
107
108 bvecs[bvec_idx] = bv;
109 bytes -= bv.bv_len;
110 start = 0;
111 }
112 }
113
114 return size;
93} 115}
94 116
95/* 117/*
96 * Allocate a page vector based on (@it, @nbytes). 118 * iov_iter_get_pages() only considers one iov_iter segment, no matter
97 * The return value is the tuple describing a page vector, 119 * what maxsize or maxpages are given. For ITER_BVEC that is a single
98 * that is (@pages, @page_align, @num_pages). 120 * page.
121 *
122 * Attempt to get up to @maxsize bytes worth of pages from @iter.
123 * Return the number of bytes in the created bio_vec array, or an error.
99 */ 124 */
100static struct page ** 125static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize,
101dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes, 126 struct bio_vec **bvecs, int *num_bvecs)
102 size_t *page_align, int *num_pages)
103{ 127{
104 struct iov_iter tmp_it = *it; 128 struct bio_vec *bv;
105 size_t align; 129 size_t orig_count = iov_iter_count(iter);
106 struct page **pages; 130 ssize_t bytes;
107 int ret = 0, idx, npages; 131 int npages;
108 132
109 align = (unsigned long)(it->iov->iov_base + it->iov_offset) & 133 iov_iter_truncate(iter, maxsize);
110 (PAGE_SIZE - 1); 134 npages = iov_iter_npages(iter, INT_MAX);
111 npages = calc_pages_for(align, nbytes); 135 iov_iter_reexpand(iter, orig_count);
112 pages = kvmalloc(sizeof(*pages) * npages, GFP_KERNEL);
113 if (!pages)
114 return ERR_PTR(-ENOMEM);
115 136
116 for (idx = 0; idx < npages; ) { 137 /*
117 size_t start; 138 * __iter_get_bvecs() may populate only part of the array -- zero it
118 ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes, 139 * out.
119 npages - idx, &start); 140 */
120 if (ret < 0) 141 bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO);
121 goto fail; 142 if (!bv)
143 return -ENOMEM;
122 144
123 iov_iter_advance(&tmp_it, ret); 145 bytes = __iter_get_bvecs(iter, maxsize, bv);
124 nbytes -= ret; 146 if (bytes < 0) {
125 idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE; 147 /*
148 * No pages were pinned -- just free the array.
149 */
150 kvfree(bv);
151 return bytes;
126 } 152 }
127 153
128 BUG_ON(nbytes != 0); 154 *bvecs = bv;
129 *num_pages = npages; 155 *num_bvecs = npages;
130 *page_align = align; 156 return bytes;
131 dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align); 157}
132 return pages; 158
133fail: 159static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty)
134 ceph_put_page_vector(pages, idx, false); 160{
135 return ERR_PTR(ret); 161 int i;
162
163 for (i = 0; i < num_bvecs; i++) {
164 if (bvecs[i].bv_page) {
165 if (should_dirty)
166 set_page_dirty_lock(bvecs[i].bv_page);
167 put_page(bvecs[i].bv_page);
168 }
169 }
170 kvfree(bvecs);
136} 171}
137 172
138/* 173/*
@@ -746,11 +781,12 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
746 struct inode *inode = req->r_inode; 781 struct inode *inode = req->r_inode;
747 struct ceph_aio_request *aio_req = req->r_priv; 782 struct ceph_aio_request *aio_req = req->r_priv;
748 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); 783 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
749 int num_pages = calc_pages_for((u64)osd_data->alignment,
750 osd_data->length);
751 784
752 dout("ceph_aio_complete_req %p rc %d bytes %llu\n", 785 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
753 inode, rc, osd_data->length); 786 BUG_ON(!osd_data->num_bvecs);
787
788 dout("ceph_aio_complete_req %p rc %d bytes %u\n",
789 inode, rc, osd_data->bvec_pos.iter.bi_size);
754 790
755 if (rc == -EOLDSNAPC) { 791 if (rc == -EOLDSNAPC) {
756 struct ceph_aio_work *aio_work; 792 struct ceph_aio_work *aio_work;
@@ -768,9 +804,10 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
768 } else if (!aio_req->write) { 804 } else if (!aio_req->write) {
769 if (rc == -ENOENT) 805 if (rc == -ENOENT)
770 rc = 0; 806 rc = 0;
771 if (rc >= 0 && osd_data->length > rc) { 807 if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) {
772 int zoff = osd_data->alignment + rc; 808 struct iov_iter i;
773 int zlen = osd_data->length - rc; 809 int zlen = osd_data->bvec_pos.iter.bi_size - rc;
810
774 /* 811 /*
775 * If read is satisfied by single OSD request, 812 * If read is satisfied by single OSD request,
776 * it can pass EOF. Otherwise read is within 813 * it can pass EOF. Otherwise read is within
@@ -785,13 +822,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
785 aio_req->total_len = rc + zlen; 822 aio_req->total_len = rc + zlen;
786 } 823 }
787 824
788 if (zlen > 0) 825 iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs,
789 ceph_zero_page_vector_range(zoff, zlen, 826 osd_data->num_bvecs,
790 osd_data->pages); 827 osd_data->bvec_pos.iter.bi_size);
828 iov_iter_advance(&i, rc);
829 iov_iter_zero(zlen, &i);
791 } 830 }
792 } 831 }
793 832
794 ceph_put_page_vector(osd_data->pages, num_pages, aio_req->should_dirty); 833 put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
834 aio_req->should_dirty);
795 ceph_osdc_put_request(req); 835 ceph_osdc_put_request(req);
796 836
797 if (rc < 0) 837 if (rc < 0)
@@ -879,7 +919,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
879 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 919 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
880 struct ceph_vino vino; 920 struct ceph_vino vino;
881 struct ceph_osd_request *req; 921 struct ceph_osd_request *req;
882 struct page **pages; 922 struct bio_vec *bvecs;
883 struct ceph_aio_request *aio_req = NULL; 923 struct ceph_aio_request *aio_req = NULL;
884 int num_pages = 0; 924 int num_pages = 0;
885 int flags; 925 int flags;
@@ -914,8 +954,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
914 } 954 }
915 955
916 while (iov_iter_count(iter) > 0) { 956 while (iov_iter_count(iter) > 0) {
917 u64 size = dio_get_pagev_size(iter); 957 u64 size = iov_iter_count(iter);
918 size_t start = 0;
919 ssize_t len; 958 ssize_t len;
920 959
921 if (write) 960 if (write)
@@ -938,13 +977,14 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
938 break; 977 break;
939 } 978 }
940 979
941 len = size; 980 len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages);
942 pages = dio_get_pages_alloc(iter, len, &start, &num_pages); 981 if (len < 0) {
943 if (IS_ERR(pages)) {
944 ceph_osdc_put_request(req); 982 ceph_osdc_put_request(req);
945 ret = PTR_ERR(pages); 983 ret = len;
946 break; 984 break;
947 } 985 }
986 if (len != size)
987 osd_req_op_extent_update(req, 0, len);
948 988
949 /* 989 /*
950 * To simplify error handling, allow AIO when IO within i_size 990 * To simplify error handling, allow AIO when IO within i_size
@@ -977,8 +1017,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
977 req->r_mtime = mtime; 1017 req->r_mtime = mtime;
978 } 1018 }
979 1019
980 osd_req_op_extent_osd_data_pages(req, 0, pages, len, start, 1020 osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len);
981 false, false);
982 1021
983 if (aio_req) { 1022 if (aio_req) {
984 aio_req->total_len += len; 1023 aio_req->total_len += len;
@@ -991,7 +1030,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
991 list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); 1030 list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
992 1031
993 pos += len; 1032 pos += len;
994 iov_iter_advance(iter, len);
995 continue; 1033 continue;
996 } 1034 }
997 1035
@@ -1004,25 +1042,26 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
1004 if (ret == -ENOENT) 1042 if (ret == -ENOENT)
1005 ret = 0; 1043 ret = 0;
1006 if (ret >= 0 && ret < len && pos + ret < size) { 1044 if (ret >= 0 && ret < len && pos + ret < size) {
1045 struct iov_iter i;
1007 int zlen = min_t(size_t, len - ret, 1046 int zlen = min_t(size_t, len - ret,
1008 size - pos - ret); 1047 size - pos - ret);
1009 ceph_zero_page_vector_range(start + ret, zlen, 1048
1010 pages); 1049 iov_iter_bvec(&i, ITER_BVEC, bvecs, num_pages,
1050 len);
1051 iov_iter_advance(&i, ret);
1052 iov_iter_zero(zlen, &i);
1011 ret += zlen; 1053 ret += zlen;
1012 } 1054 }
1013 if (ret >= 0) 1055 if (ret >= 0)
1014 len = ret; 1056 len = ret;
1015 } 1057 }
1016 1058
1017 ceph_put_page_vector(pages, num_pages, should_dirty); 1059 put_bvecs(bvecs, num_pages, should_dirty);
1018
1019 ceph_osdc_put_request(req); 1060 ceph_osdc_put_request(req);
1020 if (ret < 0) 1061 if (ret < 0)
1021 break; 1062 break;
1022 1063
1023 pos += len; 1064 pos += len;
1024 iov_iter_advance(iter, len);
1025
1026 if (!write && pos >= size) 1065 if (!write && pos >= size)
1027 break; 1066 break;
1028 1067