aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-01-30 08:54:34 -0500
committerAlex Elder <elder@inktank.com>2013-01-30 08:54:34 -0500
commit969e5aa3b0162a02c4f287d48ff58ca2145acf1b (patch)
tree1af8e8e47e7352c6d3b4abfdb4aea6bd9458666f
parent949db153b6466c6f7cad5a427ecea94985927311 (diff)
parent1ec3911dbd19076bcdfe5540096ff67f91a6ec02 (diff)
Merge branch 'testing' of github.com:ceph/ceph-client into v3.8-rc5-testing
-rw-r--r--drivers/block/rbd.c855
-rw-r--r--fs/ceph/caps.c32
-rw-r--r--fs/ceph/file.c6
-rw-r--r--fs/ceph/ioctl.c2
-rw-r--r--fs/ceph/mds_client.c33
-rw-r--r--fs/ceph/mds_client.h6
-rw-r--r--include/linux/ceph/ceph_features.h8
-rw-r--r--include/linux/ceph/decode.h29
-rw-r--r--include/linux/ceph/osd_client.h24
-rw-r--r--include/linux/ceph/osdmap.h2
-rw-r--r--include/linux/crush/crush.h2
-rw-r--r--net/ceph/crush/mapper.c15
-rw-r--r--net/ceph/osd_client.c206
-rw-r--r--net/ceph/osdmap.c43
14 files changed, 652 insertions, 611 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 89576a0b3f2e..668936381ab0 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -52,9 +52,12 @@
52#define SECTOR_SHIFT 9 52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54 54
55/* It might be useful to have this defined elsewhere too */ 55/* It might be useful to have these defined elsewhere */
56 56
57#define U64_MAX ((u64) (~0ULL)) 57#define U8_MAX ((u8) (~0U))
58#define U16_MAX ((u16) (~0U))
59#define U32_MAX ((u32) (~0U))
60#define U64_MAX ((u64) (~0ULL))
58 61
59#define RBD_DRV_NAME "rbd" 62#define RBD_DRV_NAME "rbd"
60#define RBD_DRV_NAME_LONG "rbd (rados block device)" 63#define RBD_DRV_NAME_LONG "rbd (rados block device)"
@@ -66,7 +69,6 @@
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67 70
68#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 71#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69#define RBD_MAX_OPT_LEN 1024
70 72
71#define RBD_SNAP_HEAD_NAME "-" 73#define RBD_SNAP_HEAD_NAME "-"
72 74
@@ -93,8 +95,6 @@
93#define DEV_NAME_LEN 32 95#define DEV_NAME_LEN 32
94#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) 96#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95 97
96#define RBD_READ_ONLY_DEFAULT false
97
98/* 98/*
99 * block device image metadata (in-memory version) 99 * block device image metadata (in-memory version)
100 */ 100 */
@@ -119,16 +119,33 @@ struct rbd_image_header {
119 * An rbd image specification. 119 * An rbd image specification.
120 * 120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. 122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
124 *
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
129 *
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
135 *
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
139 *
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
123 */ 142 */
124struct rbd_spec { 143struct rbd_spec {
125 u64 pool_id; 144 u64 pool_id;
126 char *pool_name; 145 char *pool_name;
127 146
128 char *image_id; 147 char *image_id;
129 size_t image_id_len;
130 char *image_name; 148 char *image_name;
131 size_t image_name_len;
132 149
133 u64 snap_id; 150 u64 snap_id;
134 char *snap_name; 151 char *snap_name;
@@ -136,10 +153,6 @@ struct rbd_spec {
136 struct kref kref; 153 struct kref kref;
137}; 154};
138 155
139struct rbd_options {
140 bool read_only;
141};
142
143/* 156/*
144 * an instance of the client. multiple devices may share an rbd client. 157 * an instance of the client. multiple devices may share an rbd client.
145 */ 158 */
@@ -154,7 +167,7 @@ struct rbd_client {
154 */ 167 */
155struct rbd_req_status { 168struct rbd_req_status {
156 int done; 169 int done;
157 int rc; 170 s32 rc;
158 u64 bytes; 171 u64 bytes;
159}; 172};
160 173
@@ -212,11 +225,13 @@ struct rbd_device {
212 spinlock_t lock; /* queue lock */ 225 spinlock_t lock; /* queue lock */
213 226
214 struct rbd_image_header header; 227 struct rbd_image_header header;
215 bool exists; 228 atomic_t exists;
216 struct rbd_spec *spec; 229 struct rbd_spec *spec;
217 230
218 char *header_name; 231 char *header_name;
219 232
233 struct ceph_file_layout layout;
234
220 struct ceph_osd_event *watch_event; 235 struct ceph_osd_event *watch_event;
221 struct ceph_osd_request *watch_request; 236 struct ceph_osd_request *watch_request;
222 237
@@ -277,6 +292,33 @@ static struct device rbd_root_dev = {
277 .release = rbd_root_dev_release, 292 .release = rbd_root_dev_release,
278}; 293};
279 294
295static __printf(2, 3)
296void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
297{
298 struct va_format vaf;
299 va_list args;
300
301 va_start(args, fmt);
302 vaf.fmt = fmt;
303 vaf.va = &args;
304
305 if (!rbd_dev)
306 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
307 else if (rbd_dev->disk)
308 printk(KERN_WARNING "%s: %s: %pV\n",
309 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
310 else if (rbd_dev->spec && rbd_dev->spec->image_name)
311 printk(KERN_WARNING "%s: image %s: %pV\n",
312 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
313 else if (rbd_dev->spec && rbd_dev->spec->image_id)
314 printk(KERN_WARNING "%s: id %s: %pV\n",
315 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
316 else /* punt */
317 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
318 RBD_DRV_NAME, rbd_dev, &vaf);
319 va_end(args);
320}
321
280#ifdef RBD_DEBUG 322#ifdef RBD_DEBUG
281#define rbd_assert(expr) \ 323#define rbd_assert(expr) \
282 if (unlikely(!(expr))) { \ 324 if (unlikely(!(expr))) { \
@@ -426,6 +468,12 @@ static match_table_t rbd_opts_tokens = {
426 {-1, NULL} 468 {-1, NULL}
427}; 469};
428 470
471struct rbd_options {
472 bool read_only;
473};
474
475#define RBD_READ_ONLY_DEFAULT false
476
429static int parse_rbd_opts_token(char *c, void *private) 477static int parse_rbd_opts_token(char *c, void *private)
430{ 478{
431 struct rbd_options *rbd_opts = private; 479 struct rbd_options *rbd_opts = private;
@@ -707,7 +755,7 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
707 goto done; 755 goto done;
708 rbd_dev->mapping.read_only = true; 756 rbd_dev->mapping.read_only = true;
709 } 757 }
710 rbd_dev->exists = true; 758 atomic_set(&rbd_dev->exists, 1);
711done: 759done:
712 return ret; 760 return ret;
713} 761}
@@ -724,7 +772,7 @@ static void rbd_header_free(struct rbd_image_header *header)
724 header->snapc = NULL; 772 header->snapc = NULL;
725} 773}
726 774
727static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 775static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
728{ 776{
729 char *name; 777 char *name;
730 u64 segment; 778 u64 segment;
@@ -772,6 +820,7 @@ static int rbd_get_num_segments(struct rbd_image_header *header,
772{ 820{
773 u64 start_seg; 821 u64 start_seg;
774 u64 end_seg; 822 u64 end_seg;
823 u64 result;
775 824
776 if (!len) 825 if (!len)
777 return 0; 826 return 0;
@@ -781,7 +830,11 @@ static int rbd_get_num_segments(struct rbd_image_header *header,
781 start_seg = ofs >> header->obj_order; 830 start_seg = ofs >> header->obj_order;
782 end_seg = (ofs + len - 1) >> header->obj_order; 831 end_seg = (ofs + len - 1) >> header->obj_order;
783 832
784 return end_seg - start_seg + 1; 833 result = end_seg - start_seg + 1;
834 if (result > (u64) INT_MAX)
835 return -ERANGE;
836
837 return (int) result;
785} 838}
786 839
787/* 840/*
@@ -949,8 +1002,10 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src,
949 unsigned int bi_size; 1002 unsigned int bi_size;
950 struct bio *bio; 1003 struct bio *bio;
951 1004
952 if (!bi) 1005 if (!bi) {
1006 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
953 goto out_err; /* EINVAL; ran out of bio's */ 1007 goto out_err; /* EINVAL; ran out of bio's */
1008 }
954 bi_size = min_t(unsigned int, bi->bi_size - off, len); 1009 bi_size = min_t(unsigned int, bi->bi_size - off, len);
955 bio = bio_clone_range(bi, off, bi_size, gfpmask); 1010 bio = bio_clone_range(bi, off, bi_size, gfpmask);
956 if (!bio) 1011 if (!bio)
@@ -976,44 +1031,84 @@ out_err:
976 return NULL; 1031 return NULL;
977} 1032}
978 1033
979/* 1034struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
980 * helpers for osd request op vectors.
981 */
982static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
983 int opcode, u32 payload_len)
984{ 1035{
985 struct ceph_osd_req_op *ops; 1036 struct ceph_osd_req_op *op;
1037 va_list args;
1038 size_t size;
986 1039
987 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO); 1040 op = kzalloc(sizeof (*op), GFP_NOIO);
988 if (!ops) 1041 if (!op)
989 return NULL; 1042 return NULL;
1043 op->op = opcode;
1044 va_start(args, opcode);
1045 switch (opcode) {
1046 case CEPH_OSD_OP_READ:
1047 case CEPH_OSD_OP_WRITE:
1048 /* rbd_osd_req_op_create(READ, offset, length) */
1049 /* rbd_osd_req_op_create(WRITE, offset, length) */
1050 op->extent.offset = va_arg(args, u64);
1051 op->extent.length = va_arg(args, u64);
1052 if (opcode == CEPH_OSD_OP_WRITE)
1053 op->payload_len = op->extent.length;
1054 break;
1055 case CEPH_OSD_OP_CALL:
1056 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1057 op->cls.class_name = va_arg(args, char *);
1058 size = strlen(op->cls.class_name);
1059 rbd_assert(size <= (size_t) U8_MAX);
1060 op->cls.class_len = size;
1061 op->payload_len = size;
1062
1063 op->cls.method_name = va_arg(args, char *);
1064 size = strlen(op->cls.method_name);
1065 rbd_assert(size <= (size_t) U8_MAX);
1066 op->cls.method_len = size;
1067 op->payload_len += size;
1068
1069 op->cls.argc = 0;
1070 op->cls.indata = va_arg(args, void *);
1071 size = va_arg(args, size_t);
1072 rbd_assert(size <= (size_t) U32_MAX);
1073 op->cls.indata_len = (u32) size;
1074 op->payload_len += size;
1075 break;
1076 case CEPH_OSD_OP_NOTIFY_ACK:
1077 case CEPH_OSD_OP_WATCH:
1078 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1079 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1080 op->watch.cookie = va_arg(args, u64);
1081 op->watch.ver = va_arg(args, u64);
1082 op->watch.ver = cpu_to_le64(op->watch.ver);
1083 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1084 op->watch.flag = (u8) 1;
1085 break;
1086 default:
1087 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1088 kfree(op);
1089 op = NULL;
1090 break;
1091 }
1092 va_end(args);
990 1093
991 ops[0].op = opcode; 1094 return op;
992
993 /*
994 * op extent offset and length will be set later on
995 * in calc_raw_layout()
996 */
997 ops[0].payload_len = payload_len;
998
999 return ops;
1000} 1095}
1001 1096
1002static void rbd_destroy_ops(struct ceph_osd_req_op *ops) 1097static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1003{ 1098{
1004 kfree(ops); 1099 kfree(op);
1005} 1100}
1006 1101
1007static void rbd_coll_end_req_index(struct request *rq, 1102static void rbd_coll_end_req_index(struct request *rq,
1008 struct rbd_req_coll *coll, 1103 struct rbd_req_coll *coll,
1009 int index, 1104 int index,
1010 int ret, u64 len) 1105 s32 ret, u64 len)
1011{ 1106{
1012 struct request_queue *q; 1107 struct request_queue *q;
1013 int min, max, i; 1108 int min, max, i;
1014 1109
1015 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n", 1110 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1016 coll, index, ret, (unsigned long long) len); 1111 coll, index, (int)ret, (unsigned long long)len);
1017 1112
1018 if (!rq) 1113 if (!rq)
1019 return; 1114 return;
@@ -1034,7 +1129,7 @@ static void rbd_coll_end_req_index(struct request *rq,
1034 max++; 1129 max++;
1035 1130
1036 for (i = min; i<max; i++) { 1131 for (i = min; i<max; i++) {
1037 __blk_end_request(rq, coll->status[i].rc, 1132 __blk_end_request(rq, (int)coll->status[i].rc,
1038 coll->status[i].bytes); 1133 coll->status[i].bytes);
1039 coll->num_done++; 1134 coll->num_done++;
1040 kref_put(&coll->kref, rbd_coll_release); 1135 kref_put(&coll->kref, rbd_coll_release);
@@ -1042,10 +1137,12 @@ static void rbd_coll_end_req_index(struct request *rq,
1042 spin_unlock_irq(q->queue_lock); 1137 spin_unlock_irq(q->queue_lock);
1043} 1138}
1044 1139
1045static void rbd_coll_end_req(struct rbd_request *req, 1140static void rbd_coll_end_req(struct rbd_request *rbd_req,
1046 int ret, u64 len) 1141 s32 ret, u64 len)
1047{ 1142{
1048 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len); 1143 rbd_coll_end_req_index(rbd_req->rq,
1144 rbd_req->coll, rbd_req->coll_index,
1145 ret, len);
1049} 1146}
1050 1147
1051/* 1148/*
@@ -1060,117 +1157,102 @@ static int rbd_do_request(struct request *rq,
1060 struct page **pages, 1157 struct page **pages,
1061 int num_pages, 1158 int num_pages,
1062 int flags, 1159 int flags,
1063 struct ceph_osd_req_op *ops, 1160 struct ceph_osd_req_op *op,
1064 struct rbd_req_coll *coll, 1161 struct rbd_req_coll *coll,
1065 int coll_index, 1162 int coll_index,
1066 void (*rbd_cb)(struct ceph_osd_request *req, 1163 void (*rbd_cb)(struct ceph_osd_request *,
1067 struct ceph_msg *msg), 1164 struct ceph_msg *),
1068 struct ceph_osd_request **linger_req,
1069 u64 *ver) 1165 u64 *ver)
1070{ 1166{
1071 struct ceph_osd_request *req;
1072 struct ceph_file_layout *layout;
1073 int ret;
1074 u64 bno;
1075 struct timespec mtime = CURRENT_TIME;
1076 struct rbd_request *req_data;
1077 struct ceph_osd_request_head *reqhead;
1078 struct ceph_osd_client *osdc; 1167 struct ceph_osd_client *osdc;
1079 1168 struct ceph_osd_request *osd_req;
1080 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 1169 struct rbd_request *rbd_req = NULL;
1081 if (!req_data) { 1170 struct timespec mtime = CURRENT_TIME;
1082 if (coll) 1171 int ret;
1083 rbd_coll_end_req_index(rq, coll, coll_index,
1084 -ENOMEM, len);
1085 return -ENOMEM;
1086 }
1087
1088 if (coll) {
1089 req_data->coll = coll;
1090 req_data->coll_index = coll_index;
1091 }
1092 1172
1093 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n", 1173 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1094 object_name, (unsigned long long) ofs, 1174 object_name, (unsigned long long) ofs,
1095 (unsigned long long) len, coll, coll_index); 1175 (unsigned long long) len, coll, coll_index);
1096 1176
1097 osdc = &rbd_dev->rbd_client->client->osdc; 1177 osdc = &rbd_dev->rbd_client->client->osdc;
1098 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, 1178 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
1099 false, GFP_NOIO, pages, bio); 1179 if (!osd_req)
1100 if (!req) { 1180 return -ENOMEM;
1101 ret = -ENOMEM;
1102 goto done_pages;
1103 }
1104
1105 req->r_callback = rbd_cb;
1106 1181
1107 req_data->rq = rq; 1182 osd_req->r_flags = flags;
1108 req_data->bio = bio; 1183 osd_req->r_pages = pages;
1109 req_data->pages = pages; 1184 if (bio) {
1110 req_data->len = len; 1185 osd_req->r_bio = bio;
1186 bio_get(osd_req->r_bio);
1187 }
1111 1188
1112 req->r_priv = req_data; 1189 if (coll) {
1190 ret = -ENOMEM;
1191 rbd_req = kmalloc(sizeof(*rbd_req), GFP_NOIO);
1192 if (!rbd_req)
1193 goto done_osd_req;
1194
1195 rbd_req->rq = rq;
1196 rbd_req->bio = bio;
1197 rbd_req->pages = pages;
1198 rbd_req->len = len;
1199 rbd_req->coll = coll;
1200 rbd_req->coll_index = coll_index;
1201 }
1113 1202
1114 reqhead = req->r_request->front.iov_base; 1203 osd_req->r_callback = rbd_cb;
1115 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); 1204 osd_req->r_priv = rbd_req;
1116 1205
1117 strncpy(req->r_oid, object_name, sizeof(req->r_oid)); 1206 strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
1118 req->r_oid_len = strlen(req->r_oid); 1207 osd_req->r_oid_len = strlen(osd_req->r_oid);
1119 1208
1120 layout = &req->r_file_layout; 1209 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1121 memset(layout, 0, sizeof(*layout)); 1210 osd_req->r_num_pages = calc_pages_for(ofs, len);
1122 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1211 osd_req->r_page_alignment = ofs & ~PAGE_MASK;
1123 layout->fl_stripe_count = cpu_to_le32(1);
1124 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1125 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1126 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1127 req, ops);
1128 rbd_assert(ret == 0);
1129 1212
1130 ceph_osdc_build_request(req, ofs, &len, 1213 ceph_osdc_build_request(osd_req, ofs, len, 1, op,
1131 ops, 1214 snapc, snapid, &mtime);
1132 snapc,
1133 &mtime,
1134 req->r_oid, req->r_oid_len);
1135 1215
1136 if (linger_req) { 1216 if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
1137 ceph_osdc_set_request_linger(osdc, req); 1217 ceph_osdc_set_request_linger(osdc, osd_req);
1138 *linger_req = req; 1218 rbd_dev->watch_request = osd_req;
1139 } 1219 }
1140 1220
1141 ret = ceph_osdc_start_request(osdc, req, false); 1221 ret = ceph_osdc_start_request(osdc, osd_req, false);
1142 if (ret < 0) 1222 if (ret < 0)
1143 goto done_err; 1223 goto done_err;
1144 1224
1145 if (!rbd_cb) { 1225 if (!rbd_cb) {
1146 ret = ceph_osdc_wait_request(osdc, req); 1226 u64 version;
1227
1228 ret = ceph_osdc_wait_request(osdc, osd_req);
1229 version = le64_to_cpu(osd_req->r_reassert_version.version);
1147 if (ver) 1230 if (ver)
1148 *ver = le64_to_cpu(req->r_reassert_version.version); 1231 *ver = version;
1149 dout("reassert_ver=%llu\n", 1232 dout("reassert_ver=%llu\n", (unsigned long long) version);
1150 (unsigned long long) 1233 ceph_osdc_put_request(osd_req);
1151 le64_to_cpu(req->r_reassert_version.version));
1152 ceph_osdc_put_request(req);
1153 } 1234 }
1154 return ret; 1235 return ret;
1155 1236
1156done_err: 1237done_err:
1157 bio_chain_put(req_data->bio); 1238 if (bio)
1158 ceph_osdc_put_request(req); 1239 bio_chain_put(osd_req->r_bio);
1159done_pages: 1240 kfree(rbd_req);
1160 rbd_coll_end_req(req_data, ret, len); 1241done_osd_req:
1161 kfree(req_data); 1242 ceph_osdc_put_request(osd_req);
1243
1162 return ret; 1244 return ret;
1163} 1245}
1164 1246
1165/* 1247/*
1166 * Ceph osd op callback 1248 * Ceph osd op callback
1167 */ 1249 */
1168static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 1250static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
1169{ 1251{
1170 struct rbd_request *req_data = req->r_priv; 1252 struct rbd_request *rbd_req = osd_req->r_priv;
1171 struct ceph_osd_reply_head *replyhead; 1253 struct ceph_osd_reply_head *replyhead;
1172 struct ceph_osd_op *op; 1254 struct ceph_osd_op *op;
1173 __s32 rc; 1255 s32 rc;
1174 u64 bytes; 1256 u64 bytes;
1175 int read_op; 1257 int read_op;
1176 1258
@@ -1178,68 +1260,66 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1178 replyhead = msg->front.iov_base; 1260 replyhead = msg->front.iov_base;
1179 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); 1261 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1180 op = (void *)(replyhead + 1); 1262 op = (void *)(replyhead + 1);
1181 rc = le32_to_cpu(replyhead->result); 1263 rc = (s32)le32_to_cpu(replyhead->result);
1182 bytes = le64_to_cpu(op->extent.length); 1264 bytes = le64_to_cpu(op->extent.length);
1183 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ); 1265 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1184 1266
1185 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n", 1267 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1186 (unsigned long long) bytes, read_op, (int) rc); 1268 (unsigned long long) bytes, read_op, (int) rc);
1187 1269
1188 if (rc == -ENOENT && read_op) { 1270 if (rc == (s32)-ENOENT && read_op) {
1189 zero_bio_chain(req_data->bio, 0); 1271 zero_bio_chain(rbd_req->bio, 0);
1190 rc = 0; 1272 rc = 0;
1191 } else if (rc == 0 && read_op && bytes < req_data->len) { 1273 } else if (rc == 0 && read_op && bytes < rbd_req->len) {
1192 zero_bio_chain(req_data->bio, bytes); 1274 zero_bio_chain(rbd_req->bio, bytes);
1193 bytes = req_data->len; 1275 bytes = rbd_req->len;
1194 } 1276 }
1195 1277
1196 rbd_coll_end_req(req_data, rc, bytes); 1278 rbd_coll_end_req(rbd_req, rc, bytes);
1197 1279
1198 if (req_data->bio) 1280 if (rbd_req->bio)
1199 bio_chain_put(req_data->bio); 1281 bio_chain_put(rbd_req->bio);
1200 1282
1201 ceph_osdc_put_request(req); 1283 ceph_osdc_put_request(osd_req);
1202 kfree(req_data); 1284 kfree(rbd_req);
1203} 1285}
1204 1286
1205static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 1287static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
1288 struct ceph_msg *msg)
1206{ 1289{
1207 ceph_osdc_put_request(req); 1290 ceph_osdc_put_request(osd_req);
1208} 1291}
1209 1292
1210/* 1293/*
1211 * Do a synchronous ceph osd operation 1294 * Do a synchronous ceph osd operation
1212 */ 1295 */
1213static int rbd_req_sync_op(struct rbd_device *rbd_dev, 1296static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1214 struct ceph_snap_context *snapc,
1215 u64 snapid,
1216 int flags, 1297 int flags,
1217 struct ceph_osd_req_op *ops, 1298 struct ceph_osd_req_op *op,
1218 const char *object_name, 1299 const char *object_name,
1219 u64 ofs, u64 inbound_size, 1300 u64 ofs, u64 inbound_size,
1220 char *inbound, 1301 char *inbound,
1221 struct ceph_osd_request **linger_req,
1222 u64 *ver) 1302 u64 *ver)
1223{ 1303{
1224 int ret; 1304 int ret;
1225 struct page **pages; 1305 struct page **pages;
1226 int num_pages; 1306 int num_pages;
1227 1307
1228 rbd_assert(ops != NULL); 1308 rbd_assert(op != NULL);
1229 1309
1230 num_pages = calc_pages_for(ofs, inbound_size); 1310 num_pages = calc_pages_for(ofs, inbound_size);
1231 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 1311 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1232 if (IS_ERR(pages)) 1312 if (IS_ERR(pages))
1233 return PTR_ERR(pages); 1313 return PTR_ERR(pages);
1234 1314
1235 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid, 1315 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1236 object_name, ofs, inbound_size, NULL, 1316 object_name, ofs, inbound_size, NULL,
1237 pages, num_pages, 1317 pages, num_pages,
1238 flags, 1318 flags,
1239 ops, 1319 op,
1240 NULL, 0, 1320 NULL, 0,
1241 NULL, 1321 NULL,
1242 linger_req, ver); 1322 ver);
1243 if (ret < 0) 1323 if (ret < 0)
1244 goto done; 1324 goto done;
1245 1325
@@ -1262,12 +1342,11 @@ static int rbd_do_op(struct request *rq,
1262 struct rbd_req_coll *coll, 1342 struct rbd_req_coll *coll,
1263 int coll_index) 1343 int coll_index)
1264{ 1344{
1265 char *seg_name; 1345 const char *seg_name;
1266 u64 seg_ofs; 1346 u64 seg_ofs;
1267 u64 seg_len; 1347 u64 seg_len;
1268 int ret; 1348 int ret;
1269 struct ceph_osd_req_op *ops; 1349 struct ceph_osd_req_op *op;
1270 u32 payload_len;
1271 int opcode; 1350 int opcode;
1272 int flags; 1351 int flags;
1273 u64 snapid; 1352 u64 snapid;
@@ -1282,18 +1361,16 @@ static int rbd_do_op(struct request *rq,
1282 opcode = CEPH_OSD_OP_WRITE; 1361 opcode = CEPH_OSD_OP_WRITE;
1283 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK; 1362 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1284 snapid = CEPH_NOSNAP; 1363 snapid = CEPH_NOSNAP;
1285 payload_len = seg_len;
1286 } else { 1364 } else {
1287 opcode = CEPH_OSD_OP_READ; 1365 opcode = CEPH_OSD_OP_READ;
1288 flags = CEPH_OSD_FLAG_READ; 1366 flags = CEPH_OSD_FLAG_READ;
1289 snapc = NULL; 1367 rbd_assert(!snapc);
1290 snapid = rbd_dev->spec->snap_id; 1368 snapid = rbd_dev->spec->snap_id;
1291 payload_len = 0;
1292 } 1369 }
1293 1370
1294 ret = -ENOMEM; 1371 ret = -ENOMEM;
1295 ops = rbd_create_rw_ops(1, opcode, payload_len); 1372 op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len);
1296 if (!ops) 1373 if (!op)
1297 goto done; 1374 goto done;
1298 1375
1299 /* we've taken care of segment sizes earlier when we 1376 /* we've taken care of segment sizes earlier when we
@@ -1306,11 +1383,13 @@ static int rbd_do_op(struct request *rq,
1306 bio, 1383 bio,
1307 NULL, 0, 1384 NULL, 0,
1308 flags, 1385 flags,
1309 ops, 1386 op,
1310 coll, coll_index, 1387 coll, coll_index,
1311 rbd_req_cb, 0, NULL); 1388 rbd_req_cb, NULL);
1312 1389 if (ret < 0)
1313 rbd_destroy_ops(ops); 1390 rbd_coll_end_req_index(rq, coll, coll_index,
1391 (s32)ret, seg_len);
1392 rbd_osd_req_op_destroy(op);
1314done: 1393done:
1315 kfree(seg_name); 1394 kfree(seg_name);
1316 return ret; 1395 return ret;
@@ -1320,24 +1399,21 @@ done:
1320 * Request sync osd read 1399 * Request sync osd read
1321 */ 1400 */
1322static int rbd_req_sync_read(struct rbd_device *rbd_dev, 1401static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1323 u64 snapid,
1324 const char *object_name, 1402 const char *object_name,
1325 u64 ofs, u64 len, 1403 u64 ofs, u64 len,
1326 char *buf, 1404 char *buf,
1327 u64 *ver) 1405 u64 *ver)
1328{ 1406{
1329 struct ceph_osd_req_op *ops; 1407 struct ceph_osd_req_op *op;
1330 int ret; 1408 int ret;
1331 1409
1332 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0); 1410 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, ofs, len);
1333 if (!ops) 1411 if (!op)
1334 return -ENOMEM; 1412 return -ENOMEM;
1335 1413
1336 ret = rbd_req_sync_op(rbd_dev, NULL, 1414 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
1337 snapid, 1415 op, object_name, ofs, len, buf, ver);
1338 CEPH_OSD_FLAG_READ, 1416 rbd_osd_req_op_destroy(op);
1339 ops, object_name, ofs, len, buf, NULL, ver);
1340 rbd_destroy_ops(ops);
1341 1417
1342 return ret; 1418 return ret;
1343} 1419}
@@ -1349,26 +1425,23 @@ static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1349 u64 ver, 1425 u64 ver,
1350 u64 notify_id) 1426 u64 notify_id)
1351{ 1427{
1352 struct ceph_osd_req_op *ops; 1428 struct ceph_osd_req_op *op;
1353 int ret; 1429 int ret;
1354 1430
1355 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0); 1431 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1356 if (!ops) 1432 if (!op)
1357 return -ENOMEM; 1433 return -ENOMEM;
1358 1434
1359 ops[0].watch.ver = cpu_to_le64(ver);
1360 ops[0].watch.cookie = notify_id;
1361 ops[0].watch.flag = 0;
1362
1363 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP, 1435 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1364 rbd_dev->header_name, 0, 0, NULL, 1436 rbd_dev->header_name, 0, 0, NULL,
1365 NULL, 0, 1437 NULL, 0,
1366 CEPH_OSD_FLAG_READ, 1438 CEPH_OSD_FLAG_READ,
1367 ops, 1439 op,
1368 NULL, 0, 1440 NULL, 0,
1369 rbd_simple_req_cb, 0, NULL); 1441 rbd_simple_req_cb, NULL);
1442
1443 rbd_osd_req_op_destroy(op);
1370 1444
1371 rbd_destroy_ops(ops);
1372 return ret; 1445 return ret;
1373} 1446}
1374 1447
@@ -1386,83 +1459,51 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1386 (unsigned int) opcode); 1459 (unsigned int) opcode);
1387 rc = rbd_dev_refresh(rbd_dev, &hver); 1460 rc = rbd_dev_refresh(rbd_dev, &hver);
1388 if (rc) 1461 if (rc)
1389 pr_warning(RBD_DRV_NAME "%d got notification but failed to " 1462 rbd_warn(rbd_dev, "got notification but failed to "
1390 " update snaps: %d\n", rbd_dev->major, rc); 1463 " update snaps: %d\n", rc);
1391 1464
1392 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id); 1465 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1393} 1466}
1394 1467
1395/* 1468/*
1396 * Request sync osd watch 1469 * Request sync osd watch/unwatch. The value of "start" determines
1470 * whether a watch request is being initiated or torn down.
1397 */ 1471 */
1398static int rbd_req_sync_watch(struct rbd_device *rbd_dev) 1472static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
1399{ 1473{
1400 struct ceph_osd_req_op *ops; 1474 struct ceph_osd_req_op *op;
1401 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1475 int ret = 0;
1402 int ret;
1403 1476
1404 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0); 1477 rbd_assert(start ^ !!rbd_dev->watch_event);
1405 if (!ops) 1478 rbd_assert(start ^ !!rbd_dev->watch_request);
1406 return -ENOMEM;
1407 1479
1408 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, 1480 if (start) {
1409 (void *)rbd_dev, &rbd_dev->watch_event); 1481 struct ceph_osd_client *osdc;
1410 if (ret < 0)
1411 goto fail;
1412 1482
1413 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version); 1483 osdc = &rbd_dev->rbd_client->client->osdc;
1414 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie); 1484 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1415 ops[0].watch.flag = 1; 1485 &rbd_dev->watch_event);
1486 if (ret < 0)
1487 return ret;
1488 }
1416 1489
1417 ret = rbd_req_sync_op(rbd_dev, NULL, 1490 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1418 CEPH_NOSNAP, 1491 rbd_dev->watch_event->cookie,
1492 rbd_dev->header.obj_version, start);
1493 if (op)
1494 ret = rbd_req_sync_op(rbd_dev,
1419 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1495 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1420 ops, 1496 op, rbd_dev->header_name,
1421 rbd_dev->header_name, 1497 0, 0, NULL, NULL);
1422 0, 0, NULL,
1423 &rbd_dev->watch_request, NULL);
1424 1498
1425 if (ret < 0) 1499 /* Cancel the event if we're tearing down, or on error */
1426 goto fail_event;
1427
1428 rbd_destroy_ops(ops);
1429 return 0;
1430
1431fail_event:
1432 ceph_osdc_cancel_event(rbd_dev->watch_event);
1433 rbd_dev->watch_event = NULL;
1434fail:
1435 rbd_destroy_ops(ops);
1436 return ret;
1437}
1438
1439/*
1440 * Request sync osd unwatch
1441 */
1442static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1443{
1444 struct ceph_osd_req_op *ops;
1445 int ret;
1446
1447 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1448 if (!ops)
1449 return -ENOMEM;
1450
1451 ops[0].watch.ver = 0;
1452 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1453 ops[0].watch.flag = 0;
1454
1455 ret = rbd_req_sync_op(rbd_dev, NULL,
1456 CEPH_NOSNAP,
1457 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1458 ops,
1459 rbd_dev->header_name,
1460 0, 0, NULL, NULL, NULL);
1461 1500
1501 if (!start || !op || ret < 0) {
1502 ceph_osdc_cancel_event(rbd_dev->watch_event);
1503 rbd_dev->watch_event = NULL;
1504 }
1505 rbd_osd_req_op_destroy(op);
1462 1506
1463 rbd_destroy_ops(ops);
1464 ceph_osdc_cancel_event(rbd_dev->watch_event);
1465 rbd_dev->watch_event = NULL;
1466 return ret; 1507 return ret;
1467} 1508}
1468 1509
@@ -1477,13 +1518,9 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1477 size_t outbound_size, 1518 size_t outbound_size,
1478 char *inbound, 1519 char *inbound,
1479 size_t inbound_size, 1520 size_t inbound_size,
1480 int flags,
1481 u64 *ver) 1521 u64 *ver)
1482{ 1522{
1483 struct ceph_osd_req_op *ops; 1523 struct ceph_osd_req_op *op;
1484 int class_name_len = strlen(class_name);
1485 int method_name_len = strlen(method_name);
1486 int payload_size;
1487 int ret; 1524 int ret;
1488 1525
1489 /* 1526 /*
@@ -1494,26 +1531,16 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1494 * the perspective of the server side) in the OSD request 1531 * the perspective of the server side) in the OSD request
1495 * operation. 1532 * operation.
1496 */ 1533 */
1497 payload_size = class_name_len + method_name_len + outbound_size; 1534 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1498 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size); 1535 method_name, outbound, outbound_size);
1499 if (!ops) 1536 if (!op)
1500 return -ENOMEM; 1537 return -ENOMEM;
1501 1538
1502 ops[0].cls.class_name = class_name; 1539 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
1503 ops[0].cls.class_len = (__u8) class_name_len;
1504 ops[0].cls.method_name = method_name;
1505 ops[0].cls.method_len = (__u8) method_name_len;
1506 ops[0].cls.argc = 0;
1507 ops[0].cls.indata = outbound;
1508 ops[0].cls.indata_len = outbound_size;
1509
1510 ret = rbd_req_sync_op(rbd_dev, NULL,
1511 CEPH_NOSNAP,
1512 flags, ops,
1513 object_name, 0, inbound_size, inbound, 1540 object_name, 0, inbound_size, inbound,
1514 NULL, ver); 1541 ver);
1515 1542
1516 rbd_destroy_ops(ops); 1543 rbd_osd_req_op_destroy(op);
1517 1544
1518 dout("cls_exec returned %d\n", ret); 1545 dout("cls_exec returned %d\n", ret);
1519 return ret; 1546 return ret;
@@ -1533,113 +1560,123 @@ static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1533 return coll; 1560 return coll;
1534} 1561}
1535 1562
1563static int rbd_dev_do_request(struct request *rq,
1564 struct rbd_device *rbd_dev,
1565 struct ceph_snap_context *snapc,
1566 u64 ofs, unsigned int size,
1567 struct bio *bio_chain)
1568{
1569 int num_segs;
1570 struct rbd_req_coll *coll;
1571 unsigned int bio_offset;
1572 int cur_seg = 0;
1573
1574 dout("%s 0x%x bytes at 0x%llx\n",
1575 rq_data_dir(rq) == WRITE ? "write" : "read",
1576 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1577
1578 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1579 if (num_segs <= 0)
1580 return num_segs;
1581
1582 coll = rbd_alloc_coll(num_segs);
1583 if (!coll)
1584 return -ENOMEM;
1585
1586 bio_offset = 0;
1587 do {
1588 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1589 unsigned int clone_size;
1590 struct bio *bio_clone;
1591
1592 BUG_ON(limit > (u64)UINT_MAX);
1593 clone_size = (unsigned int)limit;
1594 dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
1595
1596 kref_get(&coll->kref);
1597
1598 /* Pass a cloned bio chain via an osd request */
1599
1600 bio_clone = bio_chain_clone_range(&bio_chain,
1601 &bio_offset, clone_size,
1602 GFP_ATOMIC);
1603 if (bio_clone)
1604 (void)rbd_do_op(rq, rbd_dev, snapc,
1605 ofs, clone_size,
1606 bio_clone, coll, cur_seg);
1607 else
1608 rbd_coll_end_req_index(rq, coll, cur_seg,
1609 (s32)-ENOMEM,
1610 clone_size);
1611 size -= clone_size;
1612 ofs += clone_size;
1613
1614 cur_seg++;
1615 } while (size > 0);
1616 kref_put(&coll->kref, rbd_coll_release);
1617
1618 return 0;
1619}
1620
1536/* 1621/*
1537 * block device queue callback 1622 * block device queue callback
1538 */ 1623 */
1539static void rbd_rq_fn(struct request_queue *q) 1624static void rbd_rq_fn(struct request_queue *q)
1540{ 1625{
1541 struct rbd_device *rbd_dev = q->queuedata; 1626 struct rbd_device *rbd_dev = q->queuedata;
1627 bool read_only = rbd_dev->mapping.read_only;
1542 struct request *rq; 1628 struct request *rq;
1543 1629
1544 while ((rq = blk_fetch_request(q))) { 1630 while ((rq = blk_fetch_request(q))) {
1545 struct bio *bio; 1631 struct ceph_snap_context *snapc = NULL;
1546 bool do_write; 1632 unsigned int size = 0;
1547 unsigned int size; 1633 int result;
1548 u64 ofs;
1549 int num_segs, cur_seg = 0;
1550 struct rbd_req_coll *coll;
1551 struct ceph_snap_context *snapc;
1552 unsigned int bio_offset;
1553 1634
1554 dout("fetched request\n"); 1635 dout("fetched request\n");
1555 1636
1556 /* filter out block requests we don't understand */ 1637 /* Filter out block requests we don't understand */
1638
1557 if ((rq->cmd_type != REQ_TYPE_FS)) { 1639 if ((rq->cmd_type != REQ_TYPE_FS)) {
1558 __blk_end_request_all(rq, 0); 1640 __blk_end_request_all(rq, 0);
1559 continue; 1641 continue;
1560 } 1642 }
1561
1562 /* deduce our operation (read, write) */
1563 do_write = (rq_data_dir(rq) == WRITE);
1564 if (do_write && rbd_dev->mapping.read_only) {
1565 __blk_end_request_all(rq, -EROFS);
1566 continue;
1567 }
1568
1569 spin_unlock_irq(q->queue_lock); 1643 spin_unlock_irq(q->queue_lock);
1570 1644
1571 down_read(&rbd_dev->header_rwsem); 1645 /* Write requests need a reference to the snapshot context */
1572 1646
1573 if (!rbd_dev->exists) { 1647 if (rq_data_dir(rq) == WRITE) {
1574 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 1648 result = -EROFS;
1649 if (read_only) /* Can't write to a read-only device */
1650 goto out_end_request;
1651
1652 /*
1653 * Note that each osd request will take its
1654 * own reference to the snapshot context
1655 * supplied. The reference we take here
1656 * just guarantees the one we provide stays
1657 * valid.
1658 */
1659 down_read(&rbd_dev->header_rwsem);
1660 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1575 up_read(&rbd_dev->header_rwsem); 1661 up_read(&rbd_dev->header_rwsem);
1662 rbd_assert(snapc != NULL);
1663 } else if (!atomic_read(&rbd_dev->exists)) {
1664 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1576 dout("request for non-existent snapshot"); 1665 dout("request for non-existent snapshot");
1577 spin_lock_irq(q->queue_lock); 1666 result = -ENXIO;
1578 __blk_end_request_all(rq, -ENXIO); 1667 goto out_end_request;
1579 continue;
1580 } 1668 }
1581 1669
1582 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1583
1584 up_read(&rbd_dev->header_rwsem);
1585
1586 size = blk_rq_bytes(rq); 1670 size = blk_rq_bytes(rq);
1587 ofs = blk_rq_pos(rq) * SECTOR_SIZE; 1671 result = rbd_dev_do_request(rq, rbd_dev, snapc,
1588 bio = rq->bio; 1672 blk_rq_pos(rq) * SECTOR_SIZE,
1589 1673 size, rq->bio);
1590 dout("%s 0x%x bytes at 0x%llx\n", 1674out_end_request:
1591 do_write ? "write" : "read", 1675 if (snapc)
1592 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1593
1594 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1595 if (num_segs <= 0) {
1596 spin_lock_irq(q->queue_lock);
1597 __blk_end_request_all(rq, num_segs);
1598 ceph_put_snap_context(snapc); 1676 ceph_put_snap_context(snapc);
1599 continue;
1600 }
1601 coll = rbd_alloc_coll(num_segs);
1602 if (!coll) {
1603 spin_lock_irq(q->queue_lock);
1604 __blk_end_request_all(rq, -ENOMEM);
1605 ceph_put_snap_context(snapc);
1606 continue;
1607 }
1608
1609 bio_offset = 0;
1610 do {
1611 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1612 unsigned int chain_size;
1613 struct bio *bio_chain;
1614
1615 BUG_ON(limit > (u64) UINT_MAX);
1616 chain_size = (unsigned int) limit;
1617 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1618
1619 kref_get(&coll->kref);
1620
1621 /* Pass a cloned bio chain via an osd request */
1622
1623 bio_chain = bio_chain_clone_range(&bio,
1624 &bio_offset, chain_size,
1625 GFP_ATOMIC);
1626 if (bio_chain)
1627 (void) rbd_do_op(rq, rbd_dev, snapc,
1628 ofs, chain_size,
1629 bio_chain, coll, cur_seg);
1630 else
1631 rbd_coll_end_req_index(rq, coll, cur_seg,
1632 -ENOMEM, chain_size);
1633 size -= chain_size;
1634 ofs += chain_size;
1635
1636 cur_seg++;
1637 } while (size > 0);
1638 kref_put(&coll->kref, rbd_coll_release);
1639
1640 spin_lock_irq(q->queue_lock); 1677 spin_lock_irq(q->queue_lock);
1641 1678 if (!size || result < 0)
1642 ceph_put_snap_context(snapc); 1679 __blk_end_request_all(rq, result);
1643 } 1680 }
1644} 1681}
1645 1682
@@ -1741,8 +1778,7 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1741 if (!ondisk) 1778 if (!ondisk)
1742 return ERR_PTR(-ENOMEM); 1779 return ERR_PTR(-ENOMEM);
1743 1780
1744 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP, 1781 ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
1745 rbd_dev->header_name,
1746 0, size, 1782 0, size,
1747 (char *) ondisk, version); 1783 (char *) ondisk, version);
1748 1784
@@ -1750,15 +1786,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1750 goto out_err; 1786 goto out_err;
1751 if (WARN_ON((size_t) ret < size)) { 1787 if (WARN_ON((size_t) ret < size)) {
1752 ret = -ENXIO; 1788 ret = -ENXIO;
1753 pr_warning("short header read for image %s" 1789 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
1754 " (want %zd got %d)\n", 1790 size, ret);
1755 rbd_dev->spec->image_name, size, ret);
1756 goto out_err; 1791 goto out_err;
1757 } 1792 }
1758 if (!rbd_dev_ondisk_valid(ondisk)) { 1793 if (!rbd_dev_ondisk_valid(ondisk)) {
1759 ret = -ENXIO; 1794 ret = -ENXIO;
1760 pr_warning("invalid header for image %s\n", 1795 rbd_warn(rbd_dev, "invalid header");
1761 rbd_dev->spec->image_name);
1762 goto out_err; 1796 goto out_err;
1763 } 1797 }
1764 1798
@@ -2243,6 +2277,7 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2243 return NULL; 2277 return NULL;
2244 2278
2245 spin_lock_init(&rbd_dev->lock); 2279 spin_lock_init(&rbd_dev->lock);
2280 atomic_set(&rbd_dev->exists, 0);
2246 INIT_LIST_HEAD(&rbd_dev->node); 2281 INIT_LIST_HEAD(&rbd_dev->node);
2247 INIT_LIST_HEAD(&rbd_dev->snaps); 2282 INIT_LIST_HEAD(&rbd_dev->snaps);
2248 init_rwsem(&rbd_dev->header_rwsem); 2283 init_rwsem(&rbd_dev->header_rwsem);
@@ -2250,6 +2285,13 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2250 rbd_dev->spec = spec; 2285 rbd_dev->spec = spec;
2251 rbd_dev->rbd_client = rbdc; 2286 rbd_dev->rbd_client = rbdc;
2252 2287
2288 /* Initialize the layout used for all rbd requests */
2289
2290 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2291 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2292 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2293 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2294
2253 return rbd_dev; 2295 return rbd_dev;
2254} 2296}
2255 2297
@@ -2363,8 +2405,7 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2363 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2405 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2364 "rbd", "get_size", 2406 "rbd", "get_size",
2365 (char *) &snapid, sizeof (snapid), 2407 (char *) &snapid, sizeof (snapid),
2366 (char *) &size_buf, sizeof (size_buf), 2408 (char *) &size_buf, sizeof (size_buf), NULL);
2367 CEPH_OSD_FLAG_READ, NULL);
2368 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2409 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2369 if (ret < 0) 2410 if (ret < 0)
2370 return ret; 2411 return ret;
@@ -2399,8 +2440,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2399 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2440 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2400 "rbd", "get_object_prefix", 2441 "rbd", "get_object_prefix",
2401 NULL, 0, 2442 NULL, 0,
2402 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, 2443 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2403 CEPH_OSD_FLAG_READ, NULL);
2404 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2444 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2405 if (ret < 0) 2445 if (ret < 0)
2406 goto out; 2446 goto out;
@@ -2439,7 +2479,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2439 "rbd", "get_features", 2479 "rbd", "get_features",
2440 (char *) &snapid, sizeof (snapid), 2480 (char *) &snapid, sizeof (snapid),
2441 (char *) &features_buf, sizeof (features_buf), 2481 (char *) &features_buf, sizeof (features_buf),
2442 CEPH_OSD_FLAG_READ, NULL); 2482 NULL);
2443 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2483 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2444 if (ret < 0) 2484 if (ret < 0)
2445 return ret; 2485 return ret;
@@ -2474,7 +2514,6 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2474 void *end; 2514 void *end;
2475 char *image_id; 2515 char *image_id;
2476 u64 overlap; 2516 u64 overlap;
2477 size_t len = 0;
2478 int ret; 2517 int ret;
2479 2518
2480 parent_spec = rbd_spec_alloc(); 2519 parent_spec = rbd_spec_alloc();
@@ -2495,8 +2534,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2495 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2534 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2496 "rbd", "get_parent", 2535 "rbd", "get_parent",
2497 (char *) &snapid, sizeof (snapid), 2536 (char *) &snapid, sizeof (snapid),
2498 (char *) reply_buf, size, 2537 (char *) reply_buf, size, NULL);
2499 CEPH_OSD_FLAG_READ, NULL);
2500 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2538 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2501 if (ret < 0) 2539 if (ret < 0)
2502 goto out_err; 2540 goto out_err;
@@ -2508,13 +2546,18 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2508 if (parent_spec->pool_id == CEPH_NOPOOL) 2546 if (parent_spec->pool_id == CEPH_NOPOOL)
2509 goto out; /* No parent? No problem. */ 2547 goto out; /* No parent? No problem. */
2510 2548
2511 image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 2549 /* The ceph file layout needs to fit pool id in 32 bits */
2550
2551 ret = -EIO;
2552 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2553 goto out;
2554
2555 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2512 if (IS_ERR(image_id)) { 2556 if (IS_ERR(image_id)) {
2513 ret = PTR_ERR(image_id); 2557 ret = PTR_ERR(image_id);
2514 goto out_err; 2558 goto out_err;
2515 } 2559 }
2516 parent_spec->image_id = image_id; 2560 parent_spec->image_id = image_id;
2517 parent_spec->image_id_len = len;
2518 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); 2561 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2519 ceph_decode_64_safe(&p, end, overlap, out_err); 2562 ceph_decode_64_safe(&p, end, overlap, out_err);
2520 2563
@@ -2544,15 +2587,15 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2544 2587
2545 rbd_assert(!rbd_dev->spec->image_name); 2588 rbd_assert(!rbd_dev->spec->image_name);
2546 2589
2547 image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len; 2590 len = strlen(rbd_dev->spec->image_id);
2591 image_id_size = sizeof (__le32) + len;
2548 image_id = kmalloc(image_id_size, GFP_KERNEL); 2592 image_id = kmalloc(image_id_size, GFP_KERNEL);
2549 if (!image_id) 2593 if (!image_id)
2550 return NULL; 2594 return NULL;
2551 2595
2552 p = image_id; 2596 p = image_id;
2553 end = (char *) image_id + image_id_size; 2597 end = (char *) image_id + image_id_size;
2554 ceph_encode_string(&p, end, rbd_dev->spec->image_id, 2598 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2555 (u32) rbd_dev->spec->image_id_len);
2556 2599
2557 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 2600 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2558 reply_buf = kmalloc(size, GFP_KERNEL); 2601 reply_buf = kmalloc(size, GFP_KERNEL);
@@ -2562,8 +2605,7 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2562 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY, 2605 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2563 "rbd", "dir_get_name", 2606 "rbd", "dir_get_name",
2564 image_id, image_id_size, 2607 image_id, image_id_size,
2565 (char *) reply_buf, size, 2608 (char *) reply_buf, size, NULL);
2566 CEPH_OSD_FLAG_READ, NULL);
2567 if (ret < 0) 2609 if (ret < 0)
2568 goto out; 2610 goto out;
2569 p = reply_buf; 2611 p = reply_buf;
@@ -2602,8 +2644,11 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2602 2644
2603 osdc = &rbd_dev->rbd_client->client->osdc; 2645 osdc = &rbd_dev->rbd_client->client->osdc;
2604 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); 2646 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2605 if (!name) 2647 if (!name) {
2606 return -EIO; /* pool id too large (>= 2^31) */ 2648 rbd_warn(rbd_dev, "there is no pool with id %llu",
2649 rbd_dev->spec->pool_id); /* Really a BUG() */
2650 return -EIO;
2651 }
2607 2652
2608 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); 2653 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2609 if (!rbd_dev->spec->pool_name) 2654 if (!rbd_dev->spec->pool_name)
@@ -2612,19 +2657,17 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2612 /* Fetch the image name; tolerate failure here */ 2657 /* Fetch the image name; tolerate failure here */
2613 2658
2614 name = rbd_dev_image_name(rbd_dev); 2659 name = rbd_dev_image_name(rbd_dev);
2615 if (name) { 2660 if (name)
2616 rbd_dev->spec->image_name_len = strlen(name);
2617 rbd_dev->spec->image_name = (char *) name; 2661 rbd_dev->spec->image_name = (char *) name;
2618 } else { 2662 else
2619 pr_warning(RBD_DRV_NAME "%d " 2663 rbd_warn(rbd_dev, "unable to get image name");
2620 "unable to get image name for image id %s\n",
2621 rbd_dev->major, rbd_dev->spec->image_id);
2622 }
2623 2664
2624 /* Look up the snapshot name. */ 2665 /* Look up the snapshot name. */
2625 2666
2626 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); 2667 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2627 if (!name) { 2668 if (!name) {
2669 rbd_warn(rbd_dev, "no snapshot with id %llu",
2670 rbd_dev->spec->snap_id); /* Really a BUG() */
2628 ret = -EIO; 2671 ret = -EIO;
2629 goto out_err; 2672 goto out_err;
2630 } 2673 }
@@ -2668,8 +2711,7 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2668 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2711 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2669 "rbd", "get_snapcontext", 2712 "rbd", "get_snapcontext",
2670 NULL, 0, 2713 NULL, 0,
2671 reply_buf, size, 2714 reply_buf, size, ver);
2672 CEPH_OSD_FLAG_READ, ver);
2673 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2715 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2674 if (ret < 0) 2716 if (ret < 0)
2675 goto out; 2717 goto out;
@@ -2738,8 +2780,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2738 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2780 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2739 "rbd", "get_snapshot_name", 2781 "rbd", "get_snapshot_name",
2740 (char *) &snap_id, sizeof (snap_id), 2782 (char *) &snap_id, sizeof (snap_id),
2741 reply_buf, size, 2783 reply_buf, size, NULL);
2742 CEPH_OSD_FLAG_READ, NULL);
2743 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2784 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2744 if (ret < 0) 2785 if (ret < 0)
2745 goto out; 2786 goto out;
@@ -2766,7 +2807,7 @@ out:
2766static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which, 2807static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2767 u64 *snap_size, u64 *snap_features) 2808 u64 *snap_size, u64 *snap_features)
2768{ 2809{
2769 __le64 snap_id; 2810 u64 snap_id;
2770 u8 order; 2811 u8 order;
2771 int ret; 2812 int ret;
2772 2813
@@ -2868,7 +2909,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2868 /* Existing snapshot not in the new snap context */ 2909 /* Existing snapshot not in the new snap context */
2869 2910
2870 if (rbd_dev->spec->snap_id == snap->id) 2911 if (rbd_dev->spec->snap_id == snap->id)
2871 rbd_dev->exists = false; 2912 atomic_set(&rbd_dev->exists, 0);
2872 rbd_remove_snap_dev(snap); 2913 rbd_remove_snap_dev(snap);
2873 dout("%ssnap id %llu has been removed\n", 2914 dout("%ssnap id %llu has been removed\n",
2874 rbd_dev->spec->snap_id == snap->id ? 2915 rbd_dev->spec->snap_id == snap->id ?
@@ -2983,22 +3024,6 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2983 device_unregister(&rbd_dev->dev); 3024 device_unregister(&rbd_dev->dev);
2984} 3025}
2985 3026
2986static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2987{
2988 int ret, rc;
2989
2990 do {
2991 ret = rbd_req_sync_watch(rbd_dev);
2992 if (ret == -ERANGE) {
2993 rc = rbd_dev_refresh(rbd_dev, NULL);
2994 if (rc < 0)
2995 return rc;
2996 }
2997 } while (ret == -ERANGE);
2998
2999 return ret;
3000}
3001
3002static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); 3027static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3003 3028
3004/* 3029/*
@@ -3138,11 +3163,9 @@ static inline char *dup_token(const char **buf, size_t *lenp)
3138 size_t len; 3163 size_t len;
3139 3164
3140 len = next_token(buf); 3165 len = next_token(buf);
3141 dup = kmalloc(len + 1, GFP_KERNEL); 3166 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3142 if (!dup) 3167 if (!dup)
3143 return NULL; 3168 return NULL;
3144
3145 memcpy(dup, *buf, len);
3146 *(dup + len) = '\0'; 3169 *(dup + len) = '\0';
3147 *buf += len; 3170 *buf += len;
3148 3171
@@ -3210,8 +3233,10 @@ static int rbd_add_parse_args(const char *buf,
3210 /* The first four tokens are required */ 3233 /* The first four tokens are required */
3211 3234
3212 len = next_token(&buf); 3235 len = next_token(&buf);
3213 if (!len) 3236 if (!len) {
3214 return -EINVAL; /* Missing monitor address(es) */ 3237 rbd_warn(NULL, "no monitor address(es) provided");
3238 return -EINVAL;
3239 }
3215 mon_addrs = buf; 3240 mon_addrs = buf;
3216 mon_addrs_size = len + 1; 3241 mon_addrs_size = len + 1;
3217 buf += len; 3242 buf += len;
@@ -3220,8 +3245,10 @@ static int rbd_add_parse_args(const char *buf,
3220 options = dup_token(&buf, NULL); 3245 options = dup_token(&buf, NULL);
3221 if (!options) 3246 if (!options)
3222 return -ENOMEM; 3247 return -ENOMEM;
3223 if (!*options) 3248 if (!*options) {
3224 goto out_err; /* Missing options */ 3249 rbd_warn(NULL, "no options provided");
3250 goto out_err;
3251 }
3225 3252
3226 spec = rbd_spec_alloc(); 3253 spec = rbd_spec_alloc();
3227 if (!spec) 3254 if (!spec)
@@ -3230,14 +3257,18 @@ static int rbd_add_parse_args(const char *buf,
3230 spec->pool_name = dup_token(&buf, NULL); 3257 spec->pool_name = dup_token(&buf, NULL);
3231 if (!spec->pool_name) 3258 if (!spec->pool_name)
3232 goto out_mem; 3259 goto out_mem;
3233 if (!*spec->pool_name) 3260 if (!*spec->pool_name) {
3234 goto out_err; /* Missing pool name */ 3261 rbd_warn(NULL, "no pool name provided");
3262 goto out_err;
3263 }
3235 3264
3236 spec->image_name = dup_token(&buf, &spec->image_name_len); 3265 spec->image_name = dup_token(&buf, NULL);
3237 if (!spec->image_name) 3266 if (!spec->image_name)
3238 goto out_mem; 3267 goto out_mem;
3239 if (!*spec->image_name) 3268 if (!*spec->image_name) {
3240 goto out_err; /* Missing image name */ 3269 rbd_warn(NULL, "no image name provided");
3270 goto out_err;
3271 }
3241 3272
3242 /* 3273 /*
3243 * Snapshot name is optional; default is to use "-" 3274 * Snapshot name is optional; default is to use "-"
@@ -3251,10 +3282,9 @@ static int rbd_add_parse_args(const char *buf,
3251 ret = -ENAMETOOLONG; 3282 ret = -ENAMETOOLONG;
3252 goto out_err; 3283 goto out_err;
3253 } 3284 }
3254 spec->snap_name = kmalloc(len + 1, GFP_KERNEL); 3285 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3255 if (!spec->snap_name) 3286 if (!spec->snap_name)
3256 goto out_mem; 3287 goto out_mem;
3257 memcpy(spec->snap_name, buf, len);
3258 *(spec->snap_name + len) = '\0'; 3288 *(spec->snap_name + len) = '\0';
3259 3289
3260 /* Initialize all rbd options to the defaults */ 3290 /* Initialize all rbd options to the defaults */
@@ -3323,7 +3353,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3323 * First, see if the format 2 image id file exists, and if 3353 * First, see if the format 2 image id file exists, and if
3324 * so, get the image's persistent id from it. 3354 * so, get the image's persistent id from it.
3325 */ 3355 */
3326 size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len; 3356 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3327 object_name = kmalloc(size, GFP_NOIO); 3357 object_name = kmalloc(size, GFP_NOIO);
3328 if (!object_name) 3358 if (!object_name)
3329 return -ENOMEM; 3359 return -ENOMEM;
@@ -3342,8 +3372,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3342 ret = rbd_req_sync_exec(rbd_dev, object_name, 3372 ret = rbd_req_sync_exec(rbd_dev, object_name,
3343 "rbd", "get_id", 3373 "rbd", "get_id",
3344 NULL, 0, 3374 NULL, 0,
3345 response, RBD_IMAGE_ID_LEN_MAX, 3375 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3346 CEPH_OSD_FLAG_READ, NULL);
3347 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 3376 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3348 if (ret < 0) 3377 if (ret < 0)
3349 goto out; 3378 goto out;
@@ -3352,8 +3381,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3352 p = response; 3381 p = response;
3353 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p, 3382 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3354 p + RBD_IMAGE_ID_LEN_MAX, 3383 p + RBD_IMAGE_ID_LEN_MAX,
3355 &rbd_dev->spec->image_id_len, 3384 NULL, GFP_NOIO);
3356 GFP_NOIO);
3357 if (IS_ERR(rbd_dev->spec->image_id)) { 3385 if (IS_ERR(rbd_dev->spec->image_id)) {
3358 ret = PTR_ERR(rbd_dev->spec->image_id); 3386 ret = PTR_ERR(rbd_dev->spec->image_id);
3359 rbd_dev->spec->image_id = NULL; 3387 rbd_dev->spec->image_id = NULL;
@@ -3377,11 +3405,10 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3377 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL); 3405 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3378 if (!rbd_dev->spec->image_id) 3406 if (!rbd_dev->spec->image_id)
3379 return -ENOMEM; 3407 return -ENOMEM;
3380 rbd_dev->spec->image_id_len = 0;
3381 3408
3382 /* Record the header object name for this rbd image. */ 3409 /* Record the header object name for this rbd image. */
3383 3410
3384 size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX); 3411 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3385 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3412 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3386 if (!rbd_dev->header_name) { 3413 if (!rbd_dev->header_name) {
3387 ret = -ENOMEM; 3414 ret = -ENOMEM;
@@ -3427,7 +3454,7 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3427 * Image id was filled in by the caller. Record the header 3454 * Image id was filled in by the caller. Record the header
3428 * object name for this rbd image. 3455 * object name for this rbd image.
3429 */ 3456 */
3430 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len; 3457 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3431 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3458 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3432 if (!rbd_dev->header_name) 3459 if (!rbd_dev->header_name)
3433 return -ENOMEM; 3460 return -ENOMEM;
@@ -3542,7 +3569,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3542 if (ret) 3569 if (ret)
3543 goto err_out_bus; 3570 goto err_out_bus;
3544 3571
3545 ret = rbd_init_watch_dev(rbd_dev); 3572 ret = rbd_req_sync_watch(rbd_dev, 1);
3546 if (ret) 3573 if (ret)
3547 goto err_out_bus; 3574 goto err_out_bus;
3548 3575
@@ -3638,6 +3665,13 @@ static ssize_t rbd_add(struct bus_type *bus,
3638 goto err_out_client; 3665 goto err_out_client;
3639 spec->pool_id = (u64) rc; 3666 spec->pool_id = (u64) rc;
3640 3667
3668 /* The ceph file layout needs to fit pool id in 32 bits */
3669
3670 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
3671 rc = -EIO;
3672 goto err_out_client;
3673 }
3674
3641 rbd_dev = rbd_dev_create(rbdc, spec); 3675 rbd_dev = rbd_dev_create(rbdc, spec);
3642 if (!rbd_dev) 3676 if (!rbd_dev)
3643 goto err_out_client; 3677 goto err_out_client;
@@ -3698,8 +3732,7 @@ static void rbd_dev_release(struct device *dev)
3698 rbd_dev->watch_request); 3732 rbd_dev->watch_request);
3699 } 3733 }
3700 if (rbd_dev->watch_event) 3734 if (rbd_dev->watch_event)
3701 rbd_req_sync_unwatch(rbd_dev); 3735 rbd_req_sync_watch(rbd_dev, 0);
3702
3703 3736
3704 /* clean up and free blkdev */ 3737 /* clean up and free blkdev */
3705 rbd_free_disk(rbd_dev); 3738 rbd_free_disk(rbd_dev);
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index a1d9bb30c1bf..1e1e02055a2b 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -611,8 +611,16 @@ retry:
611 611
612 if (flags & CEPH_CAP_FLAG_AUTH) 612 if (flags & CEPH_CAP_FLAG_AUTH)
613 ci->i_auth_cap = cap; 613 ci->i_auth_cap = cap;
614 else if (ci->i_auth_cap == cap) 614 else if (ci->i_auth_cap == cap) {
615 ci->i_auth_cap = NULL; 615 ci->i_auth_cap = NULL;
616 spin_lock(&mdsc->cap_dirty_lock);
617 if (!list_empty(&ci->i_dirty_item)) {
618 dout(" moving %p to cap_dirty_migrating\n", inode);
619 list_move(&ci->i_dirty_item,
620 &mdsc->cap_dirty_migrating);
621 }
622 spin_unlock(&mdsc->cap_dirty_lock);
623 }
616 624
617 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", 625 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
618 inode, ceph_vinop(inode), cap, ceph_cap_string(issued), 626 inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
@@ -1460,7 +1468,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
1460 struct ceph_mds_client *mdsc = fsc->mdsc; 1468 struct ceph_mds_client *mdsc = fsc->mdsc;
1461 struct inode *inode = &ci->vfs_inode; 1469 struct inode *inode = &ci->vfs_inode;
1462 struct ceph_cap *cap; 1470 struct ceph_cap *cap;
1463 int file_wanted, used; 1471 int file_wanted, used, cap_used;
1464 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ 1472 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
1465 int issued, implemented, want, retain, revoking, flushing = 0; 1473 int issued, implemented, want, retain, revoking, flushing = 0;
1466 int mds = -1; /* keep track of how far we've gone through i_caps list 1474 int mds = -1; /* keep track of how far we've gone through i_caps list
@@ -1563,9 +1571,14 @@ retry_locked:
1563 1571
1564 /* NOTE: no side-effects allowed, until we take s_mutex */ 1572 /* NOTE: no side-effects allowed, until we take s_mutex */
1565 1573
1574 cap_used = used;
1575 if (ci->i_auth_cap && cap != ci->i_auth_cap)
1576 cap_used &= ~ci->i_auth_cap->issued;
1577
1566 revoking = cap->implemented & ~cap->issued; 1578 revoking = cap->implemented & ~cap->issued;
1567 dout(" mds%d cap %p issued %s implemented %s revoking %s\n", 1579 dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
1568 cap->mds, cap, ceph_cap_string(cap->issued), 1580 cap->mds, cap, ceph_cap_string(cap->issued),
1581 ceph_cap_string(cap_used),
1569 ceph_cap_string(cap->implemented), 1582 ceph_cap_string(cap->implemented),
1570 ceph_cap_string(revoking)); 1583 ceph_cap_string(revoking));
1571 1584
@@ -1593,7 +1606,7 @@ retry_locked:
1593 } 1606 }
1594 1607
1595 /* completed revocation? going down and there are no caps? */ 1608 /* completed revocation? going down and there are no caps? */
1596 if (revoking && (revoking & used) == 0) { 1609 if (revoking && (revoking & cap_used) == 0) {
1597 dout("completed revocation of %s\n", 1610 dout("completed revocation of %s\n",
1598 ceph_cap_string(cap->implemented & ~cap->issued)); 1611 ceph_cap_string(cap->implemented & ~cap->issued));
1599 goto ack; 1612 goto ack;
@@ -1670,8 +1683,8 @@ ack:
1670 sent++; 1683 sent++;
1671 1684
1672 /* __send_cap drops i_ceph_lock */ 1685 /* __send_cap drops i_ceph_lock */
1673 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want, 1686 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
1674 retain, flushing, NULL); 1687 want, retain, flushing, NULL);
1675 goto retry; /* retake i_ceph_lock and restart our cap scan. */ 1688 goto retry; /* retake i_ceph_lock and restart our cap scan. */
1676 } 1689 }
1677 1690
@@ -2416,7 +2429,9 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2416 dout("mds wanted %s -> %s\n", 2429 dout("mds wanted %s -> %s\n",
2417 ceph_cap_string(le32_to_cpu(grant->wanted)), 2430 ceph_cap_string(le32_to_cpu(grant->wanted)),
2418 ceph_cap_string(wanted)); 2431 ceph_cap_string(wanted));
2419 grant->wanted = cpu_to_le32(wanted); 2432 /* imported cap may not have correct mds_wanted */
2433 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT)
2434 check_caps = 1;
2420 } 2435 }
2421 2436
2422 cap->seq = seq; 2437 cap->seq = seq;
@@ -2820,6 +2835,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2820 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 2835 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
2821 (unsigned)seq); 2836 (unsigned)seq);
2822 2837
2838 if (op == CEPH_CAP_OP_IMPORT)
2839 ceph_add_cap_releases(mdsc, session);
2840
2823 /* lookup ino */ 2841 /* lookup ino */
2824 inode = ceph_find_inode(sb, vino); 2842 inode = ceph_find_inode(sb, vino);
2825 ci = ceph_inode(inode); 2843 ci = ceph_inode(inode);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index e51558fca3a3..a1e5b81e8118 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -243,6 +243,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
243 err = ceph_mdsc_do_request(mdsc, 243 err = ceph_mdsc_do_request(mdsc,
244 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 244 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
245 req); 245 req);
246 if (err)
247 goto out_err;
248
246 err = ceph_handle_snapdir(req, dentry, err); 249 err = ceph_handle_snapdir(req, dentry, err);
247 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 250 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
248 err = ceph_handle_notrace_create(dir, dentry); 251 err = ceph_handle_notrace_create(dir, dentry);
@@ -263,6 +266,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
263 err = finish_no_open(file, dn); 266 err = finish_no_open(file, dn);
264 } else { 267 } else {
265 dout("atomic_open finish_open on dn %p\n", dn); 268 dout("atomic_open finish_open on dn %p\n", dn);
269 if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
270 *opened |= FILE_CREATED;
271 }
266 err = finish_open(file, dentry, ceph_open, opened); 272 err = finish_open(file, dentry, ceph_open, opened);
267 } 273 }
268 274
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 36549a46e311..3b22150d3e19 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -194,7 +194,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
194 return -EFAULT; 194 return -EFAULT;
195 195
196 down_read(&osdc->map_sem); 196 down_read(&osdc->map_sem);
197 r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len, 197 r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
198 &dl.object_no, &dl.object_offset, 198 &dl.object_no, &dl.object_offset,
199 &olen); 199 &olen);
200 if (r < 0) 200 if (r < 0)
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 9165eb8309eb..d95842036c8b 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -233,6 +233,30 @@ bad:
233} 233}
234 234
235/* 235/*
236 * parse create results
237 */
238static int parse_reply_info_create(void **p, void *end,
239 struct ceph_mds_reply_info_parsed *info,
240 int features)
241{
242 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
243 if (*p == end) {
244 info->has_create_ino = false;
245 } else {
246 info->has_create_ino = true;
247 info->ino = ceph_decode_64(p);
248 }
249 }
250
251 if (unlikely(*p != end))
252 goto bad;
253 return 0;
254
255bad:
256 return -EIO;
257}
258
259/*
236 * parse extra results 260 * parse extra results
237 */ 261 */
238static int parse_reply_info_extra(void **p, void *end, 262static int parse_reply_info_extra(void **p, void *end,
@@ -241,8 +265,12 @@ static int parse_reply_info_extra(void **p, void *end,
241{ 265{
242 if (info->head->op == CEPH_MDS_OP_GETFILELOCK) 266 if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
243 return parse_reply_info_filelock(p, end, info, features); 267 return parse_reply_info_filelock(p, end, info, features);
244 else 268 else if (info->head->op == CEPH_MDS_OP_READDIR)
245 return parse_reply_info_dir(p, end, info, features); 269 return parse_reply_info_dir(p, end, info, features);
270 else if (info->head->op == CEPH_MDS_OP_CREATE)
271 return parse_reply_info_create(p, end, info, features);
272 else
273 return -EIO;
246} 274}
247 275
248/* 276/*
@@ -2170,7 +2198,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2170 mutex_lock(&req->r_fill_mutex); 2198 mutex_lock(&req->r_fill_mutex);
2171 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2199 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2172 if (err == 0) { 2200 if (err == 0) {
2173 if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK && 2201 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2202 req->r_op == CEPH_MDS_OP_LSSNAP) &&
2174 rinfo->dir_nr) 2203 rinfo->dir_nr)
2175 ceph_readdir_prepopulate(req, req->r_session); 2204 ceph_readdir_prepopulate(req, req->r_session);
2176 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2205 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index dd26846dd71d..567f7c60354e 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -74,6 +74,12 @@ struct ceph_mds_reply_info_parsed {
74 struct ceph_mds_reply_info_in *dir_in; 74 struct ceph_mds_reply_info_in *dir_in;
75 u8 dir_complete, dir_end; 75 u8 dir_complete, dir_end;
76 }; 76 };
77
78 /* for create results */
79 struct {
80 bool has_create_ino;
81 u64 ino;
82 };
77 }; 83 };
78 84
79 /* encoded blob describing snapshot contexts for certain 85 /* encoded blob describing snapshot contexts for certain
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index dad579b0c0e6..2160aab482f6 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -14,13 +14,19 @@
14#define CEPH_FEATURE_DIRLAYOUTHASH (1<<7) 14#define CEPH_FEATURE_DIRLAYOUTHASH (1<<7)
15/* bits 8-17 defined by user-space; not supported yet here */ 15/* bits 8-17 defined by user-space; not supported yet here */
16#define CEPH_FEATURE_CRUSH_TUNABLES (1<<18) 16#define CEPH_FEATURE_CRUSH_TUNABLES (1<<18)
17/* bits 19-24 defined by user-space; not supported yet here */
18#define CEPH_FEATURE_CRUSH_TUNABLES2 (1<<25)
19/* bit 26 defined by user-space; not supported yet here */
20#define CEPH_FEATURE_REPLY_CREATE_INODE (1<<27)
17 21
18/* 22/*
19 * Features supported. 23 * Features supported.
20 */ 24 */
21#define CEPH_FEATURES_SUPPORTED_DEFAULT \ 25#define CEPH_FEATURES_SUPPORTED_DEFAULT \
22 (CEPH_FEATURE_NOSRCADDR | \ 26 (CEPH_FEATURE_NOSRCADDR | \
23 CEPH_FEATURE_CRUSH_TUNABLES) 27 CEPH_FEATURE_CRUSH_TUNABLES | \
28 CEPH_FEATURE_CRUSH_TUNABLES2 | \
29 CEPH_FEATURE_REPLY_CREATE_INODE)
24 30
25#define CEPH_FEATURES_REQUIRED_DEFAULT \ 31#define CEPH_FEATURES_REQUIRED_DEFAULT \
26 (CEPH_FEATURE_NOSRCADDR) 32 (CEPH_FEATURE_NOSRCADDR)
diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
index 63d092822bad..360d9d08ca9e 100644
--- a/include/linux/ceph/decode.h
+++ b/include/linux/ceph/decode.h
@@ -52,10 +52,10 @@ static inline int ceph_has_room(void **p, void *end, size_t n)
52 return end >= *p && n <= end - *p; 52 return end >= *p && n <= end - *p;
53} 53}
54 54
55#define ceph_decode_need(p, end, n, bad) \ 55#define ceph_decode_need(p, end, n, bad) \
56 do { \ 56 do { \
57 if (!likely(ceph_has_room(p, end, n))) \ 57 if (!likely(ceph_has_room(p, end, n))) \
58 goto bad; \ 58 goto bad; \
59 } while (0) 59 } while (0)
60 60
61#define ceph_decode_64_safe(p, end, v, bad) \ 61#define ceph_decode_64_safe(p, end, v, bad) \
@@ -99,8 +99,8 @@ static inline int ceph_has_room(void **p, void *end, size_t n)
99 * 99 *
100 * There are two possible failures: 100 * There are two possible failures:
101 * - converting the string would require accessing memory at or 101 * - converting the string would require accessing memory at or
102 * beyond the "end" pointer provided (-E 102 * beyond the "end" pointer provided (-ERANGE)
103 * - memory could not be allocated for the result 103 * - memory could not be allocated for the result (-ENOMEM)
104 */ 104 */
105static inline char *ceph_extract_encoded_string(void **p, void *end, 105static inline char *ceph_extract_encoded_string(void **p, void *end,
106 size_t *lenp, gfp_t gfp) 106 size_t *lenp, gfp_t gfp)
@@ -217,10 +217,10 @@ static inline void ceph_encode_string(void **p, void *end,
217 *p += len; 217 *p += len;
218} 218}
219 219
220#define ceph_encode_need(p, end, n, bad) \ 220#define ceph_encode_need(p, end, n, bad) \
221 do { \ 221 do { \
222 if (!likely(ceph_has_room(p, end, n))) \ 222 if (!likely(ceph_has_room(p, end, n))) \
223 goto bad; \ 223 goto bad; \
224 } while (0) 224 } while (0)
225 225
226#define ceph_encode_64_safe(p, end, v, bad) \ 226#define ceph_encode_64_safe(p, end, v, bad) \
@@ -231,12 +231,17 @@ static inline void ceph_encode_string(void **p, void *end,
231#define ceph_encode_32_safe(p, end, v, bad) \ 231#define ceph_encode_32_safe(p, end, v, bad) \
232 do { \ 232 do { \
233 ceph_encode_need(p, end, sizeof(u32), bad); \ 233 ceph_encode_need(p, end, sizeof(u32), bad); \
234 ceph_encode_32(p, v); \ 234 ceph_encode_32(p, v); \
235 } while (0) 235 } while (0)
236#define ceph_encode_16_safe(p, end, v, bad) \ 236#define ceph_encode_16_safe(p, end, v, bad) \
237 do { \ 237 do { \
238 ceph_encode_need(p, end, sizeof(u16), bad); \ 238 ceph_encode_need(p, end, sizeof(u16), bad); \
239 ceph_encode_16(p, v); \ 239 ceph_encode_16(p, v); \
240 } while (0)
241#define ceph_encode_8_safe(p, end, v, bad) \
242 do { \
243 ceph_encode_need(p, end, sizeof(u8), bad); \
244 ceph_encode_8(p, v); \
240 } while (0) 245 } while (0)
241 246
242#define ceph_encode_copy_safe(p, end, pv, n, bad) \ 247#define ceph_encode_copy_safe(p, end, pv, n, bad) \
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index d9b880e977e6..69287ccfe68a 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -10,6 +10,7 @@
10#include <linux/ceph/osdmap.h> 10#include <linux/ceph/osdmap.h>
11#include <linux/ceph/messenger.h> 11#include <linux/ceph/messenger.h>
12#include <linux/ceph/auth.h> 12#include <linux/ceph/auth.h>
13#include <linux/ceph/pagelist.h>
13 14
14/* 15/*
15 * Maximum object name size 16 * Maximum object name size
@@ -22,7 +23,6 @@ struct ceph_snap_context;
22struct ceph_osd_request; 23struct ceph_osd_request;
23struct ceph_osd_client; 24struct ceph_osd_client;
24struct ceph_authorizer; 25struct ceph_authorizer;
25struct ceph_pagelist;
26 26
27/* 27/*
28 * completion callback for async writepages 28 * completion callback for async writepages
@@ -95,7 +95,7 @@ struct ceph_osd_request {
95 struct bio *r_bio; /* instead of pages */ 95 struct bio *r_bio; /* instead of pages */
96#endif 96#endif
97 97
98 struct ceph_pagelist *r_trail; /* trailing part of the data */ 98 struct ceph_pagelist r_trail; /* trailing part of the data */
99}; 99};
100 100
101struct ceph_osd_event { 101struct ceph_osd_event {
@@ -157,7 +157,6 @@ struct ceph_osd_client {
157 157
158struct ceph_osd_req_op { 158struct ceph_osd_req_op {
159 u16 op; /* CEPH_OSD_OP_* */ 159 u16 op; /* CEPH_OSD_OP_* */
160 u32 flags; /* CEPH_OSD_FLAG_* */
161 union { 160 union {
162 struct { 161 struct {
163 u64 offset, length; 162 u64 offset, length;
@@ -207,29 +206,24 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
207extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, 206extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
208 struct ceph_msg *msg); 207 struct ceph_msg *msg);
209 208
210extern int ceph_calc_raw_layout(struct ceph_osd_client *osdc, 209extern int ceph_calc_raw_layout(struct ceph_file_layout *layout,
211 struct ceph_file_layout *layout,
212 u64 snapid,
213 u64 off, u64 *plen, u64 *bno, 210 u64 off, u64 *plen, u64 *bno,
214 struct ceph_osd_request *req, 211 struct ceph_osd_request *req,
215 struct ceph_osd_req_op *op); 212 struct ceph_osd_req_op *op);
216 213
217extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 214extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
218 int flags,
219 struct ceph_snap_context *snapc, 215 struct ceph_snap_context *snapc,
220 struct ceph_osd_req_op *ops, 216 unsigned int num_op,
221 bool use_mempool, 217 bool use_mempool,
222 gfp_t gfp_flags, 218 gfp_t gfp_flags);
223 struct page **pages,
224 struct bio *bio);
225 219
226extern void ceph_osdc_build_request(struct ceph_osd_request *req, 220extern void ceph_osdc_build_request(struct ceph_osd_request *req,
227 u64 off, u64 *plen, 221 u64 off, u64 len,
222 unsigned int num_op,
228 struct ceph_osd_req_op *src_ops, 223 struct ceph_osd_req_op *src_ops,
229 struct ceph_snap_context *snapc, 224 struct ceph_snap_context *snapc,
230 struct timespec *mtime, 225 u64 snap_id,
231 const char *oid, 226 struct timespec *mtime);
232 int oid_len);
233 227
234extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, 228extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
235 struct ceph_file_layout *layout, 229 struct ceph_file_layout *layout,
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index 10a417f9f76f..c83a838f89f5 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -110,7 +110,7 @@ extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
110 110
111/* calculate mapping of a file extent to an object */ 111/* calculate mapping of a file extent to an object */
112extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, 112extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
113 u64 off, u64 *plen, 113 u64 off, u64 len,
114 u64 *bno, u64 *oxoff, u64 *oxlen); 114 u64 *bno, u64 *oxoff, u64 *oxlen);
115 115
116/* calculate mapping of object to a placement group */ 116/* calculate mapping of object to a placement group */
diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
index 25baa287cff7..6a1101f24cfb 100644
--- a/include/linux/crush/crush.h
+++ b/include/linux/crush/crush.h
@@ -162,6 +162,8 @@ struct crush_map {
162 __u32 choose_local_fallback_tries; 162 __u32 choose_local_fallback_tries;
163 /* choose attempts before giving up */ 163 /* choose attempts before giving up */
164 __u32 choose_total_tries; 164 __u32 choose_total_tries;
165 /* attempt chooseleaf inner descent once; on failure retry outer descent */
166 __u32 chooseleaf_descend_once;
165}; 167};
166 168
167 169
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 35fce755ce10..cbd06a91941c 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
287 * @outpos: our position in that vector 287 * @outpos: our position in that vector
288 * @firstn: true if choosing "first n" items, false if choosing "indep" 288 * @firstn: true if choosing "first n" items, false if choosing "indep"
289 * @recurse_to_leaf: true if we want one device under each item of given type 289 * @recurse_to_leaf: true if we want one device under each item of given type
290 * @descend_once: true if we should only try one descent before giving up
290 * @out2: second output vector for leaf items (if @recurse_to_leaf) 291 * @out2: second output vector for leaf items (if @recurse_to_leaf)
291 */ 292 */
292static int crush_choose(const struct crush_map *map, 293static int crush_choose(const struct crush_map *map,
@@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map,
295 int x, int numrep, int type, 296 int x, int numrep, int type,
296 int *out, int outpos, 297 int *out, int outpos,
297 int firstn, int recurse_to_leaf, 298 int firstn, int recurse_to_leaf,
298 int *out2) 299 int descend_once, int *out2)
299{ 300{
300 int rep; 301 int rep;
301 unsigned int ftotal, flocal; 302 unsigned int ftotal, flocal;
@@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map,
391 } 392 }
392 393
393 reject = 0; 394 reject = 0;
394 if (recurse_to_leaf) { 395 if (!collide && recurse_to_leaf) {
395 if (item < 0) { 396 if (item < 0) {
396 if (crush_choose(map, 397 if (crush_choose(map,
397 map->buckets[-1-item], 398 map->buckets[-1-item],
@@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map,
399 x, outpos+1, 0, 400 x, outpos+1, 0,
400 out2, outpos, 401 out2, outpos,
401 firstn, 0, 402 firstn, 0,
403 map->chooseleaf_descend_once,
402 NULL) <= outpos) 404 NULL) <= outpos)
403 /* didn't get leaf */ 405 /* didn't get leaf */
404 reject = 1; 406 reject = 1;
@@ -422,7 +424,10 @@ reject:
422 ftotal++; 424 ftotal++;
423 flocal++; 425 flocal++;
424 426
425 if (collide && flocal <= map->choose_local_tries) 427 if (reject && descend_once)
428 /* let outer call try again */
429 skip_rep = 1;
430 else if (collide && flocal <= map->choose_local_tries)
426 /* retry locally a few times */ 431 /* retry locally a few times */
427 retry_bucket = 1; 432 retry_bucket = 1;
428 else if (map->choose_local_fallback_tries > 0 && 433 else if (map->choose_local_fallback_tries > 0 &&
@@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map,
485 int i, j; 490 int i, j;
486 int numrep; 491 int numrep;
487 int firstn; 492 int firstn;
493 const int descend_once = 0;
488 494
489 if ((__u32)ruleno >= map->max_rules) { 495 if ((__u32)ruleno >= map->max_rules) {
490 dprintk(" bad ruleno %d\n", ruleno); 496 dprintk(" bad ruleno %d\n", ruleno);
@@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map,
544 curstep->arg2, 550 curstep->arg2,
545 o+osize, j, 551 o+osize, j,
546 firstn, 552 firstn,
547 recurse_to_leaf, c+osize); 553 recurse_to_leaf,
554 descend_once, c+osize);
548 } 555 }
549 556
550 if (recurse_to_leaf) 557 if (recurse_to_leaf)
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index eb9a44478764..500ae8b49321 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -32,52 +32,43 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
32static void __send_request(struct ceph_osd_client *osdc, 32static void __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req); 33 struct ceph_osd_request *req);
34 34
35static int op_needs_trail(int op)
36{
37 switch (op) {
38 case CEPH_OSD_OP_GETXATTR:
39 case CEPH_OSD_OP_SETXATTR:
40 case CEPH_OSD_OP_CMPXATTR:
41 case CEPH_OSD_OP_CALL:
42 case CEPH_OSD_OP_NOTIFY:
43 return 1;
44 default:
45 return 0;
46 }
47}
48
49static int op_has_extent(int op) 35static int op_has_extent(int op)
50{ 36{
51 return (op == CEPH_OSD_OP_READ || 37 return (op == CEPH_OSD_OP_READ ||
52 op == CEPH_OSD_OP_WRITE); 38 op == CEPH_OSD_OP_WRITE);
53} 39}
54 40
55int ceph_calc_raw_layout(struct ceph_osd_client *osdc, 41int ceph_calc_raw_layout(struct ceph_file_layout *layout,
56 struct ceph_file_layout *layout,
57 u64 snapid,
58 u64 off, u64 *plen, u64 *bno, 42 u64 off, u64 *plen, u64 *bno,
59 struct ceph_osd_request *req, 43 struct ceph_osd_request *req,
60 struct ceph_osd_req_op *op) 44 struct ceph_osd_req_op *op)
61{ 45{
62 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
63 u64 orig_len = *plen; 46 u64 orig_len = *plen;
64 u64 objoff, objlen; /* extent in object */ 47 u64 objoff, objlen; /* extent in object */
65 int r; 48 int r;
66 49
67 reqhead->snapid = cpu_to_le64(snapid);
68
69 /* object extent? */ 50 /* object extent? */
70 r = ceph_calc_file_object_mapping(layout, off, plen, bno, 51 r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
71 &objoff, &objlen); 52 &objoff, &objlen);
72 if (r < 0) 53 if (r < 0)
73 return r; 54 return r;
74 if (*plen < orig_len) 55 if (objlen < orig_len) {
56 *plen = objlen;
75 dout(" skipping last %llu, final file extent %llu~%llu\n", 57 dout(" skipping last %llu, final file extent %llu~%llu\n",
76 orig_len - *plen, off, *plen); 58 orig_len - *plen, off, *plen);
59 }
77 60
78 if (op_has_extent(op->op)) { 61 if (op_has_extent(op->op)) {
62 u32 osize = le32_to_cpu(layout->fl_object_size);
79 op->extent.offset = objoff; 63 op->extent.offset = objoff;
80 op->extent.length = objlen; 64 op->extent.length = objlen;
65 if (op->extent.truncate_size <= off - objoff) {
66 op->extent.truncate_size = 0;
67 } else {
68 op->extent.truncate_size -= off - objoff;
69 if (op->extent.truncate_size > osize)
70 op->extent.truncate_size = osize;
71 }
81 } 72 }
82 req->r_num_pages = calc_pages_for(off, *plen); 73 req->r_num_pages = calc_pages_for(off, *plen);
83 req->r_page_alignment = off & ~PAGE_MASK; 74 req->r_page_alignment = off & ~PAGE_MASK;
@@ -115,8 +106,7 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
115 * 106 *
116 * fill osd op in request message. 107 * fill osd op in request message.
117 */ 108 */
118static int calc_layout(struct ceph_osd_client *osdc, 109static int calc_layout(struct ceph_vino vino,
119 struct ceph_vino vino,
120 struct ceph_file_layout *layout, 110 struct ceph_file_layout *layout,
121 u64 off, u64 *plen, 111 u64 off, u64 *plen,
122 struct ceph_osd_request *req, 112 struct ceph_osd_request *req,
@@ -125,8 +115,7 @@ static int calc_layout(struct ceph_osd_client *osdc,
125 u64 bno; 115 u64 bno;
126 int r; 116 int r;
127 117
128 r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, 118 r = ceph_calc_raw_layout(layout, off, plen, &bno, req, op);
129 plen, &bno, req, op);
130 if (r < 0) 119 if (r < 0)
131 return r; 120 return r;
132 121
@@ -163,10 +152,7 @@ void ceph_osdc_release_request(struct kref *kref)
163 bio_put(req->r_bio); 152 bio_put(req->r_bio);
164#endif 153#endif
165 ceph_put_snap_context(req->r_snapc); 154 ceph_put_snap_context(req->r_snapc);
166 if (req->r_trail) { 155 ceph_pagelist_release(&req->r_trail);
167 ceph_pagelist_release(req->r_trail);
168 kfree(req->r_trail);
169 }
170 if (req->r_mempool) 156 if (req->r_mempool)
171 mempool_free(req, req->r_osdc->req_mempool); 157 mempool_free(req, req->r_osdc->req_mempool);
172 else 158 else
@@ -174,34 +160,14 @@ void ceph_osdc_release_request(struct kref *kref)
174} 160}
175EXPORT_SYMBOL(ceph_osdc_release_request); 161EXPORT_SYMBOL(ceph_osdc_release_request);
176 162
177static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
178{
179 int i = 0;
180
181 if (needs_trail)
182 *needs_trail = 0;
183 while (ops[i].op) {
184 if (needs_trail && op_needs_trail(ops[i].op))
185 *needs_trail = 1;
186 i++;
187 }
188
189 return i;
190}
191
192struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 163struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193 int flags,
194 struct ceph_snap_context *snapc, 164 struct ceph_snap_context *snapc,
195 struct ceph_osd_req_op *ops, 165 unsigned int num_op,
196 bool use_mempool, 166 bool use_mempool,
197 gfp_t gfp_flags, 167 gfp_t gfp_flags)
198 struct page **pages,
199 struct bio *bio)
200{ 168{
201 struct ceph_osd_request *req; 169 struct ceph_osd_request *req;
202 struct ceph_msg *msg; 170 struct ceph_msg *msg;
203 int needs_trail;
204 int num_op = get_num_ops(ops, &needs_trail);
205 size_t msg_size = sizeof(struct ceph_osd_request_head); 171 size_t msg_size = sizeof(struct ceph_osd_request_head);
206 172
207 msg_size += num_op*sizeof(struct ceph_osd_op); 173 msg_size += num_op*sizeof(struct ceph_osd_op);
@@ -228,10 +194,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
228 INIT_LIST_HEAD(&req->r_req_lru_item); 194 INIT_LIST_HEAD(&req->r_req_lru_item);
229 INIT_LIST_HEAD(&req->r_osd_item); 195 INIT_LIST_HEAD(&req->r_osd_item);
230 196
231 req->r_flags = flags;
232
233 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
234
235 /* create reply message */ 197 /* create reply message */
236 if (use_mempool) 198 if (use_mempool)
237 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 199 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,15 +206,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
244 } 206 }
245 req->r_reply = msg; 207 req->r_reply = msg;
246 208
247 /* allocate space for the trailing data */ 209 ceph_pagelist_init(&req->r_trail);
248 if (needs_trail) {
249 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
250 if (!req->r_trail) {
251 ceph_osdc_put_request(req);
252 return NULL;
253 }
254 ceph_pagelist_init(req->r_trail);
255 }
256 210
257 /* create request message; allow space for oid */ 211 /* create request message; allow space for oid */
258 msg_size += MAX_OBJ_NAME_SIZE; 212 msg_size += MAX_OBJ_NAME_SIZE;
@@ -270,13 +224,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
270 memset(msg->front.iov_base, 0, msg->front.iov_len); 224 memset(msg->front.iov_base, 0, msg->front.iov_len);
271 225
272 req->r_request = msg; 226 req->r_request = msg;
273 req->r_pages = pages;
274#ifdef CONFIG_BLOCK
275 if (bio) {
276 req->r_bio = bio;
277 bio_get(req->r_bio);
278 }
279#endif
280 227
281 return req; 228 return req;
282} 229}
@@ -304,29 +251,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
304 case CEPH_OSD_OP_GETXATTR: 251 case CEPH_OSD_OP_GETXATTR:
305 case CEPH_OSD_OP_SETXATTR: 252 case CEPH_OSD_OP_SETXATTR:
306 case CEPH_OSD_OP_CMPXATTR: 253 case CEPH_OSD_OP_CMPXATTR:
307 BUG_ON(!req->r_trail);
308
309 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 254 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
310 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 255 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
311 dst->xattr.cmp_op = src->xattr.cmp_op; 256 dst->xattr.cmp_op = src->xattr.cmp_op;
312 dst->xattr.cmp_mode = src->xattr.cmp_mode; 257 dst->xattr.cmp_mode = src->xattr.cmp_mode;
313 ceph_pagelist_append(req->r_trail, src->xattr.name, 258 ceph_pagelist_append(&req->r_trail, src->xattr.name,
314 src->xattr.name_len); 259 src->xattr.name_len);
315 ceph_pagelist_append(req->r_trail, src->xattr.val, 260 ceph_pagelist_append(&req->r_trail, src->xattr.val,
316 src->xattr.value_len); 261 src->xattr.value_len);
317 break; 262 break;
318 case CEPH_OSD_OP_CALL: 263 case CEPH_OSD_OP_CALL:
319 BUG_ON(!req->r_trail);
320
321 dst->cls.class_len = src->cls.class_len; 264 dst->cls.class_len = src->cls.class_len;
322 dst->cls.method_len = src->cls.method_len; 265 dst->cls.method_len = src->cls.method_len;
323 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 266 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
324 267
325 ceph_pagelist_append(req->r_trail, src->cls.class_name, 268 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
326 src->cls.class_len); 269 src->cls.class_len);
327 ceph_pagelist_append(req->r_trail, src->cls.method_name, 270 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
328 src->cls.method_len); 271 src->cls.method_len);
329 ceph_pagelist_append(req->r_trail, src->cls.indata, 272 ceph_pagelist_append(&req->r_trail, src->cls.indata,
330 src->cls.indata_len); 273 src->cls.indata_len);
331 break; 274 break;
332 case CEPH_OSD_OP_ROLLBACK: 275 case CEPH_OSD_OP_ROLLBACK:
@@ -339,11 +282,9 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
339 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); 282 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
340 __le32 timeout = cpu_to_le32(src->watch.timeout); 283 __le32 timeout = cpu_to_le32(src->watch.timeout);
341 284
342 BUG_ON(!req->r_trail); 285 ceph_pagelist_append(&req->r_trail,
343
344 ceph_pagelist_append(req->r_trail,
345 &prot_ver, sizeof(prot_ver)); 286 &prot_ver, sizeof(prot_ver));
346 ceph_pagelist_append(req->r_trail, 287 ceph_pagelist_append(&req->r_trail,
347 &timeout, sizeof(timeout)); 288 &timeout, sizeof(timeout));
348 } 289 }
349 case CEPH_OSD_OP_NOTIFY_ACK: 290 case CEPH_OSD_OP_NOTIFY_ACK:
@@ -365,25 +306,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
365 * 306 *
366 */ 307 */
367void ceph_osdc_build_request(struct ceph_osd_request *req, 308void ceph_osdc_build_request(struct ceph_osd_request *req,
368 u64 off, u64 *plen, 309 u64 off, u64 len, unsigned int num_op,
369 struct ceph_osd_req_op *src_ops, 310 struct ceph_osd_req_op *src_ops,
370 struct ceph_snap_context *snapc, 311 struct ceph_snap_context *snapc, u64 snap_id,
371 struct timespec *mtime, 312 struct timespec *mtime)
372 const char *oid,
373 int oid_len)
374{ 313{
375 struct ceph_msg *msg = req->r_request; 314 struct ceph_msg *msg = req->r_request;
376 struct ceph_osd_request_head *head; 315 struct ceph_osd_request_head *head;
377 struct ceph_osd_req_op *src_op; 316 struct ceph_osd_req_op *src_op;
378 struct ceph_osd_op *op; 317 struct ceph_osd_op *op;
379 void *p; 318 void *p;
380 int num_op = get_num_ops(src_ops, NULL);
381 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 319 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
382 int flags = req->r_flags; 320 int flags = req->r_flags;
383 u64 data_len = 0; 321 u64 data_len = 0;
384 int i; 322 int i;
385 323
324 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
325
386 head = msg->front.iov_base; 326 head = msg->front.iov_base;
327 head->snapid = cpu_to_le64(snap_id);
387 op = (void *)(head + 1); 328 op = (void *)(head + 1);
388 p = (void *)(op + num_op); 329 p = (void *)(op + num_op);
389 330
@@ -393,23 +334,19 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
393 head->flags = cpu_to_le32(flags); 334 head->flags = cpu_to_le32(flags);
394 if (flags & CEPH_OSD_FLAG_WRITE) 335 if (flags & CEPH_OSD_FLAG_WRITE)
395 ceph_encode_timespec(&head->mtime, mtime); 336 ceph_encode_timespec(&head->mtime, mtime);
337 BUG_ON(num_op > (unsigned int) ((u16) -1));
396 head->num_ops = cpu_to_le16(num_op); 338 head->num_ops = cpu_to_le16(num_op);
397 339
398
399 /* fill in oid */ 340 /* fill in oid */
400 head->object_len = cpu_to_le32(oid_len); 341 head->object_len = cpu_to_le32(req->r_oid_len);
401 memcpy(p, oid, oid_len); 342 memcpy(p, req->r_oid, req->r_oid_len);
402 p += oid_len; 343 p += req->r_oid_len;
403 344
404 src_op = src_ops; 345 src_op = src_ops;
405 while (src_op->op) { 346 while (num_op--)
406 osd_req_encode_op(req, op, src_op); 347 osd_req_encode_op(req, op++, src_op++);
407 src_op++;
408 op++;
409 }
410 348
411 if (req->r_trail) 349 data_len += req->r_trail.length;
412 data_len += req->r_trail->length;
413 350
414 if (snapc) { 351 if (snapc) {
415 head->snap_seq = cpu_to_le64(snapc->seq); 352 head->snap_seq = cpu_to_le64(snapc->seq);
@@ -422,7 +359,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
422 359
423 if (flags & CEPH_OSD_FLAG_WRITE) { 360 if (flags & CEPH_OSD_FLAG_WRITE) {
424 req->r_request->hdr.data_off = cpu_to_le16(off); 361 req->r_request->hdr.data_off = cpu_to_le16(off);
425 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); 362 req->r_request->hdr.data_len = cpu_to_le32(len + data_len);
426 } else if (data_len) { 363 } else if (data_len) {
427 req->r_request->hdr.data_off = 0; 364 req->r_request->hdr.data_off = 0;
428 req->r_request->hdr.data_len = cpu_to_le32(data_len); 365 req->r_request->hdr.data_len = cpu_to_le32(data_len);
@@ -462,31 +399,30 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
462 bool use_mempool, int num_reply, 399 bool use_mempool, int num_reply,
463 int page_align) 400 int page_align)
464{ 401{
465 struct ceph_osd_req_op ops[3]; 402 struct ceph_osd_req_op ops[2];
466 struct ceph_osd_request *req; 403 struct ceph_osd_request *req;
404 unsigned int num_op = 1;
467 int r; 405 int r;
468 406
407 memset(&ops, 0, sizeof ops);
408
469 ops[0].op = opcode; 409 ops[0].op = opcode;
470 ops[0].extent.truncate_seq = truncate_seq; 410 ops[0].extent.truncate_seq = truncate_seq;
471 ops[0].extent.truncate_size = truncate_size; 411 ops[0].extent.truncate_size = truncate_size;
472 ops[0].payload_len = 0;
473 412
474 if (do_sync) { 413 if (do_sync) {
475 ops[1].op = CEPH_OSD_OP_STARTSYNC; 414 ops[1].op = CEPH_OSD_OP_STARTSYNC;
476 ops[1].payload_len = 0; 415 num_op++;
477 ops[2].op = 0; 416 }
478 } else 417
479 ops[1].op = 0; 418 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
480 419 GFP_NOFS);
481 req = ceph_osdc_alloc_request(osdc, flags,
482 snapc, ops,
483 use_mempool,
484 GFP_NOFS, NULL, NULL);
485 if (!req) 420 if (!req)
486 return ERR_PTR(-ENOMEM); 421 return ERR_PTR(-ENOMEM);
422 req->r_flags = flags;
487 423
488 /* calculate max write size */ 424 /* calculate max write size */
489 r = calc_layout(osdc, vino, layout, off, plen, req, ops); 425 r = calc_layout(vino, layout, off, plen, req, ops);
490 if (r < 0) 426 if (r < 0)
491 return ERR_PTR(r); 427 return ERR_PTR(r);
492 req->r_file_layout = *layout; /* keep a copy */ 428 req->r_file_layout = *layout; /* keep a copy */
@@ -496,10 +432,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
496 req->r_num_pages = calc_pages_for(page_align, *plen); 432 req->r_num_pages = calc_pages_for(page_align, *plen);
497 req->r_page_alignment = page_align; 433 req->r_page_alignment = page_align;
498 434
499 ceph_osdc_build_request(req, off, plen, ops, 435 ceph_osdc_build_request(req, off, *plen, num_op, ops,
500 snapc, 436 snapc, vino.snap, mtime);
501 mtime,
502 req->r_oid, req->r_oid_len);
503 437
504 return req; 438 return req;
505} 439}
@@ -739,31 +673,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
739 */ 673 */
740static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 674static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
741{ 675{
742 struct ceph_osd_request *req; 676 struct ceph_entity_addr *peer_addr;
743 int ret = 0;
744 677
745 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 678 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
746 if (list_empty(&osd->o_requests) && 679 if (list_empty(&osd->o_requests) &&
747 list_empty(&osd->o_linger_requests)) { 680 list_empty(&osd->o_linger_requests)) {
748 __remove_osd(osdc, osd); 681 __remove_osd(osdc, osd);
749 ret = -ENODEV; 682
750 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 683 return -ENODEV;
751 &osd->o_con.peer_addr, 684 }
752 sizeof(osd->o_con.peer_addr)) == 0 && 685
753 !ceph_con_opened(&osd->o_con)) { 686 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
687 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
688 !ceph_con_opened(&osd->o_con)) {
689 struct ceph_osd_request *req;
690
754 dout(" osd addr hasn't changed and connection never opened," 691 dout(" osd addr hasn't changed and connection never opened,"
755 " letting msgr retry"); 692 " letting msgr retry");
756 /* touch each r_stamp for handle_timeout()'s benfit */ 693 /* touch each r_stamp for handle_timeout()'s benfit */
757 list_for_each_entry(req, &osd->o_requests, r_osd_item) 694 list_for_each_entry(req, &osd->o_requests, r_osd_item)
758 req->r_stamp = jiffies; 695 req->r_stamp = jiffies;
759 ret = -EAGAIN; 696
760 } else { 697 return -EAGAIN;
761 ceph_con_close(&osd->o_con);
762 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
763 &osdc->osdmap->osd_addr[osd->o_osd]);
764 osd->o_incarnation++;
765 } 698 }
766 return ret; 699
700 ceph_con_close(&osd->o_con);
701 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
702 osd->o_incarnation++;
703
704 return 0;
767} 705}
768 706
769static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 707static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -1706,7 +1644,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1706#ifdef CONFIG_BLOCK 1644#ifdef CONFIG_BLOCK
1707 req->r_request->bio = req->r_bio; 1645 req->r_request->bio = req->r_bio;
1708#endif 1646#endif
1709 req->r_request->trail = req->r_trail; 1647 req->r_request->trail = &req->r_trail;
1710 1648
1711 register_request(osdc, req); 1649 register_request(osdc, req);
1712 1650
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index de73214b5d26..3c61e21611d3 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -13,26 +13,18 @@
13 13
14char *ceph_osdmap_state_str(char *str, int len, int state) 14char *ceph_osdmap_state_str(char *str, int len, int state)
15{ 15{
16 int flag = 0;
17
18 if (!len) 16 if (!len)
19 goto done; 17 return str;
20 18
21 *str = '\0'; 19 if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
22 if (state) { 20 snprintf(str, len, "exists, up");
23 if (state & CEPH_OSD_EXISTS) { 21 else if (state & CEPH_OSD_EXISTS)
24 snprintf(str, len, "exists"); 22 snprintf(str, len, "exists");
25 flag = 1; 23 else if (state & CEPH_OSD_UP)
26 } 24 snprintf(str, len, "up");
27 if (state & CEPH_OSD_UP) { 25 else
28 snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""),
29 "up");
30 flag = 1;
31 }
32 } else {
33 snprintf(str, len, "doesn't exist"); 26 snprintf(str, len, "doesn't exist");
34 } 27
35done:
36 return str; 28 return str;
37} 29}
38 30
@@ -170,6 +162,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
170 c->choose_local_tries = 2; 162 c->choose_local_tries = 2;
171 c->choose_local_fallback_tries = 5; 163 c->choose_local_fallback_tries = 5;
172 c->choose_total_tries = 19; 164 c->choose_total_tries = 19;
165 c->chooseleaf_descend_once = 0;
173 166
174 ceph_decode_need(p, end, 4*sizeof(u32), bad); 167 ceph_decode_need(p, end, 4*sizeof(u32), bad);
175 magic = ceph_decode_32(p); 168 magic = ceph_decode_32(p);
@@ -336,6 +329,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
336 dout("crush decode tunable choose_total_tries = %d", 329 dout("crush decode tunable choose_total_tries = %d",
337 c->choose_total_tries); 330 c->choose_total_tries);
338 331
332 ceph_decode_need(p, end, sizeof(u32), done);
333 c->chooseleaf_descend_once = ceph_decode_32(p);
334 dout("crush decode tunable chooseleaf_descend_once = %d",
335 c->chooseleaf_descend_once);
336
339done: 337done:
340 dout("crush_decode success\n"); 338 dout("crush_decode success\n");
341 return c; 339 return c;
@@ -1010,7 +1008,7 @@ bad:
1010 * pass a stride back to the caller. 1008 * pass a stride back to the caller.
1011 */ 1009 */
1012int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, 1010int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1013 u64 off, u64 *plen, 1011 u64 off, u64 len,
1014 u64 *ono, 1012 u64 *ono,
1015 u64 *oxoff, u64 *oxlen) 1013 u64 *oxoff, u64 *oxlen)
1016{ 1014{
@@ -1021,7 +1019,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1021 u32 su_per_object; 1019 u32 su_per_object;
1022 u64 t, su_offset; 1020 u64 t, su_offset;
1023 1021
1024 dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen, 1022 dout("mapping %llu~%llu osize %u fl_su %u\n", off, len,
1025 osize, su); 1023 osize, su);
1026 if (su == 0 || sc == 0) 1024 if (su == 0 || sc == 0)
1027 goto invalid; 1025 goto invalid;
@@ -1054,11 +1052,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1054 1052
1055 /* 1053 /*
1056 * Calculate the length of the extent being written to the selected 1054 * Calculate the length of the extent being written to the selected
1057 * object. This is the minimum of the full length requested (plen) or 1055 * object. This is the minimum of the full length requested (len) or
1058 * the remainder of the current stripe being written to. 1056 * the remainder of the current stripe being written to.
1059 */ 1057 */
1060 *oxlen = min_t(u64, *plen, su - su_offset); 1058 *oxlen = min_t(u64, len, su - su_offset);
1061 *plen = *oxlen;
1062 1059
1063 dout(" obj extent %llu~%llu\n", *oxoff, *oxlen); 1060 dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
1064 return 0; 1061 return 0;