summaryrefslogtreecommitdiffstats
path: root/net/ceph/osdmap.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-05-26 17:10:32 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2016-05-26 17:10:32 -0400
commita10c38a4f385f5d7c173a263ff6bb2d36021b3bb (patch)
tree3cbaa916940b36a9fdb27c8a231e1488fbc352d6 /net/ceph/osdmap.c
parentea8ea737c46cffa5d0ee74309f81e55a7e5e9c2a (diff)
parente536030934aebf049fe6aaebc58dd37aeee21840 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "This changeset has a few main parts: - Ilya has finished a huge refactoring effort to sync up the client-side logic in libceph with the user-space client code, which has evolved significantly over the last couple years, with lots of additional behaviors (e.g., how requests are handled when cluster is full and transitions from full to non-full). This structure of the code is more closely aligned with userspace now such that it will be much easier to maintain going forward when behavior changes take place. There are some locking improvements bundled in as well. - Zheng adds multi-filesystem support (multiple namespaces within the same Ceph cluster) - Zheng has changed the readdir offsets and directory enumeration so that dentry offsets are hash-based and therefore stable across directory fragmentation events on the MDS. - Zheng has a smorgasbord of bug fixes across fs/ceph" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (71 commits) ceph: fix wake_up_session_cb() ceph: don't use truncate_pagecache() to invalidate read cache ceph: SetPageError() for writeback pages if writepages fails ceph: handle interrupted ceph_writepage() ceph: make ceph_update_writeable_page() uninterruptible libceph: make ceph_osdc_wait_request() uninterruptible ceph: handle -EAGAIN returned by ceph_update_writeable_page() ceph: make fault/page_mkwrite return VM_FAULT_OOM for -ENOMEM ceph: block non-fatal signals for fault/page_mkwrite ceph: make logical calculation functions return bool ceph: tolerate bad i_size for symlink inode ceph: improve fragtree change detection ceph: keep leaf frag when updating fragtree ceph: fix dir_auth check in ceph_fill_dirfrag() ceph: don't assume frag tree splits in mds reply are sorted ceph: fix inode reference leak ceph: using hash value to compose dentry offset ceph: don't forbid marking directory complete after forward seek ceph: record 'offset' for each entry of readdir result ceph: define 'end/complete' in readdir reply as bit flags ...
Diffstat (limited to 'net/ceph/osdmap.c')
-rw-r--r--net/ceph/osdmap.c651
1 files changed, 489 insertions, 162 deletions
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 243574c8cf33..cde52e94732f 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -380,23 +380,24 @@ bad:
380 return ERR_PTR(err); 380 return ERR_PTR(err);
381} 381}
382 382
383/* 383int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
384 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
385 * to a set of osds) and primary_temp (explicit primary setting)
386 */
387static int pgid_cmp(struct ceph_pg l, struct ceph_pg r)
388{ 384{
389 if (l.pool < r.pool) 385 if (lhs->pool < rhs->pool)
390 return -1; 386 return -1;
391 if (l.pool > r.pool) 387 if (lhs->pool > rhs->pool)
392 return 1; 388 return 1;
393 if (l.seed < r.seed) 389 if (lhs->seed < rhs->seed)
394 return -1; 390 return -1;
395 if (l.seed > r.seed) 391 if (lhs->seed > rhs->seed)
396 return 1; 392 return 1;
393
397 return 0; 394 return 0;
398} 395}
399 396
397/*
398 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
399 * to a set of osds) and primary_temp (explicit primary setting)
400 */
400static int __insert_pg_mapping(struct ceph_pg_mapping *new, 401static int __insert_pg_mapping(struct ceph_pg_mapping *new,
401 struct rb_root *root) 402 struct rb_root *root)
402{ 403{
@@ -409,7 +410,7 @@ static int __insert_pg_mapping(struct ceph_pg_mapping *new,
409 while (*p) { 410 while (*p) {
410 parent = *p; 411 parent = *p;
411 pg = rb_entry(parent, struct ceph_pg_mapping, node); 412 pg = rb_entry(parent, struct ceph_pg_mapping, node);
412 c = pgid_cmp(new->pgid, pg->pgid); 413 c = ceph_pg_compare(&new->pgid, &pg->pgid);
413 if (c < 0) 414 if (c < 0)
414 p = &(*p)->rb_left; 415 p = &(*p)->rb_left;
415 else if (c > 0) 416 else if (c > 0)
@@ -432,7 +433,7 @@ static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root,
432 433
433 while (n) { 434 while (n) {
434 pg = rb_entry(n, struct ceph_pg_mapping, node); 435 pg = rb_entry(n, struct ceph_pg_mapping, node);
435 c = pgid_cmp(pgid, pg->pgid); 436 c = ceph_pg_compare(&pgid, &pg->pgid);
436 if (c < 0) { 437 if (c < 0) {
437 n = n->rb_left; 438 n = n->rb_left;
438 } else if (c > 0) { 439 } else if (c > 0) {
@@ -596,7 +597,9 @@ static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
596 *p += 4; /* skip crash_replay_interval */ 597 *p += 4; /* skip crash_replay_interval */
597 598
598 if (ev >= 7) 599 if (ev >= 7)
599 *p += 1; /* skip min_size */ 600 pi->min_size = ceph_decode_8(p);
601 else
602 pi->min_size = pi->size - pi->size / 2;
600 603
601 if (ev >= 8) 604 if (ev >= 8)
602 *p += 8 + 8; /* skip quota_max_* */ 605 *p += 8 + 8; /* skip quota_max_* */
@@ -616,6 +619,50 @@ static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
616 pi->write_tier = -1; 619 pi->write_tier = -1;
617 } 620 }
618 621
622 if (ev >= 10) {
623 /* skip properties */
624 num = ceph_decode_32(p);
625 while (num--) {
626 len = ceph_decode_32(p);
627 *p += len; /* key */
628 len = ceph_decode_32(p);
629 *p += len; /* val */
630 }
631 }
632
633 if (ev >= 11) {
634 /* skip hit_set_params */
635 *p += 1 + 1; /* versions */
636 len = ceph_decode_32(p);
637 *p += len;
638
639 *p += 4; /* skip hit_set_period */
640 *p += 4; /* skip hit_set_count */
641 }
642
643 if (ev >= 12)
644 *p += 4; /* skip stripe_width */
645
646 if (ev >= 13) {
647 *p += 8; /* skip target_max_bytes */
648 *p += 8; /* skip target_max_objects */
649 *p += 4; /* skip cache_target_dirty_ratio_micro */
650 *p += 4; /* skip cache_target_full_ratio_micro */
651 *p += 4; /* skip cache_min_flush_age */
652 *p += 4; /* skip cache_min_evict_age */
653 }
654
655 if (ev >= 14) {
656 /* skip erasure_code_profile */
657 len = ceph_decode_32(p);
658 *p += len;
659 }
660
661 if (ev >= 15)
662 pi->last_force_request_resend = ceph_decode_32(p);
663 else
664 pi->last_force_request_resend = 0;
665
619 /* ignore the rest */ 666 /* ignore the rest */
620 667
621 *p = pool_end; 668 *p = pool_end;
@@ -660,6 +707,23 @@ bad:
660/* 707/*
661 * osd map 708 * osd map
662 */ 709 */
710struct ceph_osdmap *ceph_osdmap_alloc(void)
711{
712 struct ceph_osdmap *map;
713
714 map = kzalloc(sizeof(*map), GFP_NOIO);
715 if (!map)
716 return NULL;
717
718 map->pg_pools = RB_ROOT;
719 map->pool_max = -1;
720 map->pg_temp = RB_ROOT;
721 map->primary_temp = RB_ROOT;
722 mutex_init(&map->crush_scratch_mutex);
723
724 return map;
725}
726
663void ceph_osdmap_destroy(struct ceph_osdmap *map) 727void ceph_osdmap_destroy(struct ceph_osdmap *map)
664{ 728{
665 dout("osdmap_destroy %p\n", map); 729 dout("osdmap_destroy %p\n", map);
@@ -1183,14 +1247,10 @@ struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
1183 struct ceph_osdmap *map; 1247 struct ceph_osdmap *map;
1184 int ret; 1248 int ret;
1185 1249
1186 map = kzalloc(sizeof(*map), GFP_NOFS); 1250 map = ceph_osdmap_alloc();
1187 if (!map) 1251 if (!map)
1188 return ERR_PTR(-ENOMEM); 1252 return ERR_PTR(-ENOMEM);
1189 1253
1190 map->pg_temp = RB_ROOT;
1191 map->primary_temp = RB_ROOT;
1192 mutex_init(&map->crush_scratch_mutex);
1193
1194 ret = osdmap_decode(p, end, map); 1254 ret = osdmap_decode(p, end, map);
1195 if (ret) { 1255 if (ret) {
1196 ceph_osdmap_destroy(map); 1256 ceph_osdmap_destroy(map);
@@ -1204,8 +1264,7 @@ struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
1204 * decode and apply an incremental map update. 1264 * decode and apply an incremental map update.
1205 */ 1265 */
1206struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, 1266struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
1207 struct ceph_osdmap *map, 1267 struct ceph_osdmap *map)
1208 struct ceph_messenger *msgr)
1209{ 1268{
1210 struct crush_map *newcrush = NULL; 1269 struct crush_map *newcrush = NULL;
1211 struct ceph_fsid fsid; 1270 struct ceph_fsid fsid;
@@ -1381,8 +1440,252 @@ bad:
1381 return ERR_PTR(err); 1440 return ERR_PTR(err);
1382} 1441}
1383 1442
1443void ceph_oid_copy(struct ceph_object_id *dest,
1444 const struct ceph_object_id *src)
1445{
1446 WARN_ON(!ceph_oid_empty(dest));
1447
1448 if (src->name != src->inline_name) {
1449 /* very rare, see ceph_object_id definition */
1450 dest->name = kmalloc(src->name_len + 1,
1451 GFP_NOIO | __GFP_NOFAIL);
1452 }
1453
1454 memcpy(dest->name, src->name, src->name_len + 1);
1455 dest->name_len = src->name_len;
1456}
1457EXPORT_SYMBOL(ceph_oid_copy);
1458
1459static __printf(2, 0)
1460int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap)
1461{
1462 int len;
1463
1464 WARN_ON(!ceph_oid_empty(oid));
1465
1466 len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap);
1467 if (len >= sizeof(oid->inline_name))
1468 return len;
1469
1470 oid->name_len = len;
1471 return 0;
1472}
1473
1474/*
1475 * If oid doesn't fit into inline buffer, BUG.
1476 */
1477void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...)
1478{
1479 va_list ap;
1480
1481 va_start(ap, fmt);
1482 BUG_ON(oid_printf_vargs(oid, fmt, ap));
1483 va_end(ap);
1484}
1485EXPORT_SYMBOL(ceph_oid_printf);
1486
1487static __printf(3, 0)
1488int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp,
1489 const char *fmt, va_list ap)
1490{
1491 va_list aq;
1492 int len;
1493
1494 va_copy(aq, ap);
1495 len = oid_printf_vargs(oid, fmt, aq);
1496 va_end(aq);
1497
1498 if (len) {
1499 char *external_name;
1500
1501 external_name = kmalloc(len + 1, gfp);
1502 if (!external_name)
1503 return -ENOMEM;
1504
1505 oid->name = external_name;
1506 WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len);
1507 oid->name_len = len;
1508 }
1509
1510 return 0;
1511}
1512
1513/*
1514 * If oid doesn't fit into inline buffer, allocate.
1515 */
1516int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
1517 const char *fmt, ...)
1518{
1519 va_list ap;
1520 int ret;
1521
1522 va_start(ap, fmt);
1523 ret = oid_aprintf_vargs(oid, gfp, fmt, ap);
1524 va_end(ap);
1525
1526 return ret;
1527}
1528EXPORT_SYMBOL(ceph_oid_aprintf);
1529
1530void ceph_oid_destroy(struct ceph_object_id *oid)
1531{
1532 if (oid->name != oid->inline_name)
1533 kfree(oid->name);
1534}
1535EXPORT_SYMBOL(ceph_oid_destroy);
1536
1537/*
1538 * osds only
1539 */
1540static bool __osds_equal(const struct ceph_osds *lhs,
1541 const struct ceph_osds *rhs)
1542{
1543 if (lhs->size == rhs->size &&
1544 !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0])))
1545 return true;
1546
1547 return false;
1548}
1549
1550/*
1551 * osds + primary
1552 */
1553static bool osds_equal(const struct ceph_osds *lhs,
1554 const struct ceph_osds *rhs)
1555{
1556 if (__osds_equal(lhs, rhs) &&
1557 lhs->primary == rhs->primary)
1558 return true;
1559
1560 return false;
1561}
1562
1563static bool osds_valid(const struct ceph_osds *set)
1564{
1565 /* non-empty set */
1566 if (set->size > 0 && set->primary >= 0)
1567 return true;
1568
1569 /* empty can_shift_osds set */
1570 if (!set->size && set->primary == -1)
1571 return true;
1572
1573 /* empty !can_shift_osds set - all NONE */
1574 if (set->size > 0 && set->primary == -1) {
1575 int i;
1576
1577 for (i = 0; i < set->size; i++) {
1578 if (set->osds[i] != CRUSH_ITEM_NONE)
1579 break;
1580 }
1581 if (i == set->size)
1582 return true;
1583 }
1584
1585 return false;
1586}
1587
1588void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
1589{
1590 memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
1591 dest->size = src->size;
1592 dest->primary = src->primary;
1593}
1594
1595static bool is_split(const struct ceph_pg *pgid,
1596 u32 old_pg_num,
1597 u32 new_pg_num)
1598{
1599 int old_bits = calc_bits_of(old_pg_num);
1600 int old_mask = (1 << old_bits) - 1;
1601 int n;
1602
1603 WARN_ON(pgid->seed >= old_pg_num);
1604 if (new_pg_num <= old_pg_num)
1605 return false;
1606
1607 for (n = 1; ; n++) {
1608 int next_bit = n << (old_bits - 1);
1609 u32 s = next_bit | pgid->seed;
1610
1611 if (s < old_pg_num || s == pgid->seed)
1612 continue;
1613 if (s >= new_pg_num)
1614 break;
1615
1616 s = ceph_stable_mod(s, old_pg_num, old_mask);
1617 if (s == pgid->seed)
1618 return true;
1619 }
1620
1621 return false;
1622}
1623
1624bool ceph_is_new_interval(const struct ceph_osds *old_acting,
1625 const struct ceph_osds *new_acting,
1626 const struct ceph_osds *old_up,
1627 const struct ceph_osds *new_up,
1628 int old_size,
1629 int new_size,
1630 int old_min_size,
1631 int new_min_size,
1632 u32 old_pg_num,
1633 u32 new_pg_num,
1634 bool old_sort_bitwise,
1635 bool new_sort_bitwise,
1636 const struct ceph_pg *pgid)
1637{
1638 return !osds_equal(old_acting, new_acting) ||
1639 !osds_equal(old_up, new_up) ||
1640 old_size != new_size ||
1641 old_min_size != new_min_size ||
1642 is_split(pgid, old_pg_num, new_pg_num) ||
1643 old_sort_bitwise != new_sort_bitwise;
1644}
1645
1646static int calc_pg_rank(int osd, const struct ceph_osds *acting)
1647{
1648 int i;
1649
1650 for (i = 0; i < acting->size; i++) {
1651 if (acting->osds[i] == osd)
1652 return i;
1653 }
1654
1655 return -1;
1656}
1657
1658static bool primary_changed(const struct ceph_osds *old_acting,
1659 const struct ceph_osds *new_acting)
1660{
1661 if (!old_acting->size && !new_acting->size)
1662 return false; /* both still empty */
1384 1663
1664 if (!old_acting->size ^ !new_acting->size)
1665 return true; /* was empty, now not, or vice versa */
1385 1666
1667 if (old_acting->primary != new_acting->primary)
1668 return true; /* primary changed */
1669
1670 if (calc_pg_rank(old_acting->primary, old_acting) !=
1671 calc_pg_rank(new_acting->primary, new_acting))
1672 return true;
1673
1674 return false; /* same primary (tho replicas may have changed) */
1675}
1676
1677bool ceph_osds_changed(const struct ceph_osds *old_acting,
1678 const struct ceph_osds *new_acting,
1679 bool any_change)
1680{
1681 if (primary_changed(old_acting, new_acting))
1682 return true;
1683
1684 if (any_change && !__osds_equal(old_acting, new_acting))
1685 return true;
1686
1687 return false;
1688}
1386 1689
1387/* 1690/*
1388 * calculate file layout from given offset, length. 1691 * calculate file layout from given offset, length.
@@ -1455,30 +1758,71 @@ invalid:
1455EXPORT_SYMBOL(ceph_calc_file_object_mapping); 1758EXPORT_SYMBOL(ceph_calc_file_object_mapping);
1456 1759
1457/* 1760/*
1458 * Calculate mapping of a (oloc, oid) pair to a PG. Should only be 1761 * Map an object into a PG.
1459 * called with target's (oloc, oid), since tiering isn't taken into 1762 *
1460 * account. 1763 * Should only be called with target_oid and target_oloc (as opposed to
1764 * base_oid and base_oloc), since tiering isn't taken into account.
1461 */ 1765 */
1462int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, 1766int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
1463 struct ceph_object_locator *oloc, 1767 struct ceph_object_id *oid,
1464 struct ceph_object_id *oid, 1768 struct ceph_object_locator *oloc,
1465 struct ceph_pg *pg_out) 1769 struct ceph_pg *raw_pgid)
1466{ 1770{
1467 struct ceph_pg_pool_info *pi; 1771 struct ceph_pg_pool_info *pi;
1468 1772
1469 pi = __lookup_pg_pool(&osdmap->pg_pools, oloc->pool); 1773 pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
1470 if (!pi) 1774 if (!pi)
1471 return -EIO; 1775 return -ENOENT;
1472 1776
1473 pg_out->pool = oloc->pool; 1777 raw_pgid->pool = oloc->pool;
1474 pg_out->seed = ceph_str_hash(pi->object_hash, oid->name, 1778 raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
1475 oid->name_len); 1779 oid->name_len);
1476 1780
1477 dout("%s '%.*s' pgid %llu.%x\n", __func__, oid->name_len, oid->name, 1781 dout("%s %*pE -> raw_pgid %llu.%x\n", __func__, oid->name_len,
1478 pg_out->pool, pg_out->seed); 1782 oid->name, raw_pgid->pool, raw_pgid->seed);
1479 return 0; 1783 return 0;
1480} 1784}
1481EXPORT_SYMBOL(ceph_oloc_oid_to_pg); 1785EXPORT_SYMBOL(ceph_object_locator_to_pg);
1786
1787/*
1788 * Map a raw PG (full precision ps) into an actual PG.
1789 */
1790static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
1791 const struct ceph_pg *raw_pgid,
1792 struct ceph_pg *pgid)
1793{
1794 pgid->pool = raw_pgid->pool;
1795 pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
1796 pi->pg_num_mask);
1797}
1798
1799/*
1800 * Map a raw PG (full precision ps) into a placement ps (placement
1801 * seed). Include pool id in that value so that different pools don't
1802 * use the same seeds.
1803 */
1804static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
1805 const struct ceph_pg *raw_pgid)
1806{
1807 if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
1808 /* hash pool id and seed so that pool PGs do not overlap */
1809 return crush_hash32_2(CRUSH_HASH_RJENKINS1,
1810 ceph_stable_mod(raw_pgid->seed,
1811 pi->pgp_num,
1812 pi->pgp_num_mask),
1813 raw_pgid->pool);
1814 } else {
1815 /*
1816 * legacy behavior: add ps and pool together. this is
1817 * not a great approach because the PGs from each pool
1818 * will overlap on top of each other: 0.5 == 1.4 ==
1819 * 2.3 == ...
1820 */
1821 return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
1822 pi->pgp_num_mask) +
1823 (unsigned)raw_pgid->pool;
1824 }
1825}
1482 1826
1483static int do_crush(struct ceph_osdmap *map, int ruleno, int x, 1827static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
1484 int *result, int result_max, 1828 int *result, int result_max,
@@ -1497,84 +1841,92 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
1497} 1841}
1498 1842
1499/* 1843/*
1500 * Calculate raw (crush) set for given pgid. 1844 * Calculate raw set (CRUSH output) for given PG. The result may
1845 * contain nonexistent OSDs. ->primary is undefined for a raw set.
1501 * 1846 *
1502 * Return raw set length, or error. 1847 * Placement seed (CRUSH input) is returned through @ppps.
1503 */ 1848 */
1504static int pg_to_raw_osds(struct ceph_osdmap *osdmap, 1849static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
1505 struct ceph_pg_pool_info *pool, 1850 struct ceph_pg_pool_info *pi,
1506 struct ceph_pg pgid, u32 pps, int *osds) 1851 const struct ceph_pg *raw_pgid,
1852 struct ceph_osds *raw,
1853 u32 *ppps)
1507{ 1854{
1855 u32 pps = raw_pg_to_pps(pi, raw_pgid);
1508 int ruleno; 1856 int ruleno;
1509 int len; 1857 int len;
1510 1858
1511 /* crush */ 1859 ceph_osds_init(raw);
1512 ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, 1860 if (ppps)
1513 pool->type, pool->size); 1861 *ppps = pps;
1862
1863 ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
1864 pi->size);
1514 if (ruleno < 0) { 1865 if (ruleno < 0) {
1515 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n", 1866 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
1516 pgid.pool, pool->crush_ruleset, pool->type, 1867 pi->id, pi->crush_ruleset, pi->type, pi->size);
1517 pool->size); 1868 return;
1518 return -ENOENT;
1519 } 1869 }
1520 1870
1521 len = do_crush(osdmap, ruleno, pps, osds, 1871 len = do_crush(osdmap, ruleno, pps, raw->osds,
1522 min_t(int, pool->size, CEPH_PG_MAX_SIZE), 1872 min_t(int, pi->size, ARRAY_SIZE(raw->osds)),
1523 osdmap->osd_weight, osdmap->max_osd); 1873 osdmap->osd_weight, osdmap->max_osd);
1524 if (len < 0) { 1874 if (len < 0) {
1525 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", 1875 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
1526 len, ruleno, pgid.pool, pool->crush_ruleset, 1876 len, ruleno, pi->id, pi->crush_ruleset, pi->type,
1527 pool->type, pool->size); 1877 pi->size);
1528 return len; 1878 return;
1529 } 1879 }
1530 1880
1531 return len; 1881 raw->size = len;
1532} 1882}
1533 1883
1534/* 1884/*
1535 * Given raw set, calculate up set and up primary. 1885 * Given raw set, calculate up set and up primary. By definition of an
1886 * up set, the result won't contain nonexistent or down OSDs.
1536 * 1887 *
1537 * Return up set length. *primary is set to up primary osd id, or -1 1888 * This is done in-place - on return @set is the up set. If it's
1538 * if up set is empty. 1889 * empty, ->primary will remain undefined.
1539 */ 1890 */
1540static int raw_to_up_osds(struct ceph_osdmap *osdmap, 1891static void raw_to_up_osds(struct ceph_osdmap *osdmap,
1541 struct ceph_pg_pool_info *pool, 1892 struct ceph_pg_pool_info *pi,
1542 int *osds, int len, int *primary) 1893 struct ceph_osds *set)
1543{ 1894{
1544 int up_primary = -1;
1545 int i; 1895 int i;
1546 1896
1547 if (ceph_can_shift_osds(pool)) { 1897 /* ->primary is undefined for a raw set */
1898 BUG_ON(set->primary != -1);
1899
1900 if (ceph_can_shift_osds(pi)) {
1548 int removed = 0; 1901 int removed = 0;
1549 1902
1550 for (i = 0; i < len; i++) { 1903 /* shift left */
1551 if (ceph_osd_is_down(osdmap, osds[i])) { 1904 for (i = 0; i < set->size; i++) {
1905 if (ceph_osd_is_down(osdmap, set->osds[i])) {
1552 removed++; 1906 removed++;
1553 continue; 1907 continue;
1554 } 1908 }
1555 if (removed) 1909 if (removed)
1556 osds[i - removed] = osds[i]; 1910 set->osds[i - removed] = set->osds[i];
1557 } 1911 }
1558 1912 set->size -= removed;
1559 len -= removed; 1913 if (set->size > 0)
1560 if (len > 0) 1914 set->primary = set->osds[0];
1561 up_primary = osds[0];
1562 } else { 1915 } else {
1563 for (i = len - 1; i >= 0; i--) { 1916 /* set down/dne devices to NONE */
1564 if (ceph_osd_is_down(osdmap, osds[i])) 1917 for (i = set->size - 1; i >= 0; i--) {
1565 osds[i] = CRUSH_ITEM_NONE; 1918 if (ceph_osd_is_down(osdmap, set->osds[i]))
1919 set->osds[i] = CRUSH_ITEM_NONE;
1566 else 1920 else
1567 up_primary = osds[i]; 1921 set->primary = set->osds[i];
1568 } 1922 }
1569 } 1923 }
1570
1571 *primary = up_primary;
1572 return len;
1573} 1924}
1574 1925
1575static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps, 1926static void apply_primary_affinity(struct ceph_osdmap *osdmap,
1576 struct ceph_pg_pool_info *pool, 1927 struct ceph_pg_pool_info *pi,
1577 int *osds, int len, int *primary) 1928 u32 pps,
1929 struct ceph_osds *up)
1578{ 1930{
1579 int i; 1931 int i;
1580 int pos = -1; 1932 int pos = -1;
@@ -1586,8 +1938,8 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
1586 if (!osdmap->osd_primary_affinity) 1938 if (!osdmap->osd_primary_affinity)
1587 return; 1939 return;
1588 1940
1589 for (i = 0; i < len; i++) { 1941 for (i = 0; i < up->size; i++) {
1590 int osd = osds[i]; 1942 int osd = up->osds[i];
1591 1943
1592 if (osd != CRUSH_ITEM_NONE && 1944 if (osd != CRUSH_ITEM_NONE &&
1593 osdmap->osd_primary_affinity[osd] != 1945 osdmap->osd_primary_affinity[osd] !=
@@ -1595,7 +1947,7 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
1595 break; 1947 break;
1596 } 1948 }
1597 } 1949 }
1598 if (i == len) 1950 if (i == up->size)
1599 return; 1951 return;
1600 1952
1601 /* 1953 /*
@@ -1603,8 +1955,8 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
1603 * osd into the hash/rng so that a proportional fraction of an 1955 * osd into the hash/rng so that a proportional fraction of an
1604 * osd's pgs get rejected as primary. 1956 * osd's pgs get rejected as primary.
1605 */ 1957 */
1606 for (i = 0; i < len; i++) { 1958 for (i = 0; i < up->size; i++) {
1607 int osd = osds[i]; 1959 int osd = up->osds[i];
1608 u32 aff; 1960 u32 aff;
1609 1961
1610 if (osd == CRUSH_ITEM_NONE) 1962 if (osd == CRUSH_ITEM_NONE)
@@ -1629,135 +1981,110 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
1629 if (pos < 0) 1981 if (pos < 0)
1630 return; 1982 return;
1631 1983
1632 *primary = osds[pos]; 1984 up->primary = up->osds[pos];
1633 1985
1634 if (ceph_can_shift_osds(pool) && pos > 0) { 1986 if (ceph_can_shift_osds(pi) && pos > 0) {
1635 /* move the new primary to the front */ 1987 /* move the new primary to the front */
1636 for (i = pos; i > 0; i--) 1988 for (i = pos; i > 0; i--)
1637 osds[i] = osds[i - 1]; 1989 up->osds[i] = up->osds[i - 1];
1638 osds[0] = *primary; 1990 up->osds[0] = up->primary;
1639 } 1991 }
1640} 1992}
1641 1993
1642/* 1994/*
1643 * Given up set, apply pg_temp and primary_temp mappings. 1995 * Get pg_temp and primary_temp mappings for given PG.
1644 * 1996 *
1645 * Return acting set length. *primary is set to acting primary osd id, 1997 * Note that a PG may have none, only pg_temp, only primary_temp or
1646 * or -1 if acting set is empty. 1998 * both pg_temp and primary_temp mappings. This means @temp isn't
1999 * always a valid OSD set on return: in the "only primary_temp" case,
2000 * @temp will have its ->primary >= 0 but ->size == 0.
1647 */ 2001 */
1648static int apply_temps(struct ceph_osdmap *osdmap, 2002static void get_temp_osds(struct ceph_osdmap *osdmap,
1649 struct ceph_pg_pool_info *pool, struct ceph_pg pgid, 2003 struct ceph_pg_pool_info *pi,
1650 int *osds, int len, int *primary) 2004 const struct ceph_pg *raw_pgid,
2005 struct ceph_osds *temp)
1651{ 2006{
2007 struct ceph_pg pgid;
1652 struct ceph_pg_mapping *pg; 2008 struct ceph_pg_mapping *pg;
1653 int temp_len;
1654 int temp_primary;
1655 int i; 2009 int i;
1656 2010
1657 /* raw_pg -> pg */ 2011 raw_pg_to_pg(pi, raw_pgid, &pgid);
1658 pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, 2012 ceph_osds_init(temp);
1659 pool->pg_num_mask);
1660 2013
1661 /* pg_temp? */ 2014 /* pg_temp? */
1662 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); 2015 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
1663 if (pg) { 2016 if (pg) {
1664 temp_len = 0;
1665 temp_primary = -1;
1666
1667 for (i = 0; i < pg->pg_temp.len; i++) { 2017 for (i = 0; i < pg->pg_temp.len; i++) {
1668 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) { 2018 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
1669 if (ceph_can_shift_osds(pool)) 2019 if (ceph_can_shift_osds(pi))
1670 continue; 2020 continue;
1671 else 2021
1672 osds[temp_len++] = CRUSH_ITEM_NONE; 2022 temp->osds[temp->size++] = CRUSH_ITEM_NONE;
1673 } else { 2023 } else {
1674 osds[temp_len++] = pg->pg_temp.osds[i]; 2024 temp->osds[temp->size++] = pg->pg_temp.osds[i];
1675 } 2025 }
1676 } 2026 }
1677 2027
1678 /* apply pg_temp's primary */ 2028 /* apply pg_temp's primary */
1679 for (i = 0; i < temp_len; i++) { 2029 for (i = 0; i < temp->size; i++) {
1680 if (osds[i] != CRUSH_ITEM_NONE) { 2030 if (temp->osds[i] != CRUSH_ITEM_NONE) {
1681 temp_primary = osds[i]; 2031 temp->primary = temp->osds[i];
1682 break; 2032 break;
1683 } 2033 }
1684 } 2034 }
1685 } else {
1686 temp_len = len;
1687 temp_primary = *primary;
1688 } 2035 }
1689 2036
1690 /* primary_temp? */ 2037 /* primary_temp? */
1691 pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid); 2038 pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
1692 if (pg) 2039 if (pg)
1693 temp_primary = pg->primary_temp.osd; 2040 temp->primary = pg->primary_temp.osd;
1694
1695 *primary = temp_primary;
1696 return temp_len;
1697} 2041}
1698 2042
1699/* 2043/*
1700 * Calculate acting set for given pgid. 2044 * Map a PG to its acting set as well as its up set.
1701 * 2045 *
1702 * Return acting set length, or error. *primary is set to acting 2046 * Acting set is used for data mapping purposes, while up set can be
1703 * primary osd id, or -1 if acting set is empty or on error. 2047 * recorded for detecting interval changes and deciding whether to
2048 * resend a request.
1704 */ 2049 */
1705int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, 2050void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
1706 int *osds, int *primary) 2051 const struct ceph_pg *raw_pgid,
2052 struct ceph_osds *up,
2053 struct ceph_osds *acting)
1707{ 2054{
1708 struct ceph_pg_pool_info *pool; 2055 struct ceph_pg_pool_info *pi;
1709 u32 pps; 2056 u32 pps;
1710 int len;
1711 2057
1712 pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); 2058 pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
1713 if (!pool) { 2059 if (!pi) {
1714 *primary = -1; 2060 ceph_osds_init(up);
1715 return -ENOENT; 2061 ceph_osds_init(acting);
2062 goto out;
1716 } 2063 }
1717 2064
1718 if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { 2065 pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
1719 /* hash pool id and seed so that pool PGs do not overlap */ 2066 raw_to_up_osds(osdmap, pi, up);
1720 pps = crush_hash32_2(CRUSH_HASH_RJENKINS1, 2067 apply_primary_affinity(osdmap, pi, pps, up);
1721 ceph_stable_mod(pgid.seed, pool->pgp_num, 2068 get_temp_osds(osdmap, pi, raw_pgid, acting);
1722 pool->pgp_num_mask), 2069 if (!acting->size) {
1723 pgid.pool); 2070 memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
1724 } else { 2071 acting->size = up->size;
1725 /* 2072 if (acting->primary == -1)
1726 * legacy behavior: add ps and pool together. this is 2073 acting->primary = up->primary;
1727 * not a great approach because the PGs from each pool
1728 * will overlap on top of each other: 0.5 == 1.4 ==
1729 * 2.3 == ...
1730 */
1731 pps = ceph_stable_mod(pgid.seed, pool->pgp_num,
1732 pool->pgp_num_mask) +
1733 (unsigned)pgid.pool;
1734 }
1735
1736 len = pg_to_raw_osds(osdmap, pool, pgid, pps, osds);
1737 if (len < 0) {
1738 *primary = -1;
1739 return len;
1740 } 2074 }
1741 2075out:
1742 len = raw_to_up_osds(osdmap, pool, osds, len, primary); 2076 WARN_ON(!osds_valid(up) || !osds_valid(acting));
1743
1744 apply_primary_affinity(osdmap, pps, pool, osds, len, primary);
1745
1746 len = apply_temps(osdmap, pool, pgid, osds, len, primary);
1747
1748 return len;
1749} 2077}
1750 2078
1751/* 2079/*
1752 * Return primary osd for given pgid, or -1 if none. 2080 * Return acting primary for given PG, or -1 if none.
1753 */ 2081 */
1754int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) 2082int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
2083 const struct ceph_pg *raw_pgid)
1755{ 2084{
1756 int osds[CEPH_PG_MAX_SIZE]; 2085 struct ceph_osds up, acting;
1757 int primary;
1758
1759 ceph_calc_pg_acting(osdmap, pgid, osds, &primary);
1760 2086
1761 return primary; 2087 ceph_pg_to_up_acting_osds(osdmap, raw_pgid, &up, &acting);
2088 return acting.primary;
1762} 2089}
1763EXPORT_SYMBOL(ceph_calc_pg_primary); 2090EXPORT_SYMBOL(ceph_pg_to_acting_primary);