aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c283
1 files changed, 255 insertions, 28 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2b4b32aaa893..010ff3bd58ad 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
338 msg_size = 4 + 4 + 8 + 8 + 4+8; 338 msg_size = 4 + 4 + 8 + 8 + 4+8;
339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
340 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 340 msg_size += 1 + 8 + 4 + 4; /* pg_t */
341 msg_size += 4 + MAX_OBJ_NAME_SIZE; 341 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
342 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 342 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
343 msg_size += 8; /* snapid */ 343 msg_size += 8; /* snapid */
344 msg_size += 8; /* snap_seq */ 344 msg_size += 8; /* snap_seq */
@@ -368,6 +368,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
368 INIT_LIST_HEAD(&req->r_req_lru_item); 368 INIT_LIST_HEAD(&req->r_req_lru_item);
369 INIT_LIST_HEAD(&req->r_osd_item); 369 INIT_LIST_HEAD(&req->r_osd_item);
370 370
371 req->r_base_oloc.pool = -1;
372 req->r_target_oloc.pool = -1;
373
371 /* create reply message */ 374 /* create reply message */
372 if (use_mempool) 375 if (use_mempool)
373 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 376 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -761,11 +764,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
761 if (num_ops > 1) 764 if (num_ops > 1)
762 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); 765 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
763 766
764 req->r_file_layout = *layout; /* keep a copy */ 767 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
765 768
766 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", 769 snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
767 vino.ino, objnum); 770 "%llx.%08llx", vino.ino, objnum);
768 req->r_oid_len = strlen(req->r_oid); 771 req->r_base_oid.name_len = strlen(req->r_base_oid.name);
769 772
770 return req; 773 return req;
771} 774}
@@ -1044,8 +1047,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1044 !ceph_con_opened(&osd->o_con)) { 1047 !ceph_con_opened(&osd->o_con)) {
1045 struct ceph_osd_request *req; 1048 struct ceph_osd_request *req;
1046 1049
1047 dout(" osd addr hasn't changed and connection never opened," 1050 dout("osd addr hasn't changed and connection never opened, "
1048 " letting msgr retry"); 1051 "letting msgr retry\n");
1049 /* touch each r_stamp for handle_timeout()'s benfit */ 1052 /* touch each r_stamp for handle_timeout()'s benfit */
1050 list_for_each_entry(req, &osd->o_requests, r_osd_item) 1053 list_for_each_entry(req, &osd->o_requests, r_osd_item)
1051 req->r_stamp = jiffies; 1054 req->r_stamp = jiffies;
@@ -1232,6 +1235,61 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1232EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1235EXPORT_SYMBOL(ceph_osdc_set_request_linger);
1233 1236
1234/* 1237/*
1238 * Returns whether a request should be blocked from being sent
1239 * based on the current osdmap and osd_client settings.
1240 *
1241 * Caller should hold map_sem for read.
1242 */
1243static bool __req_should_be_paused(struct ceph_osd_client *osdc,
1244 struct ceph_osd_request *req)
1245{
1246 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
1247 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
1248 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
1249 return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
1250 (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
1251}
1252
1253/*
1254 * Calculate mapping of a request to a PG. Takes tiering into account.
1255 */
1256static int __calc_request_pg(struct ceph_osdmap *osdmap,
1257 struct ceph_osd_request *req,
1258 struct ceph_pg *pg_out)
1259{
1260 bool need_check_tiering;
1261
1262 need_check_tiering = false;
1263 if (req->r_target_oloc.pool == -1) {
1264 req->r_target_oloc = req->r_base_oloc; /* struct */
1265 need_check_tiering = true;
1266 }
1267 if (req->r_target_oid.name_len == 0) {
1268 ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
1269 need_check_tiering = true;
1270 }
1271
1272 if (need_check_tiering &&
1273 (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1274 struct ceph_pg_pool_info *pi;
1275
1276 pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
1277 if (pi) {
1278 if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
1279 pi->read_tier >= 0)
1280 req->r_target_oloc.pool = pi->read_tier;
1281 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1282 pi->write_tier >= 0)
1283 req->r_target_oloc.pool = pi->write_tier;
1284 }
1285 /* !pi is caught in ceph_oloc_oid_to_pg() */
1286 }
1287
1288 return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
1289 &req->r_target_oid, pg_out);
1290}
1291
1292/*
1235 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1293 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
1236 * (as needed), and set the request r_osd appropriately. If there is 1294 * (as needed), and set the request r_osd appropriately. If there is
1237 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1295 * no up osd, set r_osd to NULL. Move the request to the appropriate list
@@ -1248,10 +1306,11 @@ static int __map_request(struct ceph_osd_client *osdc,
1248 int acting[CEPH_PG_MAX_SIZE]; 1306 int acting[CEPH_PG_MAX_SIZE];
1249 int o = -1, num = 0; 1307 int o = -1, num = 0;
1250 int err; 1308 int err;
1309 bool was_paused;
1251 1310
1252 dout("map_request %p tid %lld\n", req, req->r_tid); 1311 dout("map_request %p tid %lld\n", req, req->r_tid);
1253 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 1312
1254 ceph_file_layout_pg_pool(req->r_file_layout)); 1313 err = __calc_request_pg(osdc->osdmap, req, &pgid);
1255 if (err) { 1314 if (err) {
1256 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1315 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1257 return err; 1316 return err;
@@ -1264,12 +1323,18 @@ static int __map_request(struct ceph_osd_client *osdc,
1264 num = err; 1323 num = err;
1265 } 1324 }
1266 1325
1326 was_paused = req->r_paused;
1327 req->r_paused = __req_should_be_paused(osdc, req);
1328 if (was_paused && !req->r_paused)
1329 force_resend = 1;
1330
1267 if ((!force_resend && 1331 if ((!force_resend &&
1268 req->r_osd && req->r_osd->o_osd == o && 1332 req->r_osd && req->r_osd->o_osd == o &&
1269 req->r_sent >= req->r_osd->o_incarnation && 1333 req->r_sent >= req->r_osd->o_incarnation &&
1270 req->r_num_pg_osds == num && 1334 req->r_num_pg_osds == num &&
1271 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1335 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
1272 (req->r_osd == NULL && o == -1)) 1336 (req->r_osd == NULL && o == -1) ||
1337 req->r_paused)
1273 return 0; /* no change */ 1338 return 0; /* no change */
1274 1339
1275 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1340 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
@@ -1331,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
1331 /* fill in message content that changes each time we send it */ 1396 /* fill in message content that changes each time we send it */
1332 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1397 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1333 put_unaligned_le32(req->r_flags, req->r_request_flags); 1398 put_unaligned_le32(req->r_flags, req->r_request_flags);
1334 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1399 put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
1335 p = req->r_request_pgid; 1400 p = req->r_request_pgid;
1336 ceph_encode_64(&p, req->r_pgid.pool); 1401 ceph_encode_64(&p, req->r_pgid.pool);
1337 ceph_encode_32(&p, req->r_pgid.seed); 1402 ceph_encode_32(&p, req->r_pgid.seed);
@@ -1432,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work)
1432 round_jiffies_relative(delay)); 1497 round_jiffies_relative(delay));
1433} 1498}
1434 1499
1500static int ceph_oloc_decode(void **p, void *end,
1501 struct ceph_object_locator *oloc)
1502{
1503 u8 struct_v, struct_cv;
1504 u32 len;
1505 void *struct_end;
1506 int ret = 0;
1507
1508 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1509 struct_v = ceph_decode_8(p);
1510 struct_cv = ceph_decode_8(p);
1511 if (struct_v < 3) {
1512 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
1513 struct_v, struct_cv);
1514 goto e_inval;
1515 }
1516 if (struct_cv > 6) {
1517 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
1518 struct_v, struct_cv);
1519 goto e_inval;
1520 }
1521 len = ceph_decode_32(p);
1522 ceph_decode_need(p, end, len, e_inval);
1523 struct_end = *p + len;
1524
1525 oloc->pool = ceph_decode_64(p);
1526 *p += 4; /* skip preferred */
1527
1528 len = ceph_decode_32(p);
1529 if (len > 0) {
1530 pr_warn("ceph_object_locator::key is set\n");
1531 goto e_inval;
1532 }
1533
1534 if (struct_v >= 5) {
1535 len = ceph_decode_32(p);
1536 if (len > 0) {
1537 pr_warn("ceph_object_locator::nspace is set\n");
1538 goto e_inval;
1539 }
1540 }
1541
1542 if (struct_v >= 6) {
1543 s64 hash = ceph_decode_64(p);
1544 if (hash != -1) {
1545 pr_warn("ceph_object_locator::hash is set\n");
1546 goto e_inval;
1547 }
1548 }
1549
1550 /* skip the rest */
1551 *p = struct_end;
1552out:
1553 return ret;
1554
1555e_inval:
1556 ret = -EINVAL;
1557 goto out;
1558}
1559
1560static int ceph_redirect_decode(void **p, void *end,
1561 struct ceph_request_redirect *redir)
1562{
1563 u8 struct_v, struct_cv;
1564 u32 len;
1565 void *struct_end;
1566 int ret;
1567
1568 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1569 struct_v = ceph_decode_8(p);
1570 struct_cv = ceph_decode_8(p);
1571 if (struct_cv > 1) {
1572 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
1573 struct_v, struct_cv);
1574 goto e_inval;
1575 }
1576 len = ceph_decode_32(p);
1577 ceph_decode_need(p, end, len, e_inval);
1578 struct_end = *p + len;
1579
1580 ret = ceph_oloc_decode(p, end, &redir->oloc);
1581 if (ret)
1582 goto out;
1583
1584 len = ceph_decode_32(p);
1585 if (len > 0) {
1586 pr_warn("ceph_request_redirect::object_name is set\n");
1587 goto e_inval;
1588 }
1589
1590 len = ceph_decode_32(p);
1591 *p += len; /* skip osd_instructions */
1592
1593 /* skip the rest */
1594 *p = struct_end;
1595out:
1596 return ret;
1597
1598e_inval:
1599 ret = -EINVAL;
1600 goto out;
1601}
1602
1435static void complete_request(struct ceph_osd_request *req) 1603static void complete_request(struct ceph_osd_request *req)
1436{ 1604{
1437 complete_all(&req->r_safe_completion); /* fsync waiter */ 1605 complete_all(&req->r_safe_completion); /* fsync waiter */
@@ -1446,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1446{ 1614{
1447 void *p, *end; 1615 void *p, *end;
1448 struct ceph_osd_request *req; 1616 struct ceph_osd_request *req;
1617 struct ceph_request_redirect redir;
1449 u64 tid; 1618 u64 tid;
1450 int object_len; 1619 int object_len;
1451 unsigned int numops; 1620 unsigned int numops;
@@ -1525,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1525 for (i = 0; i < numops; i++) 1694 for (i = 0; i < numops; i++)
1526 req->r_reply_op_result[i] = ceph_decode_32(&p); 1695 req->r_reply_op_result[i] = ceph_decode_32(&p);
1527 1696
1528 already_completed = req->r_got_reply; 1697 if (le16_to_cpu(msg->hdr.version) >= 6) {
1698 p += 8 + 4; /* skip replay_version */
1699 p += 8; /* skip user_version */
1529 1700
1530 if (!req->r_got_reply) { 1701 err = ceph_redirect_decode(&p, end, &redir);
1702 if (err)
1703 goto bad_put;
1704 } else {
1705 redir.oloc.pool = -1;
1706 }
1707
1708 if (redir.oloc.pool != -1) {
1709 dout("redirect pool %lld\n", redir.oloc.pool);
1710
1711 __unregister_request(osdc, req);
1712 mutex_unlock(&osdc->request_mutex);
1713
1714 req->r_target_oloc = redir.oloc; /* struct */
1715
1716 /*
1717 * Start redirect requests with nofail=true. If
1718 * mapping fails, request will end up on the notarget
1719 * list, waiting for the new osdmap (which can take
1720 * a while), even though the original request mapped
1721 * successfully. In the future we might want to follow
1722 * original request's nofail setting here.
1723 */
1724 err = ceph_osdc_start_request(osdc, req, true);
1725 BUG_ON(err);
1531 1726
1727 goto done;
1728 }
1729
1730 already_completed = req->r_got_reply;
1731 if (!req->r_got_reply) {
1532 req->r_result = result; 1732 req->r_result = result;
1533 dout("handle_reply result %d bytes %d\n", req->r_result, 1733 dout("handle_reply result %d bytes %d\n", req->r_result,
1534 bytes); 1734 bytes);
@@ -1581,6 +1781,13 @@ done:
1581 return; 1781 return;
1582 1782
1583bad_put: 1783bad_put:
1784 req->r_result = -EIO;
1785 __unregister_request(osdc, req);
1786 if (req->r_callback)
1787 req->r_callback(req, msg);
1788 else
1789 complete_all(&req->r_completion);
1790 complete_request(req);
1584 ceph_osdc_put_request(req); 1791 ceph_osdc_put_request(req);
1585bad_mutex: 1792bad_mutex:
1586 mutex_unlock(&osdc->request_mutex); 1793 mutex_unlock(&osdc->request_mutex);
@@ -1613,14 +1820,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
1613 * 1820 *
1614 * Caller should hold map_sem for read. 1821 * Caller should hold map_sem for read.
1615 */ 1822 */
1616static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1823static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
1824 bool force_resend_writes)
1617{ 1825{
1618 struct ceph_osd_request *req, *nreq; 1826 struct ceph_osd_request *req, *nreq;
1619 struct rb_node *p; 1827 struct rb_node *p;
1620 int needmap = 0; 1828 int needmap = 0;
1621 int err; 1829 int err;
1830 bool force_resend_req;
1622 1831
1623 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1832 dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
1833 force_resend_writes ? " (force resend writes)" : "");
1624 mutex_lock(&osdc->request_mutex); 1834 mutex_lock(&osdc->request_mutex);
1625 for (p = rb_first(&osdc->requests); p; ) { 1835 for (p = rb_first(&osdc->requests); p; ) {
1626 req = rb_entry(p, struct ceph_osd_request, r_node); 1836 req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -1645,7 +1855,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1645 continue; 1855 continue;
1646 } 1856 }
1647 1857
1648 err = __map_request(osdc, req, force_resend); 1858 force_resend_req = force_resend ||
1859 (force_resend_writes &&
1860 req->r_flags & CEPH_OSD_FLAG_WRITE);
1861 err = __map_request(osdc, req, force_resend_req);
1649 if (err < 0) 1862 if (err < 0)
1650 continue; /* error */ 1863 continue; /* error */
1651 if (req->r_osd == NULL) { 1864 if (req->r_osd == NULL) {
@@ -1665,7 +1878,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1665 r_linger_item) { 1878 r_linger_item) {
1666 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1879 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1667 1880
1668 err = __map_request(osdc, req, force_resend); 1881 err = __map_request(osdc, req,
1882 force_resend || force_resend_writes);
1669 dout("__map_request returned %d\n", err); 1883 dout("__map_request returned %d\n", err);
1670 if (err == 0) 1884 if (err == 0)
1671 continue; /* no change and no osd was specified */ 1885 continue; /* no change and no osd was specified */
@@ -1707,6 +1921,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1707 struct ceph_osdmap *newmap = NULL, *oldmap; 1921 struct ceph_osdmap *newmap = NULL, *oldmap;
1708 int err; 1922 int err;
1709 struct ceph_fsid fsid; 1923 struct ceph_fsid fsid;
1924 bool was_full;
1710 1925
1711 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1926 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1712 p = msg->front.iov_base; 1927 p = msg->front.iov_base;
@@ -1720,6 +1935,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1720 1935
1721 down_write(&osdc->map_sem); 1936 down_write(&osdc->map_sem);
1722 1937
1938 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
1939
1723 /* incremental maps */ 1940 /* incremental maps */
1724 ceph_decode_32_safe(&p, end, nr_maps, bad); 1941 ceph_decode_32_safe(&p, end, nr_maps, bad);
1725 dout(" %d inc maps\n", nr_maps); 1942 dout(" %d inc maps\n", nr_maps);
@@ -1744,7 +1961,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1744 ceph_osdmap_destroy(osdc->osdmap); 1961 ceph_osdmap_destroy(osdc->osdmap);
1745 osdc->osdmap = newmap; 1962 osdc->osdmap = newmap;
1746 } 1963 }
1747 kick_requests(osdc, 0); 1964 was_full = was_full ||
1965 ceph_osdmap_flag(osdc->osdmap,
1966 CEPH_OSDMAP_FULL);
1967 kick_requests(osdc, 0, was_full);
1748 } else { 1968 } else {
1749 dout("ignoring incremental map %u len %d\n", 1969 dout("ignoring incremental map %u len %d\n",
1750 epoch, maplen); 1970 epoch, maplen);
@@ -1787,7 +2007,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1787 skipped_map = 1; 2007 skipped_map = 1;
1788 ceph_osdmap_destroy(oldmap); 2008 ceph_osdmap_destroy(oldmap);
1789 } 2009 }
1790 kick_requests(osdc, skipped_map); 2010 was_full = was_full ||
2011 ceph_osdmap_flag(osdc->osdmap,
2012 CEPH_OSDMAP_FULL);
2013 kick_requests(osdc, skipped_map, was_full);
1791 } 2014 }
1792 p += maplen; 2015 p += maplen;
1793 nr_maps--; 2016 nr_maps--;
@@ -1804,7 +2027,9 @@ done:
1804 * we find out when we are no longer full and stop returning 2027 * we find out when we are no longer full and stop returning
1805 * ENOSPC. 2028 * ENOSPC.
1806 */ 2029 */
1807 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 2030 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
2031 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
2032 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
1808 ceph_monc_request_next_osdmap(&osdc->client->monc); 2033 ceph_monc_request_next_osdmap(&osdc->client->monc);
1809 2034
1810 mutex_lock(&osdc->request_mutex); 2035 mutex_lock(&osdc->request_mutex);
@@ -2068,10 +2293,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
2068 ceph_encode_32(&p, -1); /* preferred */ 2293 ceph_encode_32(&p, -1); /* preferred */
2069 2294
2070 /* oid */ 2295 /* oid */
2071 ceph_encode_32(&p, req->r_oid_len); 2296 ceph_encode_32(&p, req->r_base_oid.name_len);
2072 memcpy(p, req->r_oid, req->r_oid_len); 2297 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
2073 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 2298 dout("oid '%.*s' len %d\n", req->r_base_oid.name_len,
2074 p += req->r_oid_len; 2299 req->r_base_oid.name, req->r_base_oid.name_len);
2300 p += req->r_base_oid.name_len;
2075 2301
2076 /* ops--can imply data */ 2302 /* ops--can imply data */
2077 ceph_encode_16(&p, (u16)req->r_num_ops); 2303 ceph_encode_16(&p, (u16)req->r_num_ops);
@@ -2454,7 +2680,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2454 struct ceph_osd_client *osdc = osd->o_osdc; 2680 struct ceph_osd_client *osdc = osd->o_osdc;
2455 struct ceph_msg *m; 2681 struct ceph_msg *m;
2456 struct ceph_osd_request *req; 2682 struct ceph_osd_request *req;
2457 int front = le32_to_cpu(hdr->front_len); 2683 int front_len = le32_to_cpu(hdr->front_len);
2458 int data_len = le32_to_cpu(hdr->data_len); 2684 int data_len = le32_to_cpu(hdr->data_len);
2459 u64 tid; 2685 u64 tid;
2460 2686
@@ -2474,12 +2700,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2474 req->r_reply, req->r_reply->con); 2700 req->r_reply, req->r_reply->con);
2475 ceph_msg_revoke_incoming(req->r_reply); 2701 ceph_msg_revoke_incoming(req->r_reply);
2476 2702
2477 if (front > req->r_reply->front.iov_len) { 2703 if (front_len > req->r_reply->front_alloc_len) {
2478 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", 2704 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
2479 front, (int)req->r_reply->front.iov_len, 2705 front_len, req->r_reply->front_alloc_len,
2480 (unsigned int)con->peer_name.type, 2706 (unsigned int)con->peer_name.type,
2481 le64_to_cpu(con->peer_name.num)); 2707 le64_to_cpu(con->peer_name.num));
2482 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2708 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
2709 false);
2483 if (!m) 2710 if (!m)
2484 goto out; 2711 goto out;
2485 ceph_msg_put(req->r_reply); 2712 ceph_msg_put(req->r_reply);