aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-13 21:50:00 -0400
committerSage Weil <sage@inktank.com>2013-05-02 00:17:45 -0400
commit33803f3300265661b5c5d20a9811c6a2a157d545 (patch)
tree192ee4eb2d726fee466eb667f6fe938ba4a41a1e
parenta8dd0a37bc016cfb3ac75cf8484428573bb8d862 (diff)
libceph: define source request op functions
The rbd code has a function that allocates and populates a ceph_osd_req_op structure (the in-core version of an osd request operation). When reviewed, Josh suggested two things: that the big varargs function might be better split into type-specific functions; and that this functionality really belongs in the osd client rather than rbd. This patch implements both of Josh's suggestions. It breaks up the rbd function into separate functions and defines them in the osd client module as exported interfaces. Unlike the rbd version, however, the functions don't allocate an osd_req_op structure; they are provided the address of one and that is initialized instead. The rbd function has been eliminated and calls to it have been replaced by calls to the new routines. The rbd code now now use a stack (struct) variable to hold the op rather than allocating and freeing it each time. For now only the capabilities used by rbd are implemented. Implementing all the other osd op types, and making the rest of the code use it will be done separately, in the next few patches. Note that only the extent, cls, and watch portions of the ceph_osd_req_op structure are currently used. Delete the others (xattr, pgls, and snap) from its definition so nobody thinks it's actually implemented or needed. We can add it back again later if needed, when we know it's been tested. This (and a few follow-on patches) resolves: http://tracker.ceph.com/issues/3861 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
-rw-r--r--drivers/block/rbd.c117
-rw-r--r--include/linux/ceph/osd_client.h26
-rw-r--r--net/ceph/osd_client.c84
3 files changed, 111 insertions, 116 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 6ed508bd363a..f04d45b6b563 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1134,76 +1134,6 @@ static bool obj_request_type_valid(enum obj_request_type type)
1134 } 1134 }
1135} 1135}
1136 1136
1137static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1138{
1139 struct ceph_osd_req_op *op;
1140 va_list args;
1141 size_t size;
1142
1143 op = kzalloc(sizeof (*op), GFP_NOIO);
1144 if (!op)
1145 return NULL;
1146 op->op = opcode;
1147 va_start(args, opcode);
1148 switch (opcode) {
1149 case CEPH_OSD_OP_READ:
1150 case CEPH_OSD_OP_WRITE:
1151 /* rbd_osd_req_op_create(READ, offset, length) */
1152 /* rbd_osd_req_op_create(WRITE, offset, length) */
1153 op->extent.offset = va_arg(args, u64);
1154 op->extent.length = va_arg(args, u64);
1155 if (opcode == CEPH_OSD_OP_WRITE)
1156 op->payload_len = op->extent.length;
1157 break;
1158 case CEPH_OSD_OP_STAT:
1159 break;
1160 case CEPH_OSD_OP_CALL:
1161 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1162 op->cls.class_name = va_arg(args, char *);
1163 size = strlen(op->cls.class_name);
1164 rbd_assert(size <= (size_t) U8_MAX);
1165 op->cls.class_len = size;
1166 op->payload_len = size;
1167
1168 op->cls.method_name = va_arg(args, char *);
1169 size = strlen(op->cls.method_name);
1170 rbd_assert(size <= (size_t) U8_MAX);
1171 op->cls.method_len = size;
1172 op->payload_len += size;
1173
1174 op->cls.argc = 0;
1175 op->cls.indata = va_arg(args, void *);
1176 size = va_arg(args, size_t);
1177 rbd_assert(size <= (size_t) U32_MAX);
1178 op->cls.indata_len = (u32) size;
1179 op->payload_len += size;
1180 break;
1181 case CEPH_OSD_OP_NOTIFY_ACK:
1182 case CEPH_OSD_OP_WATCH:
1183 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1184 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1185 op->watch.cookie = va_arg(args, u64);
1186 op->watch.ver = va_arg(args, u64);
1187 op->watch.ver = cpu_to_le64(op->watch.ver);
1188 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1189 op->watch.flag = (u8) 1;
1190 break;
1191 default:
1192 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1193 kfree(op);
1194 op = NULL;
1195 break;
1196 }
1197 va_end(args);
1198
1199 return op;
1200}
1201
1202static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1203{
1204 kfree(op);
1205}
1206
1207static int rbd_obj_request_submit(struct ceph_osd_client *osdc, 1137static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1208 struct rbd_obj_request *obj_request) 1138 struct rbd_obj_request *obj_request)
1209{ 1139{
@@ -1628,7 +1558,7 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1628 while (resid) { 1558 while (resid) {
1629 const char *object_name; 1559 const char *object_name;
1630 unsigned int clone_size; 1560 unsigned int clone_size;
1631 struct ceph_osd_req_op *op; 1561 struct ceph_osd_req_op op;
1632 u64 offset; 1562 u64 offset;
1633 u64 length; 1563 u64 length;
1634 1564
@@ -1657,13 +1587,10 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1657 * request. Note that the contents of the op are 1587 * request. Note that the contents of the op are
1658 * copied by rbd_osd_req_create(). 1588 * copied by rbd_osd_req_create().
1659 */ 1589 */
1660 op = rbd_osd_req_op_create(opcode, offset, length); 1590 osd_req_op_extent_init(&op, opcode, offset, length, 0, 0);
1661 if (!op)
1662 goto out_partial;
1663 obj_request->osd_req = rbd_osd_req_create(rbd_dev, 1591 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1664 img_request->write_request, 1592 img_request->write_request,
1665 obj_request, op); 1593 obj_request, &op);
1666 rbd_osd_req_op_destroy(op);
1667 if (!obj_request->osd_req) 1594 if (!obj_request->osd_req)
1668 goto out_partial; 1595 goto out_partial;
1669 /* status and version are initially zero-filled */ 1596 /* status and version are initially zero-filled */
@@ -1766,7 +1693,7 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1766 u64 ver, u64 notify_id) 1693 u64 ver, u64 notify_id)
1767{ 1694{
1768 struct rbd_obj_request *obj_request; 1695 struct rbd_obj_request *obj_request;
1769 struct ceph_osd_req_op *op; 1696 struct ceph_osd_req_op op;
1770 struct ceph_osd_client *osdc; 1697 struct ceph_osd_client *osdc;
1771 int ret; 1698 int ret;
1772 1699
@@ -1776,12 +1703,9 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1776 return -ENOMEM; 1703 return -ENOMEM;
1777 1704
1778 ret = -ENOMEM; 1705 ret = -ENOMEM;
1779 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver); 1706 osd_req_op_watch_init(&op, CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0);
1780 if (!op)
1781 goto out;
1782 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1707 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1783 obj_request, op); 1708 obj_request, &op);
1784 rbd_osd_req_op_destroy(op);
1785 if (!obj_request->osd_req) 1709 if (!obj_request->osd_req)
1786 goto out; 1710 goto out;
1787 1711
@@ -1823,7 +1747,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1823{ 1747{
1824 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1748 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1825 struct rbd_obj_request *obj_request; 1749 struct rbd_obj_request *obj_request;
1826 struct ceph_osd_req_op *op; 1750 struct ceph_osd_req_op op;
1827 int ret; 1751 int ret;
1828 1752
1829 rbd_assert(start ^ !!rbd_dev->watch_event); 1753 rbd_assert(start ^ !!rbd_dev->watch_event);
@@ -1843,14 +1767,11 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1843 if (!obj_request) 1767 if (!obj_request)
1844 goto out_cancel; 1768 goto out_cancel;
1845 1769
1846 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH, 1770 osd_req_op_watch_init(&op, CEPH_OSD_OP_WATCH,
1847 rbd_dev->watch_event->cookie, 1771 rbd_dev->watch_event->cookie,
1848 rbd_dev->header.obj_version, start); 1772 rbd_dev->header.obj_version, start);
1849 if (!op)
1850 goto out_cancel;
1851 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1773 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1852 obj_request, op); 1774 obj_request, &op);
1853 rbd_osd_req_op_destroy(op);
1854 if (!obj_request->osd_req) 1775 if (!obj_request->osd_req)
1855 goto out_cancel; 1776 goto out_cancel;
1856 1777
@@ -1912,7 +1833,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1912{ 1833{
1913 struct rbd_obj_request *obj_request; 1834 struct rbd_obj_request *obj_request;
1914 struct ceph_osd_client *osdc; 1835 struct ceph_osd_client *osdc;
1915 struct ceph_osd_req_op *op; 1836 struct ceph_osd_req_op op;
1916 struct page **pages; 1837 struct page **pages;
1917 u32 page_count; 1838 u32 page_count;
1918 int ret; 1839 int ret;
@@ -1939,13 +1860,10 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1939 obj_request->pages = pages; 1860 obj_request->pages = pages;
1940 obj_request->page_count = page_count; 1861 obj_request->page_count = page_count;
1941 1862
1942 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name, 1863 osd_req_op_cls_init(&op, CEPH_OSD_OP_CALL, class_name, method_name,
1943 method_name, outbound, outbound_size); 1864 outbound, outbound_size);
1944 if (!op)
1945 goto out;
1946 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1865 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1947 obj_request, op); 1866 obj_request, &op);
1948 rbd_osd_req_op_destroy(op);
1949 if (!obj_request->osd_req) 1867 if (!obj_request->osd_req)
1950 goto out; 1868 goto out;
1951 1869
@@ -2125,7 +2043,7 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2125 char *buf, u64 *version) 2043 char *buf, u64 *version)
2126 2044
2127{ 2045{
2128 struct ceph_osd_req_op *op; 2046 struct ceph_osd_req_op op;
2129 struct rbd_obj_request *obj_request; 2047 struct rbd_obj_request *obj_request;
2130 struct ceph_osd_client *osdc; 2048 struct ceph_osd_client *osdc;
2131 struct page **pages = NULL; 2049 struct page **pages = NULL;
@@ -2147,12 +2065,9 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2147 obj_request->pages = pages; 2065 obj_request->pages = pages;
2148 obj_request->page_count = page_count; 2066 obj_request->page_count = page_count;
2149 2067
2150 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length); 2068 osd_req_op_extent_init(&op, CEPH_OSD_OP_READ, offset, length, 0, 0);
2151 if (!op)
2152 goto out;
2153 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 2069 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2154 obj_request, op); 2070 obj_request, &op);
2155 rbd_osd_req_op_destroy(op);
2156 if (!obj_request->osd_req) 2071 if (!obj_request->osd_req)
2157 goto out; 2072 goto out;
2158 2073
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 1dab291b2dc6..5fd2cbfcfd91 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -202,14 +202,6 @@ struct ceph_osd_req_op {
202 u32 truncate_seq; 202 u32 truncate_seq;
203 } extent; 203 } extent;
204 struct { 204 struct {
205 const char *name;
206 const void *val;
207 u32 name_len;
208 u32 value_len;
209 __u8 cmp_op; /* CEPH_OSD_CMPXATTR_OP_* */
210 __u8 cmp_mode; /* CEPH_OSD_CMPXATTR_MODE_* */
211 } xattr;
212 struct {
213 const char *class_name; 205 const char *class_name;
214 const char *method_name; 206 const char *method_name;
215 const void *indata; 207 const void *indata;
@@ -220,13 +212,6 @@ struct ceph_osd_req_op {
220 } cls; 212 } cls;
221 struct { 213 struct {
222 u64 cookie; 214 u64 cookie;
223 u64 count;
224 } pgls;
225 struct {
226 u64 snapid;
227 } snap;
228 struct {
229 u64 cookie;
230 u64 ver; 215 u64 ver;
231 u32 prot_ver; 216 u32 prot_ver;
232 u32 timeout; 217 u32 timeout;
@@ -244,6 +229,17 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
244extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, 229extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
245 struct ceph_msg *msg); 230 struct ceph_msg *msg);
246 231
232extern void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode);
233extern void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode,
234 u64 offset, u64 length,
235 u64 truncate_size, u32 truncate_seq);
236extern void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode,
237 const char *class, const char *method,
238 const void *request_data,
239 size_t request_data_size);
240extern void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode,
241 u64 cookie, u64 version, int flag);
242
247extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 243extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
248 struct ceph_snap_context *snapc, 244 struct ceph_snap_context *snapc,
249 unsigned int num_op, 245 unsigned int num_op,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 4e5c0438ea35..02ed72820479 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -289,6 +289,90 @@ static bool osd_req_opcode_valid(u16 opcode)
289 } 289 }
290} 290}
291 291
292/*
293 * This is an osd op init function for opcodes that have no data or
294 * other information associated with them. It also serves as a
295 * common init routine for all the other init functions, below.
296 */
297void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode)
298{
299 BUG_ON(!osd_req_opcode_valid(opcode));
300
301 memset(op, 0, sizeof (*op));
302
303 op->op = opcode;
304}
305
306void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode,
307 u64 offset, u64 length,
308 u64 truncate_size, u32 truncate_seq)
309{
310 size_t payload_len = 0;
311
312 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
313
314 osd_req_op_init(op, opcode);
315
316 op->extent.offset = offset;
317 op->extent.length = length;
318 op->extent.truncate_size = truncate_size;
319 op->extent.truncate_seq = truncate_seq;
320 if (opcode == CEPH_OSD_OP_WRITE)
321 payload_len += length;
322
323 op->payload_len = payload_len;
324}
325EXPORT_SYMBOL(osd_req_op_extent_init);
326
327void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode,
328 const char *class, const char *method,
329 const void *request_data, size_t request_data_size)
330{
331 size_t payload_len = 0;
332 size_t size;
333
334 BUG_ON(opcode != CEPH_OSD_OP_CALL);
335
336 osd_req_op_init(op, opcode);
337
338 op->cls.class_name = class;
339 size = strlen(class);
340 BUG_ON(size > (size_t) U8_MAX);
341 op->cls.class_len = size;
342 payload_len += size;
343
344 op->cls.method_name = method;
345 size = strlen(method);
346 BUG_ON(size > (size_t) U8_MAX);
347 op->cls.method_len = size;
348 payload_len += size;
349
350 op->cls.indata = request_data;
351 BUG_ON(request_data_size > (size_t) U32_MAX);
352 op->cls.indata_len = (u32) request_data_size;
353 payload_len += request_data_size;
354
355 op->cls.argc = 0; /* currently unused */
356
357 op->payload_len = payload_len;
358}
359EXPORT_SYMBOL(osd_req_op_cls_init);
360
361void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode,
362 u64 cookie, u64 version, int flag)
363{
364 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
365
366 osd_req_op_init(op, opcode);
367
368 op->watch.cookie = cookie;
369 /* op->watch.ver = version; */ /* XXX 3847 */
370 op->watch.ver = cpu_to_le64(version);
371 if (opcode == CEPH_OSD_OP_WATCH && flag)
372 op->watch.flag = (u8) 1;
373}
374EXPORT_SYMBOL(osd_req_op_watch_init);
375
292static u64 osd_req_encode_op(struct ceph_osd_request *req, 376static u64 osd_req_encode_op(struct ceph_osd_request *req,
293 struct ceph_osd_op *dst, 377 struct ceph_osd_op *dst,
294 struct ceph_osd_req_op *src) 378 struct ceph_osd_req_op *src)