aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorIlya Dryomov <ilya.dryomov@inktank.com>2014-01-27 10:40:20 -0500
committerIlya Dryomov <ilya.dryomov@inktank.com>2014-01-27 16:57:53 -0500
commit205ee1187a671c3b067d7f1e974903b44036f270 (patch)
tree2a310516bcf7fdfb769c360a9fcfce85f501f57a /net/ceph
parent3c972c95c68f455d80ff185aa440857be046bbe0 (diff)
libceph: follow redirect replies from osds
Follow redirect replies from osds, for details see ceph.git commit fbbe3ad1220799b7bb00ea30fce581c5eadaf034. v1 (current) version of redirect reply consists of oloc and oid, which expands to pool, key, nspace, hash and oid. However, server-side code that would populate anything other than pool doesn't exist yet, and hence this commit adds support for pool redirects only. To make sure that future server-side updates don't break us, we decode all fields and, if any of key, nspace, hash or oid have a non-default value, error out with "corrupt osd_op_reply ..." message. Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com> Reviewed-by: Sage Weil <sage@inktank.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/osd_client.c167
1 files changed, 158 insertions, 9 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3997a87c4f51..010ff3bd58ad 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -369,6 +369,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
369 INIT_LIST_HEAD(&req->r_osd_item); 369 INIT_LIST_HEAD(&req->r_osd_item);
370 370
371 req->r_base_oloc.pool = -1; 371 req->r_base_oloc.pool = -1;
372 req->r_target_oloc.pool = -1;
372 373
373 /* create reply message */ 374 /* create reply message */
374 if (use_mempool) 375 if (use_mempool)
@@ -1256,23 +1257,36 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
1256 struct ceph_osd_request *req, 1257 struct ceph_osd_request *req,
1257 struct ceph_pg *pg_out) 1258 struct ceph_pg *pg_out)
1258{ 1259{
1259 if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1260 bool need_check_tiering;
1261
1262 need_check_tiering = false;
1263 if (req->r_target_oloc.pool == -1) {
1264 req->r_target_oloc = req->r_base_oloc; /* struct */
1265 need_check_tiering = true;
1266 }
1267 if (req->r_target_oid.name_len == 0) {
1268 ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
1269 need_check_tiering = true;
1270 }
1271
1272 if (need_check_tiering &&
1273 (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1260 struct ceph_pg_pool_info *pi; 1274 struct ceph_pg_pool_info *pi;
1261 1275
1262 pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool); 1276 pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
1263 if (pi) { 1277 if (pi) {
1264 if ((req->r_flags & CEPH_OSD_FLAG_READ) && 1278 if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
1265 pi->read_tier >= 0) 1279 pi->read_tier >= 0)
1266 req->r_base_oloc.pool = pi->read_tier; 1280 req->r_target_oloc.pool = pi->read_tier;
1267 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 1281 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1268 pi->write_tier >= 0) 1282 pi->write_tier >= 0)
1269 req->r_base_oloc.pool = pi->write_tier; 1283 req->r_target_oloc.pool = pi->write_tier;
1270 } 1284 }
1271 /* !pi is caught in ceph_oloc_oid_to_pg() */ 1285 /* !pi is caught in ceph_oloc_oid_to_pg() */
1272 } 1286 }
1273 1287
1274 return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc, 1288 return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
1275 &req->r_base_oid, pg_out); 1289 &req->r_target_oid, pg_out);
1276} 1290}
1277 1291
1278/* 1292/*
@@ -1382,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
1382 /* fill in message content that changes each time we send it */ 1396 /* fill in message content that changes each time we send it */
1383 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1397 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1384 put_unaligned_le32(req->r_flags, req->r_request_flags); 1398 put_unaligned_le32(req->r_flags, req->r_request_flags);
1385 put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool); 1399 put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
1386 p = req->r_request_pgid; 1400 p = req->r_request_pgid;
1387 ceph_encode_64(&p, req->r_pgid.pool); 1401 ceph_encode_64(&p, req->r_pgid.pool);
1388 ceph_encode_32(&p, req->r_pgid.seed); 1402 ceph_encode_32(&p, req->r_pgid.seed);
@@ -1483,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work)
1483 round_jiffies_relative(delay)); 1497 round_jiffies_relative(delay));
1484} 1498}
1485 1499
1500static int ceph_oloc_decode(void **p, void *end,
1501 struct ceph_object_locator *oloc)
1502{
1503 u8 struct_v, struct_cv;
1504 u32 len;
1505 void *struct_end;
1506 int ret = 0;
1507
1508 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1509 struct_v = ceph_decode_8(p);
1510 struct_cv = ceph_decode_8(p);
1511 if (struct_v < 3) {
1512 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
1513 struct_v, struct_cv);
1514 goto e_inval;
1515 }
1516 if (struct_cv > 6) {
1517 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
1518 struct_v, struct_cv);
1519 goto e_inval;
1520 }
1521 len = ceph_decode_32(p);
1522 ceph_decode_need(p, end, len, e_inval);
1523 struct_end = *p + len;
1524
1525 oloc->pool = ceph_decode_64(p);
1526 *p += 4; /* skip preferred */
1527
1528 len = ceph_decode_32(p);
1529 if (len > 0) {
1530 pr_warn("ceph_object_locator::key is set\n");
1531 goto e_inval;
1532 }
1533
1534 if (struct_v >= 5) {
1535 len = ceph_decode_32(p);
1536 if (len > 0) {
1537 pr_warn("ceph_object_locator::nspace is set\n");
1538 goto e_inval;
1539 }
1540 }
1541
1542 if (struct_v >= 6) {
1543 s64 hash = ceph_decode_64(p);
1544 if (hash != -1) {
1545 pr_warn("ceph_object_locator::hash is set\n");
1546 goto e_inval;
1547 }
1548 }
1549
1550 /* skip the rest */
1551 *p = struct_end;
1552out:
1553 return ret;
1554
1555e_inval:
1556 ret = -EINVAL;
1557 goto out;
1558}
1559
1560static int ceph_redirect_decode(void **p, void *end,
1561 struct ceph_request_redirect *redir)
1562{
1563 u8 struct_v, struct_cv;
1564 u32 len;
1565 void *struct_end;
1566 int ret;
1567
1568 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1569 struct_v = ceph_decode_8(p);
1570 struct_cv = ceph_decode_8(p);
1571 if (struct_cv > 1) {
1572 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
1573 struct_v, struct_cv);
1574 goto e_inval;
1575 }
1576 len = ceph_decode_32(p);
1577 ceph_decode_need(p, end, len, e_inval);
1578 struct_end = *p + len;
1579
1580 ret = ceph_oloc_decode(p, end, &redir->oloc);
1581 if (ret)
1582 goto out;
1583
1584 len = ceph_decode_32(p);
1585 if (len > 0) {
1586 pr_warn("ceph_request_redirect::object_name is set\n");
1587 goto e_inval;
1588 }
1589
1590 len = ceph_decode_32(p);
1591 *p += len; /* skip osd_instructions */
1592
1593 /* skip the rest */
1594 *p = struct_end;
1595out:
1596 return ret;
1597
1598e_inval:
1599 ret = -EINVAL;
1600 goto out;
1601}
1602
1486static void complete_request(struct ceph_osd_request *req) 1603static void complete_request(struct ceph_osd_request *req)
1487{ 1604{
1488 complete_all(&req->r_safe_completion); /* fsync waiter */ 1605 complete_all(&req->r_safe_completion); /* fsync waiter */
@@ -1497,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1497{ 1614{
1498 void *p, *end; 1615 void *p, *end;
1499 struct ceph_osd_request *req; 1616 struct ceph_osd_request *req;
1617 struct ceph_request_redirect redir;
1500 u64 tid; 1618 u64 tid;
1501 int object_len; 1619 int object_len;
1502 unsigned int numops; 1620 unsigned int numops;
@@ -1576,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1576 for (i = 0; i < numops; i++) 1694 for (i = 0; i < numops; i++)
1577 req->r_reply_op_result[i] = ceph_decode_32(&p); 1695 req->r_reply_op_result[i] = ceph_decode_32(&p);
1578 1696
1579 already_completed = req->r_got_reply; 1697 if (le16_to_cpu(msg->hdr.version) >= 6) {
1698 p += 8 + 4; /* skip replay_version */
1699 p += 8; /* skip user_version */
1580 1700
1581 if (!req->r_got_reply) { 1701 err = ceph_redirect_decode(&p, end, &redir);
1702 if (err)
1703 goto bad_put;
1704 } else {
1705 redir.oloc.pool = -1;
1706 }
1582 1707
1708 if (redir.oloc.pool != -1) {
1709 dout("redirect pool %lld\n", redir.oloc.pool);
1710
1711 __unregister_request(osdc, req);
1712 mutex_unlock(&osdc->request_mutex);
1713
1714 req->r_target_oloc = redir.oloc; /* struct */
1715
1716 /*
1717 * Start redirect requests with nofail=true. If
1718 * mapping fails, request will end up on the notarget
1719 * list, waiting for the new osdmap (which can take
1720 * a while), even though the original request mapped
1721 * successfully. In the future we might want to follow
1722 * original request's nofail setting here.
1723 */
1724 err = ceph_osdc_start_request(osdc, req, true);
1725 BUG_ON(err);
1726
1727 goto done;
1728 }
1729
1730 already_completed = req->r_got_reply;
1731 if (!req->r_got_reply) {
1583 req->r_result = result; 1732 req->r_result = result;
1584 dout("handle_reply result %d bytes %d\n", req->r_result, 1733 dout("handle_reply result %d bytes %d\n", req->r_result,
1585 bytes); 1734 bytes);