summaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2017-01-31 09:55:06 -0500
committerIlya Dryomov <idryomov@gmail.com>2017-02-20 06:16:11 -0500
commit66a0e2d579dbec5c676cfe446234ffebb267c564 (patch)
tree6a2d4307be3e39e4955697a49ed5daf6ea97d927 /net/ceph
parent1b6a78b5b91cdc07cc0b940b458e90c86835cf73 (diff)
crush: remove mutable part of CRUSH map
Then add it to the working state. It would be very nice if we didn't have to take a lock to calculate a crush placement. By moving the permutation array into the working data, we can treat the CRUSH map as immutable. Reflects ceph.git commit cbcd039651c0569551cb90d26ce27e1432671f2a. Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/crush/crush.c5
-rw-r--r--net/ceph/crush/mapper.c206
-rw-r--r--net/ceph/osdmap.c47
3 files changed, 180 insertions, 78 deletions
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 80d7c3a97cb8..5bf94c04f645 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -45,7 +45,6 @@ int crush_get_bucket_item_weight(const struct crush_bucket *b, int p)
45 45
46void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b) 46void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b)
47{ 47{
48 kfree(b->h.perm);
49 kfree(b->h.items); 48 kfree(b->h.items);
50 kfree(b); 49 kfree(b);
51} 50}
@@ -54,14 +53,12 @@ void crush_destroy_bucket_list(struct crush_bucket_list *b)
54{ 53{
55 kfree(b->item_weights); 54 kfree(b->item_weights);
56 kfree(b->sum_weights); 55 kfree(b->sum_weights);
57 kfree(b->h.perm);
58 kfree(b->h.items); 56 kfree(b->h.items);
59 kfree(b); 57 kfree(b);
60} 58}
61 59
62void crush_destroy_bucket_tree(struct crush_bucket_tree *b) 60void crush_destroy_bucket_tree(struct crush_bucket_tree *b)
63{ 61{
64 kfree(b->h.perm);
65 kfree(b->h.items); 62 kfree(b->h.items);
66 kfree(b->node_weights); 63 kfree(b->node_weights);
67 kfree(b); 64 kfree(b);
@@ -71,7 +68,6 @@ void crush_destroy_bucket_straw(struct crush_bucket_straw *b)
71{ 68{
72 kfree(b->straws); 69 kfree(b->straws);
73 kfree(b->item_weights); 70 kfree(b->item_weights);
74 kfree(b->h.perm);
75 kfree(b->h.items); 71 kfree(b->h.items);
76 kfree(b); 72 kfree(b);
77} 73}
@@ -79,7 +75,6 @@ void crush_destroy_bucket_straw(struct crush_bucket_straw *b)
79void crush_destroy_bucket_straw2(struct crush_bucket_straw2 *b) 75void crush_destroy_bucket_straw2(struct crush_bucket_straw2 *b)
80{ 76{
81 kfree(b->item_weights); 77 kfree(b->item_weights);
82 kfree(b->h.perm);
83 kfree(b->h.items); 78 kfree(b->h.items);
84 kfree(b); 79 kfree(b);
85} 80}
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 130ab407c5ec..9e75be5ec716 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -54,7 +54,6 @@ int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size
54 return -1; 54 return -1;
55} 55}
56 56
57
58/* 57/*
59 * bucket choose methods 58 * bucket choose methods
60 * 59 *
@@ -72,59 +71,60 @@ int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size
72 * Since this is expensive, we optimize for the r=0 case, which 71 * Since this is expensive, we optimize for the r=0 case, which
73 * captures the vast majority of calls. 72 * captures the vast majority of calls.
74 */ 73 */
75static int bucket_perm_choose(struct crush_bucket *bucket, 74static int bucket_perm_choose(const struct crush_bucket *bucket,
75 struct crush_work_bucket *work,
76 int x, int r) 76 int x, int r)
77{ 77{
78 unsigned int pr = r % bucket->size; 78 unsigned int pr = r % bucket->size;
79 unsigned int i, s; 79 unsigned int i, s;
80 80
81 /* start a new permutation if @x has changed */ 81 /* start a new permutation if @x has changed */
82 if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) { 82 if (work->perm_x != (__u32)x || work->perm_n == 0) {
83 dprintk("bucket %d new x=%d\n", bucket->id, x); 83 dprintk("bucket %d new x=%d\n", bucket->id, x);
84 bucket->perm_x = x; 84 work->perm_x = x;
85 85
86 /* optimize common r=0 case */ 86 /* optimize common r=0 case */
87 if (pr == 0) { 87 if (pr == 0) {
88 s = crush_hash32_3(bucket->hash, x, bucket->id, 0) % 88 s = crush_hash32_3(bucket->hash, x, bucket->id, 0) %
89 bucket->size; 89 bucket->size;
90 bucket->perm[0] = s; 90 work->perm[0] = s;
91 bucket->perm_n = 0xffff; /* magic value, see below */ 91 work->perm_n = 0xffff; /* magic value, see below */
92 goto out; 92 goto out;
93 } 93 }
94 94
95 for (i = 0; i < bucket->size; i++) 95 for (i = 0; i < bucket->size; i++)
96 bucket->perm[i] = i; 96 work->perm[i] = i;
97 bucket->perm_n = 0; 97 work->perm_n = 0;
98 } else if (bucket->perm_n == 0xffff) { 98 } else if (work->perm_n == 0xffff) {
99 /* clean up after the r=0 case above */ 99 /* clean up after the r=0 case above */
100 for (i = 1; i < bucket->size; i++) 100 for (i = 1; i < bucket->size; i++)
101 bucket->perm[i] = i; 101 work->perm[i] = i;
102 bucket->perm[bucket->perm[0]] = 0; 102 work->perm[work->perm[0]] = 0;
103 bucket->perm_n = 1; 103 work->perm_n = 1;
104 } 104 }
105 105
106 /* calculate permutation up to pr */ 106 /* calculate permutation up to pr */
107 for (i = 0; i < bucket->perm_n; i++) 107 for (i = 0; i < work->perm_n; i++)
108 dprintk(" perm_choose have %d: %d\n", i, bucket->perm[i]); 108 dprintk(" perm_choose have %d: %d\n", i, bucket->perm[i]);
109 while (bucket->perm_n <= pr) { 109 while (work->perm_n <= pr) {
110 unsigned int p = bucket->perm_n; 110 unsigned int p = work->perm_n;
111 /* no point in swapping the final entry */ 111 /* no point in swapping the final entry */
112 if (p < bucket->size - 1) { 112 if (p < bucket->size - 1) {
113 i = crush_hash32_3(bucket->hash, x, bucket->id, p) % 113 i = crush_hash32_3(bucket->hash, x, bucket->id, p) %
114 (bucket->size - p); 114 (bucket->size - p);
115 if (i) { 115 if (i) {
116 unsigned int t = bucket->perm[p + i]; 116 unsigned int t = work->perm[p + i];
117 bucket->perm[p + i] = bucket->perm[p]; 117 work->perm[p + i] = work->perm[p];
118 bucket->perm[p] = t; 118 work->perm[p] = t;
119 } 119 }
120 dprintk(" perm_choose swap %d with %d\n", p, p+i); 120 dprintk(" perm_choose swap %d with %d\n", p, p+i);
121 } 121 }
122 bucket->perm_n++; 122 work->perm_n++;
123 } 123 }
124 for (i = 0; i < bucket->size; i++) 124 for (i = 0; i < bucket->size; i++)
125 dprintk(" perm_choose %d: %d\n", i, bucket->perm[i]); 125 dprintk(" perm_choose %d: %d\n", i, bucket->perm[i]);
126 126
127 s = bucket->perm[pr]; 127 s = work->perm[pr];
128out: 128out:
129 dprintk(" perm_choose %d sz=%d x=%d r=%d (%d) s=%d\n", bucket->id, 129 dprintk(" perm_choose %d sz=%d x=%d r=%d (%d) s=%d\n", bucket->id,
130 bucket->size, x, r, pr, s); 130 bucket->size, x, r, pr, s);
@@ -132,14 +132,14 @@ out:
132} 132}
133 133
134/* uniform */ 134/* uniform */
135static int bucket_uniform_choose(struct crush_bucket_uniform *bucket, 135static int bucket_uniform_choose(const struct crush_bucket_uniform *bucket,
136 int x, int r) 136 struct crush_work_bucket *work, int x, int r)
137{ 137{
138 return bucket_perm_choose(&bucket->h, x, r); 138 return bucket_perm_choose(&bucket->h, work, x, r);
139} 139}
140 140
141/* list */ 141/* list */
142static int bucket_list_choose(struct crush_bucket_list *bucket, 142static int bucket_list_choose(const struct crush_bucket_list *bucket,
143 int x, int r) 143 int x, int r)
144{ 144{
145 int i; 145 int i;
@@ -155,8 +155,9 @@ static int bucket_list_choose(struct crush_bucket_list *bucket,
155 w *= bucket->sum_weights[i]; 155 w *= bucket->sum_weights[i];
156 w = w >> 16; 156 w = w >> 16;
157 /*dprintk(" scaled %llx\n", w);*/ 157 /*dprintk(" scaled %llx\n", w);*/
158 if (w < bucket->item_weights[i]) 158 if (w < bucket->item_weights[i]) {
159 return bucket->h.items[i]; 159 return bucket->h.items[i];
160 }
160 } 161 }
161 162
162 dprintk("bad list sums for bucket %d\n", bucket->h.id); 163 dprintk("bad list sums for bucket %d\n", bucket->h.id);
@@ -192,7 +193,7 @@ static int terminal(int x)
192 return x & 1; 193 return x & 1;
193} 194}
194 195
195static int bucket_tree_choose(struct crush_bucket_tree *bucket, 196static int bucket_tree_choose(const struct crush_bucket_tree *bucket,
196 int x, int r) 197 int x, int r)
197{ 198{
198 int n; 199 int n;
@@ -224,7 +225,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket,
224 225
225/* straw */ 226/* straw */
226 227
227static int bucket_straw_choose(struct crush_bucket_straw *bucket, 228static int bucket_straw_choose(const struct crush_bucket_straw *bucket,
228 int x, int r) 229 int x, int r)
229{ 230{
230 __u32 i; 231 __u32 i;
@@ -301,7 +302,7 @@ static __u64 crush_ln(unsigned int xin)
301 * 302 *
302 */ 303 */
303 304
304static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket, 305static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
305 int x, int r) 306 int x, int r)
306{ 307{
307 unsigned int i, high = 0; 308 unsigned int i, high = 0;
@@ -344,37 +345,42 @@ static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket,
344 high_draw = draw; 345 high_draw = draw;
345 } 346 }
346 } 347 }
348
347 return bucket->h.items[high]; 349 return bucket->h.items[high];
348} 350}
349 351
350 352
351static int crush_bucket_choose(struct crush_bucket *in, int x, int r) 353static int crush_bucket_choose(const struct crush_bucket *in,
354 struct crush_work_bucket *work,
355 int x, int r)
352{ 356{
353 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r); 357 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
354 BUG_ON(in->size == 0); 358 BUG_ON(in->size == 0);
355 switch (in->alg) { 359 switch (in->alg) {
356 case CRUSH_BUCKET_UNIFORM: 360 case CRUSH_BUCKET_UNIFORM:
357 return bucket_uniform_choose((struct crush_bucket_uniform *)in, 361 return bucket_uniform_choose(
358 x, r); 362 (const struct crush_bucket_uniform *)in,
363 work, x, r);
359 case CRUSH_BUCKET_LIST: 364 case CRUSH_BUCKET_LIST:
360 return bucket_list_choose((struct crush_bucket_list *)in, 365 return bucket_list_choose((const struct crush_bucket_list *)in,
361 x, r); 366 x, r);
362 case CRUSH_BUCKET_TREE: 367 case CRUSH_BUCKET_TREE:
363 return bucket_tree_choose((struct crush_bucket_tree *)in, 368 return bucket_tree_choose((const struct crush_bucket_tree *)in,
364 x, r); 369 x, r);
365 case CRUSH_BUCKET_STRAW: 370 case CRUSH_BUCKET_STRAW:
366 return bucket_straw_choose((struct crush_bucket_straw *)in, 371 return bucket_straw_choose(
367 x, r); 372 (const struct crush_bucket_straw *)in,
373 x, r);
368 case CRUSH_BUCKET_STRAW2: 374 case CRUSH_BUCKET_STRAW2:
369 return bucket_straw2_choose((struct crush_bucket_straw2 *)in, 375 return bucket_straw2_choose(
370 x, r); 376 (const struct crush_bucket_straw2 *)in,
377 x, r);
371 default: 378 default:
372 dprintk("unknown bucket %d alg %d\n", in->id, in->alg); 379 dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
373 return in->items[0]; 380 return in->items[0];
374 } 381 }
375} 382}
376 383
377
378/* 384/*
379 * true if device is marked "out" (failed, fully offloaded) 385 * true if device is marked "out" (failed, fully offloaded)
380 * of the cluster 386 * of the cluster
@@ -416,7 +422,8 @@ static int is_out(const struct crush_map *map,
416 * @parent_r: r value passed from the parent 422 * @parent_r: r value passed from the parent
417 */ 423 */
418static int crush_choose_firstn(const struct crush_map *map, 424static int crush_choose_firstn(const struct crush_map *map,
419 struct crush_bucket *bucket, 425 struct crush_work *work,
426 const struct crush_bucket *bucket,
420 const __u32 *weight, int weight_max, 427 const __u32 *weight, int weight_max,
421 int x, int numrep, int type, 428 int x, int numrep, int type,
422 int *out, int outpos, 429 int *out, int outpos,
@@ -434,7 +441,7 @@ static int crush_choose_firstn(const struct crush_map *map,
434 int rep; 441 int rep;
435 unsigned int ftotal, flocal; 442 unsigned int ftotal, flocal;
436 int retry_descent, retry_bucket, skip_rep; 443 int retry_descent, retry_bucket, skip_rep;
437 struct crush_bucket *in = bucket; 444 const struct crush_bucket *in = bucket;
438 int r; 445 int r;
439 int i; 446 int i;
440 int item = 0; 447 int item = 0;
@@ -473,9 +480,13 @@ static int crush_choose_firstn(const struct crush_map *map,
473 if (local_fallback_retries > 0 && 480 if (local_fallback_retries > 0 &&
474 flocal >= (in->size>>1) && 481 flocal >= (in->size>>1) &&
475 flocal > local_fallback_retries) 482 flocal > local_fallback_retries)
476 item = bucket_perm_choose(in, x, r); 483 item = bucket_perm_choose(
484 in, work->work[-1-in->id],
485 x, r);
477 else 486 else
478 item = crush_bucket_choose(in, x, r); 487 item = crush_bucket_choose(
488 in, work->work[-1-in->id],
489 x, r);
479 if (item >= map->max_devices) { 490 if (item >= map->max_devices) {
480 dprintk(" bad item %d\n", item); 491 dprintk(" bad item %d\n", item);
481 skip_rep = 1; 492 skip_rep = 1;
@@ -518,19 +529,21 @@ static int crush_choose_firstn(const struct crush_map *map,
518 sub_r = r >> (vary_r-1); 529 sub_r = r >> (vary_r-1);
519 else 530 else
520 sub_r = 0; 531 sub_r = 0;
521 if (crush_choose_firstn(map, 532 if (crush_choose_firstn(
522 map->buckets[-1-item], 533 map,
523 weight, weight_max, 534 work,
524 x, stable ? 1 : outpos+1, 0, 535 map->buckets[-1-item],
525 out2, outpos, count, 536 weight, weight_max,
526 recurse_tries, 0, 537 x, stable ? 1 : outpos+1, 0,
527 local_retries, 538 out2, outpos, count,
528 local_fallback_retries, 539 recurse_tries, 0,
529 0, 540 local_retries,
530 vary_r, 541 local_fallback_retries,
531 stable, 542 0,
532 NULL, 543 vary_r,
533 sub_r) <= outpos) 544 stable,
545 NULL,
546 sub_r) <= outpos)
534 /* didn't get leaf */ 547 /* didn't get leaf */
535 reject = 1; 548 reject = 1;
536 } else { 549 } else {
@@ -600,7 +613,8 @@ reject:
600 * 613 *
601 */ 614 */
602static void crush_choose_indep(const struct crush_map *map, 615static void crush_choose_indep(const struct crush_map *map,
603 struct crush_bucket *bucket, 616 struct crush_work *work,
617 const struct crush_bucket *bucket,
604 const __u32 *weight, int weight_max, 618 const __u32 *weight, int weight_max,
605 int x, int left, int numrep, int type, 619 int x, int left, int numrep, int type,
606 int *out, int outpos, 620 int *out, int outpos,
@@ -610,7 +624,7 @@ static void crush_choose_indep(const struct crush_map *map,
610 int *out2, 624 int *out2,
611 int parent_r) 625 int parent_r)
612{ 626{
613 struct crush_bucket *in = bucket; 627 const struct crush_bucket *in = bucket;
614 int endpos = outpos + left; 628 int endpos = outpos + left;
615 int rep; 629 int rep;
616 unsigned int ftotal; 630 unsigned int ftotal;
@@ -678,7 +692,9 @@ static void crush_choose_indep(const struct crush_map *map,
678 break; 692 break;
679 } 693 }
680 694
681 item = crush_bucket_choose(in, x, r); 695 item = crush_bucket_choose(
696 in, work->work[-1-in->id],
697 x, r);
682 if (item >= map->max_devices) { 698 if (item >= map->max_devices) {
683 dprintk(" bad item %d\n", item); 699 dprintk(" bad item %d\n", item);
684 out[rep] = CRUSH_ITEM_NONE; 700 out[rep] = CRUSH_ITEM_NONE;
@@ -724,13 +740,15 @@ static void crush_choose_indep(const struct crush_map *map,
724 740
725 if (recurse_to_leaf) { 741 if (recurse_to_leaf) {
726 if (item < 0) { 742 if (item < 0) {
727 crush_choose_indep(map, 743 crush_choose_indep(
728 map->buckets[-1-item], 744 map,
729 weight, weight_max, 745 work,
730 x, 1, numrep, 0, 746 map->buckets[-1-item],
731 out2, rep, 747 weight, weight_max,
732 recurse_tries, 0, 748 x, 1, numrep, 0,
733 0, NULL, r); 749 out2, rep,
750 recurse_tries, 0,
751 0, NULL, r);
734 if (out2[rep] == CRUSH_ITEM_NONE) { 752 if (out2[rep] == CRUSH_ITEM_NONE) {
735 /* placed nothing; no leaf */ 753 /* placed nothing; no leaf */
736 break; 754 break;
@@ -781,6 +799,53 @@ static void crush_choose_indep(const struct crush_map *map,
781#endif 799#endif
782} 800}
783 801
802
803/*
804 * This takes a chunk of memory and sets it up to be a shiny new
805 * working area for a CRUSH placement computation. It must be called
806 * on any newly allocated memory before passing it in to
807 * crush_do_rule. It may be used repeatedly after that, so long as the
808 * map has not changed. If the map /has/ changed, you must make sure
809 * the working size is no smaller than what was allocated and re-run
810 * crush_init_workspace.
811 *
812 * If you do retain the working space between calls to crush, make it
813 * thread-local.
814 */
815void crush_init_workspace(const struct crush_map *map, void *v)
816{
817 struct crush_work *w = v;
818 __s32 b;
819
820 /*
821 * We work by moving through the available space and setting
822 * values and pointers as we go.
823 *
824 * It's a bit like Forth's use of the 'allot' word since we
825 * set the pointer first and then reserve the space for it to
826 * point to by incrementing the point.
827 */
828 v += sizeof(struct crush_work *);
829 w->work = v;
830 v += map->max_buckets * sizeof(struct crush_work_bucket *);
831 for (b = 0; b < map->max_buckets; ++b) {
832 if (!map->buckets[b])
833 continue;
834
835 w->work[b] = v;
836 switch (map->buckets[b]->alg) {
837 default:
838 v += sizeof(struct crush_work_bucket);
839 break;
840 }
841 w->work[b]->perm_x = 0;
842 w->work[b]->perm_n = 0;
843 w->work[b]->perm = v;
844 v += map->buckets[b]->size * sizeof(__u32);
845 }
846 BUG_ON(v - (void *)w != map->working_size);
847}
848
784/** 849/**
785 * crush_do_rule - calculate a mapping with the given input and rule 850 * crush_do_rule - calculate a mapping with the given input and rule
786 * @map: the crush_map 851 * @map: the crush_map
@@ -790,14 +855,16 @@ static void crush_choose_indep(const struct crush_map *map,
790 * @result_max: maximum result size 855 * @result_max: maximum result size
791 * @weight: weight vector (for map leaves) 856 * @weight: weight vector (for map leaves)
792 * @weight_max: size of weight vector 857 * @weight_max: size of weight vector
858 * @cwin: pointer to at least map->working_size bytes of memory
793 * @scratch: scratch vector for private use; must be >= 3 * result_max 859 * @scratch: scratch vector for private use; must be >= 3 * result_max
794 */ 860 */
795int crush_do_rule(const struct crush_map *map, 861int crush_do_rule(const struct crush_map *map,
796 int ruleno, int x, int *result, int result_max, 862 int ruleno, int x, int *result, int result_max,
797 const __u32 *weight, int weight_max, 863 const __u32 *weight, int weight_max,
798 int *scratch) 864 void *cwin, int *scratch)
799{ 865{
800 int result_len; 866 int result_len;
867 struct crush_work *cw = cwin;
801 int *a = scratch; 868 int *a = scratch;
802 int *b = scratch + result_max; 869 int *b = scratch + result_max;
803 int *c = scratch + result_max*2; 870 int *c = scratch + result_max*2;
@@ -807,7 +874,7 @@ int crush_do_rule(const struct crush_map *map,
807 int *o; 874 int *o;
808 int osize; 875 int osize;
809 int *tmp; 876 int *tmp;
810 struct crush_rule *rule; 877 const struct crush_rule *rule;
811 __u32 step; 878 __u32 step;
812 int i, j; 879 int i, j;
813 int numrep; 880 int numrep;
@@ -840,7 +907,7 @@ int crush_do_rule(const struct crush_map *map,
840 907
841 for (step = 0; step < rule->len; step++) { 908 for (step = 0; step < rule->len; step++) {
842 int firstn = 0; 909 int firstn = 0;
843 struct crush_rule_step *curstep = &rule->steps[step]; 910 const struct crush_rule_step *curstep = &rule->steps[step];
844 911
845 switch (curstep->op) { 912 switch (curstep->op) {
846 case CRUSH_RULE_TAKE: 913 case CRUSH_RULE_TAKE:
@@ -936,6 +1003,7 @@ int crush_do_rule(const struct crush_map *map,
936 recurse_tries = choose_tries; 1003 recurse_tries = choose_tries;
937 osize += crush_choose_firstn( 1004 osize += crush_choose_firstn(
938 map, 1005 map,
1006 cw,
939 map->buckets[bno], 1007 map->buckets[bno],
940 weight, weight_max, 1008 weight, weight_max,
941 x, numrep, 1009 x, numrep,
@@ -956,6 +1024,7 @@ int crush_do_rule(const struct crush_map *map,
956 numrep : (result_max-osize)); 1024 numrep : (result_max-osize));
957 crush_choose_indep( 1025 crush_choose_indep(
958 map, 1026 map,
1027 cw,
959 map->buckets[bno], 1028 map->buckets[bno],
960 weight, weight_max, 1029 weight, weight_max,
961 x, out_size, numrep, 1030 x, out_size, numrep,
@@ -997,5 +1066,6 @@ int crush_do_rule(const struct crush_map *map,
997 break; 1066 break;
998 } 1067 }
999 } 1068 }
1069
1000 return result_len; 1070 return result_len;
1001} 1071}
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 47df075b31e5..3892e7fa7747 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -153,6 +153,32 @@ bad:
153 return -EINVAL; 153 return -EINVAL;
154} 154}
155 155
156static void crush_finalize(struct crush_map *c)
157{
158 __s32 b;
159
160 /* Space for the array of pointers to per-bucket workspace */
161 c->working_size = sizeof(struct crush_work) +
162 c->max_buckets * sizeof(struct crush_work_bucket *);
163
164 for (b = 0; b < c->max_buckets; b++) {
165 if (!c->buckets[b])
166 continue;
167
168 switch (c->buckets[b]->alg) {
169 default:
170 /*
171 * The base case, permutation variables and
172 * the pointer to the permutation array.
173 */
174 c->working_size += sizeof(struct crush_work_bucket);
175 break;
176 }
177 /* Every bucket has a permutation array. */
178 c->working_size += c->buckets[b]->size * sizeof(__u32);
179 }
180}
181
156static struct crush_map *crush_decode(void *pbyval, void *end) 182static struct crush_map *crush_decode(void *pbyval, void *end)
157{ 183{
158 struct crush_map *c; 184 struct crush_map *c;
@@ -246,10 +272,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
246 b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS); 272 b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS);
247 if (b->items == NULL) 273 if (b->items == NULL)
248 goto badmem; 274 goto badmem;
249 b->perm = kcalloc(b->size, sizeof(u32), GFP_NOFS);
250 if (b->perm == NULL)
251 goto badmem;
252 b->perm_n = 0;
253 275
254 ceph_decode_need(p, end, b->size*sizeof(u32), bad); 276 ceph_decode_need(p, end, b->size*sizeof(u32), bad);
255 for (j = 0; j < b->size; j++) 277 for (j = 0; j < b->size; j++)
@@ -368,6 +390,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
368 dout("crush decode tunable chooseleaf_stable = %d\n", 390 dout("crush decode tunable chooseleaf_stable = %d\n",
369 c->chooseleaf_stable); 391 c->chooseleaf_stable);
370 392
393 crush_finalize(c);
394
371done: 395done:
372 dout("crush_decode success\n"); 396 dout("crush_decode success\n");
373 return c; 397 return c;
@@ -753,6 +777,7 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
753 kfree(map->osd_weight); 777 kfree(map->osd_weight);
754 kfree(map->osd_addr); 778 kfree(map->osd_addr);
755 kfree(map->osd_primary_affinity); 779 kfree(map->osd_primary_affinity);
780 kfree(map->crush_workspace);
756 kfree(map); 781 kfree(map);
757} 782}
758 783
@@ -810,12 +835,23 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
810 835
811static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush) 836static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
812{ 837{
838 void *workspace;
839
813 if (IS_ERR(crush)) 840 if (IS_ERR(crush))
814 return PTR_ERR(crush); 841 return PTR_ERR(crush);
815 842
843 workspace = kmalloc(crush->working_size, GFP_NOIO);
844 if (!workspace) {
845 crush_destroy(crush);
846 return -ENOMEM;
847 }
848 crush_init_workspace(crush, workspace);
849
816 if (map->crush) 850 if (map->crush)
817 crush_destroy(map->crush); 851 crush_destroy(map->crush);
852 kfree(map->crush_workspace);
818 map->crush = crush; 853 map->crush = crush;
854 map->crush_workspace = workspace;
819 return 0; 855 return 0;
820} 856}
821 857
@@ -1940,7 +1976,8 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
1940 1976
1941 mutex_lock(&map->crush_scratch_mutex); 1977 mutex_lock(&map->crush_scratch_mutex);
1942 r = crush_do_rule(map->crush, ruleno, x, result, result_max, 1978 r = crush_do_rule(map->crush, ruleno, x, result, result_max,
1943 weight, weight_max, map->crush_scratch_ary); 1979 weight, weight_max, map->crush_workspace,
1980 map->crush_scratch_ary);
1944 mutex_unlock(&map->crush_scratch_mutex); 1981 mutex_unlock(&map->crush_scratch_mutex);
1945 1982
1946 return r; 1983 return r;