aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-04-07 14:09:13 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2014-04-07 14:09:13 -0400
commit240cd6a817bd855e3f1e615ed9ae16407f8cfce6 (patch)
treeda7d6267d549cd0fbdff3f30032720b416d1ff3d /net/ceph
parent3021112598d2b722eee54d8a662fea2089abbdbc (diff)
parenta30be7cb2ccb995ad5e67fd4b548f11fe37fc8b1 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "The biggest chunk is a series of patches from Ilya that add support for new Ceph osd and crush map features, including some new tunables, primary affinity, and the new encoding that is needed for erasure coding support. This brings things into parity with the server side and the looming firefly release. There is also support for allocation hints in RBD that help limit fragmentation on the server side. There is also a series of patches from Zheng fixing NFS reexport, directory fragmentation support, flock vs fnctl behavior, and some issues with clustered MDS. Finally, there are some miscellaneous fixes from Yunchuan Wen for fscache, Fabian Frederick for ACLs, and from me for fsync(dirfd) behavior" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (79 commits) ceph: skip invalid dentry during dcache readdir libceph: dump pool {read,write}_tier to debugfs libceph: output primary affinity values on osdmap updates ceph: flush cap release queue when trimming session caps ceph: don't grabs open file reference for aborted request ceph: drop extra open file reference in ceph_atomic_open() ceph: preallocate buffer for readdir reply libceph: enable PRIMARY_AFFINITY feature bit libceph: redo ceph_calc_pg_primary() in terms of ceph_calc_pg_acting() libceph: add support for osd primary affinity libceph: add support for primary_temp mappings libceph: return primary from ceph_calc_pg_acting() libceph: switch ceph_calc_pg_acting() to new helpers libceph: introduce apply_temps() helper libceph: introduce pg_to_raw_osds() and raw_to_up_osds() helpers libceph: ceph_can_shift_osds(pool) and pool type defines libceph: ceph_osd_{exists,is_up,is_down}(osd) definitions libceph: enable OSDMAP_ENC feature bit libceph: primary_affinity decode bits libceph: primary_affinity infrastructure ...
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/crush/mapper.c85
-rw-r--r--net/ceph/debugfs.c55
-rw-r--r--net/ceph/messenger.c6
-rw-r--r--net/ceph/osd_client.c41
-rw-r--r--net/ceph/osdmap.c993
5 files changed, 864 insertions, 316 deletions
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index b703790b4e44..a1ef53c04415 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -292,10 +292,12 @@ static int is_out(const struct crush_map *map,
292 * @outpos: our position in that vector 292 * @outpos: our position in that vector
293 * @tries: number of attempts to make 293 * @tries: number of attempts to make
294 * @recurse_tries: number of attempts to have recursive chooseleaf make 294 * @recurse_tries: number of attempts to have recursive chooseleaf make
295 * @local_tries: localized retries 295 * @local_retries: localized retries
296 * @local_fallback_tries: localized fallback retries 296 * @local_fallback_retries: localized fallback retries
297 * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose) 297 * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose)
298 * @vary_r: pass r to recursive calls
298 * @out2: second output vector for leaf items (if @recurse_to_leaf) 299 * @out2: second output vector for leaf items (if @recurse_to_leaf)
300 * @parent_r: r value passed from the parent
299 */ 301 */
300static int crush_choose_firstn(const struct crush_map *map, 302static int crush_choose_firstn(const struct crush_map *map,
301 struct crush_bucket *bucket, 303 struct crush_bucket *bucket,
@@ -304,10 +306,12 @@ static int crush_choose_firstn(const struct crush_map *map,
304 int *out, int outpos, 306 int *out, int outpos,
305 unsigned int tries, 307 unsigned int tries,
306 unsigned int recurse_tries, 308 unsigned int recurse_tries,
307 unsigned int local_tries, 309 unsigned int local_retries,
308 unsigned int local_fallback_tries, 310 unsigned int local_fallback_retries,
309 int recurse_to_leaf, 311 int recurse_to_leaf,
310 int *out2) 312 unsigned int vary_r,
313 int *out2,
314 int parent_r)
311{ 315{
312 int rep; 316 int rep;
313 unsigned int ftotal, flocal; 317 unsigned int ftotal, flocal;
@@ -319,8 +323,11 @@ static int crush_choose_firstn(const struct crush_map *map,
319 int itemtype; 323 int itemtype;
320 int collide, reject; 324 int collide, reject;
321 325
322 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", 326 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d tries %d recurse_tries %d local_retries %d local_fallback_retries %d parent_r %d\n",
323 bucket->id, x, outpos, numrep); 327 recurse_to_leaf ? "_LEAF" : "",
328 bucket->id, x, outpos, numrep,
329 tries, recurse_tries, local_retries, local_fallback_retries,
330 parent_r);
324 331
325 for (rep = outpos; rep < numrep; rep++) { 332 for (rep = outpos; rep < numrep; rep++) {
326 /* keep trying until we get a non-out, non-colliding item */ 333 /* keep trying until we get a non-out, non-colliding item */
@@ -335,7 +342,7 @@ static int crush_choose_firstn(const struct crush_map *map,
335 do { 342 do {
336 collide = 0; 343 collide = 0;
337 retry_bucket = 0; 344 retry_bucket = 0;
338 r = rep; 345 r = rep + parent_r;
339 /* r' = r + f_total */ 346 /* r' = r + f_total */
340 r += ftotal; 347 r += ftotal;
341 348
@@ -344,9 +351,9 @@ static int crush_choose_firstn(const struct crush_map *map,
344 reject = 1; 351 reject = 1;
345 goto reject; 352 goto reject;
346 } 353 }
347 if (local_fallback_tries > 0 && 354 if (local_fallback_retries > 0 &&
348 flocal >= (in->size>>1) && 355 flocal >= (in->size>>1) &&
349 flocal > local_fallback_tries) 356 flocal > local_fallback_retries)
350 item = bucket_perm_choose(in, x, r); 357 item = bucket_perm_choose(in, x, r);
351 else 358 else
352 item = crush_bucket_choose(in, x, r); 359 item = crush_bucket_choose(in, x, r);
@@ -387,16 +394,23 @@ static int crush_choose_firstn(const struct crush_map *map,
387 reject = 0; 394 reject = 0;
388 if (!collide && recurse_to_leaf) { 395 if (!collide && recurse_to_leaf) {
389 if (item < 0) { 396 if (item < 0) {
397 int sub_r;
398 if (vary_r)
399 sub_r = r >> (vary_r-1);
400 else
401 sub_r = 0;
390 if (crush_choose_firstn(map, 402 if (crush_choose_firstn(map,
391 map->buckets[-1-item], 403 map->buckets[-1-item],
392 weight, weight_max, 404 weight, weight_max,
393 x, outpos+1, 0, 405 x, outpos+1, 0,
394 out2, outpos, 406 out2, outpos,
395 recurse_tries, 0, 407 recurse_tries, 0,
396 local_tries, 408 local_retries,
397 local_fallback_tries, 409 local_fallback_retries,
398 0, 410 0,
399 NULL) <= outpos) 411 vary_r,
412 NULL,
413 sub_r) <= outpos)
400 /* didn't get leaf */ 414 /* didn't get leaf */
401 reject = 1; 415 reject = 1;
402 } else { 416 } else {
@@ -420,14 +434,14 @@ reject:
420 ftotal++; 434 ftotal++;
421 flocal++; 435 flocal++;
422 436
423 if (collide && flocal <= local_tries) 437 if (collide && flocal <= local_retries)
424 /* retry locally a few times */ 438 /* retry locally a few times */
425 retry_bucket = 1; 439 retry_bucket = 1;
426 else if (local_fallback_tries > 0 && 440 else if (local_fallback_retries > 0 &&
427 flocal <= in->size + local_fallback_tries) 441 flocal <= in->size + local_fallback_retries)
428 /* exhaustive bucket search */ 442 /* exhaustive bucket search */
429 retry_bucket = 1; 443 retry_bucket = 1;
430 else if (ftotal <= tries) 444 else if (ftotal < tries)
431 /* then retry descent */ 445 /* then retry descent */
432 retry_descent = 1; 446 retry_descent = 1;
433 else 447 else
@@ -640,10 +654,20 @@ int crush_do_rule(const struct crush_map *map,
640 __u32 step; 654 __u32 step;
641 int i, j; 655 int i, j;
642 int numrep; 656 int numrep;
643 int choose_tries = map->choose_total_tries; 657 /*
644 int choose_local_tries = map->choose_local_tries; 658 * the original choose_total_tries value was off by one (it
645 int choose_local_fallback_tries = map->choose_local_fallback_tries; 659 * counted "retries" and not "tries"). add one.
660 */
661 int choose_tries = map->choose_total_tries + 1;
646 int choose_leaf_tries = 0; 662 int choose_leaf_tries = 0;
663 /*
664 * the local tries values were counted as "retries", though,
665 * and need no adjustment
666 */
667 int choose_local_retries = map->choose_local_tries;
668 int choose_local_fallback_retries = map->choose_local_fallback_tries;
669
670 int vary_r = map->chooseleaf_vary_r;
647 671
648 if ((__u32)ruleno >= map->max_rules) { 672 if ((__u32)ruleno >= map->max_rules) {
649 dprintk(" bad ruleno %d\n", ruleno); 673 dprintk(" bad ruleno %d\n", ruleno);
@@ -676,13 +700,18 @@ int crush_do_rule(const struct crush_map *map,
676 break; 700 break;
677 701
678 case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES: 702 case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES:
679 if (curstep->arg1 > 0) 703 if (curstep->arg1 >= 0)
680 choose_local_tries = curstep->arg1; 704 choose_local_retries = curstep->arg1;
681 break; 705 break;
682 706
683 case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES: 707 case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES:
684 if (curstep->arg1 > 0) 708 if (curstep->arg1 >= 0)
685 choose_local_fallback_tries = curstep->arg1; 709 choose_local_fallback_retries = curstep->arg1;
710 break;
711
712 case CRUSH_RULE_SET_CHOOSELEAF_VARY_R:
713 if (curstep->arg1 >= 0)
714 vary_r = curstep->arg1;
686 break; 715 break;
687 716
688 case CRUSH_RULE_CHOOSELEAF_FIRSTN: 717 case CRUSH_RULE_CHOOSELEAF_FIRSTN:
@@ -734,10 +763,12 @@ int crush_do_rule(const struct crush_map *map,
734 o+osize, j, 763 o+osize, j,
735 choose_tries, 764 choose_tries,
736 recurse_tries, 765 recurse_tries,
737 choose_local_tries, 766 choose_local_retries,
738 choose_local_fallback_tries, 767 choose_local_fallback_retries,
739 recurse_to_leaf, 768 recurse_to_leaf,
740 c+osize); 769 vary_r,
770 c+osize,
771 0);
741 } else { 772 } else {
742 crush_choose_indep( 773 crush_choose_indep(
743 map, 774 map,
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 258a382e75ed..10421a4b76f8 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -53,34 +53,55 @@ static int osdmap_show(struct seq_file *s, void *p)
53{ 53{
54 int i; 54 int i;
55 struct ceph_client *client = s->private; 55 struct ceph_client *client = s->private;
56 struct ceph_osdmap *map = client->osdc.osdmap;
56 struct rb_node *n; 57 struct rb_node *n;
57 58
58 if (client->osdc.osdmap == NULL) 59 if (map == NULL)
59 return 0; 60 return 0;
60 seq_printf(s, "epoch %d\n", client->osdc.osdmap->epoch); 61
62 seq_printf(s, "epoch %d\n", map->epoch);
61 seq_printf(s, "flags%s%s\n", 63 seq_printf(s, "flags%s%s\n",
62 (client->osdc.osdmap->flags & CEPH_OSDMAP_NEARFULL) ? 64 (map->flags & CEPH_OSDMAP_NEARFULL) ? " NEARFULL" : "",
63 " NEARFULL" : "", 65 (map->flags & CEPH_OSDMAP_FULL) ? " FULL" : "");
64 (client->osdc.osdmap->flags & CEPH_OSDMAP_FULL) ? 66
65 " FULL" : ""); 67 for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) {
66 for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) {
67 struct ceph_pg_pool_info *pool = 68 struct ceph_pg_pool_info *pool =
68 rb_entry(n, struct ceph_pg_pool_info, node); 69 rb_entry(n, struct ceph_pg_pool_info, node);
69 seq_printf(s, "pg_pool %llu pg_num %d / %d\n", 70
70 (unsigned long long)pool->id, pool->pg_num, 71 seq_printf(s, "pool %lld pg_num %u (%d) read_tier %lld write_tier %lld\n",
71 pool->pg_num_mask); 72 pool->id, pool->pg_num, pool->pg_num_mask,
73 pool->read_tier, pool->write_tier);
72 } 74 }
73 for (i = 0; i < client->osdc.osdmap->max_osd; i++) { 75 for (i = 0; i < map->max_osd; i++) {
74 struct ceph_entity_addr *addr = 76 struct ceph_entity_addr *addr = &map->osd_addr[i];
75 &client->osdc.osdmap->osd_addr[i]; 77 int state = map->osd_state[i];
76 int state = client->osdc.osdmap->osd_state[i];
77 char sb[64]; 78 char sb[64];
78 79
79 seq_printf(s, "\tosd%d\t%s\t%3d%%\t(%s)\n", 80 seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",
80 i, ceph_pr_addr(&addr->in_addr), 81 i, ceph_pr_addr(&addr->in_addr),
81 ((client->osdc.osdmap->osd_weight[i]*100) >> 16), 82 ((map->osd_weight[i]*100) >> 16),
82 ceph_osdmap_state_str(sb, sizeof(sb), state)); 83 ceph_osdmap_state_str(sb, sizeof(sb), state),
84 ((ceph_get_primary_affinity(map, i)*100) >> 16));
85 }
86 for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) {
87 struct ceph_pg_mapping *pg =
88 rb_entry(n, struct ceph_pg_mapping, node);
89
90 seq_printf(s, "pg_temp %llu.%x [", pg->pgid.pool,
91 pg->pgid.seed);
92 for (i = 0; i < pg->pg_temp.len; i++)
93 seq_printf(s, "%s%d", (i == 0 ? "" : ","),
94 pg->pg_temp.osds[i]);
95 seq_printf(s, "]\n");
83 } 96 }
97 for (n = rb_first(&map->primary_temp); n; n = rb_next(n)) {
98 struct ceph_pg_mapping *pg =
99 rb_entry(n, struct ceph_pg_mapping, node);
100
101 seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool,
102 pg->pgid.seed, pg->primary_temp.osd);
103 }
104
84 return 0; 105 return 0;
85} 106}
86 107
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 30efc5c18622..4f55f9ce63fa 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -919,6 +919,9 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
919 if (!bytes || cursor->page_offset) 919 if (!bytes || cursor->page_offset)
920 return false; /* more bytes to process in the current page */ 920 return false; /* more bytes to process in the current page */
921 921
922 if (!cursor->resid)
923 return false; /* no more data */
924
922 /* Move on to the next page; offset is already at 0 */ 925 /* Move on to the next page; offset is already at 0 */
923 926
924 BUG_ON(cursor->page_index >= cursor->page_count); 927 BUG_ON(cursor->page_index >= cursor->page_count);
@@ -1004,6 +1007,9 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
1004 if (!bytes || cursor->offset & ~PAGE_MASK) 1007 if (!bytes || cursor->offset & ~PAGE_MASK)
1005 return false; /* more bytes to process in the current page */ 1008 return false; /* more bytes to process in the current page */
1006 1009
1010 if (!cursor->resid)
1011 return false; /* no more data */
1012
1007 /* Move on to the next page */ 1013 /* Move on to the next page */
1008 1014
1009 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); 1015 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 82750f915865..b0dfce77656a 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -436,6 +436,7 @@ static bool osd_req_opcode_valid(u16 opcode)
436 case CEPH_OSD_OP_OMAPCLEAR: 436 case CEPH_OSD_OP_OMAPCLEAR:
437 case CEPH_OSD_OP_OMAPRMKEYS: 437 case CEPH_OSD_OP_OMAPRMKEYS:
438 case CEPH_OSD_OP_OMAP_CMP: 438 case CEPH_OSD_OP_OMAP_CMP:
439 case CEPH_OSD_OP_SETALLOCHINT:
439 case CEPH_OSD_OP_CLONERANGE: 440 case CEPH_OSD_OP_CLONERANGE:
440 case CEPH_OSD_OP_ASSERT_SRC_VERSION: 441 case CEPH_OSD_OP_ASSERT_SRC_VERSION:
441 case CEPH_OSD_OP_SRC_CMPXATTR: 442 case CEPH_OSD_OP_SRC_CMPXATTR:
@@ -591,6 +592,26 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
591} 592}
592EXPORT_SYMBOL(osd_req_op_watch_init); 593EXPORT_SYMBOL(osd_req_op_watch_init);
593 594
595void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
596 unsigned int which,
597 u64 expected_object_size,
598 u64 expected_write_size)
599{
600 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
601 CEPH_OSD_OP_SETALLOCHINT);
602
603 op->alloc_hint.expected_object_size = expected_object_size;
604 op->alloc_hint.expected_write_size = expected_write_size;
605
606 /*
607 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
608 * not worth a feature bit. Set FAILOK per-op flag to make
609 * sure older osds don't trip over an unsupported opcode.
610 */
611 op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
612}
613EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
614
594static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 615static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
595 struct ceph_osd_data *osd_data) 616 struct ceph_osd_data *osd_data)
596{ 617{
@@ -681,6 +702,12 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
681 dst->watch.ver = cpu_to_le64(src->watch.ver); 702 dst->watch.ver = cpu_to_le64(src->watch.ver);
682 dst->watch.flag = src->watch.flag; 703 dst->watch.flag = src->watch.flag;
683 break; 704 break;
705 case CEPH_OSD_OP_SETALLOCHINT:
706 dst->alloc_hint.expected_object_size =
707 cpu_to_le64(src->alloc_hint.expected_object_size);
708 dst->alloc_hint.expected_write_size =
709 cpu_to_le64(src->alloc_hint.expected_write_size);
710 break;
684 default: 711 default:
685 pr_err("unsupported osd opcode %s\n", 712 pr_err("unsupported osd opcode %s\n",
686 ceph_osd_op_name(src->op)); 713 ceph_osd_op_name(src->op));
@@ -688,7 +715,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
688 715
689 return 0; 716 return 0;
690 } 717 }
718
691 dst->op = cpu_to_le16(src->op); 719 dst->op = cpu_to_le16(src->op);
720 dst->flags = cpu_to_le32(src->flags);
692 dst->payload_len = cpu_to_le32(src->payload_len); 721 dst->payload_len = cpu_to_le32(src->payload_len);
693 722
694 return request_data_len; 723 return request_data_len;
@@ -1304,7 +1333,7 @@ static int __map_request(struct ceph_osd_client *osdc,
1304{ 1333{
1305 struct ceph_pg pgid; 1334 struct ceph_pg pgid;
1306 int acting[CEPH_PG_MAX_SIZE]; 1335 int acting[CEPH_PG_MAX_SIZE];
1307 int o = -1, num = 0; 1336 int num, o;
1308 int err; 1337 int err;
1309 bool was_paused; 1338 bool was_paused;
1310 1339
@@ -1317,11 +1346,9 @@ static int __map_request(struct ceph_osd_client *osdc,
1317 } 1346 }
1318 req->r_pgid = pgid; 1347 req->r_pgid = pgid;
1319 1348
1320 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 1349 num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o);
1321 if (err > 0) { 1350 if (num < 0)
1322 o = acting[0]; 1351 num = 0;
1323 num = err;
1324 }
1325 1352
1326 was_paused = req->r_paused; 1353 was_paused = req->r_paused;
1327 req->r_paused = __req_should_be_paused(osdc, req); 1354 req->r_paused = __req_should_be_paused(osdc, req);
@@ -2033,7 +2060,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
2033 int skipped_map = 0; 2060 int skipped_map = 0;
2034 2061
2035 dout("taking full map %u len %d\n", epoch, maplen); 2062 dout("taking full map %u len %d\n", epoch, maplen);
2036 newmap = osdmap_decode(&p, p+maplen); 2063 newmap = ceph_osdmap_decode(&p, p+maplen);
2037 if (IS_ERR(newmap)) { 2064 if (IS_ERR(newmap)) {
2038 err = PTR_ERR(newmap); 2065 err = PTR_ERR(newmap);
2039 goto bad; 2066 goto bad;
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index aade4a5c1c07..e632b5a52f5b 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -343,7 +343,7 @@ bad:
343 343
344/* 344/*
345 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid 345 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
346 * to a set of osds) 346 * to a set of osds) and primary_temp (explicit primary setting)
347 */ 347 */
348static int pgid_cmp(struct ceph_pg l, struct ceph_pg r) 348static int pgid_cmp(struct ceph_pg l, struct ceph_pg r)
349{ 349{
@@ -506,7 +506,7 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
506 kfree(pi); 506 kfree(pi);
507} 507}
508 508
509static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) 509static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
510{ 510{
511 u8 ev, cv; 511 u8 ev, cv;
512 unsigned len, num; 512 unsigned len, num;
@@ -587,7 +587,7 @@ bad:
587 return -EINVAL; 587 return -EINVAL;
588} 588}
589 589
590static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) 590static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
591{ 591{
592 struct ceph_pg_pool_info *pi; 592 struct ceph_pg_pool_info *pi;
593 u32 num, len; 593 u32 num, len;
@@ -633,6 +633,13 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
633 rb_erase(&pg->node, &map->pg_temp); 633 rb_erase(&pg->node, &map->pg_temp);
634 kfree(pg); 634 kfree(pg);
635 } 635 }
636 while (!RB_EMPTY_ROOT(&map->primary_temp)) {
637 struct ceph_pg_mapping *pg =
638 rb_entry(rb_first(&map->primary_temp),
639 struct ceph_pg_mapping, node);
640 rb_erase(&pg->node, &map->primary_temp);
641 kfree(pg);
642 }
636 while (!RB_EMPTY_ROOT(&map->pg_pools)) { 643 while (!RB_EMPTY_ROOT(&map->pg_pools)) {
637 struct ceph_pg_pool_info *pi = 644 struct ceph_pg_pool_info *pi =
638 rb_entry(rb_first(&map->pg_pools), 645 rb_entry(rb_first(&map->pg_pools),
@@ -642,186 +649,516 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
642 kfree(map->osd_state); 649 kfree(map->osd_state);
643 kfree(map->osd_weight); 650 kfree(map->osd_weight);
644 kfree(map->osd_addr); 651 kfree(map->osd_addr);
652 kfree(map->osd_primary_affinity);
645 kfree(map); 653 kfree(map);
646} 654}
647 655
648/* 656/*
649 * adjust max osd value. reallocate arrays. 657 * Adjust max_osd value, (re)allocate arrays.
658 *
659 * The new elements are properly initialized.
650 */ 660 */
651static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) 661static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
652{ 662{
653 u8 *state; 663 u8 *state;
654 struct ceph_entity_addr *addr;
655 u32 *weight; 664 u32 *weight;
665 struct ceph_entity_addr *addr;
666 int i;
656 667
657 state = kcalloc(max, sizeof(*state), GFP_NOFS); 668 state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS);
658 addr = kcalloc(max, sizeof(*addr), GFP_NOFS); 669 weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS);
659 weight = kcalloc(max, sizeof(*weight), GFP_NOFS); 670 addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
660 if (state == NULL || addr == NULL || weight == NULL) { 671 if (!state || !weight || !addr) {
661 kfree(state); 672 kfree(state);
662 kfree(addr);
663 kfree(weight); 673 kfree(weight);
674 kfree(addr);
675
664 return -ENOMEM; 676 return -ENOMEM;
665 } 677 }
666 678
667 /* copy old? */ 679 for (i = map->max_osd; i < max; i++) {
668 if (map->osd_state) { 680 state[i] = 0;
669 memcpy(state, map->osd_state, map->max_osd*sizeof(*state)); 681 weight[i] = CEPH_OSD_OUT;
670 memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr)); 682 memset(addr + i, 0, sizeof(*addr));
671 memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight));
672 kfree(map->osd_state);
673 kfree(map->osd_addr);
674 kfree(map->osd_weight);
675 } 683 }
676 684
677 map->osd_state = state; 685 map->osd_state = state;
678 map->osd_weight = weight; 686 map->osd_weight = weight;
679 map->osd_addr = addr; 687 map->osd_addr = addr;
688
689 if (map->osd_primary_affinity) {
690 u32 *affinity;
691
692 affinity = krealloc(map->osd_primary_affinity,
693 max*sizeof(*affinity), GFP_NOFS);
694 if (!affinity)
695 return -ENOMEM;
696
697 for (i = map->max_osd; i < max; i++)
698 affinity[i] = CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
699
700 map->osd_primary_affinity = affinity;
701 }
702
680 map->max_osd = max; 703 map->max_osd = max;
704
681 return 0; 705 return 0;
682} 706}
683 707
708#define OSDMAP_WRAPPER_COMPAT_VER 7
709#define OSDMAP_CLIENT_DATA_COMPAT_VER 1
710
684/* 711/*
685 * decode a full map. 712 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps,
713 * to struct_v of the client_data section for new (v7 and above)
714 * osdmaps.
686 */ 715 */
687struct ceph_osdmap *osdmap_decode(void **p, void *end) 716static int get_osdmap_client_data_v(void **p, void *end,
717 const char *prefix, u8 *v)
688{ 718{
689 struct ceph_osdmap *map; 719 u8 struct_v;
690 u16 version; 720
691 u32 len, max, i; 721 ceph_decode_8_safe(p, end, struct_v, e_inval);
692 int err = -EINVAL; 722 if (struct_v >= 7) {
693 void *start = *p; 723 u8 struct_compat;
694 struct ceph_pg_pool_info *pi; 724
725 ceph_decode_8_safe(p, end, struct_compat, e_inval);
726 if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
727 pr_warning("got v %d cv %d > %d of %s ceph_osdmap\n",
728 struct_v, struct_compat,
729 OSDMAP_WRAPPER_COMPAT_VER, prefix);
730 return -EINVAL;
731 }
732 *p += 4; /* ignore wrapper struct_len */
733
734 ceph_decode_8_safe(p, end, struct_v, e_inval);
735 ceph_decode_8_safe(p, end, struct_compat, e_inval);
736 if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
737 pr_warning("got v %d cv %d > %d of %s ceph_osdmap client data\n",
738 struct_v, struct_compat,
739 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
740 return -EINVAL;
741 }
742 *p += 4; /* ignore client data struct_len */
743 } else {
744 u16 version;
745
746 *p -= 1;
747 ceph_decode_16_safe(p, end, version, e_inval);
748 if (version < 6) {
749 pr_warning("got v %d < 6 of %s ceph_osdmap\n", version,
750 prefix);
751 return -EINVAL;
752 }
695 753
696 dout("osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p)); 754 /* old osdmap enconding */
755 struct_v = 0;
756 }
697 757
698 map = kzalloc(sizeof(*map), GFP_NOFS); 758 *v = struct_v;
699 if (map == NULL) 759 return 0;
700 return ERR_PTR(-ENOMEM);
701 map->pg_temp = RB_ROOT;
702 760
703 ceph_decode_16_safe(p, end, version, bad); 761e_inval:
704 if (version > 6) { 762 return -EINVAL;
705 pr_warning("got unknown v %d > 6 of osdmap\n", version); 763}
706 goto bad; 764
765static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
766 bool incremental)
767{
768 u32 n;
769
770 ceph_decode_32_safe(p, end, n, e_inval);
771 while (n--) {
772 struct ceph_pg_pool_info *pi;
773 u64 pool;
774 int ret;
775
776 ceph_decode_64_safe(p, end, pool, e_inval);
777
778 pi = __lookup_pg_pool(&map->pg_pools, pool);
779 if (!incremental || !pi) {
780 pi = kzalloc(sizeof(*pi), GFP_NOFS);
781 if (!pi)
782 return -ENOMEM;
783
784 pi->id = pool;
785
786 ret = __insert_pg_pool(&map->pg_pools, pi);
787 if (ret) {
788 kfree(pi);
789 return ret;
790 }
791 }
792
793 ret = decode_pool(p, end, pi);
794 if (ret)
795 return ret;
707 } 796 }
708 if (version < 6) { 797
709 pr_warning("got old v %d < 6 of osdmap\n", version); 798 return 0;
710 goto bad; 799
800e_inval:
801 return -EINVAL;
802}
803
804static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
805{
806 return __decode_pools(p, end, map, false);
807}
808
809static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
810{
811 return __decode_pools(p, end, map, true);
812}
813
814static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map,
815 bool incremental)
816{
817 u32 n;
818
819 ceph_decode_32_safe(p, end, n, e_inval);
820 while (n--) {
821 struct ceph_pg pgid;
822 u32 len, i;
823 int ret;
824
825 ret = ceph_decode_pgid(p, end, &pgid);
826 if (ret)
827 return ret;
828
829 ceph_decode_32_safe(p, end, len, e_inval);
830
831 ret = __remove_pg_mapping(&map->pg_temp, pgid);
832 BUG_ON(!incremental && ret != -ENOENT);
833
834 if (!incremental || len > 0) {
835 struct ceph_pg_mapping *pg;
836
837 ceph_decode_need(p, end, len*sizeof(u32), e_inval);
838
839 if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
840 return -EINVAL;
841
842 pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS);
843 if (!pg)
844 return -ENOMEM;
845
846 pg->pgid = pgid;
847 pg->pg_temp.len = len;
848 for (i = 0; i < len; i++)
849 pg->pg_temp.osds[i] = ceph_decode_32(p);
850
851 ret = __insert_pg_mapping(pg, &map->pg_temp);
852 if (ret) {
853 kfree(pg);
854 return ret;
855 }
856 }
711 } 857 }
712 858
713 ceph_decode_need(p, end, 2*sizeof(u64)+6*sizeof(u32), bad); 859 return 0;
860
861e_inval:
862 return -EINVAL;
863}
864
865static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
866{
867 return __decode_pg_temp(p, end, map, false);
868}
869
870static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
871{
872 return __decode_pg_temp(p, end, map, true);
873}
874
875static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map,
876 bool incremental)
877{
878 u32 n;
879
880 ceph_decode_32_safe(p, end, n, e_inval);
881 while (n--) {
882 struct ceph_pg pgid;
883 u32 osd;
884 int ret;
885
886 ret = ceph_decode_pgid(p, end, &pgid);
887 if (ret)
888 return ret;
889
890 ceph_decode_32_safe(p, end, osd, e_inval);
891
892 ret = __remove_pg_mapping(&map->primary_temp, pgid);
893 BUG_ON(!incremental && ret != -ENOENT);
894
895 if (!incremental || osd != (u32)-1) {
896 struct ceph_pg_mapping *pg;
897
898 pg = kzalloc(sizeof(*pg), GFP_NOFS);
899 if (!pg)
900 return -ENOMEM;
901
902 pg->pgid = pgid;
903 pg->primary_temp.osd = osd;
904
905 ret = __insert_pg_mapping(pg, &map->primary_temp);
906 if (ret) {
907 kfree(pg);
908 return ret;
909 }
910 }
911 }
912
913 return 0;
914
915e_inval:
916 return -EINVAL;
917}
918
919static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
920{
921 return __decode_primary_temp(p, end, map, false);
922}
923
924static int decode_new_primary_temp(void **p, void *end,
925 struct ceph_osdmap *map)
926{
927 return __decode_primary_temp(p, end, map, true);
928}
929
930u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
931{
932 BUG_ON(osd >= map->max_osd);
933
934 if (!map->osd_primary_affinity)
935 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
936
937 return map->osd_primary_affinity[osd];
938}
939
940static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
941{
942 BUG_ON(osd >= map->max_osd);
943
944 if (!map->osd_primary_affinity) {
945 int i;
946
947 map->osd_primary_affinity = kmalloc(map->max_osd*sizeof(u32),
948 GFP_NOFS);
949 if (!map->osd_primary_affinity)
950 return -ENOMEM;
951
952 for (i = 0; i < map->max_osd; i++)
953 map->osd_primary_affinity[i] =
954 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
955 }
956
957 map->osd_primary_affinity[osd] = aff;
958
959 return 0;
960}
961
962static int decode_primary_affinity(void **p, void *end,
963 struct ceph_osdmap *map)
964{
965 u32 len, i;
966
967 ceph_decode_32_safe(p, end, len, e_inval);
968 if (len == 0) {
969 kfree(map->osd_primary_affinity);
970 map->osd_primary_affinity = NULL;
971 return 0;
972 }
973 if (len != map->max_osd)
974 goto e_inval;
975
976 ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
977
978 for (i = 0; i < map->max_osd; i++) {
979 int ret;
980
981 ret = set_primary_affinity(map, i, ceph_decode_32(p));
982 if (ret)
983 return ret;
984 }
985
986 return 0;
987
988e_inval:
989 return -EINVAL;
990}
991
992static int decode_new_primary_affinity(void **p, void *end,
993 struct ceph_osdmap *map)
994{
995 u32 n;
996
997 ceph_decode_32_safe(p, end, n, e_inval);
998 while (n--) {
999 u32 osd, aff;
1000 int ret;
1001
1002 ceph_decode_32_safe(p, end, osd, e_inval);
1003 ceph_decode_32_safe(p, end, aff, e_inval);
1004
1005 ret = set_primary_affinity(map, osd, aff);
1006 if (ret)
1007 return ret;
1008
1009 pr_info("osd%d primary-affinity 0x%x\n", osd, aff);
1010 }
1011
1012 return 0;
1013
1014e_inval:
1015 return -EINVAL;
1016}
1017
1018/*
1019 * decode a full map.
1020 */
1021static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
1022{
1023 u8 struct_v;
1024 u32 epoch = 0;
1025 void *start = *p;
1026 u32 max;
1027 u32 len, i;
1028 int err;
1029
1030 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1031
1032 err = get_osdmap_client_data_v(p, end, "full", &struct_v);
1033 if (err)
1034 goto bad;
1035
1036 /* fsid, epoch, created, modified */
1037 ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
1038 sizeof(map->created) + sizeof(map->modified), e_inval);
714 ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); 1039 ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
715 map->epoch = ceph_decode_32(p); 1040 epoch = map->epoch = ceph_decode_32(p);
716 ceph_decode_copy(p, &map->created, sizeof(map->created)); 1041 ceph_decode_copy(p, &map->created, sizeof(map->created));
717 ceph_decode_copy(p, &map->modified, sizeof(map->modified)); 1042 ceph_decode_copy(p, &map->modified, sizeof(map->modified));
718 1043
719 ceph_decode_32_safe(p, end, max, bad); 1044 /* pools */
720 while (max--) { 1045 err = decode_pools(p, end, map);
721 ceph_decode_need(p, end, 8 + 2, bad); 1046 if (err)
722 err = -ENOMEM; 1047 goto bad;
723 pi = kzalloc(sizeof(*pi), GFP_NOFS);
724 if (!pi)
725 goto bad;
726 pi->id = ceph_decode_64(p);
727 err = __decode_pool(p, end, pi);
728 if (err < 0) {
729 kfree(pi);
730 goto bad;
731 }
732 __insert_pg_pool(&map->pg_pools, pi);
733 }
734 1048
735 err = __decode_pool_names(p, end, map); 1049 /* pool_name */
736 if (err < 0) { 1050 err = decode_pool_names(p, end, map);
737 dout("fail to decode pool names"); 1051 if (err)
738 goto bad; 1052 goto bad;
739 }
740 1053
741 ceph_decode_32_safe(p, end, map->pool_max, bad); 1054 ceph_decode_32_safe(p, end, map->pool_max, e_inval);
742 1055
743 ceph_decode_32_safe(p, end, map->flags, bad); 1056 ceph_decode_32_safe(p, end, map->flags, e_inval);
744 1057
745 max = ceph_decode_32(p); 1058 /* max_osd */
1059 ceph_decode_32_safe(p, end, max, e_inval);
746 1060
747 /* (re)alloc osd arrays */ 1061 /* (re)alloc osd arrays */
748 err = osdmap_set_max_osd(map, max); 1062 err = osdmap_set_max_osd(map, max);
749 if (err < 0) 1063 if (err)
750 goto bad; 1064 goto bad;
751 dout("osdmap_decode max_osd = %d\n", map->max_osd);
752 1065
753 /* osds */ 1066 /* osd_state, osd_weight, osd_addrs->client_addr */
754 err = -EINVAL;
755 ceph_decode_need(p, end, 3*sizeof(u32) + 1067 ceph_decode_need(p, end, 3*sizeof(u32) +
756 map->max_osd*(1 + sizeof(*map->osd_weight) + 1068 map->max_osd*(1 + sizeof(*map->osd_weight) +
757 sizeof(*map->osd_addr)), bad); 1069 sizeof(*map->osd_addr)), e_inval);
758 *p += 4; /* skip length field (should match max) */ 1070
1071 if (ceph_decode_32(p) != map->max_osd)
1072 goto e_inval;
1073
759 ceph_decode_copy(p, map->osd_state, map->max_osd); 1074 ceph_decode_copy(p, map->osd_state, map->max_osd);
760 1075
761 *p += 4; /* skip length field (should match max) */ 1076 if (ceph_decode_32(p) != map->max_osd)
1077 goto e_inval;
1078
762 for (i = 0; i < map->max_osd; i++) 1079 for (i = 0; i < map->max_osd; i++)
763 map->osd_weight[i] = ceph_decode_32(p); 1080 map->osd_weight[i] = ceph_decode_32(p);
764 1081
765 *p += 4; /* skip length field (should match max) */ 1082 if (ceph_decode_32(p) != map->max_osd)
1083 goto e_inval;
1084
766 ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr)); 1085 ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
767 for (i = 0; i < map->max_osd; i++) 1086 for (i = 0; i < map->max_osd; i++)
768 ceph_decode_addr(&map->osd_addr[i]); 1087 ceph_decode_addr(&map->osd_addr[i]);
769 1088
770 /* pg_temp */ 1089 /* pg_temp */
771 ceph_decode_32_safe(p, end, len, bad); 1090 err = decode_pg_temp(p, end, map);
772 for (i = 0; i < len; i++) { 1091 if (err)
773 int n, j; 1092 goto bad;
774 struct ceph_pg pgid;
775 struct ceph_pg_mapping *pg;
776 1093
777 err = ceph_decode_pgid(p, end, &pgid); 1094 /* primary_temp */
1095 if (struct_v >= 1) {
1096 err = decode_primary_temp(p, end, map);
778 if (err) 1097 if (err)
779 goto bad; 1098 goto bad;
780 ceph_decode_need(p, end, sizeof(u32), bad); 1099 }
781 n = ceph_decode_32(p);
782 err = -EINVAL;
783 if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
784 goto bad;
785 ceph_decode_need(p, end, n * sizeof(u32), bad);
786 err = -ENOMEM;
787 pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS);
788 if (!pg)
789 goto bad;
790 pg->pgid = pgid;
791 pg->len = n;
792 for (j = 0; j < n; j++)
793 pg->osds[j] = ceph_decode_32(p);
794 1100
795 err = __insert_pg_mapping(pg, &map->pg_temp); 1101 /* primary_affinity */
1102 if (struct_v >= 2) {
1103 err = decode_primary_affinity(p, end, map);
796 if (err) 1104 if (err)
797 goto bad; 1105 goto bad;
798 dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed, 1106 } else {
799 len); 1107 /* XXX can this happen? */
1108 kfree(map->osd_primary_affinity);
1109 map->osd_primary_affinity = NULL;
800 } 1110 }
801 1111
802 /* crush */ 1112 /* crush */
803 ceph_decode_32_safe(p, end, len, bad); 1113 ceph_decode_32_safe(p, end, len, e_inval);
804 dout("osdmap_decode crush len %d from off 0x%x\n", len, 1114 map->crush = crush_decode(*p, min(*p + len, end));
805 (int)(*p - start));
806 ceph_decode_need(p, end, len, bad);
807 map->crush = crush_decode(*p, end);
808 *p += len;
809 if (IS_ERR(map->crush)) { 1115 if (IS_ERR(map->crush)) {
810 err = PTR_ERR(map->crush); 1116 err = PTR_ERR(map->crush);
811 map->crush = NULL; 1117 map->crush = NULL;
812 goto bad; 1118 goto bad;
813 } 1119 }
1120 *p += len;
814 1121
815 /* ignore the rest of the map */ 1122 /* ignore the rest */
816 *p = end; 1123 *p = end;
817 1124
818 dout("osdmap_decode done %p %p\n", *p, end); 1125 dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
819 return map; 1126 return 0;
820 1127
1128e_inval:
1129 err = -EINVAL;
821bad: 1130bad:
822 dout("osdmap_decode fail err %d\n", err); 1131 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
823 ceph_osdmap_destroy(map); 1132 err, epoch, (int)(*p - start), *p, start, end);
824 return ERR_PTR(err); 1133 print_hex_dump(KERN_DEBUG, "osdmap: ",
1134 DUMP_PREFIX_OFFSET, 16, 1,
1135 start, end - start, true);
1136 return err;
1137}
1138
1139/*
1140 * Allocate and decode a full map.
1141 */
1142struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
1143{
1144 struct ceph_osdmap *map;
1145 int ret;
1146
1147 map = kzalloc(sizeof(*map), GFP_NOFS);
1148 if (!map)
1149 return ERR_PTR(-ENOMEM);
1150
1151 map->pg_temp = RB_ROOT;
1152 map->primary_temp = RB_ROOT;
1153 mutex_init(&map->crush_scratch_mutex);
1154
1155 ret = osdmap_decode(p, end, map);
1156 if (ret) {
1157 ceph_osdmap_destroy(map);
1158 return ERR_PTR(ret);
1159 }
1160
1161 return map;
825} 1162}
826 1163
827/* 1164/*
@@ -840,17 +1177,18 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
840 __s64 new_pool_max; 1177 __s64 new_pool_max;
841 __s32 new_flags, max; 1178 __s32 new_flags, max;
842 void *start = *p; 1179 void *start = *p;
843 int err = -EINVAL; 1180 int err;
844 u16 version; 1181 u8 struct_v;
1182
1183 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
845 1184
846 ceph_decode_16_safe(p, end, version, bad); 1185 err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
847 if (version != 6) { 1186 if (err)
848 pr_warning("got unknown v %d != 6 of inc osdmap\n", version);
849 goto bad; 1187 goto bad;
850 }
851 1188
852 ceph_decode_need(p, end, sizeof(fsid)+sizeof(modified)+2*sizeof(u32), 1189 /* fsid, epoch, modified, new_pool_max, new_flags */
853 bad); 1190 ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
1191 sizeof(u64) + sizeof(u32), e_inval);
854 ceph_decode_copy(p, &fsid, sizeof(fsid)); 1192 ceph_decode_copy(p, &fsid, sizeof(fsid));
855 epoch = ceph_decode_32(p); 1193 epoch = ceph_decode_32(p);
856 BUG_ON(epoch != map->epoch+1); 1194 BUG_ON(epoch != map->epoch+1);
@@ -859,21 +1197,22 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
859 new_flags = ceph_decode_32(p); 1197 new_flags = ceph_decode_32(p);
860 1198
861 /* full map? */ 1199 /* full map? */
862 ceph_decode_32_safe(p, end, len, bad); 1200 ceph_decode_32_safe(p, end, len, e_inval);
863 if (len > 0) { 1201 if (len > 0) {
864 dout("apply_incremental full map len %d, %p to %p\n", 1202 dout("apply_incremental full map len %d, %p to %p\n",
865 len, *p, end); 1203 len, *p, end);
866 return osdmap_decode(p, min(*p+len, end)); 1204 return ceph_osdmap_decode(p, min(*p+len, end));
867 } 1205 }
868 1206
869 /* new crush? */ 1207 /* new crush? */
870 ceph_decode_32_safe(p, end, len, bad); 1208 ceph_decode_32_safe(p, end, len, e_inval);
871 if (len > 0) { 1209 if (len > 0) {
872 dout("apply_incremental new crush map len %d, %p to %p\n",
873 len, *p, end);
874 newcrush = crush_decode(*p, min(*p+len, end)); 1210 newcrush = crush_decode(*p, min(*p+len, end));
875 if (IS_ERR(newcrush)) 1211 if (IS_ERR(newcrush)) {
876 return ERR_CAST(newcrush); 1212 err = PTR_ERR(newcrush);
1213 newcrush = NULL;
1214 goto bad;
1215 }
877 *p += len; 1216 *p += len;
878 } 1217 }
879 1218
@@ -883,13 +1222,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
883 if (new_pool_max >= 0) 1222 if (new_pool_max >= 0)
884 map->pool_max = new_pool_max; 1223 map->pool_max = new_pool_max;
885 1224
886 ceph_decode_need(p, end, 5*sizeof(u32), bad);
887
888 /* new max? */ 1225 /* new max? */
889 max = ceph_decode_32(p); 1226 ceph_decode_32_safe(p, end, max, e_inval);
890 if (max >= 0) { 1227 if (max >= 0) {
891 err = osdmap_set_max_osd(map, max); 1228 err = osdmap_set_max_osd(map, max);
892 if (err < 0) 1229 if (err)
893 goto bad; 1230 goto bad;
894 } 1231 }
895 1232
@@ -902,51 +1239,34 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
902 newcrush = NULL; 1239 newcrush = NULL;
903 } 1240 }
904 1241
905 /* new_pool */ 1242 /* new_pools */
906 ceph_decode_32_safe(p, end, len, bad); 1243 err = decode_new_pools(p, end, map);
907 while (len--) { 1244 if (err)
908 struct ceph_pg_pool_info *pi; 1245 goto bad;
909 1246
910 ceph_decode_64_safe(p, end, pool, bad); 1247 /* new_pool_names */
911 pi = __lookup_pg_pool(&map->pg_pools, pool); 1248 err = decode_pool_names(p, end, map);
912 if (!pi) { 1249 if (err)
913 pi = kzalloc(sizeof(*pi), GFP_NOFS); 1250 goto bad;
914 if (!pi) {
915 err = -ENOMEM;
916 goto bad;
917 }
918 pi->id = pool;
919 __insert_pg_pool(&map->pg_pools, pi);
920 }
921 err = __decode_pool(p, end, pi);
922 if (err < 0)
923 goto bad;
924 }
925 if (version >= 5) {
926 err = __decode_pool_names(p, end, map);
927 if (err < 0)
928 goto bad;
929 }
930 1251
931 /* old_pool */ 1252 /* old_pool */
932 ceph_decode_32_safe(p, end, len, bad); 1253 ceph_decode_32_safe(p, end, len, e_inval);
933 while (len--) { 1254 while (len--) {
934 struct ceph_pg_pool_info *pi; 1255 struct ceph_pg_pool_info *pi;
935 1256
936 ceph_decode_64_safe(p, end, pool, bad); 1257 ceph_decode_64_safe(p, end, pool, e_inval);
937 pi = __lookup_pg_pool(&map->pg_pools, pool); 1258 pi = __lookup_pg_pool(&map->pg_pools, pool);
938 if (pi) 1259 if (pi)
939 __remove_pg_pool(&map->pg_pools, pi); 1260 __remove_pg_pool(&map->pg_pools, pi);
940 } 1261 }
941 1262
942 /* new_up */ 1263 /* new_up */
943 err = -EINVAL; 1264 ceph_decode_32_safe(p, end, len, e_inval);
944 ceph_decode_32_safe(p, end, len, bad);
945 while (len--) { 1265 while (len--) {
946 u32 osd; 1266 u32 osd;
947 struct ceph_entity_addr addr; 1267 struct ceph_entity_addr addr;
948 ceph_decode_32_safe(p, end, osd, bad); 1268 ceph_decode_32_safe(p, end, osd, e_inval);
949 ceph_decode_copy_safe(p, end, &addr, sizeof(addr), bad); 1269 ceph_decode_copy_safe(p, end, &addr, sizeof(addr), e_inval);
950 ceph_decode_addr(&addr); 1270 ceph_decode_addr(&addr);
951 pr_info("osd%d up\n", osd); 1271 pr_info("osd%d up\n", osd);
952 BUG_ON(osd >= map->max_osd); 1272 BUG_ON(osd >= map->max_osd);
@@ -955,11 +1275,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
955 } 1275 }
956 1276
957 /* new_state */ 1277 /* new_state */
958 ceph_decode_32_safe(p, end, len, bad); 1278 ceph_decode_32_safe(p, end, len, e_inval);
959 while (len--) { 1279 while (len--) {
960 u32 osd; 1280 u32 osd;
961 u8 xorstate; 1281 u8 xorstate;
962 ceph_decode_32_safe(p, end, osd, bad); 1282 ceph_decode_32_safe(p, end, osd, e_inval);
963 xorstate = **(u8 **)p; 1283 xorstate = **(u8 **)p;
964 (*p)++; /* clean flag */ 1284 (*p)++; /* clean flag */
965 if (xorstate == 0) 1285 if (xorstate == 0)
@@ -971,10 +1291,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
971 } 1291 }
972 1292
973 /* new_weight */ 1293 /* new_weight */
974 ceph_decode_32_safe(p, end, len, bad); 1294 ceph_decode_32_safe(p, end, len, e_inval);
975 while (len--) { 1295 while (len--) {
976 u32 osd, off; 1296 u32 osd, off;
977 ceph_decode_need(p, end, sizeof(u32)*2, bad); 1297 ceph_decode_need(p, end, sizeof(u32)*2, e_inval);
978 osd = ceph_decode_32(p); 1298 osd = ceph_decode_32(p);
979 off = ceph_decode_32(p); 1299 off = ceph_decode_32(p);
980 pr_info("osd%d weight 0x%x %s\n", osd, off, 1300 pr_info("osd%d weight 0x%x %s\n", osd, off,
@@ -985,56 +1305,35 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
985 } 1305 }
986 1306
987 /* new_pg_temp */ 1307 /* new_pg_temp */
988 ceph_decode_32_safe(p, end, len, bad); 1308 err = decode_new_pg_temp(p, end, map);
989 while (len--) { 1309 if (err)
990 struct ceph_pg_mapping *pg; 1310 goto bad;
991 int j;
992 struct ceph_pg pgid;
993 u32 pglen;
994 1311
995 err = ceph_decode_pgid(p, end, &pgid); 1312 /* new_primary_temp */
1313 if (struct_v >= 1) {
1314 err = decode_new_primary_temp(p, end, map);
996 if (err) 1315 if (err)
997 goto bad; 1316 goto bad;
998 ceph_decode_need(p, end, sizeof(u32), bad); 1317 }
999 pglen = ceph_decode_32(p);
1000 if (pglen) {
1001 ceph_decode_need(p, end, pglen*sizeof(u32), bad);
1002
1003 /* removing existing (if any) */
1004 (void) __remove_pg_mapping(&map->pg_temp, pgid);
1005 1318
1006 /* insert */ 1319 /* new_primary_affinity */
1007 err = -EINVAL; 1320 if (struct_v >= 2) {
1008 if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) 1321 err = decode_new_primary_affinity(p, end, map);
1009 goto bad; 1322 if (err)
1010 err = -ENOMEM; 1323 goto bad;
1011 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
1012 if (!pg)
1013 goto bad;
1014 pg->pgid = pgid;
1015 pg->len = pglen;
1016 for (j = 0; j < pglen; j++)
1017 pg->osds[j] = ceph_decode_32(p);
1018 err = __insert_pg_mapping(pg, &map->pg_temp);
1019 if (err) {
1020 kfree(pg);
1021 goto bad;
1022 }
1023 dout(" added pg_temp %lld.%x len %d\n", pgid.pool,
1024 pgid.seed, pglen);
1025 } else {
1026 /* remove */
1027 __remove_pg_mapping(&map->pg_temp, pgid);
1028 }
1029 } 1324 }
1030 1325
1031 /* ignore the rest */ 1326 /* ignore the rest */
1032 *p = end; 1327 *p = end;
1328
1329 dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1033 return map; 1330 return map;
1034 1331
1332e_inval:
1333 err = -EINVAL;
1035bad: 1334bad:
1036 pr_err("corrupt inc osdmap epoch %d off %d (%p of %p-%p)\n", 1335 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1037 epoch, (int)(*p - start), *p, start, end); 1336 err, epoch, (int)(*p - start), *p, start, end);
1038 print_hex_dump(KERN_DEBUG, "osdmap: ", 1337 print_hex_dump(KERN_DEBUG, "osdmap: ",
1039 DUMP_PREFIX_OFFSET, 16, 1, 1338 DUMP_PREFIX_OFFSET, 16, 1,
1040 start, end - start, true); 1339 start, end - start, true);
@@ -1142,61 +1441,249 @@ int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
1142} 1441}
1143EXPORT_SYMBOL(ceph_oloc_oid_to_pg); 1442EXPORT_SYMBOL(ceph_oloc_oid_to_pg);
1144 1443
1145static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x, 1444static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
1146 int *result, int result_max, 1445 int *result, int result_max,
1147 const __u32 *weight, int weight_max) 1446 const __u32 *weight, int weight_max)
1148{ 1447{
1149 int scratch[result_max * 3]; 1448 int r;
1150 1449
1151 return crush_do_rule(map, ruleno, x, result, result_max, 1450 BUG_ON(result_max > CEPH_PG_MAX_SIZE);
1152 weight, weight_max, scratch); 1451
1452 mutex_lock(&map->crush_scratch_mutex);
1453 r = crush_do_rule(map->crush, ruleno, x, result, result_max,
1454 weight, weight_max, map->crush_scratch_ary);
1455 mutex_unlock(&map->crush_scratch_mutex);
1456
1457 return r;
1153} 1458}
1154 1459
1155/* 1460/*
1156 * Calculate raw osd vector for the given pgid. Return pointer to osd 1461 * Calculate raw (crush) set for given pgid.
1157 * array, or NULL on failure. 1462 *
1463 * Return raw set length, or error.
1158 */ 1464 */
1159static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, 1465static int pg_to_raw_osds(struct ceph_osdmap *osdmap,
1160 int *osds, int *num) 1466 struct ceph_pg_pool_info *pool,
1467 struct ceph_pg pgid, u32 pps, int *osds)
1161{ 1468{
1162 struct ceph_pg_mapping *pg;
1163 struct ceph_pg_pool_info *pool;
1164 int ruleno; 1469 int ruleno;
1165 int r; 1470 int len;
1166 u32 pps;
1167 1471
1168 pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); 1472 /* crush */
1169 if (!pool) 1473 ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset,
1170 return NULL; 1474 pool->type, pool->size);
1475 if (ruleno < 0) {
1476 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
1477 pgid.pool, pool->crush_ruleset, pool->type,
1478 pool->size);
1479 return -ENOENT;
1480 }
1171 1481
1172 /* pg_temp? */ 1482 len = do_crush(osdmap, ruleno, pps, osds,
1483 min_t(int, pool->size, CEPH_PG_MAX_SIZE),
1484 osdmap->osd_weight, osdmap->max_osd);
1485 if (len < 0) {
1486 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
1487 len, ruleno, pgid.pool, pool->crush_ruleset,
1488 pool->type, pool->size);
1489 return len;
1490 }
1491
1492 return len;
1493}
1494
1495/*
1496 * Given raw set, calculate up set and up primary.
1497 *
1498 * Return up set length. *primary is set to up primary osd id, or -1
1499 * if up set is empty.
1500 */
1501static int raw_to_up_osds(struct ceph_osdmap *osdmap,
1502 struct ceph_pg_pool_info *pool,
1503 int *osds, int len, int *primary)
1504{
1505 int up_primary = -1;
1506 int i;
1507
1508 if (ceph_can_shift_osds(pool)) {
1509 int removed = 0;
1510
1511 for (i = 0; i < len; i++) {
1512 if (ceph_osd_is_down(osdmap, osds[i])) {
1513 removed++;
1514 continue;
1515 }
1516 if (removed)
1517 osds[i - removed] = osds[i];
1518 }
1519
1520 len -= removed;
1521 if (len > 0)
1522 up_primary = osds[0];
1523 } else {
1524 for (i = len - 1; i >= 0; i--) {
1525 if (ceph_osd_is_down(osdmap, osds[i]))
1526 osds[i] = CRUSH_ITEM_NONE;
1527 else
1528 up_primary = osds[i];
1529 }
1530 }
1531
1532 *primary = up_primary;
1533 return len;
1534}
1535
1536static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
1537 struct ceph_pg_pool_info *pool,
1538 int *osds, int len, int *primary)
1539{
1540 int i;
1541 int pos = -1;
1542
1543 /*
1544 * Do we have any non-default primary_affinity values for these
1545 * osds?
1546 */
1547 if (!osdmap->osd_primary_affinity)
1548 return;
1549
1550 for (i = 0; i < len; i++) {
1551 if (osds[i] != CRUSH_ITEM_NONE &&
1552 osdmap->osd_primary_affinity[i] !=
1553 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
1554 break;
1555 }
1556 }
1557 if (i == len)
1558 return;
1559
1560 /*
1561 * Pick the primary. Feed both the seed (for the pg) and the
1562 * osd into the hash/rng so that a proportional fraction of an
1563 * osd's pgs get rejected as primary.
1564 */
1565 for (i = 0; i < len; i++) {
1566 int osd;
1567 u32 aff;
1568
1569 osd = osds[i];
1570 if (osd == CRUSH_ITEM_NONE)
1571 continue;
1572
1573 aff = osdmap->osd_primary_affinity[osd];
1574 if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
1575 (crush_hash32_2(CRUSH_HASH_RJENKINS1,
1576 pps, osd) >> 16) >= aff) {
1577 /*
1578 * We chose not to use this primary. Note it
1579 * anyway as a fallback in case we don't pick
1580 * anyone else, but keep looking.
1581 */
1582 if (pos < 0)
1583 pos = i;
1584 } else {
1585 pos = i;
1586 break;
1587 }
1588 }
1589 if (pos < 0)
1590 return;
1591
1592 *primary = osds[pos];
1593
1594 if (ceph_can_shift_osds(pool) && pos > 0) {
1595 /* move the new primary to the front */
1596 for (i = pos; i > 0; i--)
1597 osds[i] = osds[i - 1];
1598 osds[0] = *primary;
1599 }
1600}
1601
1602/*
1603 * Given up set, apply pg_temp and primary_temp mappings.
1604 *
1605 * Return acting set length. *primary is set to acting primary osd id,
1606 * or -1 if acting set is empty.
1607 */
1608static int apply_temps(struct ceph_osdmap *osdmap,
1609 struct ceph_pg_pool_info *pool, struct ceph_pg pgid,
1610 int *osds, int len, int *primary)
1611{
1612 struct ceph_pg_mapping *pg;
1613 int temp_len;
1614 int temp_primary;
1615 int i;
1616
1617 /* raw_pg -> pg */
1173 pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, 1618 pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,
1174 pool->pg_num_mask); 1619 pool->pg_num_mask);
1620
1621 /* pg_temp? */
1175 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); 1622 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
1176 if (pg) { 1623 if (pg) {
1177 *num = pg->len; 1624 temp_len = 0;
1178 return pg->osds; 1625 temp_primary = -1;
1626
1627 for (i = 0; i < pg->pg_temp.len; i++) {
1628 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
1629 if (ceph_can_shift_osds(pool))
1630 continue;
1631 else
1632 osds[temp_len++] = CRUSH_ITEM_NONE;
1633 } else {
1634 osds[temp_len++] = pg->pg_temp.osds[i];
1635 }
1636 }
1637
1638 /* apply pg_temp's primary */
1639 for (i = 0; i < temp_len; i++) {
1640 if (osds[i] != CRUSH_ITEM_NONE) {
1641 temp_primary = osds[i];
1642 break;
1643 }
1644 }
1645 } else {
1646 temp_len = len;
1647 temp_primary = *primary;
1179 } 1648 }
1180 1649
1181 /* crush */ 1650 /* primary_temp? */
1182 ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, 1651 pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
1183 pool->type, pool->size); 1652 if (pg)
1184 if (ruleno < 0) { 1653 temp_primary = pg->primary_temp.osd;
1185 pr_err("no crush rule pool %lld ruleset %d type %d size %d\n", 1654
1186 pgid.pool, pool->crush_ruleset, pool->type, 1655 *primary = temp_primary;
1187 pool->size); 1656 return temp_len;
1188 return NULL; 1657}
1658
1659/*
1660 * Calculate acting set for given pgid.
1661 *
1662 * Return acting set length, or error. *primary is set to acting
1663 * primary osd id, or -1 if acting set is empty or on error.
1664 */
1665int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1666 int *osds, int *primary)
1667{
1668 struct ceph_pg_pool_info *pool;
1669 u32 pps;
1670 int len;
1671
1672 pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool);
1673 if (!pool) {
1674 *primary = -1;
1675 return -ENOENT;
1189 } 1676 }
1190 1677
1191 if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { 1678 if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
1192 /* hash pool id and seed sothat pool PGs do not overlap */ 1679 /* hash pool id and seed so that pool PGs do not overlap */
1193 pps = crush_hash32_2(CRUSH_HASH_RJENKINS1, 1680 pps = crush_hash32_2(CRUSH_HASH_RJENKINS1,
1194 ceph_stable_mod(pgid.seed, pool->pgp_num, 1681 ceph_stable_mod(pgid.seed, pool->pgp_num,
1195 pool->pgp_num_mask), 1682 pool->pgp_num_mask),
1196 pgid.pool); 1683 pgid.pool);
1197 } else { 1684 } else {
1198 /* 1685 /*
1199 * legacy ehavior: add ps and pool together. this is 1686 * legacy behavior: add ps and pool together. this is
1200 * not a great approach because the PGs from each pool 1687 * not a great approach because the PGs from each pool
1201 * will overlap on top of each other: 0.5 == 1.4 == 1688 * will overlap on top of each other: 0.5 == 1.4 ==
1202 * 2.3 == ... 1689 * 2.3 == ...
@@ -1205,38 +1692,20 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1205 pool->pgp_num_mask) + 1692 pool->pgp_num_mask) +
1206 (unsigned)pgid.pool; 1693 (unsigned)pgid.pool;
1207 } 1694 }
1208 r = crush_do_rule_ary(osdmap->crush, ruleno, pps, 1695
1209 osds, min_t(int, pool->size, *num), 1696 len = pg_to_raw_osds(osdmap, pool, pgid, pps, osds);
1210 osdmap->osd_weight, osdmap->max_osd); 1697 if (len < 0) {
1211 if (r < 0) { 1698 *primary = -1;
1212 pr_err("error %d from crush rule: pool %lld ruleset %d type %d" 1699 return len;
1213 " size %d\n", r, pgid.pool, pool->crush_ruleset,
1214 pool->type, pool->size);
1215 return NULL;
1216 } 1700 }
1217 *num = r;
1218 return osds;
1219}
1220 1701
1221/* 1702 len = raw_to_up_osds(osdmap, pool, osds, len, primary);
1222 * Return acting set for given pgid.
1223 */
1224int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1225 int *acting)
1226{
1227 int rawosds[CEPH_PG_MAX_SIZE], *osds;
1228 int i, o, num = CEPH_PG_MAX_SIZE;
1229 1703
1230 osds = calc_pg_raw(osdmap, pgid, rawosds, &num); 1704 apply_primary_affinity(osdmap, pps, pool, osds, len, primary);
1231 if (!osds)
1232 return -1;
1233 1705
1234 /* primary is first up osd */ 1706 len = apply_temps(osdmap, pool, pgid, osds, len, primary);
1235 o = 0; 1707
1236 for (i = 0; i < num; i++) 1708 return len;
1237 if (ceph_osd_is_up(osdmap, osds[i]))
1238 acting[o++] = osds[i];
1239 return o;
1240} 1709}
1241 1710
1242/* 1711/*
@@ -1244,17 +1713,11 @@ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1244 */ 1713 */
1245int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) 1714int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid)
1246{ 1715{
1247 int rawosds[CEPH_PG_MAX_SIZE], *osds; 1716 int osds[CEPH_PG_MAX_SIZE];
1248 int i, num = CEPH_PG_MAX_SIZE; 1717 int primary;
1249 1718
1250 osds = calc_pg_raw(osdmap, pgid, rawosds, &num); 1719 ceph_calc_pg_acting(osdmap, pgid, osds, &primary);
1251 if (!osds)
1252 return -1;
1253 1720
1254 /* primary is first up osd */ 1721 return primary;
1255 for (i = 0; i < num; i++)
1256 if (ceph_osd_is_up(osdmap, osds[i]))
1257 return osds[i];
1258 return -1;
1259} 1722}
1260EXPORT_SYMBOL(ceph_calc_pg_primary); 1723EXPORT_SYMBOL(ceph_calc_pg_primary);