aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c621
1 files changed, 619 insertions, 2 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 668936381ab0..daa0f18f7089 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -181,6 +181,67 @@ struct rbd_req_coll {
181 struct rbd_req_status status[0]; 181 struct rbd_req_status status[0];
182}; 182};
183 183
184struct rbd_img_request;
185typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
186
187#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
188
189struct rbd_obj_request;
190typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
191
192enum obj_request_type { OBJ_REQUEST_BIO }; /* More types to come */
193
194struct rbd_obj_request {
195 const char *object_name;
196 u64 offset; /* object start byte */
197 u64 length; /* bytes from offset */
198
199 struct rbd_img_request *img_request;
200 struct list_head links; /* img_request->obj_requests */
201 u32 which; /* posn image request list */
202
203 enum obj_request_type type;
204 struct bio *bio_list;
205
206 struct ceph_osd_request *osd_req;
207
208 u64 xferred; /* bytes transferred */
209 u64 version;
210 s32 result;
211 atomic_t done;
212
213 rbd_obj_callback_t callback;
214
215 struct kref kref;
216};
217
218struct rbd_img_request {
219 struct request *rq;
220 struct rbd_device *rbd_dev;
221 u64 offset; /* starting image byte offset */
222 u64 length; /* byte count from offset */
223 bool write_request; /* false for read */
224 union {
225 struct ceph_snap_context *snapc; /* for writes */
226 u64 snap_id; /* for reads */
227 };
228 spinlock_t completion_lock;/* protects next_completion */
229 u32 next_completion;
230 rbd_img_callback_t callback;
231
232 u32 obj_request_count;
233 struct list_head obj_requests; /* rbd_obj_request structs */
234
235 struct kref kref;
236};
237
238#define for_each_obj_request(ireq, oreq) \
239 list_for_each_entry(oreq, &ireq->obj_requests, links)
240#define for_each_obj_request_from(ireq, oreq) \
241 list_for_each_entry_from(oreq, &ireq->obj_requests, links)
242#define for_each_obj_request_safe(ireq, oreq, n) \
243 list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
244
184/* 245/*
185 * a single io request 246 * a single io request
186 */ 247 */
@@ -1031,6 +1092,62 @@ out_err:
1031 return NULL; 1092 return NULL;
1032} 1093}
1033 1094
1095static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1096{
1097 kref_get(&obj_request->kref);
1098}
1099
1100static void rbd_obj_request_destroy(struct kref *kref);
1101static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1102{
1103 rbd_assert(obj_request != NULL);
1104 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1105}
1106
1107static void rbd_img_request_get(struct rbd_img_request *img_request)
1108{
1109 kref_get(&img_request->kref);
1110}
1111
1112static void rbd_img_request_destroy(struct kref *kref);
1113static void rbd_img_request_put(struct rbd_img_request *img_request)
1114{
1115 rbd_assert(img_request != NULL);
1116 kref_put(&img_request->kref, rbd_img_request_destroy);
1117}
1118
1119static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1120 struct rbd_obj_request *obj_request)
1121{
1122 rbd_obj_request_get(obj_request);
1123 obj_request->img_request = img_request;
1124 list_add_tail(&obj_request->links, &img_request->obj_requests);
1125 obj_request->which = img_request->obj_request_count++;
1126 rbd_assert(obj_request->which != BAD_WHICH);
1127}
1128
1129static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1130 struct rbd_obj_request *obj_request)
1131{
1132 rbd_assert(obj_request->which != BAD_WHICH);
1133 obj_request->which = BAD_WHICH;
1134 list_del(&obj_request->links);
1135 rbd_assert(obj_request->img_request == img_request);
1136 obj_request->callback = NULL;
1137 obj_request->img_request = NULL;
1138 rbd_obj_request_put(obj_request);
1139}
1140
1141static bool obj_request_type_valid(enum obj_request_type type)
1142{
1143 switch (type) {
1144 case OBJ_REQUEST_BIO:
1145 return true;
1146 default:
1147 return false;
1148 }
1149}
1150
1034struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...) 1151struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1035{ 1152{
1036 struct ceph_osd_req_op *op; 1153 struct ceph_osd_req_op *op;
@@ -1395,6 +1512,26 @@ done:
1395 return ret; 1512 return ret;
1396} 1513}
1397 1514
1515static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1516 struct rbd_obj_request *obj_request)
1517{
1518 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1519}
1520
1521static void rbd_img_request_complete(struct rbd_img_request *img_request)
1522{
1523 if (img_request->callback)
1524 img_request->callback(img_request);
1525 else
1526 rbd_img_request_put(img_request);
1527}
1528
1529static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1530{
1531 if (obj_request->callback)
1532 obj_request->callback(obj_request);
1533}
1534
1398/* 1535/*
1399 * Request sync osd read 1536 * Request sync osd read
1400 */ 1537 */
@@ -1618,6 +1755,486 @@ static int rbd_dev_do_request(struct request *rq,
1618 return 0; 1755 return 0;
1619} 1756}
1620 1757
1758static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
1759 struct ceph_osd_op *op)
1760{
1761 u64 xferred;
1762
1763 /*
1764 * We support a 64-bit length, but ultimately it has to be
1765 * passed to blk_end_request(), which takes an unsigned int.
1766 */
1767 xferred = le64_to_cpu(op->extent.length);
1768 rbd_assert(xferred < (u64) UINT_MAX);
1769 if (obj_request->result == (s32) -ENOENT) {
1770 zero_bio_chain(obj_request->bio_list, 0);
1771 obj_request->result = 0;
1772 } else if (xferred < obj_request->length && !obj_request->result) {
1773 zero_bio_chain(obj_request->bio_list, xferred);
1774 xferred = obj_request->length;
1775 }
1776 obj_request->xferred = xferred;
1777 atomic_set(&obj_request->done, 1);
1778}
1779
1780static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
1781 struct ceph_osd_op *op)
1782{
1783 obj_request->xferred = le64_to_cpu(op->extent.length);
1784 atomic_set(&obj_request->done, 1);
1785}
1786
1787static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1788 struct ceph_msg *msg)
1789{
1790 struct rbd_obj_request *obj_request = osd_req->r_priv;
1791 struct ceph_osd_reply_head *reply_head;
1792 struct ceph_osd_op *op;
1793 u32 num_ops;
1794 u16 opcode;
1795
1796 rbd_assert(osd_req == obj_request->osd_req);
1797 rbd_assert(!!obj_request->img_request ^
1798 (obj_request->which == BAD_WHICH));
1799
1800 obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
1801 reply_head = msg->front.iov_base;
1802 obj_request->result = (s32) le32_to_cpu(reply_head->result);
1803 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1804
1805 num_ops = le32_to_cpu(reply_head->num_ops);
1806 WARN_ON(num_ops != 1); /* For now */
1807
1808 op = &reply_head->ops[0];
1809 opcode = le16_to_cpu(op->op);
1810 switch (opcode) {
1811 case CEPH_OSD_OP_READ:
1812 rbd_osd_read_callback(obj_request, op);
1813 break;
1814 case CEPH_OSD_OP_WRITE:
1815 rbd_osd_write_callback(obj_request, op);
1816 break;
1817 default:
1818 rbd_warn(NULL, "%s: unsupported op %hu\n",
1819 obj_request->object_name, (unsigned short) opcode);
1820 break;
1821 }
1822
1823 if (atomic_read(&obj_request->done))
1824 rbd_obj_request_complete(obj_request);
1825}
1826
1827static struct ceph_osd_request *rbd_osd_req_create(
1828 struct rbd_device *rbd_dev,
1829 bool write_request,
1830 struct rbd_obj_request *obj_request,
1831 struct ceph_osd_req_op *op)
1832{
1833 struct rbd_img_request *img_request = obj_request->img_request;
1834 struct ceph_snap_context *snapc = NULL;
1835 struct ceph_osd_client *osdc;
1836 struct ceph_osd_request *osd_req;
1837 struct timespec now;
1838 struct timespec *mtime;
1839 u64 snap_id = CEPH_NOSNAP;
1840 u64 offset = obj_request->offset;
1841 u64 length = obj_request->length;
1842
1843 if (img_request) {
1844 rbd_assert(img_request->write_request == write_request);
1845 if (img_request->write_request)
1846 snapc = img_request->snapc;
1847 else
1848 snap_id = img_request->snap_id;
1849 }
1850
1851 /* Allocate and initialize the request, for the single op */
1852
1853 osdc = &rbd_dev->rbd_client->client->osdc;
1854 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1855 if (!osd_req)
1856 return NULL; /* ENOMEM */
1857
1858 rbd_assert(obj_request_type_valid(obj_request->type));
1859 switch (obj_request->type) {
1860 case OBJ_REQUEST_BIO:
1861 rbd_assert(obj_request->bio_list != NULL);
1862 osd_req->r_bio = obj_request->bio_list;
1863 bio_get(osd_req->r_bio);
1864 /* osd client requires "num pages" even for bio */
1865 osd_req->r_num_pages = calc_pages_for(offset, length);
1866 break;
1867 }
1868
1869 if (write_request) {
1870 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1871 now = CURRENT_TIME;
1872 mtime = &now;
1873 } else {
1874 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1875 mtime = NULL; /* not needed for reads */
1876 offset = 0; /* These are not used... */
1877 length = 0; /* ...for osd read requests */
1878 }
1879
1880 osd_req->r_callback = rbd_osd_req_callback;
1881 osd_req->r_priv = obj_request;
1882
1883 osd_req->r_oid_len = strlen(obj_request->object_name);
1884 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1885 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1886
1887 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1888
1889 /* osd_req will get its own reference to snapc (if non-null) */
1890
1891 ceph_osdc_build_request(osd_req, offset, length, 1, op,
1892 snapc, snap_id, mtime);
1893
1894 return osd_req;
1895}
1896
1897static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1898{
1899 ceph_osdc_put_request(osd_req);
1900}
1901
1902/* object_name is assumed to be a non-null pointer and NUL-terminated */
1903
1904static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1905 u64 offset, u64 length,
1906 enum obj_request_type type)
1907{
1908 struct rbd_obj_request *obj_request;
1909 size_t size;
1910 char *name;
1911
1912 rbd_assert(obj_request_type_valid(type));
1913
1914 size = strlen(object_name) + 1;
1915 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1916 if (!obj_request)
1917 return NULL;
1918
1919 name = (char *)(obj_request + 1);
1920 obj_request->object_name = memcpy(name, object_name, size);
1921 obj_request->offset = offset;
1922 obj_request->length = length;
1923 obj_request->which = BAD_WHICH;
1924 obj_request->type = type;
1925 INIT_LIST_HEAD(&obj_request->links);
1926 atomic_set(&obj_request->done, 0);
1927 kref_init(&obj_request->kref);
1928
1929 return obj_request;
1930}
1931
1932static void rbd_obj_request_destroy(struct kref *kref)
1933{
1934 struct rbd_obj_request *obj_request;
1935
1936 obj_request = container_of(kref, struct rbd_obj_request, kref);
1937
1938 rbd_assert(obj_request->img_request == NULL);
1939 rbd_assert(obj_request->which == BAD_WHICH);
1940
1941 if (obj_request->osd_req)
1942 rbd_osd_req_destroy(obj_request->osd_req);
1943
1944 rbd_assert(obj_request_type_valid(obj_request->type));
1945 switch (obj_request->type) {
1946 case OBJ_REQUEST_BIO:
1947 if (obj_request->bio_list)
1948 bio_chain_put(obj_request->bio_list);
1949 break;
1950 }
1951
1952 kfree(obj_request);
1953}
1954
1955/*
1956 * Caller is responsible for filling in the list of object requests
1957 * that comprises the image request, and the Linux request pointer
1958 * (if there is one).
1959 */
1960struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
1961 u64 offset, u64 length,
1962 bool write_request)
1963{
1964 struct rbd_img_request *img_request;
1965 struct ceph_snap_context *snapc = NULL;
1966
1967 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1968 if (!img_request)
1969 return NULL;
1970
1971 if (write_request) {
1972 down_read(&rbd_dev->header_rwsem);
1973 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1974 up_read(&rbd_dev->header_rwsem);
1975 if (WARN_ON(!snapc)) {
1976 kfree(img_request);
1977 return NULL; /* Shouldn't happen */
1978 }
1979 }
1980
1981 img_request->rq = NULL;
1982 img_request->rbd_dev = rbd_dev;
1983 img_request->offset = offset;
1984 img_request->length = length;
1985 img_request->write_request = write_request;
1986 if (write_request)
1987 img_request->snapc = snapc;
1988 else
1989 img_request->snap_id = rbd_dev->spec->snap_id;
1990 spin_lock_init(&img_request->completion_lock);
1991 img_request->next_completion = 0;
1992 img_request->callback = NULL;
1993 img_request->obj_request_count = 0;
1994 INIT_LIST_HEAD(&img_request->obj_requests);
1995 kref_init(&img_request->kref);
1996
1997 rbd_img_request_get(img_request); /* Avoid a warning */
1998 rbd_img_request_put(img_request); /* TEMPORARY */
1999
2000 return img_request;
2001}
2002
2003static void rbd_img_request_destroy(struct kref *kref)
2004{
2005 struct rbd_img_request *img_request;
2006 struct rbd_obj_request *obj_request;
2007 struct rbd_obj_request *next_obj_request;
2008
2009 img_request = container_of(kref, struct rbd_img_request, kref);
2010
2011 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2012 rbd_img_obj_request_del(img_request, obj_request);
2013
2014 if (img_request->write_request)
2015 ceph_put_snap_context(img_request->snapc);
2016
2017 kfree(img_request);
2018}
2019
2020static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
2021 struct bio *bio_list)
2022{
2023 struct rbd_device *rbd_dev = img_request->rbd_dev;
2024 struct rbd_obj_request *obj_request = NULL;
2025 struct rbd_obj_request *next_obj_request;
2026 unsigned int bio_offset;
2027 u64 image_offset;
2028 u64 resid;
2029 u16 opcode;
2030
2031 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
2032 : CEPH_OSD_OP_READ;
2033 bio_offset = 0;
2034 image_offset = img_request->offset;
2035 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
2036 resid = img_request->length;
2037 while (resid) {
2038 const char *object_name;
2039 unsigned int clone_size;
2040 struct ceph_osd_req_op *op;
2041 u64 offset;
2042 u64 length;
2043
2044 object_name = rbd_segment_name(rbd_dev, image_offset);
2045 if (!object_name)
2046 goto out_unwind;
2047 offset = rbd_segment_offset(rbd_dev, image_offset);
2048 length = rbd_segment_length(rbd_dev, image_offset, resid);
2049 obj_request = rbd_obj_request_create(object_name,
2050 offset, length,
2051 OBJ_REQUEST_BIO);
2052 kfree(object_name); /* object request has its own copy */
2053 if (!obj_request)
2054 goto out_unwind;
2055
2056 rbd_assert(length <= (u64) UINT_MAX);
2057 clone_size = (unsigned int) length;
2058 obj_request->bio_list = bio_chain_clone_range(&bio_list,
2059 &bio_offset, clone_size,
2060 GFP_ATOMIC);
2061 if (!obj_request->bio_list)
2062 goto out_partial;
2063
2064 /*
2065 * Build up the op to use in building the osd
2066 * request. Note that the contents of the op are
2067 * copied by rbd_osd_req_create().
2068 */
2069 op = rbd_osd_req_op_create(opcode, offset, length);
2070 if (!op)
2071 goto out_partial;
2072 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
2073 img_request->write_request,
2074 obj_request, op);
2075 rbd_osd_req_op_destroy(op);
2076 if (!obj_request->osd_req)
2077 goto out_partial;
2078 /* status and version are initially zero-filled */
2079
2080 rbd_img_obj_request_add(img_request, obj_request);
2081
2082 image_offset += length;
2083 resid -= length;
2084 }
2085
2086 return 0;
2087
2088out_partial:
2089 rbd_obj_request_put(obj_request);
2090out_unwind:
2091 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2092 rbd_obj_request_put(obj_request);
2093
2094 return -ENOMEM;
2095}
2096
2097static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2098{
2099 struct rbd_img_request *img_request;
2100 u32 which = obj_request->which;
2101 bool more = true;
2102
2103 img_request = obj_request->img_request;
2104 rbd_assert(img_request != NULL);
2105 rbd_assert(img_request->rq != NULL);
2106 rbd_assert(which != BAD_WHICH);
2107 rbd_assert(which < img_request->obj_request_count);
2108 rbd_assert(which >= img_request->next_completion);
2109
2110 spin_lock_irq(&img_request->completion_lock);
2111 if (which != img_request->next_completion)
2112 goto out;
2113
2114 for_each_obj_request_from(img_request, obj_request) {
2115 unsigned int xferred;
2116 int result;
2117
2118 rbd_assert(more);
2119 rbd_assert(which < img_request->obj_request_count);
2120
2121 if (!atomic_read(&obj_request->done))
2122 break;
2123
2124 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
2125 xferred = (unsigned int) obj_request->xferred;
2126 result = (int) obj_request->result;
2127 if (result)
2128 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
2129 img_request->write_request ? "write" : "read",
2130 result, xferred);
2131
2132 more = blk_end_request(img_request->rq, result, xferred);
2133 which++;
2134 }
2135 rbd_assert(more ^ (which == img_request->obj_request_count));
2136 img_request->next_completion = which;
2137out:
2138 spin_unlock_irq(&img_request->completion_lock);
2139
2140 if (!more)
2141 rbd_img_request_complete(img_request);
2142}
2143
2144static int rbd_img_request_submit(struct rbd_img_request *img_request)
2145{
2146 struct rbd_device *rbd_dev = img_request->rbd_dev;
2147 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2148 struct rbd_obj_request *obj_request;
2149
2150 for_each_obj_request(img_request, obj_request) {
2151 int ret;
2152
2153 obj_request->callback = rbd_img_obj_callback;
2154 ret = rbd_obj_request_submit(osdc, obj_request);
2155 if (ret)
2156 return ret;
2157 /*
2158 * The image request has its own reference to each
2159 * of its object requests, so we can safely drop the
2160 * initial one here.
2161 */
2162 rbd_obj_request_put(obj_request);
2163 }
2164
2165 return 0;
2166}
2167
2168static void rbd_request_fn(struct request_queue *q)
2169{
2170 struct rbd_device *rbd_dev = q->queuedata;
2171 bool read_only = rbd_dev->mapping.read_only;
2172 struct request *rq;
2173 int result;
2174
2175 while ((rq = blk_fetch_request(q))) {
2176 bool write_request = rq_data_dir(rq) == WRITE;
2177 struct rbd_img_request *img_request;
2178 u64 offset;
2179 u64 length;
2180
2181 /* Ignore any non-FS requests that filter through. */
2182
2183 if (rq->cmd_type != REQ_TYPE_FS) {
2184 __blk_end_request_all(rq, 0);
2185 continue;
2186 }
2187
2188 spin_unlock_irq(q->queue_lock);
2189
2190 /* Disallow writes to a read-only device */
2191
2192 if (write_request) {
2193 result = -EROFS;
2194 if (read_only)
2195 goto end_request;
2196 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2197 }
2198
2199 /* Quit early if the snapshot has disappeared */
2200
2201 if (!atomic_read(&rbd_dev->exists)) {
2202 dout("request for non-existent snapshot");
2203 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2204 result = -ENXIO;
2205 goto end_request;
2206 }
2207
2208 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2209 length = (u64) blk_rq_bytes(rq);
2210
2211 result = -EINVAL;
2212 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2213 goto end_request; /* Shouldn't happen */
2214
2215 result = -ENOMEM;
2216 img_request = rbd_img_request_create(rbd_dev, offset, length,
2217 write_request);
2218 if (!img_request)
2219 goto end_request;
2220
2221 img_request->rq = rq;
2222
2223 result = rbd_img_request_fill_bio(img_request, rq->bio);
2224 if (!result)
2225 result = rbd_img_request_submit(img_request);
2226 if (result)
2227 rbd_img_request_put(img_request);
2228end_request:
2229 spin_lock_irq(q->queue_lock);
2230 if (result < 0) {
2231 rbd_warn(rbd_dev, "obj_request %s result %d\n",
2232 write_request ? "write" : "read", result);
2233 __blk_end_request_all(rq, result);
2234 }
2235 }
2236}
2237
1621/* 2238/*
1622 * block device queue callback 2239 * block device queue callback
1623 */ 2240 */
@@ -1929,8 +2546,8 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1929 disk->fops = &rbd_bd_ops; 2546 disk->fops = &rbd_bd_ops;
1930 disk->private_data = rbd_dev; 2547 disk->private_data = rbd_dev;
1931 2548
1932 /* init rq */ 2549 (void) rbd_rq_fn; /* avoid a warning */
1933 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); 2550 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
1934 if (!q) 2551 if (!q)
1935 goto out_disk; 2552 goto out_disk;
1936 2553