aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/auth_none.c15
-rw-r--r--net/ceph/auth_x.c15
-rw-r--r--net/ceph/crush/crush.c39
-rw-r--r--net/ceph/crush/mapper.c124
-rw-r--r--net/ceph/messenger.c182
-rw-r--r--net/ceph/osd_client.c63
-rw-r--r--net/ceph/osdmap.c73
7 files changed, 235 insertions, 276 deletions
diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c
index 214c2bb43d62..925ca583c09c 100644
--- a/net/ceph/auth_none.c
+++ b/net/ceph/auth_none.c
@@ -59,9 +59,7 @@ static int handle_reply(struct ceph_auth_client *ac, int result,
59 */ 59 */
60static int ceph_auth_none_create_authorizer( 60static int ceph_auth_none_create_authorizer(
61 struct ceph_auth_client *ac, int peer_type, 61 struct ceph_auth_client *ac, int peer_type,
62 struct ceph_authorizer **a, 62 struct ceph_auth_handshake *auth)
63 void **buf, size_t *len,
64 void **reply_buf, size_t *reply_len)
65{ 63{
66 struct ceph_auth_none_info *ai = ac->private; 64 struct ceph_auth_none_info *ai = ac->private;
67 struct ceph_none_authorizer *au = &ai->au; 65 struct ceph_none_authorizer *au = &ai->au;
@@ -82,11 +80,12 @@ static int ceph_auth_none_create_authorizer(
82 dout("built authorizer len %d\n", au->buf_len); 80 dout("built authorizer len %d\n", au->buf_len);
83 } 81 }
84 82
85 *a = (struct ceph_authorizer *)au; 83 auth->authorizer = (struct ceph_authorizer *) au;
86 *buf = au->buf; 84 auth->authorizer_buf = au->buf;
87 *len = au->buf_len; 85 auth->authorizer_buf_len = au->buf_len;
88 *reply_buf = au->reply_buf; 86 auth->authorizer_reply_buf = au->reply_buf;
89 *reply_len = sizeof(au->reply_buf); 87 auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
88
90 return 0; 89 return 0;
91 90
92bad2: 91bad2:
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index 1587dc6010c6..a16bf14eb027 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -526,9 +526,7 @@ static int ceph_x_handle_reply(struct ceph_auth_client *ac, int result,
526 526
527static int ceph_x_create_authorizer( 527static int ceph_x_create_authorizer(
528 struct ceph_auth_client *ac, int peer_type, 528 struct ceph_auth_client *ac, int peer_type,
529 struct ceph_authorizer **a, 529 struct ceph_auth_handshake *auth)
530 void **buf, size_t *len,
531 void **reply_buf, size_t *reply_len)
532{ 530{
533 struct ceph_x_authorizer *au; 531 struct ceph_x_authorizer *au;
534 struct ceph_x_ticket_handler *th; 532 struct ceph_x_ticket_handler *th;
@@ -548,11 +546,12 @@ static int ceph_x_create_authorizer(
548 return ret; 546 return ret;
549 } 547 }
550 548
551 *a = (struct ceph_authorizer *)au; 549 auth->authorizer = (struct ceph_authorizer *) au;
552 *buf = au->buf->vec.iov_base; 550 auth->authorizer_buf = au->buf->vec.iov_base;
553 *len = au->buf->vec.iov_len; 551 auth->authorizer_buf_len = au->buf->vec.iov_len;
554 *reply_buf = au->reply_buf; 552 auth->authorizer_reply_buf = au->reply_buf;
555 *reply_len = sizeof(au->reply_buf); 553 auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
554
556 return 0; 555 return 0;
557} 556}
558 557
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index d6ebb13a18a4..089613234f03 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -26,9 +26,9 @@ const char *crush_bucket_alg_name(int alg)
26 * @b: bucket pointer 26 * @b: bucket pointer
27 * @p: item index in bucket 27 * @p: item index in bucket
28 */ 28 */
29int crush_get_bucket_item_weight(struct crush_bucket *b, int p) 29int crush_get_bucket_item_weight(const struct crush_bucket *b, int p)
30{ 30{
31 if (p >= b->size) 31 if ((__u32)p >= b->size)
32 return 0; 32 return 0;
33 33
34 switch (b->alg) { 34 switch (b->alg) {
@@ -37,38 +37,13 @@ int crush_get_bucket_item_weight(struct crush_bucket *b, int p)
37 case CRUSH_BUCKET_LIST: 37 case CRUSH_BUCKET_LIST:
38 return ((struct crush_bucket_list *)b)->item_weights[p]; 38 return ((struct crush_bucket_list *)b)->item_weights[p];
39 case CRUSH_BUCKET_TREE: 39 case CRUSH_BUCKET_TREE:
40 if (p & 1) 40 return ((struct crush_bucket_tree *)b)->node_weights[crush_calc_tree_node(p)];
41 return ((struct crush_bucket_tree *)b)->node_weights[p];
42 return 0;
43 case CRUSH_BUCKET_STRAW: 41 case CRUSH_BUCKET_STRAW:
44 return ((struct crush_bucket_straw *)b)->item_weights[p]; 42 return ((struct crush_bucket_straw *)b)->item_weights[p];
45 } 43 }
46 return 0; 44 return 0;
47} 45}
48 46
49/**
50 * crush_calc_parents - Calculate parent vectors for the given crush map.
51 * @map: crush_map pointer
52 */
53void crush_calc_parents(struct crush_map *map)
54{
55 int i, b, c;
56
57 for (b = 0; b < map->max_buckets; b++) {
58 if (map->buckets[b] == NULL)
59 continue;
60 for (i = 0; i < map->buckets[b]->size; i++) {
61 c = map->buckets[b]->items[i];
62 BUG_ON(c >= map->max_devices ||
63 c < -map->max_buckets);
64 if (c >= 0)
65 map->device_parents[c] = map->buckets[b]->id;
66 else
67 map->bucket_parents[-1-c] = map->buckets[b]->id;
68 }
69 }
70}
71
72void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b) 47void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b)
73{ 48{
74 kfree(b->h.perm); 49 kfree(b->h.perm);
@@ -87,6 +62,8 @@ void crush_destroy_bucket_list(struct crush_bucket_list *b)
87 62
88void crush_destroy_bucket_tree(struct crush_bucket_tree *b) 63void crush_destroy_bucket_tree(struct crush_bucket_tree *b)
89{ 64{
65 kfree(b->h.perm);
66 kfree(b->h.items);
90 kfree(b->node_weights); 67 kfree(b->node_weights);
91 kfree(b); 68 kfree(b);
92} 69}
@@ -124,10 +101,9 @@ void crush_destroy_bucket(struct crush_bucket *b)
124 */ 101 */
125void crush_destroy(struct crush_map *map) 102void crush_destroy(struct crush_map *map)
126{ 103{
127 int b;
128
129 /* buckets */ 104 /* buckets */
130 if (map->buckets) { 105 if (map->buckets) {
106 __s32 b;
131 for (b = 0; b < map->max_buckets; b++) { 107 for (b = 0; b < map->max_buckets; b++) {
132 if (map->buckets[b] == NULL) 108 if (map->buckets[b] == NULL)
133 continue; 109 continue;
@@ -138,13 +114,12 @@ void crush_destroy(struct crush_map *map)
138 114
139 /* rules */ 115 /* rules */
140 if (map->rules) { 116 if (map->rules) {
117 __u32 b;
141 for (b = 0; b < map->max_rules; b++) 118 for (b = 0; b < map->max_rules; b++)
142 kfree(map->rules[b]); 119 kfree(map->rules[b]);
143 kfree(map->rules); 120 kfree(map->rules);
144 } 121 }
145 122
146 kfree(map->bucket_parents);
147 kfree(map->device_parents);
148 kfree(map); 123 kfree(map);
149} 124}
150 125
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 363f8f7e6c3c..d7edc24333b8 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -33,9 +33,9 @@
33 * @type: storage ruleset type (user defined) 33 * @type: storage ruleset type (user defined)
34 * @size: output set size 34 * @size: output set size
35 */ 35 */
36int crush_find_rule(struct crush_map *map, int ruleset, int type, int size) 36int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size)
37{ 37{
38 int i; 38 __u32 i;
39 39
40 for (i = 0; i < map->max_rules; i++) { 40 for (i = 0; i < map->max_rules; i++) {
41 if (map->rules[i] && 41 if (map->rules[i] &&
@@ -73,7 +73,7 @@ static int bucket_perm_choose(struct crush_bucket *bucket,
73 unsigned int i, s; 73 unsigned int i, s;
74 74
75 /* start a new permutation if @x has changed */ 75 /* start a new permutation if @x has changed */
76 if (bucket->perm_x != x || bucket->perm_n == 0) { 76 if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) {
77 dprintk("bucket %d new x=%d\n", bucket->id, x); 77 dprintk("bucket %d new x=%d\n", bucket->id, x);
78 bucket->perm_x = x; 78 bucket->perm_x = x;
79 79
@@ -153,8 +153,8 @@ static int bucket_list_choose(struct crush_bucket_list *bucket,
153 return bucket->h.items[i]; 153 return bucket->h.items[i];
154 } 154 }
155 155
156 BUG_ON(1); 156 dprintk("bad list sums for bucket %d\n", bucket->h.id);
157 return 0; 157 return bucket->h.items[0];
158} 158}
159 159
160 160
@@ -220,7 +220,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket,
220static int bucket_straw_choose(struct crush_bucket_straw *bucket, 220static int bucket_straw_choose(struct crush_bucket_straw *bucket,
221 int x, int r) 221 int x, int r)
222{ 222{
223 int i; 223 __u32 i;
224 int high = 0; 224 int high = 0;
225 __u64 high_draw = 0; 225 __u64 high_draw = 0;
226 __u64 draw; 226 __u64 draw;
@@ -240,6 +240,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket,
240static int crush_bucket_choose(struct crush_bucket *in, int x, int r) 240static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
241{ 241{
242 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r); 242 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
243 BUG_ON(in->size == 0);
243 switch (in->alg) { 244 switch (in->alg) {
244 case CRUSH_BUCKET_UNIFORM: 245 case CRUSH_BUCKET_UNIFORM:
245 return bucket_uniform_choose((struct crush_bucket_uniform *)in, 246 return bucket_uniform_choose((struct crush_bucket_uniform *)in,
@@ -254,7 +255,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
254 return bucket_straw_choose((struct crush_bucket_straw *)in, 255 return bucket_straw_choose((struct crush_bucket_straw *)in,
255 x, r); 256 x, r);
256 default: 257 default:
257 BUG_ON(1); 258 dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
258 return in->items[0]; 259 return in->items[0];
259 } 260 }
260} 261}
@@ -263,7 +264,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
263 * true if device is marked "out" (failed, fully offloaded) 264 * true if device is marked "out" (failed, fully offloaded)
264 * of the cluster 265 * of the cluster
265 */ 266 */
266static int is_out(struct crush_map *map, __u32 *weight, int item, int x) 267static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x)
267{ 268{
268 if (weight[item] >= 0x10000) 269 if (weight[item] >= 0x10000)
269 return 0; 270 return 0;
@@ -288,16 +289,16 @@ static int is_out(struct crush_map *map, __u32 *weight, int item, int x)
288 * @recurse_to_leaf: true if we want one device under each item of given type 289 * @recurse_to_leaf: true if we want one device under each item of given type
289 * @out2: second output vector for leaf items (if @recurse_to_leaf) 290 * @out2: second output vector for leaf items (if @recurse_to_leaf)
290 */ 291 */
291static int crush_choose(struct crush_map *map, 292static int crush_choose(const struct crush_map *map,
292 struct crush_bucket *bucket, 293 struct crush_bucket *bucket,
293 __u32 *weight, 294 const __u32 *weight,
294 int x, int numrep, int type, 295 int x, int numrep, int type,
295 int *out, int outpos, 296 int *out, int outpos,
296 int firstn, int recurse_to_leaf, 297 int firstn, int recurse_to_leaf,
297 int *out2) 298 int *out2)
298{ 299{
299 int rep; 300 int rep;
300 int ftotal, flocal; 301 unsigned int ftotal, flocal;
301 int retry_descent, retry_bucket, skip_rep; 302 int retry_descent, retry_bucket, skip_rep;
302 struct crush_bucket *in = bucket; 303 struct crush_bucket *in = bucket;
303 int r; 304 int r;
@@ -305,7 +306,7 @@ static int crush_choose(struct crush_map *map,
305 int item = 0; 306 int item = 0;
306 int itemtype; 307 int itemtype;
307 int collide, reject; 308 int collide, reject;
308 const int orig_tries = 5; /* attempts before we fall back to search */ 309 const unsigned int orig_tries = 5; /* attempts before we fall back to search */
309 310
310 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", 311 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
311 bucket->id, x, outpos, numrep); 312 bucket->id, x, outpos, numrep);
@@ -326,7 +327,7 @@ static int crush_choose(struct crush_map *map,
326 r = rep; 327 r = rep;
327 if (in->alg == CRUSH_BUCKET_UNIFORM) { 328 if (in->alg == CRUSH_BUCKET_UNIFORM) {
328 /* be careful */ 329 /* be careful */
329 if (firstn || numrep >= in->size) 330 if (firstn || (__u32)numrep >= in->size)
330 /* r' = r + f_total */ 331 /* r' = r + f_total */
331 r += ftotal; 332 r += ftotal;
332 else if (in->size % numrep == 0) 333 else if (in->size % numrep == 0)
@@ -355,7 +356,11 @@ static int crush_choose(struct crush_map *map,
355 item = bucket_perm_choose(in, x, r); 356 item = bucket_perm_choose(in, x, r);
356 else 357 else
357 item = crush_bucket_choose(in, x, r); 358 item = crush_bucket_choose(in, x, r);
358 BUG_ON(item >= map->max_devices); 359 if (item >= map->max_devices) {
360 dprintk(" bad item %d\n", item);
361 skip_rep = 1;
362 break;
363 }
359 364
360 /* desired type? */ 365 /* desired type? */
361 if (item < 0) 366 if (item < 0)
@@ -366,8 +371,12 @@ static int crush_choose(struct crush_map *map,
366 371
367 /* keep going? */ 372 /* keep going? */
368 if (itemtype != type) { 373 if (itemtype != type) {
369 BUG_ON(item >= 0 || 374 if (item >= 0 ||
370 (-1-item) >= map->max_buckets); 375 (-1-item) >= map->max_buckets) {
376 dprintk(" bad item type %d\n", type);
377 skip_rep = 1;
378 break;
379 }
371 in = map->buckets[-1-item]; 380 in = map->buckets[-1-item];
372 retry_bucket = 1; 381 retry_bucket = 1;
373 continue; 382 continue;
@@ -416,7 +425,7 @@ reject:
416 if (collide && flocal < 3) 425 if (collide && flocal < 3)
417 /* retry locally a few times */ 426 /* retry locally a few times */
418 retry_bucket = 1; 427 retry_bucket = 1;
419 else if (flocal < in->size + orig_tries) 428 else if (flocal <= in->size + orig_tries)
420 /* exhaustive bucket search */ 429 /* exhaustive bucket search */
421 retry_bucket = 1; 430 retry_bucket = 1;
422 else if (ftotal < 20) 431 else if (ftotal < 20)
@@ -426,7 +435,7 @@ reject:
426 /* else give up */ 435 /* else give up */
427 skip_rep = 1; 436 skip_rep = 1;
428 dprintk(" reject %d collide %d " 437 dprintk(" reject %d collide %d "
429 "ftotal %d flocal %d\n", 438 "ftotal %u flocal %u\n",
430 reject, collide, ftotal, 439 reject, collide, ftotal,
431 flocal); 440 flocal);
432 } 441 }
@@ -455,15 +464,12 @@ reject:
455 * @x: hash input 464 * @x: hash input
456 * @result: pointer to result vector 465 * @result: pointer to result vector
457 * @result_max: maximum result size 466 * @result_max: maximum result size
458 * @force: force initial replica choice; -1 for none
459 */ 467 */
460int crush_do_rule(struct crush_map *map, 468int crush_do_rule(const struct crush_map *map,
461 int ruleno, int x, int *result, int result_max, 469 int ruleno, int x, int *result, int result_max,
462 int force, __u32 *weight) 470 const __u32 *weight)
463{ 471{
464 int result_len; 472 int result_len;
465 int force_context[CRUSH_MAX_DEPTH];
466 int force_pos = -1;
467 int a[CRUSH_MAX_SET]; 473 int a[CRUSH_MAX_SET];
468 int b[CRUSH_MAX_SET]; 474 int b[CRUSH_MAX_SET];
469 int c[CRUSH_MAX_SET]; 475 int c[CRUSH_MAX_SET];
@@ -474,66 +480,44 @@ int crush_do_rule(struct crush_map *map,
474 int osize; 480 int osize;
475 int *tmp; 481 int *tmp;
476 struct crush_rule *rule; 482 struct crush_rule *rule;
477 int step; 483 __u32 step;
478 int i, j; 484 int i, j;
479 int numrep; 485 int numrep;
480 int firstn; 486 int firstn;
481 487
482 BUG_ON(ruleno >= map->max_rules); 488 if ((__u32)ruleno >= map->max_rules) {
489 dprintk(" bad ruleno %d\n", ruleno);
490 return 0;
491 }
483 492
484 rule = map->rules[ruleno]; 493 rule = map->rules[ruleno];
485 result_len = 0; 494 result_len = 0;
486 w = a; 495 w = a;
487 o = b; 496 o = b;
488 497
489 /*
490 * determine hierarchical context of force, if any. note
491 * that this may or may not correspond to the specific types
492 * referenced by the crush rule.
493 */
494 if (force >= 0 &&
495 force < map->max_devices &&
496 map->device_parents[force] != 0 &&
497 !is_out(map, weight, force, x)) {
498 while (1) {
499 force_context[++force_pos] = force;
500 if (force >= 0)
501 force = map->device_parents[force];
502 else
503 force = map->bucket_parents[-1-force];
504 if (force == 0)
505 break;
506 }
507 }
508
509 for (step = 0; step < rule->len; step++) { 498 for (step = 0; step < rule->len; step++) {
499 struct crush_rule_step *curstep = &rule->steps[step];
500
510 firstn = 0; 501 firstn = 0;
511 switch (rule->steps[step].op) { 502 switch (curstep->op) {
512 case CRUSH_RULE_TAKE: 503 case CRUSH_RULE_TAKE:
513 w[0] = rule->steps[step].arg1; 504 w[0] = curstep->arg1;
514
515 /* find position in force_context/hierarchy */
516 while (force_pos >= 0 &&
517 force_context[force_pos] != w[0])
518 force_pos--;
519 /* and move past it */
520 if (force_pos >= 0)
521 force_pos--;
522
523 wsize = 1; 505 wsize = 1;
524 break; 506 break;
525 507
526 case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: 508 case CRUSH_RULE_CHOOSE_LEAF_FIRSTN:
527 case CRUSH_RULE_CHOOSE_FIRSTN: 509 case CRUSH_RULE_CHOOSE_FIRSTN:
528 firstn = 1; 510 firstn = 1;
511 /* fall through */
529 case CRUSH_RULE_CHOOSE_LEAF_INDEP: 512 case CRUSH_RULE_CHOOSE_LEAF_INDEP:
530 case CRUSH_RULE_CHOOSE_INDEP: 513 case CRUSH_RULE_CHOOSE_INDEP:
531 BUG_ON(wsize == 0); 514 if (wsize == 0)
515 break;
532 516
533 recurse_to_leaf = 517 recurse_to_leaf =
534 rule->steps[step].op == 518 curstep->op ==
535 CRUSH_RULE_CHOOSE_LEAF_FIRSTN || 519 CRUSH_RULE_CHOOSE_LEAF_FIRSTN ||
536 rule->steps[step].op == 520 curstep->op ==
537 CRUSH_RULE_CHOOSE_LEAF_INDEP; 521 CRUSH_RULE_CHOOSE_LEAF_INDEP;
538 522
539 /* reset output */ 523 /* reset output */
@@ -545,32 +529,18 @@ int crush_do_rule(struct crush_map *map,
545 * basically, numrep <= 0 means relative to 529 * basically, numrep <= 0 means relative to
546 * the provided result_max 530 * the provided result_max
547 */ 531 */
548 numrep = rule->steps[step].arg1; 532 numrep = curstep->arg1;
549 if (numrep <= 0) { 533 if (numrep <= 0) {
550 numrep += result_max; 534 numrep += result_max;
551 if (numrep <= 0) 535 if (numrep <= 0)
552 continue; 536 continue;
553 } 537 }
554 j = 0; 538 j = 0;
555 if (osize == 0 && force_pos >= 0) {
556 /* skip any intermediate types */
557 while (force_pos &&
558 force_context[force_pos] < 0 &&
559 rule->steps[step].arg2 !=
560 map->buckets[-1 -
561 force_context[force_pos]]->type)
562 force_pos--;
563 o[osize] = force_context[force_pos];
564 if (recurse_to_leaf)
565 c[osize] = force_context[0];
566 j++;
567 force_pos--;
568 }
569 osize += crush_choose(map, 539 osize += crush_choose(map,
570 map->buckets[-1-w[i]], 540 map->buckets[-1-w[i]],
571 weight, 541 weight,
572 x, numrep, 542 x, numrep,
573 rule->steps[step].arg2, 543 curstep->arg2,
574 o+osize, j, 544 o+osize, j,
575 firstn, 545 firstn,
576 recurse_to_leaf, c+osize); 546 recurse_to_leaf, c+osize);
@@ -597,7 +567,9 @@ int crush_do_rule(struct crush_map *map,
597 break; 567 break;
598 568
599 default: 569 default:
600 BUG_ON(1); 570 dprintk(" unknown op %d at step %d\n",
571 curstep->op, step);
572 break;
601 } 573 }
602 } 574 }
603 return result_len; 575 return result_len;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 36fa6bf68498..524f4e4f598b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -653,54 +653,57 @@ static void prepare_write_keepalive(struct ceph_connection *con)
653 * Connection negotiation. 653 * Connection negotiation.
654 */ 654 */
655 655
656static int prepare_connect_authorizer(struct ceph_connection *con) 656static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con,
657 int *auth_proto)
657{ 658{
658 void *auth_buf; 659 struct ceph_auth_handshake *auth;
659 int auth_len = 0; 660
660 int auth_protocol = 0; 661 if (!con->ops->get_authorizer) {
662 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
663 con->out_connect.authorizer_len = 0;
664
665 return NULL;
666 }
667
668 /* Can't hold the mutex while getting authorizer */
661 669
662 mutex_unlock(&con->mutex); 670 mutex_unlock(&con->mutex);
663 if (con->ops->get_authorizer) 671
664 con->ops->get_authorizer(con, &auth_buf, &auth_len, 672 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
665 &auth_protocol, &con->auth_reply_buf, 673
666 &con->auth_reply_buf_len,
667 con->auth_retry);
668 mutex_lock(&con->mutex); 674 mutex_lock(&con->mutex);
669 675
670 if (test_bit(CLOSED, &con->state) || 676 if (IS_ERR(auth))
671 test_bit(OPENING, &con->state)) 677 return auth;
672 return -EAGAIN; 678 if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
679 return ERR_PTR(-EAGAIN);
673 680
674 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 681 con->auth_reply_buf = auth->authorizer_reply_buf;
675 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 682 con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
676 683
677 if (auth_len)
678 ceph_con_out_kvec_add(con, auth_len, auth_buf);
679 684
680 return 0; 685 return auth;
681} 686}
682 687
683/* 688/*
684 * We connected to a peer and are saying hello. 689 * We connected to a peer and are saying hello.
685 */ 690 */
686static void prepare_write_banner(struct ceph_messenger *msgr, 691static void prepare_write_banner(struct ceph_connection *con)
687 struct ceph_connection *con)
688{ 692{
689 ceph_con_out_kvec_reset(con);
690 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 693 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
691 ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr), 694 ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
692 &msgr->my_enc_addr); 695 &con->msgr->my_enc_addr);
693 696
694 con->out_more = 0; 697 con->out_more = 0;
695 set_bit(WRITE_PENDING, &con->state); 698 set_bit(WRITE_PENDING, &con->state);
696} 699}
697 700
698static int prepare_write_connect(struct ceph_messenger *msgr, 701static int prepare_write_connect(struct ceph_connection *con)
699 struct ceph_connection *con,
700 int include_banner)
701{ 702{
702 unsigned int global_seq = get_global_seq(con->msgr, 0); 703 unsigned int global_seq = get_global_seq(con->msgr, 0);
703 int proto; 704 int proto;
705 int auth_proto;
706 struct ceph_auth_handshake *auth;
704 707
705 switch (con->peer_name.type) { 708 switch (con->peer_name.type) {
706 case CEPH_ENTITY_TYPE_MON: 709 case CEPH_ENTITY_TYPE_MON:
@@ -719,23 +722,32 @@ static int prepare_write_connect(struct ceph_messenger *msgr,
719 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 722 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
720 con->connect_seq, global_seq, proto); 723 con->connect_seq, global_seq, proto);
721 724
722 con->out_connect.features = cpu_to_le64(msgr->supported_features); 725 con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
723 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 726 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
724 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 727 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
725 con->out_connect.global_seq = cpu_to_le32(global_seq); 728 con->out_connect.global_seq = cpu_to_le32(global_seq);
726 con->out_connect.protocol_version = cpu_to_le32(proto); 729 con->out_connect.protocol_version = cpu_to_le32(proto);
727 con->out_connect.flags = 0; 730 con->out_connect.flags = 0;
728 731
729 if (include_banner) 732 auth_proto = CEPH_AUTH_UNKNOWN;
730 prepare_write_banner(msgr, con); 733 auth = get_connect_authorizer(con, &auth_proto);
731 else 734 if (IS_ERR(auth))
732 ceph_con_out_kvec_reset(con); 735 return PTR_ERR(auth);
733 ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); 736
737 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
738 con->out_connect.authorizer_len = auth ?
739 cpu_to_le32(auth->authorizer_buf_len) : 0;
740
741 ceph_con_out_kvec_add(con, sizeof (con->out_connect),
742 &con->out_connect);
743 if (auth && auth->authorizer_buf_len)
744 ceph_con_out_kvec_add(con, auth->authorizer_buf_len,
745 auth->authorizer_buf);
734 746
735 con->out_more = 0; 747 con->out_more = 0;
736 set_bit(WRITE_PENDING, &con->state); 748 set_bit(WRITE_PENDING, &con->state);
737 749
738 return prepare_connect_authorizer(con); 750 return 0;
739} 751}
740 752
741/* 753/*
@@ -992,11 +1004,10 @@ static int prepare_read_message(struct ceph_connection *con)
992 1004
993 1005
994static int read_partial(struct ceph_connection *con, 1006static int read_partial(struct ceph_connection *con,
995 int *to, int size, void *object) 1007 int end, int size, void *object)
996{ 1008{
997 *to += size; 1009 while (con->in_base_pos < end) {
998 while (con->in_base_pos < *to) { 1010 int left = end - con->in_base_pos;
999 int left = *to - con->in_base_pos;
1000 int have = size - left; 1011 int have = size - left;
1001 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 1012 int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
1002 if (ret <= 0) 1013 if (ret <= 0)
@@ -1012,37 +1023,52 @@ static int read_partial(struct ceph_connection *con,
1012 */ 1023 */
1013static int read_partial_banner(struct ceph_connection *con) 1024static int read_partial_banner(struct ceph_connection *con)
1014{ 1025{
1015 int ret, to = 0; 1026 int size;
1027 int end;
1028 int ret;
1016 1029
1017 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 1030 dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
1018 1031
1019 /* peer's banner */ 1032 /* peer's banner */
1020 ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); 1033 size = strlen(CEPH_BANNER);
1034 end = size;
1035 ret = read_partial(con, end, size, con->in_banner);
1021 if (ret <= 0) 1036 if (ret <= 0)
1022 goto out; 1037 goto out;
1023 ret = read_partial(con, &to, sizeof(con->actual_peer_addr), 1038
1024 &con->actual_peer_addr); 1039 size = sizeof (con->actual_peer_addr);
1040 end += size;
1041 ret = read_partial(con, end, size, &con->actual_peer_addr);
1025 if (ret <= 0) 1042 if (ret <= 0)
1026 goto out; 1043 goto out;
1027 ret = read_partial(con, &to, sizeof(con->peer_addr_for_me), 1044
1028 &con->peer_addr_for_me); 1045 size = sizeof (con->peer_addr_for_me);
1046 end += size;
1047 ret = read_partial(con, end, size, &con->peer_addr_for_me);
1029 if (ret <= 0) 1048 if (ret <= 0)
1030 goto out; 1049 goto out;
1050
1031out: 1051out:
1032 return ret; 1052 return ret;
1033} 1053}
1034 1054
1035static int read_partial_connect(struct ceph_connection *con) 1055static int read_partial_connect(struct ceph_connection *con)
1036{ 1056{
1037 int ret, to = 0; 1057 int size;
1058 int end;
1059 int ret;
1038 1060
1039 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1061 dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
1040 1062
1041 ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); 1063 size = sizeof (con->in_reply);
1064 end = size;
1065 ret = read_partial(con, end, size, &con->in_reply);
1042 if (ret <= 0) 1066 if (ret <= 0)
1043 goto out; 1067 goto out;
1044 ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len), 1068
1045 con->auth_reply_buf); 1069 size = le32_to_cpu(con->in_reply.authorizer_len);
1070 end += size;
1071 ret = read_partial(con, end, size, con->auth_reply_buf);
1046 if (ret <= 0) 1072 if (ret <= 0)
1047 goto out; 1073 goto out;
1048 1074
@@ -1377,7 +1403,8 @@ static int process_connect(struct ceph_connection *con)
1377 return -1; 1403 return -1;
1378 } 1404 }
1379 con->auth_retry = 1; 1405 con->auth_retry = 1;
1380 ret = prepare_write_connect(con->msgr, con, 0); 1406 ceph_con_out_kvec_reset(con);
1407 ret = prepare_write_connect(con);
1381 if (ret < 0) 1408 if (ret < 0)
1382 return ret; 1409 return ret;
1383 prepare_read_connect(con); 1410 prepare_read_connect(con);
@@ -1397,7 +1424,10 @@ static int process_connect(struct ceph_connection *con)
1397 ENTITY_NAME(con->peer_name), 1424 ENTITY_NAME(con->peer_name),
1398 ceph_pr_addr(&con->peer_addr.in_addr)); 1425 ceph_pr_addr(&con->peer_addr.in_addr));
1399 reset_connection(con); 1426 reset_connection(con);
1400 prepare_write_connect(con->msgr, con, 0); 1427 ceph_con_out_kvec_reset(con);
1428 ret = prepare_write_connect(con);
1429 if (ret < 0)
1430 return ret;
1401 prepare_read_connect(con); 1431 prepare_read_connect(con);
1402 1432
1403 /* Tell ceph about it. */ 1433 /* Tell ceph about it. */
@@ -1420,7 +1450,10 @@ static int process_connect(struct ceph_connection *con)
1420 le32_to_cpu(con->out_connect.connect_seq), 1450 le32_to_cpu(con->out_connect.connect_seq),
1421 le32_to_cpu(con->in_connect.connect_seq)); 1451 le32_to_cpu(con->in_connect.connect_seq));
1422 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1452 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
1423 prepare_write_connect(con->msgr, con, 0); 1453 ceph_con_out_kvec_reset(con);
1454 ret = prepare_write_connect(con);
1455 if (ret < 0)
1456 return ret;
1424 prepare_read_connect(con); 1457 prepare_read_connect(con);
1425 break; 1458 break;
1426 1459
@@ -1434,7 +1467,10 @@ static int process_connect(struct ceph_connection *con)
1434 le32_to_cpu(con->in_connect.global_seq)); 1467 le32_to_cpu(con->in_connect.global_seq));
1435 get_global_seq(con->msgr, 1468 get_global_seq(con->msgr,
1436 le32_to_cpu(con->in_connect.global_seq)); 1469 le32_to_cpu(con->in_connect.global_seq));
1437 prepare_write_connect(con->msgr, con, 0); 1470 ceph_con_out_kvec_reset(con);
1471 ret = prepare_write_connect(con);
1472 if (ret < 0)
1473 return ret;
1438 prepare_read_connect(con); 1474 prepare_read_connect(con);
1439 break; 1475 break;
1440 1476
@@ -1491,10 +1527,10 @@ static int process_connect(struct ceph_connection *con)
1491 */ 1527 */
1492static int read_partial_ack(struct ceph_connection *con) 1528static int read_partial_ack(struct ceph_connection *con)
1493{ 1529{
1494 int to = 0; 1530 int size = sizeof (con->in_temp_ack);
1531 int end = size;
1495 1532
1496 return read_partial(con, &to, sizeof(con->in_temp_ack), 1533 return read_partial(con, end, size, &con->in_temp_ack);
1497 &con->in_temp_ack);
1498} 1534}
1499 1535
1500 1536
@@ -1627,8 +1663,9 @@ static int read_partial_message_bio(struct ceph_connection *con,
1627static int read_partial_message(struct ceph_connection *con) 1663static int read_partial_message(struct ceph_connection *con)
1628{ 1664{
1629 struct ceph_msg *m = con->in_msg; 1665 struct ceph_msg *m = con->in_msg;
1666 int size;
1667 int end;
1630 int ret; 1668 int ret;
1631 int to, left;
1632 unsigned int front_len, middle_len, data_len; 1669 unsigned int front_len, middle_len, data_len;
1633 bool do_datacrc = !con->msgr->nocrc; 1670 bool do_datacrc = !con->msgr->nocrc;
1634 int skip; 1671 int skip;
@@ -1638,15 +1675,11 @@ static int read_partial_message(struct ceph_connection *con)
1638 dout("read_partial_message con %p msg %p\n", con, m); 1675 dout("read_partial_message con %p msg %p\n", con, m);
1639 1676
1640 /* header */ 1677 /* header */
1641 while (con->in_base_pos < sizeof(con->in_hdr)) { 1678 size = sizeof (con->in_hdr);
1642 left = sizeof(con->in_hdr) - con->in_base_pos; 1679 end = size;
1643 ret = ceph_tcp_recvmsg(con->sock, 1680 ret = read_partial(con, end, size, &con->in_hdr);
1644 (char *)&con->in_hdr + con->in_base_pos, 1681 if (ret <= 0)
1645 left); 1682 return ret;
1646 if (ret <= 0)
1647 return ret;
1648 con->in_base_pos += ret;
1649 }
1650 1683
1651 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); 1684 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
1652 if (cpu_to_le32(crc) != con->in_hdr.crc) { 1685 if (cpu_to_le32(crc) != con->in_hdr.crc) {
@@ -1759,16 +1792,12 @@ static int read_partial_message(struct ceph_connection *con)
1759 } 1792 }
1760 1793
1761 /* footer */ 1794 /* footer */
1762 to = sizeof(m->hdr) + sizeof(m->footer); 1795 size = sizeof (m->footer);
1763 while (con->in_base_pos < to) { 1796 end += size;
1764 left = to - con->in_base_pos; 1797 ret = read_partial(con, end, size, &m->footer);
1765 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + 1798 if (ret <= 0)
1766 (con->in_base_pos - sizeof(m->hdr)), 1799 return ret;
1767 left); 1800
1768 if (ret <= 0)
1769 return ret;
1770 con->in_base_pos += ret;
1771 }
1772 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1801 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1773 m, front_len, m->footer.front_crc, middle_len, 1802 m, front_len, m->footer.front_crc, middle_len,
1774 m->footer.middle_crc, data_len, m->footer.data_crc); 1803 m->footer.middle_crc, data_len, m->footer.data_crc);
@@ -1835,7 +1864,6 @@ static void process_message(struct ceph_connection *con)
1835 */ 1864 */
1836static int try_write(struct ceph_connection *con) 1865static int try_write(struct ceph_connection *con)
1837{ 1866{
1838 struct ceph_messenger *msgr = con->msgr;
1839 int ret = 1; 1867 int ret = 1;
1840 1868
1841 dout("try_write start %p state %lu nref %d\n", con, con->state, 1869 dout("try_write start %p state %lu nref %d\n", con, con->state,
@@ -1846,7 +1874,11 @@ more:
1846 1874
1847 /* open the socket first? */ 1875 /* open the socket first? */
1848 if (con->sock == NULL) { 1876 if (con->sock == NULL) {
1849 prepare_write_connect(msgr, con, 1); 1877 ceph_con_out_kvec_reset(con);
1878 prepare_write_banner(con);
1879 ret = prepare_write_connect(con);
1880 if (ret < 0)
1881 goto out;
1850 prepare_read_banner(con); 1882 prepare_read_banner(con);
1851 set_bit(CONNECTING, &con->state); 1883 set_bit(CONNECTING, &con->state);
1852 clear_bit(NEGOTIATING, &con->state); 1884 clear_bit(NEGOTIATING, &con->state);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 1b0ef3c4d393..1ffebed5ce0f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -278,7 +278,7 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
278{ 278{
279 dst->op = cpu_to_le16(src->op); 279 dst->op = cpu_to_le16(src->op);
280 280
281 switch (dst->op) { 281 switch (src->op) {
282 case CEPH_OSD_OP_READ: 282 case CEPH_OSD_OP_READ:
283 case CEPH_OSD_OP_WRITE: 283 case CEPH_OSD_OP_WRITE:
284 dst->extent.offset = 284 dst->extent.offset =
@@ -664,11 +664,11 @@ static void put_osd(struct ceph_osd *osd)
664{ 664{
665 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 665 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
666 atomic_read(&osd->o_ref) - 1); 666 atomic_read(&osd->o_ref) - 1);
667 if (atomic_dec_and_test(&osd->o_ref)) { 667 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
668 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 668 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
669 669
670 if (osd->o_authorizer) 670 if (ac->ops && ac->ops->destroy_authorizer)
671 ac->ops->destroy_authorizer(ac, osd->o_authorizer); 671 ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
672 kfree(osd); 672 kfree(osd);
673 } 673 }
674} 674}
@@ -841,6 +841,12 @@ static void register_request(struct ceph_osd_client *osdc,
841static void __unregister_request(struct ceph_osd_client *osdc, 841static void __unregister_request(struct ceph_osd_client *osdc,
842 struct ceph_osd_request *req) 842 struct ceph_osd_request *req)
843{ 843{
844 if (RB_EMPTY_NODE(&req->r_node)) {
845 dout("__unregister_request %p tid %lld not registered\n",
846 req, req->r_tid);
847 return;
848 }
849
844 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 850 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
845 rb_erase(&req->r_node, &osdc->requests); 851 rb_erase(&req->r_node, &osdc->requests);
846 osdc->num_requests--; 852 osdc->num_requests--;
@@ -2108,37 +2114,32 @@ static void put_osd_con(struct ceph_connection *con)
2108/* 2114/*
2109 * authentication 2115 * authentication
2110 */ 2116 */
2111static int get_authorizer(struct ceph_connection *con, 2117/*
2112 void **buf, int *len, int *proto, 2118 * Note: returned pointer is the address of a structure that's
2113 void **reply_buf, int *reply_len, int force_new) 2119 * managed separately. Caller must *not* attempt to free it.
2120 */
2121static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2122 int *proto, int force_new)
2114{ 2123{
2115 struct ceph_osd *o = con->private; 2124 struct ceph_osd *o = con->private;
2116 struct ceph_osd_client *osdc = o->o_osdc; 2125 struct ceph_osd_client *osdc = o->o_osdc;
2117 struct ceph_auth_client *ac = osdc->client->monc.auth; 2126 struct ceph_auth_client *ac = osdc->client->monc.auth;
2118 int ret = 0; 2127 struct ceph_auth_handshake *auth = &o->o_auth;
2119 2128
2120 if (force_new && o->o_authorizer) { 2129 if (force_new && auth->authorizer) {
2121 ac->ops->destroy_authorizer(ac, o->o_authorizer); 2130 if (ac->ops && ac->ops->destroy_authorizer)
2122 o->o_authorizer = NULL; 2131 ac->ops->destroy_authorizer(ac, auth->authorizer);
2123 } 2132 auth->authorizer = NULL;
2124 if (o->o_authorizer == NULL) { 2133 }
2125 ret = ac->ops->create_authorizer( 2134 if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
2126 ac, CEPH_ENTITY_TYPE_OSD, 2135 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2127 &o->o_authorizer, 2136 auth);
2128 &o->o_authorizer_buf,
2129 &o->o_authorizer_buf_len,
2130 &o->o_authorizer_reply_buf,
2131 &o->o_authorizer_reply_buf_len);
2132 if (ret) 2137 if (ret)
2133 return ret; 2138 return ERR_PTR(ret);
2134 } 2139 }
2135
2136 *proto = ac->protocol; 2140 *proto = ac->protocol;
2137 *buf = o->o_authorizer_buf; 2141
2138 *len = o->o_authorizer_buf_len; 2142 return auth;
2139 *reply_buf = o->o_authorizer_reply_buf;
2140 *reply_len = o->o_authorizer_reply_buf_len;
2141 return 0;
2142} 2143}
2143 2144
2144 2145
@@ -2148,7 +2149,11 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
2148 struct ceph_osd_client *osdc = o->o_osdc; 2149 struct ceph_osd_client *osdc = o->o_osdc;
2149 struct ceph_auth_client *ac = osdc->client->monc.auth; 2150 struct ceph_auth_client *ac = osdc->client->monc.auth;
2150 2151
2151 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len); 2152 /*
2153 * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2154 * XXX which do we do: succeed or fail?
2155 */
2156 return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2152} 2157}
2153 2158
2154static int invalidate_authorizer(struct ceph_connection *con) 2159static int invalidate_authorizer(struct ceph_connection *con)
@@ -2157,7 +2162,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
2157 struct ceph_osd_client *osdc = o->o_osdc; 2162 struct ceph_osd_client *osdc = o->o_osdc;
2158 struct ceph_auth_client *ac = osdc->client->monc.auth; 2163 struct ceph_auth_client *ac = osdc->client->monc.auth;
2159 2164
2160 if (ac->ops->invalidate_authorizer) 2165 if (ac->ops && ac->ops->invalidate_authorizer)
2161 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2166 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2162 2167
2163 return ceph_monc_validate_auth(&osdc->client->monc); 2168 return ceph_monc_validate_auth(&osdc->client->monc);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 56e561a69004..81e3b84a77ef 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -161,13 +161,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
161 c->max_rules = ceph_decode_32(p); 161 c->max_rules = ceph_decode_32(p);
162 c->max_devices = ceph_decode_32(p); 162 c->max_devices = ceph_decode_32(p);
163 163
164 c->device_parents = kcalloc(c->max_devices, sizeof(u32), GFP_NOFS);
165 if (c->device_parents == NULL)
166 goto badmem;
167 c->bucket_parents = kcalloc(c->max_buckets, sizeof(u32), GFP_NOFS);
168 if (c->bucket_parents == NULL)
169 goto badmem;
170
171 c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS); 164 c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS);
172 if (c->buckets == NULL) 165 if (c->buckets == NULL)
173 goto badmem; 166 goto badmem;
@@ -890,8 +883,12 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
890 pglen = ceph_decode_32(p); 883 pglen = ceph_decode_32(p);
891 884
892 if (pglen) { 885 if (pglen) {
893 /* insert */
894 ceph_decode_need(p, end, pglen*sizeof(u32), bad); 886 ceph_decode_need(p, end, pglen*sizeof(u32), bad);
887
888 /* removing existing (if any) */
889 (void) __remove_pg_mapping(&map->pg_temp, pgid);
890
891 /* insert */
895 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); 892 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
896 if (!pg) { 893 if (!pg) {
897 err = -ENOMEM; 894 err = -ENOMEM;
@@ -1000,7 +997,6 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol,
1000{ 997{
1001 unsigned int num, num_mask; 998 unsigned int num, num_mask;
1002 struct ceph_pg pgid; 999 struct ceph_pg pgid;
1003 s32 preferred = (s32)le32_to_cpu(fl->fl_pg_preferred);
1004 int poolid = le32_to_cpu(fl->fl_pg_pool); 1000 int poolid = le32_to_cpu(fl->fl_pg_pool);
1005 struct ceph_pg_pool_info *pool; 1001 struct ceph_pg_pool_info *pool;
1006 unsigned int ps; 1002 unsigned int ps;
@@ -1011,23 +1007,13 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol,
1011 if (!pool) 1007 if (!pool)
1012 return -EIO; 1008 return -EIO;
1013 ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); 1009 ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid));
1014 if (preferred >= 0) { 1010 num = le32_to_cpu(pool->v.pg_num);
1015 ps += preferred; 1011 num_mask = pool->pg_num_mask;
1016 num = le32_to_cpu(pool->v.lpg_num);
1017 num_mask = pool->lpg_num_mask;
1018 } else {
1019 num = le32_to_cpu(pool->v.pg_num);
1020 num_mask = pool->pg_num_mask;
1021 }
1022 1012
1023 pgid.ps = cpu_to_le16(ps); 1013 pgid.ps = cpu_to_le16(ps);
1024 pgid.preferred = cpu_to_le16(preferred); 1014 pgid.preferred = cpu_to_le16(-1);
1025 pgid.pool = fl->fl_pg_pool; 1015 pgid.pool = fl->fl_pg_pool;
1026 if (preferred >= 0) 1016 dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps);
1027 dout("calc_object_layout '%s' pgid %d.%xp%d\n", oid, poolid, ps,
1028 (int)preferred);
1029 else
1030 dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps);
1031 1017
1032 ol->ol_pgid = pgid; 1018 ol->ol_pgid = pgid;
1033 ol->ol_stripe_unit = fl->fl_object_stripe_unit; 1019 ol->ol_stripe_unit = fl->fl_object_stripe_unit;
@@ -1045,24 +1031,18 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1045 struct ceph_pg_mapping *pg; 1031 struct ceph_pg_mapping *pg;
1046 struct ceph_pg_pool_info *pool; 1032 struct ceph_pg_pool_info *pool;
1047 int ruleno; 1033 int ruleno;
1048 unsigned int poolid, ps, pps, t; 1034 unsigned int poolid, ps, pps, t, r;
1049 int preferred;
1050 1035
1051 poolid = le32_to_cpu(pgid.pool); 1036 poolid = le32_to_cpu(pgid.pool);
1052 ps = le16_to_cpu(pgid.ps); 1037 ps = le16_to_cpu(pgid.ps);
1053 preferred = (s16)le16_to_cpu(pgid.preferred);
1054 1038
1055 pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); 1039 pool = __lookup_pg_pool(&osdmap->pg_pools, poolid);
1056 if (!pool) 1040 if (!pool)
1057 return NULL; 1041 return NULL;
1058 1042
1059 /* pg_temp? */ 1043 /* pg_temp? */
1060 if (preferred >= 0) 1044 t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num),
1061 t = ceph_stable_mod(ps, le32_to_cpu(pool->v.lpg_num), 1045 pool->pgp_num_mask);
1062 pool->lpgp_num_mask);
1063 else
1064 t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num),
1065 pool->pgp_num_mask);
1066 pgid.ps = cpu_to_le16(t); 1046 pgid.ps = cpu_to_le16(t);
1067 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); 1047 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
1068 if (pg) { 1048 if (pg) {
@@ -1080,23 +1060,20 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1080 return NULL; 1060 return NULL;
1081 } 1061 }
1082 1062
1083 /* don't forcefeed bad device ids to crush */ 1063 pps = ceph_stable_mod(ps,
1084 if (preferred >= osdmap->max_osd || 1064 le32_to_cpu(pool->v.pgp_num),
1085 preferred >= osdmap->crush->max_devices) 1065 pool->pgp_num_mask);
1086 preferred = -1;
1087
1088 if (preferred >= 0)
1089 pps = ceph_stable_mod(ps,
1090 le32_to_cpu(pool->v.lpgp_num),
1091 pool->lpgp_num_mask);
1092 else
1093 pps = ceph_stable_mod(ps,
1094 le32_to_cpu(pool->v.pgp_num),
1095 pool->pgp_num_mask);
1096 pps += poolid; 1066 pps += poolid;
1097 *num = crush_do_rule(osdmap->crush, ruleno, pps, osds, 1067 r = crush_do_rule(osdmap->crush, ruleno, pps, osds,
1098 min_t(int, pool->v.size, *num), 1068 min_t(int, pool->v.size, *num),
1099 preferred, osdmap->osd_weight); 1069 osdmap->osd_weight);
1070 if (r < 0) {
1071 pr_err("error %d from crush rule: pool %d ruleset %d type %d"
1072 " size %d\n", r, poolid, pool->v.crush_ruleset,
1073 pool->v.type, pool->v.size);
1074 return NULL;
1075 }
1076 *num = r;
1100 return osds; 1077 return osds;
1101} 1078}
1102 1079