aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-02-25 19:11:12 -0500
committerSage Weil <sage@inktank.com>2013-02-26 18:02:50 -0500
commit1b83bef24c6746a146d39915a18fb5425f2facb0 (patch)
treea765aeb136f4c7e354c01314e5fdfb776d503fb7 /net/ceph
parent2169aea649c08374bec7d220a3b8f64712275356 (diff)
libceph: update osd request/reply encoding
Use the new version of the encoding for osd requests and replies. In the process, update the way we are tracking request ops and reply lengths and results in the struct ceph_osd_request. Update the rbd and fs/ceph users appropriately. The main changes are: - we keep pointers into the request memory for fields we need to update each time the request is sent out over the wire - we keep information about the result in an array in the request struct where the users can easily get at it. Signed-off-by: Sage Weil <sage@inktank.com> Reviewed-by: Alex Elder <elder@inktank.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/debugfs.c18
-rw-r--r--net/ceph/osd_client.c233
2 files changed, 174 insertions, 77 deletions
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index f4d4b27d6026..00d051f4894e 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -123,10 +123,7 @@ static int osdc_show(struct seq_file *s, void *pp)
123 mutex_lock(&osdc->request_mutex); 123 mutex_lock(&osdc->request_mutex);
124 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 124 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
125 struct ceph_osd_request *req; 125 struct ceph_osd_request *req;
126 struct ceph_osd_request_head *head; 126 int opcode;
127 struct ceph_osd_op *op;
128 int num_ops;
129 int opcode, olen;
130 int i; 127 int i;
131 128
132 req = rb_entry(p, struct ceph_osd_request, r_node); 129 req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -135,13 +132,7 @@ static int osdc_show(struct seq_file *s, void *pp)
135 req->r_osd ? req->r_osd->o_osd : -1, 132 req->r_osd ? req->r_osd->o_osd : -1,
136 req->r_pgid.pool, req->r_pgid.seed); 133 req->r_pgid.pool, req->r_pgid.seed);
137 134
138 head = req->r_request->front.iov_base; 135 seq_printf(s, "%.*s", req->r_oid_len, req->r_oid);
139 op = (void *)(head + 1);
140
141 num_ops = le16_to_cpu(head->num_ops);
142 olen = le32_to_cpu(head->object_len);
143 seq_printf(s, "%.*s", olen,
144 (const char *)(head->ops + num_ops));
145 136
146 if (req->r_reassert_version.epoch) 137 if (req->r_reassert_version.epoch)
147 seq_printf(s, "\t%u'%llu", 138 seq_printf(s, "\t%u'%llu",
@@ -150,10 +141,9 @@ static int osdc_show(struct seq_file *s, void *pp)
150 else 141 else
151 seq_printf(s, "\t"); 142 seq_printf(s, "\t");
152 143
153 for (i = 0; i < num_ops; i++) { 144 for (i = 0; i < req->r_num_ops; i++) {
154 opcode = le16_to_cpu(op->op); 145 opcode = le16_to_cpu(req->r_request_ops[i].op);
155 seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); 146 seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
156 op++;
157 } 147 }
158 148
159 seq_printf(s, "\n"); 149 seq_printf(s, "\n");
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5584f0a08e28..d730dd4d8eb2 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -146,15 +146,23 @@ EXPORT_SYMBOL(ceph_osdc_release_request);
146 146
147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
148 struct ceph_snap_context *snapc, 148 struct ceph_snap_context *snapc,
149 unsigned int num_op, 149 unsigned int num_ops,
150 bool use_mempool, 150 bool use_mempool,
151 gfp_t gfp_flags) 151 gfp_t gfp_flags)
152{ 152{
153 struct ceph_osd_request *req; 153 struct ceph_osd_request *req;
154 struct ceph_msg *msg; 154 struct ceph_msg *msg;
155 size_t msg_size = sizeof(struct ceph_osd_request_head); 155 size_t msg_size;
156 156
157 msg_size += num_op*sizeof(struct ceph_osd_op); 157 msg_size = 4 + 4 + 8 + 8 + 4+8;
158 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
159 msg_size += 1 + 8 + 4 + 4; /* pg_t */
160 msg_size += 4 + MAX_OBJ_NAME_SIZE;
161 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
162 msg_size += 8; /* snapid */
163 msg_size += 8; /* snap_seq */
164 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
165 msg_size += 4;
158 166
159 if (use_mempool) { 167 if (use_mempool) {
160 req = mempool_alloc(osdc->req_mempool, gfp_flags); 168 req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -193,9 +201,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193 ceph_pagelist_init(&req->r_trail); 201 ceph_pagelist_init(&req->r_trail);
194 202
195 /* create request message; allow space for oid */ 203 /* create request message; allow space for oid */
196 msg_size += MAX_OBJ_NAME_SIZE;
197 if (snapc)
198 msg_size += sizeof(u64) * snapc->num_snaps;
199 if (use_mempool) 204 if (use_mempool)
200 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 205 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
201 else 206 else
@@ -324,55 +329,80 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
324 * 329 *
325 */ 330 */
326void ceph_osdc_build_request(struct ceph_osd_request *req, 331void ceph_osdc_build_request(struct ceph_osd_request *req,
327 u64 off, u64 len, unsigned int num_op, 332 u64 off, u64 len, unsigned int num_ops,
328 struct ceph_osd_req_op *src_ops, 333 struct ceph_osd_req_op *src_ops,
329 struct ceph_snap_context *snapc, u64 snap_id, 334 struct ceph_snap_context *snapc, u64 snap_id,
330 struct timespec *mtime) 335 struct timespec *mtime)
331{ 336{
332 struct ceph_msg *msg = req->r_request; 337 struct ceph_msg *msg = req->r_request;
333 struct ceph_osd_request_head *head;
334 struct ceph_osd_req_op *src_op; 338 struct ceph_osd_req_op *src_op;
335 struct ceph_osd_op *op;
336 void *p; 339 void *p;
337 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 340 size_t msg_size;
338 int flags = req->r_flags; 341 int flags = req->r_flags;
339 u64 data_len; 342 u64 data_len;
340 int i; 343 int i;
341 344
342 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 345 req->r_num_ops = num_ops;
343 346 req->r_snapid = snap_id;
344 head = msg->front.iov_base;
345 head->snapid = cpu_to_le64(snap_id);
346 op = (void *)(head + 1);
347 p = (void *)(op + num_op);
348
349 req->r_snapc = ceph_get_snap_context(snapc); 347 req->r_snapc = ceph_get_snap_context(snapc);
350 348
351 head->client_inc = cpu_to_le32(1); /* always, for now. */ 349 /* encode request */
352 head->flags = cpu_to_le32(flags); 350 msg->hdr.version = cpu_to_le16(4);
353 if (flags & CEPH_OSD_FLAG_WRITE)
354 ceph_encode_timespec(&head->mtime, mtime);
355 BUG_ON(num_op > (unsigned int) ((u16) -1));
356 head->num_ops = cpu_to_le16(num_op);
357 351
358 /* fill in oid */ 352 p = msg->front.iov_base;
359 head->object_len = cpu_to_le32(req->r_oid_len); 353 ceph_encode_32(&p, 1); /* client_inc is always 1 */
354 req->r_request_osdmap_epoch = p;
355 p += 4;
356 req->r_request_flags = p;
357 p += 4;
358 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359 ceph_encode_timespec(p, mtime);
360 p += sizeof(struct ceph_timespec);
361 req->r_request_reassert_version = p;
362 p += sizeof(struct ceph_eversion); /* will get filled in */
363
364 /* oloc */
365 ceph_encode_8(&p, 4);
366 ceph_encode_8(&p, 4);
367 ceph_encode_32(&p, 8 + 4 + 4);
368 req->r_request_pool = p;
369 p += 8;
370 ceph_encode_32(&p, -1); /* preferred */
371 ceph_encode_32(&p, 0); /* key len */
372
373 ceph_encode_8(&p, 1);
374 req->r_request_pgid = p;
375 p += 8 + 4;
376 ceph_encode_32(&p, -1); /* preferred */
377
378 /* oid */
379 ceph_encode_32(&p, req->r_oid_len);
360 memcpy(p, req->r_oid, req->r_oid_len); 380 memcpy(p, req->r_oid, req->r_oid_len);
381 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
361 p += req->r_oid_len; 382 p += req->r_oid_len;
362 383
384 /* ops */
385 ceph_encode_16(&p, num_ops);
363 src_op = src_ops; 386 src_op = src_ops;
364 while (num_op--) 387 req->r_request_ops = p;
365 osd_req_encode_op(req, op++, src_op++); 388 for (i = 0; i < num_ops; i++, src_op++) {
389 osd_req_encode_op(req, p, src_op);
390 p += sizeof(struct ceph_osd_op);
391 }
366 392
367 if (snapc) { 393 /* snaps */
368 head->snap_seq = cpu_to_le64(snapc->seq); 394 ceph_encode_64(&p, req->r_snapid);
369 head->num_snaps = cpu_to_le32(snapc->num_snaps); 395 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
396 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
397 if (req->r_snapc) {
370 for (i = 0; i < snapc->num_snaps; i++) { 398 for (i = 0; i < snapc->num_snaps; i++) {
371 put_unaligned_le64(snapc->snaps[i], p); 399 ceph_encode_64(&p, req->r_snapc->snaps[i]);
372 p += sizeof(u64);
373 } 400 }
374 } 401 }
375 402
403 req->r_request_attempts = p;
404 p += 4;
405
376 data_len = req->r_trail.length; 406 data_len = req->r_trail.length;
377 if (flags & CEPH_OSD_FLAG_WRITE) { 407 if (flags & CEPH_OSD_FLAG_WRITE) {
378 req->r_request->hdr.data_off = cpu_to_le16(off); 408 req->r_request->hdr.data_off = cpu_to_le16(off);
@@ -385,6 +415,9 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
385 msg_size = p - msg->front.iov_base; 415 msg_size = p - msg->front.iov_base;
386 msg->front.iov_len = msg_size; 416 msg->front.iov_len = msg_size;
387 msg->hdr.front_len = cpu_to_le32(msg_size); 417 msg->hdr.front_len = cpu_to_le32(msg_size);
418
419 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
420 num_ops);
388 return; 421 return;
389} 422}
390EXPORT_SYMBOL(ceph_osdc_build_request); 423EXPORT_SYMBOL(ceph_osdc_build_request);
@@ -991,21 +1024,22 @@ out:
991static void __send_request(struct ceph_osd_client *osdc, 1024static void __send_request(struct ceph_osd_client *osdc,
992 struct ceph_osd_request *req) 1025 struct ceph_osd_request *req)
993{ 1026{
994 struct ceph_osd_request_head *reqhead; 1027 void *p;
995
996 dout("send_request %p tid %llu to osd%d flags %d\n",
997 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
998
999 reqhead = req->r_request->front.iov_base;
1000 reqhead->snapid = cpu_to_le64(req->r_snapid);
1001 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1002 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
1003 reqhead->reassert_version = req->r_reassert_version;
1004 1028
1005 reqhead->layout.ol_pgid.ps = cpu_to_le16(req->r_pgid.seed); 1029 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1006 reqhead->layout.ol_pgid.pool = cpu_to_le32(req->r_pgid.pool); 1030 req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1007 reqhead->layout.ol_pgid.preferred = cpu_to_le16(-1); 1031 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1008 reqhead->layout.ol_stripe_unit = 0; 1032
1033 /* fill in message content that changes each time we send it */
1034 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1035 put_unaligned_le32(req->r_flags, req->r_request_flags);
1036 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1037 p = req->r_request_pgid;
1038 ceph_encode_64(&p, req->r_pgid.pool);
1039 ceph_encode_32(&p, req->r_pgid.seed);
1040 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
1041 memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1042 sizeof(req->r_reassert_version));
1009 1043
1010 req->r_stamp = jiffies; 1044 req->r_stamp = jiffies;
1011 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1045 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
@@ -1105,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req)
1105 complete_all(&req->r_safe_completion); /* fsync waiter */ 1139 complete_all(&req->r_safe_completion); /* fsync waiter */
1106} 1140}
1107 1141
1142static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143{
1144 __u8 v;
1145
1146 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147 v = ceph_decode_8(p);
1148 if (v > 1) {
1149 pr_warning("do not understand pg encoding %d > 1", v);
1150 return -EINVAL;
1151 }
1152 pgid->pool = ceph_decode_64(p);
1153 pgid->seed = ceph_decode_32(p);
1154 *p += 4;
1155 return 0;
1156
1157bad:
1158 pr_warning("incomplete pg encoding");
1159 return -EINVAL;
1160}
1161
1108/* 1162/*
1109 * handle osd op reply. either call the callback if it is specified, 1163 * handle osd op reply. either call the callback if it is specified,
1110 * or do the completion to wake up the waiting thread. 1164 * or do the completion to wake up the waiting thread.
@@ -1112,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req)
1112static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1166static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1113 struct ceph_connection *con) 1167 struct ceph_connection *con)
1114{ 1168{
1115 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1169 void *p, *end;
1116 struct ceph_osd_request *req; 1170 struct ceph_osd_request *req;
1117 u64 tid; 1171 u64 tid;
1118 int numops, object_len, flags; 1172 int object_len;
1173 int numops, payload_len, flags;
1119 s32 result; 1174 s32 result;
1175 s32 retry_attempt;
1176 struct ceph_pg pg;
1177 int err;
1178 u32 reassert_epoch;
1179 u64 reassert_version;
1180 u32 osdmap_epoch;
1181 int i;
1120 1182
1121 tid = le64_to_cpu(msg->hdr.tid); 1183 tid = le64_to_cpu(msg->hdr.tid);
1122 if (msg->front.iov_len < sizeof(*rhead)) 1184 dout("handle_reply %p tid %llu\n", msg, tid);
1123 goto bad; 1185
1124 numops = le32_to_cpu(rhead->num_ops); 1186 p = msg->front.iov_base;
1125 object_len = le32_to_cpu(rhead->object_len); 1187 end = p + msg->front.iov_len;
1126 result = le32_to_cpu(rhead->result); 1188
1127 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1189 ceph_decode_need(&p, end, 4, bad);
1128 numops * sizeof(struct ceph_osd_op)) 1190 object_len = ceph_decode_32(&p);
1191 ceph_decode_need(&p, end, object_len, bad);
1192 p += object_len;
1193
1194 err = __decode_pgid(&p, end, &pg);
1195 if (err)
1129 goto bad; 1196 goto bad;
1130 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1197
1198 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1199 flags = ceph_decode_64(&p);
1200 result = ceph_decode_32(&p);
1201 reassert_epoch = ceph_decode_32(&p);
1202 reassert_version = ceph_decode_64(&p);
1203 osdmap_epoch = ceph_decode_32(&p);
1204
1131 /* lookup */ 1205 /* lookup */
1132 mutex_lock(&osdc->request_mutex); 1206 mutex_lock(&osdc->request_mutex);
1133 req = __lookup_request(osdc, tid); 1207 req = __lookup_request(osdc, tid);
@@ -1137,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1137 return; 1211 return;
1138 } 1212 }
1139 ceph_osdc_get_request(req); 1213 ceph_osdc_get_request(req);
1140 flags = le32_to_cpu(rhead->flags); 1214
1215 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1216 req, result);
1217
1218 ceph_decode_need(&p, end, 4, bad);
1219 numops = ceph_decode_32(&p);
1220 if (numops > CEPH_OSD_MAX_OP)
1221 goto bad_put;
1222 if (numops != req->r_num_ops)
1223 goto bad_put;
1224 payload_len = 0;
1225 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1226 for (i = 0; i < numops; i++) {
1227 struct ceph_osd_op *op = p;
1228 int len;
1229
1230 len = le32_to_cpu(op->payload_len);
1231 req->r_reply_op_len[i] = len;
1232 dout(" op %d has %d bytes\n", i, len);
1233 payload_len += len;
1234 p += sizeof(*op);
1235 }
1236 if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
1237 pr_warning("sum of op payload lens %d != data_len %d",
1238 payload_len, le32_to_cpu(msg->hdr.data_len));
1239 goto bad_put;
1240 }
1241
1242 ceph_decode_need(&p, end, 4 + numops * 4, bad);
1243 retry_attempt = ceph_decode_32(&p);
1244 for (i = 0; i < numops; i++)
1245 req->r_reply_op_result[i] = ceph_decode_32(&p);
1141 1246
1142 /* 1247 /*
1143 * if this connection filled our message, drop our reference now, to 1248 * if this connection filled our message, drop our reference now, to
@@ -1152,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1152 if (!req->r_got_reply) { 1257 if (!req->r_got_reply) {
1153 unsigned int bytes; 1258 unsigned int bytes;
1154 1259
1155 req->r_result = le32_to_cpu(rhead->result); 1260 req->r_result = result;
1156 bytes = le32_to_cpu(msg->hdr.data_len); 1261 bytes = le32_to_cpu(msg->hdr.data_len);
1157 dout("handle_reply result %d bytes %d\n", req->r_result, 1262 dout("handle_reply result %d bytes %d\n", req->r_result,
1158 bytes); 1263 bytes);
@@ -1160,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1160 req->r_result = bytes; 1265 req->r_result = bytes;
1161 1266
1162 /* in case this is a write and we need to replay, */ 1267 /* in case this is a write and we need to replay, */
1163 req->r_reassert_version = rhead->reassert_version; 1268 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1269 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1164 1270
1165 req->r_got_reply = 1; 1271 req->r_got_reply = 1;
1166 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1272 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1195,10 +1301,11 @@ done:
1195 ceph_osdc_put_request(req); 1301 ceph_osdc_put_request(req);
1196 return; 1302 return;
1197 1303
1304bad_put:
1305 ceph_osdc_put_request(req);
1198bad: 1306bad:
1199 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1307 pr_err("corrupt osd_op_reply got %d %d\n",
1200 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1308 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1201 (int)sizeof(*rhead));
1202 ceph_msg_dump(msg); 1309 ceph_msg_dump(msg);
1203} 1310}
1204 1311