aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c233
1 files changed, 170 insertions, 63 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5584f0a08e28..d730dd4d8eb2 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -146,15 +146,23 @@ EXPORT_SYMBOL(ceph_osdc_release_request);
146 146
147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
148 struct ceph_snap_context *snapc, 148 struct ceph_snap_context *snapc,
149 unsigned int num_op, 149 unsigned int num_ops,
150 bool use_mempool, 150 bool use_mempool,
151 gfp_t gfp_flags) 151 gfp_t gfp_flags)
152{ 152{
153 struct ceph_osd_request *req; 153 struct ceph_osd_request *req;
154 struct ceph_msg *msg; 154 struct ceph_msg *msg;
155 size_t msg_size = sizeof(struct ceph_osd_request_head); 155 size_t msg_size;
156 156
157 msg_size += num_op*sizeof(struct ceph_osd_op); 157 msg_size = 4 + 4 + 8 + 8 + 4+8;
158 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
159 msg_size += 1 + 8 + 4 + 4; /* pg_t */
160 msg_size += 4 + MAX_OBJ_NAME_SIZE;
161 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
162 msg_size += 8; /* snapid */
163 msg_size += 8; /* snap_seq */
164 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
165 msg_size += 4;
158 166
159 if (use_mempool) { 167 if (use_mempool) {
160 req = mempool_alloc(osdc->req_mempool, gfp_flags); 168 req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -193,9 +201,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193 ceph_pagelist_init(&req->r_trail); 201 ceph_pagelist_init(&req->r_trail);
194 202
195 /* create request message; allow space for oid */ 203 /* create request message; allow space for oid */
196 msg_size += MAX_OBJ_NAME_SIZE;
197 if (snapc)
198 msg_size += sizeof(u64) * snapc->num_snaps;
199 if (use_mempool) 204 if (use_mempool)
200 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 205 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
201 else 206 else
@@ -324,55 +329,80 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
324 * 329 *
325 */ 330 */
326void ceph_osdc_build_request(struct ceph_osd_request *req, 331void ceph_osdc_build_request(struct ceph_osd_request *req,
327 u64 off, u64 len, unsigned int num_op, 332 u64 off, u64 len, unsigned int num_ops,
328 struct ceph_osd_req_op *src_ops, 333 struct ceph_osd_req_op *src_ops,
329 struct ceph_snap_context *snapc, u64 snap_id, 334 struct ceph_snap_context *snapc, u64 snap_id,
330 struct timespec *mtime) 335 struct timespec *mtime)
331{ 336{
332 struct ceph_msg *msg = req->r_request; 337 struct ceph_msg *msg = req->r_request;
333 struct ceph_osd_request_head *head;
334 struct ceph_osd_req_op *src_op; 338 struct ceph_osd_req_op *src_op;
335 struct ceph_osd_op *op;
336 void *p; 339 void *p;
337 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 340 size_t msg_size;
338 int flags = req->r_flags; 341 int flags = req->r_flags;
339 u64 data_len; 342 u64 data_len;
340 int i; 343 int i;
341 344
342 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 345 req->r_num_ops = num_ops;
343 346 req->r_snapid = snap_id;
344 head = msg->front.iov_base;
345 head->snapid = cpu_to_le64(snap_id);
346 op = (void *)(head + 1);
347 p = (void *)(op + num_op);
348
349 req->r_snapc = ceph_get_snap_context(snapc); 347 req->r_snapc = ceph_get_snap_context(snapc);
350 348
351 head->client_inc = cpu_to_le32(1); /* always, for now. */ 349 /* encode request */
352 head->flags = cpu_to_le32(flags); 350 msg->hdr.version = cpu_to_le16(4);
353 if (flags & CEPH_OSD_FLAG_WRITE)
354 ceph_encode_timespec(&head->mtime, mtime);
355 BUG_ON(num_op > (unsigned int) ((u16) -1));
356 head->num_ops = cpu_to_le16(num_op);
357 351
358 /* fill in oid */ 352 p = msg->front.iov_base;
359 head->object_len = cpu_to_le32(req->r_oid_len); 353 ceph_encode_32(&p, 1); /* client_inc is always 1 */
354 req->r_request_osdmap_epoch = p;
355 p += 4;
356 req->r_request_flags = p;
357 p += 4;
358 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359 ceph_encode_timespec(p, mtime);
360 p += sizeof(struct ceph_timespec);
361 req->r_request_reassert_version = p;
362 p += sizeof(struct ceph_eversion); /* will get filled in */
363
364 /* oloc */
365 ceph_encode_8(&p, 4);
366 ceph_encode_8(&p, 4);
367 ceph_encode_32(&p, 8 + 4 + 4);
368 req->r_request_pool = p;
369 p += 8;
370 ceph_encode_32(&p, -1); /* preferred */
371 ceph_encode_32(&p, 0); /* key len */
372
373 ceph_encode_8(&p, 1);
374 req->r_request_pgid = p;
375 p += 8 + 4;
376 ceph_encode_32(&p, -1); /* preferred */
377
378 /* oid */
379 ceph_encode_32(&p, req->r_oid_len);
360 memcpy(p, req->r_oid, req->r_oid_len); 380 memcpy(p, req->r_oid, req->r_oid_len);
381 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
361 p += req->r_oid_len; 382 p += req->r_oid_len;
362 383
384 /* ops */
385 ceph_encode_16(&p, num_ops);
363 src_op = src_ops; 386 src_op = src_ops;
364 while (num_op--) 387 req->r_request_ops = p;
365 osd_req_encode_op(req, op++, src_op++); 388 for (i = 0; i < num_ops; i++, src_op++) {
389 osd_req_encode_op(req, p, src_op);
390 p += sizeof(struct ceph_osd_op);
391 }
366 392
367 if (snapc) { 393 /* snaps */
368 head->snap_seq = cpu_to_le64(snapc->seq); 394 ceph_encode_64(&p, req->r_snapid);
369 head->num_snaps = cpu_to_le32(snapc->num_snaps); 395 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
396 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
397 if (req->r_snapc) {
370 for (i = 0; i < snapc->num_snaps; i++) { 398 for (i = 0; i < snapc->num_snaps; i++) {
371 put_unaligned_le64(snapc->snaps[i], p); 399 ceph_encode_64(&p, req->r_snapc->snaps[i]);
372 p += sizeof(u64);
373 } 400 }
374 } 401 }
375 402
403 req->r_request_attempts = p;
404 p += 4;
405
376 data_len = req->r_trail.length; 406 data_len = req->r_trail.length;
377 if (flags & CEPH_OSD_FLAG_WRITE) { 407 if (flags & CEPH_OSD_FLAG_WRITE) {
378 req->r_request->hdr.data_off = cpu_to_le16(off); 408 req->r_request->hdr.data_off = cpu_to_le16(off);
@@ -385,6 +415,9 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
385 msg_size = p - msg->front.iov_base; 415 msg_size = p - msg->front.iov_base;
386 msg->front.iov_len = msg_size; 416 msg->front.iov_len = msg_size;
387 msg->hdr.front_len = cpu_to_le32(msg_size); 417 msg->hdr.front_len = cpu_to_le32(msg_size);
418
419 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
420 num_ops);
388 return; 421 return;
389} 422}
390EXPORT_SYMBOL(ceph_osdc_build_request); 423EXPORT_SYMBOL(ceph_osdc_build_request);
@@ -991,21 +1024,22 @@ out:
991static void __send_request(struct ceph_osd_client *osdc, 1024static void __send_request(struct ceph_osd_client *osdc,
992 struct ceph_osd_request *req) 1025 struct ceph_osd_request *req)
993{ 1026{
994 struct ceph_osd_request_head *reqhead; 1027 void *p;
995
996 dout("send_request %p tid %llu to osd%d flags %d\n",
997 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
998
999 reqhead = req->r_request->front.iov_base;
1000 reqhead->snapid = cpu_to_le64(req->r_snapid);
1001 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1002 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
1003 reqhead->reassert_version = req->r_reassert_version;
1004 1028
1005 reqhead->layout.ol_pgid.ps = cpu_to_le16(req->r_pgid.seed); 1029 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1006 reqhead->layout.ol_pgid.pool = cpu_to_le32(req->r_pgid.pool); 1030 req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1007 reqhead->layout.ol_pgid.preferred = cpu_to_le16(-1); 1031 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1008 reqhead->layout.ol_stripe_unit = 0; 1032
1033 /* fill in message content that changes each time we send it */
1034 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1035 put_unaligned_le32(req->r_flags, req->r_request_flags);
1036 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1037 p = req->r_request_pgid;
1038 ceph_encode_64(&p, req->r_pgid.pool);
1039 ceph_encode_32(&p, req->r_pgid.seed);
1040 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
1041 memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1042 sizeof(req->r_reassert_version));
1009 1043
1010 req->r_stamp = jiffies; 1044 req->r_stamp = jiffies;
1011 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1045 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
@@ -1105,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req)
1105 complete_all(&req->r_safe_completion); /* fsync waiter */ 1139 complete_all(&req->r_safe_completion); /* fsync waiter */
1106} 1140}
1107 1141
1142static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143{
1144 __u8 v;
1145
1146 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147 v = ceph_decode_8(p);
1148 if (v > 1) {
1149 pr_warning("do not understand pg encoding %d > 1", v);
1150 return -EINVAL;
1151 }
1152 pgid->pool = ceph_decode_64(p);
1153 pgid->seed = ceph_decode_32(p);
1154 *p += 4;
1155 return 0;
1156
1157bad:
1158 pr_warning("incomplete pg encoding");
1159 return -EINVAL;
1160}
1161
1108/* 1162/*
1109 * handle osd op reply. either call the callback if it is specified, 1163 * handle osd op reply. either call the callback if it is specified,
1110 * or do the completion to wake up the waiting thread. 1164 * or do the completion to wake up the waiting thread.
@@ -1112,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req)
1112static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1166static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1113 struct ceph_connection *con) 1167 struct ceph_connection *con)
1114{ 1168{
1115 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1169 void *p, *end;
1116 struct ceph_osd_request *req; 1170 struct ceph_osd_request *req;
1117 u64 tid; 1171 u64 tid;
1118 int numops, object_len, flags; 1172 int object_len;
1173 int numops, payload_len, flags;
1119 s32 result; 1174 s32 result;
1175 s32 retry_attempt;
1176 struct ceph_pg pg;
1177 int err;
1178 u32 reassert_epoch;
1179 u64 reassert_version;
1180 u32 osdmap_epoch;
1181 int i;
1120 1182
1121 tid = le64_to_cpu(msg->hdr.tid); 1183 tid = le64_to_cpu(msg->hdr.tid);
1122 if (msg->front.iov_len < sizeof(*rhead)) 1184 dout("handle_reply %p tid %llu\n", msg, tid);
1123 goto bad; 1185
1124 numops = le32_to_cpu(rhead->num_ops); 1186 p = msg->front.iov_base;
1125 object_len = le32_to_cpu(rhead->object_len); 1187 end = p + msg->front.iov_len;
1126 result = le32_to_cpu(rhead->result); 1188
1127 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1189 ceph_decode_need(&p, end, 4, bad);
1128 numops * sizeof(struct ceph_osd_op)) 1190 object_len = ceph_decode_32(&p);
1191 ceph_decode_need(&p, end, object_len, bad);
1192 p += object_len;
1193
1194 err = __decode_pgid(&p, end, &pg);
1195 if (err)
1129 goto bad; 1196 goto bad;
1130 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1197
1198 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1199 flags = ceph_decode_64(&p);
1200 result = ceph_decode_32(&p);
1201 reassert_epoch = ceph_decode_32(&p);
1202 reassert_version = ceph_decode_64(&p);
1203 osdmap_epoch = ceph_decode_32(&p);
1204
1131 /* lookup */ 1205 /* lookup */
1132 mutex_lock(&osdc->request_mutex); 1206 mutex_lock(&osdc->request_mutex);
1133 req = __lookup_request(osdc, tid); 1207 req = __lookup_request(osdc, tid);
@@ -1137,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1137 return; 1211 return;
1138 } 1212 }
1139 ceph_osdc_get_request(req); 1213 ceph_osdc_get_request(req);
1140 flags = le32_to_cpu(rhead->flags); 1214
1215 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1216 req, result);
1217
1218 ceph_decode_need(&p, end, 4, bad);
1219 numops = ceph_decode_32(&p);
1220 if (numops > CEPH_OSD_MAX_OP)
1221 goto bad_put;
1222 if (numops != req->r_num_ops)
1223 goto bad_put;
1224 payload_len = 0;
1225 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1226 for (i = 0; i < numops; i++) {
1227 struct ceph_osd_op *op = p;
1228 int len;
1229
1230 len = le32_to_cpu(op->payload_len);
1231 req->r_reply_op_len[i] = len;
1232 dout(" op %d has %d bytes\n", i, len);
1233 payload_len += len;
1234 p += sizeof(*op);
1235 }
1236 if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
1237 pr_warning("sum of op payload lens %d != data_len %d",
1238 payload_len, le32_to_cpu(msg->hdr.data_len));
1239 goto bad_put;
1240 }
1241
1242 ceph_decode_need(&p, end, 4 + numops * 4, bad);
1243 retry_attempt = ceph_decode_32(&p);
1244 for (i = 0; i < numops; i++)
1245 req->r_reply_op_result[i] = ceph_decode_32(&p);
1141 1246
1142 /* 1247 /*
1143 * if this connection filled our message, drop our reference now, to 1248 * if this connection filled our message, drop our reference now, to
@@ -1152,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1152 if (!req->r_got_reply) { 1257 if (!req->r_got_reply) {
1153 unsigned int bytes; 1258 unsigned int bytes;
1154 1259
1155 req->r_result = le32_to_cpu(rhead->result); 1260 req->r_result = result;
1156 bytes = le32_to_cpu(msg->hdr.data_len); 1261 bytes = le32_to_cpu(msg->hdr.data_len);
1157 dout("handle_reply result %d bytes %d\n", req->r_result, 1262 dout("handle_reply result %d bytes %d\n", req->r_result,
1158 bytes); 1263 bytes);
@@ -1160,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1160 req->r_result = bytes; 1265 req->r_result = bytes;
1161 1266
1162 /* in case this is a write and we need to replay, */ 1267 /* in case this is a write and we need to replay, */
1163 req->r_reassert_version = rhead->reassert_version; 1268 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1269 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1164 1270
1165 req->r_got_reply = 1; 1271 req->r_got_reply = 1;
1166 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1272 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1195,10 +1301,11 @@ done:
1195 ceph_osdc_put_request(req); 1301 ceph_osdc_put_request(req);
1196 return; 1302 return;
1197 1303
1304bad_put:
1305 ceph_osdc_put_request(req);
1198bad: 1306bad:
1199 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1307 pr_err("corrupt osd_op_reply got %d %d\n",
1200 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1308 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1201 (int)sizeof(*rhead));
1202 ceph_msg_dump(msg); 1309 ceph_msg_dump(msg);
1203} 1310}
1204 1311