aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/ceph/buffer.c22
-rw-r--r--net/ceph/ceph_common.c24
-rw-r--r--net/ceph/crush/crush.c7
-rw-r--r--net/ceph/crush/mapper.c336
-rw-r--r--net/ceph/debugfs.c3
-rw-r--r--net/ceph/messenger.c32
-rw-r--r--net/ceph/mon_client.c8
-rw-r--r--net/ceph/osd_client.c283
-rw-r--r--net/ceph/osdmap.c78
9 files changed, 636 insertions, 157 deletions
diff --git a/net/ceph/buffer.c b/net/ceph/buffer.c
index bf3e6a13c215..621b5f65407f 100644
--- a/net/ceph/buffer.c
+++ b/net/ceph/buffer.c
@@ -6,6 +6,7 @@
6 6
7#include <linux/ceph/buffer.h> 7#include <linux/ceph/buffer.h>
8#include <linux/ceph/decode.h> 8#include <linux/ceph/decode.h>
9#include <linux/ceph/libceph.h> /* for ceph_kv{malloc,free} */
9 10
10struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp) 11struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)
11{ 12{
@@ -15,16 +16,10 @@ struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)
15 if (!b) 16 if (!b)
16 return NULL; 17 return NULL;
17 18
18 b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN); 19 b->vec.iov_base = ceph_kvmalloc(len, gfp);
19 if (b->vec.iov_base) { 20 if (!b->vec.iov_base) {
20 b->is_vmalloc = false; 21 kfree(b);
21 } else { 22 return NULL;
22 b->vec.iov_base = __vmalloc(len, gfp | __GFP_HIGHMEM, PAGE_KERNEL);
23 if (!b->vec.iov_base) {
24 kfree(b);
25 return NULL;
26 }
27 b->is_vmalloc = true;
28 } 23 }
29 24
30 kref_init(&b->kref); 25 kref_init(&b->kref);
@@ -40,12 +35,7 @@ void ceph_buffer_release(struct kref *kref)
40 struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref); 35 struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref);
41 36
42 dout("buffer_release %p\n", b); 37 dout("buffer_release %p\n", b);
43 if (b->vec.iov_base) { 38 ceph_kvfree(b->vec.iov_base);
44 if (b->is_vmalloc)
45 vfree(b->vec.iov_base);
46 else
47 kfree(b->vec.iov_base);
48 }
49 kfree(b); 39 kfree(b);
50} 40}
51EXPORT_SYMBOL(ceph_buffer_release); 41EXPORT_SYMBOL(ceph_buffer_release);
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 34b11ee8124e..67d7721d237e 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -15,6 +15,7 @@
15#include <linux/slab.h> 15#include <linux/slab.h>
16#include <linux/statfs.h> 16#include <linux/statfs.h>
17#include <linux/string.h> 17#include <linux/string.h>
18#include <linux/vmalloc.h>
18#include <linux/nsproxy.h> 19#include <linux/nsproxy.h>
19#include <net/net_namespace.h> 20#include <net/net_namespace.h>
20 21
@@ -170,6 +171,25 @@ int ceph_compare_options(struct ceph_options *new_opt,
170} 171}
171EXPORT_SYMBOL(ceph_compare_options); 172EXPORT_SYMBOL(ceph_compare_options);
172 173
174void *ceph_kvmalloc(size_t size, gfp_t flags)
175{
176 if (size <= (PAGE_SIZE << PAGE_ALLOC_COSTLY_ORDER)) {
177 void *ptr = kmalloc(size, flags | __GFP_NOWARN);
178 if (ptr)
179 return ptr;
180 }
181
182 return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL);
183}
184
185void ceph_kvfree(const void *ptr)
186{
187 if (is_vmalloc_addr(ptr))
188 vfree(ptr);
189 else
190 kfree(ptr);
191}
192
173 193
174static int parse_fsid(const char *str, struct ceph_fsid *fsid) 194static int parse_fsid(const char *str, struct ceph_fsid *fsid)
175{ 195{
@@ -461,8 +481,8 @@ EXPORT_SYMBOL(ceph_client_id);
461 * create a fresh client instance 481 * create a fresh client instance
462 */ 482 */
463struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, 483struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
464 unsigned int supported_features, 484 u64 supported_features,
465 unsigned int required_features) 485 u64 required_features)
466{ 486{
467 struct ceph_client *client; 487 struct ceph_client *client;
468 struct ceph_entity_addr *myaddr = NULL; 488 struct ceph_entity_addr *myaddr = NULL;
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 089613234f03..16bc199d9a62 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -116,11 +116,14 @@ void crush_destroy(struct crush_map *map)
116 if (map->rules) { 116 if (map->rules) {
117 __u32 b; 117 __u32 b;
118 for (b = 0; b < map->max_rules; b++) 118 for (b = 0; b < map->max_rules; b++)
119 kfree(map->rules[b]); 119 crush_destroy_rule(map->rules[b]);
120 kfree(map->rules); 120 kfree(map->rules);
121 } 121 }
122 122
123 kfree(map); 123 kfree(map);
124} 124}
125 125
126 126void crush_destroy_rule(struct crush_rule *rule)
127{
128 kfree(rule);
129}
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index cbd06a91941c..b703790b4e44 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -189,7 +189,7 @@ static int terminal(int x)
189static int bucket_tree_choose(struct crush_bucket_tree *bucket, 189static int bucket_tree_choose(struct crush_bucket_tree *bucket,
190 int x, int r) 190 int x, int r)
191{ 191{
192 int n, l; 192 int n;
193 __u32 w; 193 __u32 w;
194 __u64 t; 194 __u64 t;
195 195
@@ -197,6 +197,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket,
197 n = bucket->num_nodes >> 1; 197 n = bucket->num_nodes >> 1;
198 198
199 while (!terminal(n)) { 199 while (!terminal(n)) {
200 int l;
200 /* pick point in [0, w) */ 201 /* pick point in [0, w) */
201 w = bucket->node_weights[n]; 202 w = bucket->node_weights[n];
202 t = (__u64)crush_hash32_4(bucket->h.hash, x, n, r, 203 t = (__u64)crush_hash32_4(bucket->h.hash, x, n, r,
@@ -264,8 +265,12 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
264 * true if device is marked "out" (failed, fully offloaded) 265 * true if device is marked "out" (failed, fully offloaded)
265 * of the cluster 266 * of the cluster
266 */ 267 */
267static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x) 268static int is_out(const struct crush_map *map,
269 const __u32 *weight, int weight_max,
270 int item, int x)
268{ 271{
272 if (item >= weight_max)
273 return 1;
269 if (weight[item] >= 0x10000) 274 if (weight[item] >= 0x10000)
270 return 0; 275 return 0;
271 if (weight[item] == 0) 276 if (weight[item] == 0)
@@ -277,7 +282,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
277} 282}
278 283
279/** 284/**
280 * crush_choose - choose numrep distinct items of given type 285 * crush_choose_firstn - choose numrep distinct items of given type
281 * @map: the crush_map 286 * @map: the crush_map
282 * @bucket: the bucket we are choose an item from 287 * @bucket: the bucket we are choose an item from
283 * @x: crush input value 288 * @x: crush input value
@@ -285,18 +290,24 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
285 * @type: the type of item to choose 290 * @type: the type of item to choose
286 * @out: pointer to output vector 291 * @out: pointer to output vector
287 * @outpos: our position in that vector 292 * @outpos: our position in that vector
288 * @firstn: true if choosing "first n" items, false if choosing "indep" 293 * @tries: number of attempts to make
289 * @recurse_to_leaf: true if we want one device under each item of given type 294 * @recurse_tries: number of attempts to have recursive chooseleaf make
290 * @descend_once: true if we should only try one descent before giving up 295 * @local_tries: localized retries
296 * @local_fallback_tries: localized fallback retries
297 * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose)
291 * @out2: second output vector for leaf items (if @recurse_to_leaf) 298 * @out2: second output vector for leaf items (if @recurse_to_leaf)
292 */ 299 */
293static int crush_choose(const struct crush_map *map, 300static int crush_choose_firstn(const struct crush_map *map,
294 struct crush_bucket *bucket, 301 struct crush_bucket *bucket,
295 const __u32 *weight, 302 const __u32 *weight, int weight_max,
296 int x, int numrep, int type, 303 int x, int numrep, int type,
297 int *out, int outpos, 304 int *out, int outpos,
298 int firstn, int recurse_to_leaf, 305 unsigned int tries,
299 int descend_once, int *out2) 306 unsigned int recurse_tries,
307 unsigned int local_tries,
308 unsigned int local_fallback_tries,
309 int recurse_to_leaf,
310 int *out2)
300{ 311{
301 int rep; 312 int rep;
302 unsigned int ftotal, flocal; 313 unsigned int ftotal, flocal;
@@ -325,35 +336,17 @@ static int crush_choose(const struct crush_map *map,
325 collide = 0; 336 collide = 0;
326 retry_bucket = 0; 337 retry_bucket = 0;
327 r = rep; 338 r = rep;
328 if (in->alg == CRUSH_BUCKET_UNIFORM) { 339 /* r' = r + f_total */
329 /* be careful */ 340 r += ftotal;
330 if (firstn || (__u32)numrep >= in->size)
331 /* r' = r + f_total */
332 r += ftotal;
333 else if (in->size % numrep == 0)
334 /* r'=r+(n+1)*f_local */
335 r += (numrep+1) *
336 (flocal+ftotal);
337 else
338 /* r' = r + n*f_local */
339 r += numrep * (flocal+ftotal);
340 } else {
341 if (firstn)
342 /* r' = r + f_total */
343 r += ftotal;
344 else
345 /* r' = r + n*f_local */
346 r += numrep * (flocal+ftotal);
347 }
348 341
349 /* bucket choose */ 342 /* bucket choose */
350 if (in->size == 0) { 343 if (in->size == 0) {
351 reject = 1; 344 reject = 1;
352 goto reject; 345 goto reject;
353 } 346 }
354 if (map->choose_local_fallback_tries > 0 && 347 if (local_fallback_tries > 0 &&
355 flocal >= (in->size>>1) && 348 flocal >= (in->size>>1) &&
356 flocal > map->choose_local_fallback_tries) 349 flocal > local_fallback_tries)
357 item = bucket_perm_choose(in, x, r); 350 item = bucket_perm_choose(in, x, r);
358 else 351 else
359 item = crush_bucket_choose(in, x, r); 352 item = crush_bucket_choose(in, x, r);
@@ -394,13 +387,15 @@ static int crush_choose(const struct crush_map *map,
394 reject = 0; 387 reject = 0;
395 if (!collide && recurse_to_leaf) { 388 if (!collide && recurse_to_leaf) {
396 if (item < 0) { 389 if (item < 0) {
397 if (crush_choose(map, 390 if (crush_choose_firstn(map,
398 map->buckets[-1-item], 391 map->buckets[-1-item],
399 weight, 392 weight, weight_max,
400 x, outpos+1, 0, 393 x, outpos+1, 0,
401 out2, outpos, 394 out2, outpos,
402 firstn, 0, 395 recurse_tries, 0,
403 map->chooseleaf_descend_once, 396 local_tries,
397 local_fallback_tries,
398 0,
404 NULL) <= outpos) 399 NULL) <= outpos)
405 /* didn't get leaf */ 400 /* didn't get leaf */
406 reject = 1; 401 reject = 1;
@@ -414,6 +409,7 @@ static int crush_choose(const struct crush_map *map,
414 /* out? */ 409 /* out? */
415 if (itemtype == 0) 410 if (itemtype == 0)
416 reject = is_out(map, weight, 411 reject = is_out(map, weight,
412 weight_max,
417 item, x); 413 item, x);
418 else 414 else
419 reject = 0; 415 reject = 0;
@@ -424,17 +420,14 @@ reject:
424 ftotal++; 420 ftotal++;
425 flocal++; 421 flocal++;
426 422
427 if (reject && descend_once) 423 if (collide && flocal <= local_tries)
428 /* let outer call try again */
429 skip_rep = 1;
430 else if (collide && flocal <= map->choose_local_tries)
431 /* retry locally a few times */ 424 /* retry locally a few times */
432 retry_bucket = 1; 425 retry_bucket = 1;
433 else if (map->choose_local_fallback_tries > 0 && 426 else if (local_fallback_tries > 0 &&
434 flocal <= in->size + map->choose_local_fallback_tries) 427 flocal <= in->size + local_fallback_tries)
435 /* exhaustive bucket search */ 428 /* exhaustive bucket search */
436 retry_bucket = 1; 429 retry_bucket = 1;
437 else if (ftotal <= map->choose_total_tries) 430 else if (ftotal <= tries)
438 /* then retry descent */ 431 /* then retry descent */
439 retry_descent = 1; 432 retry_descent = 1;
440 else 433 else
@@ -464,21 +457,179 @@ reject:
464 457
465 458
466/** 459/**
460 * crush_choose_indep: alternative breadth-first positionally stable mapping
461 *
462 */
463static void crush_choose_indep(const struct crush_map *map,
464 struct crush_bucket *bucket,
465 const __u32 *weight, int weight_max,
466 int x, int left, int numrep, int type,
467 int *out, int outpos,
468 unsigned int tries,
469 unsigned int recurse_tries,
470 int recurse_to_leaf,
471 int *out2,
472 int parent_r)
473{
474 struct crush_bucket *in = bucket;
475 int endpos = outpos + left;
476 int rep;
477 unsigned int ftotal;
478 int r;
479 int i;
480 int item = 0;
481 int itemtype;
482 int collide;
483
484 dprintk("CHOOSE%s INDEP bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
485 bucket->id, x, outpos, numrep);
486
487 /* initially my result is undefined */
488 for (rep = outpos; rep < endpos; rep++) {
489 out[rep] = CRUSH_ITEM_UNDEF;
490 if (out2)
491 out2[rep] = CRUSH_ITEM_UNDEF;
492 }
493
494 for (ftotal = 0; left > 0 && ftotal < tries; ftotal++) {
495 for (rep = outpos; rep < endpos; rep++) {
496 if (out[rep] != CRUSH_ITEM_UNDEF)
497 continue;
498
499 in = bucket; /* initial bucket */
500
501 /* choose through intervening buckets */
502 for (;;) {
503 /* note: we base the choice on the position
504 * even in the nested call. that means that
505 * if the first layer chooses the same bucket
506 * in a different position, we will tend to
507 * choose a different item in that bucket.
508 * this will involve more devices in data
509 * movement and tend to distribute the load.
510 */
511 r = rep + parent_r;
512
513 /* be careful */
514 if (in->alg == CRUSH_BUCKET_UNIFORM &&
515 in->size % numrep == 0)
516 /* r'=r+(n+1)*f_total */
517 r += (numrep+1) * ftotal;
518 else
519 /* r' = r + n*f_total */
520 r += numrep * ftotal;
521
522 /* bucket choose */
523 if (in->size == 0) {
524 dprintk(" empty bucket\n");
525 break;
526 }
527
528 item = crush_bucket_choose(in, x, r);
529 if (item >= map->max_devices) {
530 dprintk(" bad item %d\n", item);
531 out[rep] = CRUSH_ITEM_NONE;
532 if (out2)
533 out2[rep] = CRUSH_ITEM_NONE;
534 left--;
535 break;
536 }
537
538 /* desired type? */
539 if (item < 0)
540 itemtype = map->buckets[-1-item]->type;
541 else
542 itemtype = 0;
543 dprintk(" item %d type %d\n", item, itemtype);
544
545 /* keep going? */
546 if (itemtype != type) {
547 if (item >= 0 ||
548 (-1-item) >= map->max_buckets) {
549 dprintk(" bad item type %d\n", type);
550 out[rep] = CRUSH_ITEM_NONE;
551 if (out2)
552 out2[rep] =
553 CRUSH_ITEM_NONE;
554 left--;
555 break;
556 }
557 in = map->buckets[-1-item];
558 continue;
559 }
560
561 /* collision? */
562 collide = 0;
563 for (i = outpos; i < endpos; i++) {
564 if (out[i] == item) {
565 collide = 1;
566 break;
567 }
568 }
569 if (collide)
570 break;
571
572 if (recurse_to_leaf) {
573 if (item < 0) {
574 crush_choose_indep(map,
575 map->buckets[-1-item],
576 weight, weight_max,
577 x, 1, numrep, 0,
578 out2, rep,
579 recurse_tries, 0,
580 0, NULL, r);
581 if (out2[rep] == CRUSH_ITEM_NONE) {
582 /* placed nothing; no leaf */
583 break;
584 }
585 } else {
586 /* we already have a leaf! */
587 out2[rep] = item;
588 }
589 }
590
591 /* out? */
592 if (itemtype == 0 &&
593 is_out(map, weight, weight_max, item, x))
594 break;
595
596 /* yay! */
597 out[rep] = item;
598 left--;
599 break;
600 }
601 }
602 }
603 for (rep = outpos; rep < endpos; rep++) {
604 if (out[rep] == CRUSH_ITEM_UNDEF) {
605 out[rep] = CRUSH_ITEM_NONE;
606 }
607 if (out2 && out2[rep] == CRUSH_ITEM_UNDEF) {
608 out2[rep] = CRUSH_ITEM_NONE;
609 }
610 }
611}
612
613/**
467 * crush_do_rule - calculate a mapping with the given input and rule 614 * crush_do_rule - calculate a mapping with the given input and rule
468 * @map: the crush_map 615 * @map: the crush_map
469 * @ruleno: the rule id 616 * @ruleno: the rule id
470 * @x: hash input 617 * @x: hash input
471 * @result: pointer to result vector 618 * @result: pointer to result vector
472 * @result_max: maximum result size 619 * @result_max: maximum result size
620 * @weight: weight vector (for map leaves)
621 * @weight_max: size of weight vector
622 * @scratch: scratch vector for private use; must be >= 3 * result_max
473 */ 623 */
474int crush_do_rule(const struct crush_map *map, 624int crush_do_rule(const struct crush_map *map,
475 int ruleno, int x, int *result, int result_max, 625 int ruleno, int x, int *result, int result_max,
476 const __u32 *weight) 626 const __u32 *weight, int weight_max,
627 int *scratch)
477{ 628{
478 int result_len; 629 int result_len;
479 int a[CRUSH_MAX_SET]; 630 int *a = scratch;
480 int b[CRUSH_MAX_SET]; 631 int *b = scratch + result_max;
481 int c[CRUSH_MAX_SET]; 632 int *c = scratch + result_max*2;
482 int recurse_to_leaf; 633 int recurse_to_leaf;
483 int *w; 634 int *w;
484 int wsize = 0; 635 int wsize = 0;
@@ -489,8 +640,10 @@ int crush_do_rule(const struct crush_map *map,
489 __u32 step; 640 __u32 step;
490 int i, j; 641 int i, j;
491 int numrep; 642 int numrep;
492 int firstn; 643 int choose_tries = map->choose_total_tries;
493 const int descend_once = 0; 644 int choose_local_tries = map->choose_local_tries;
645 int choose_local_fallback_tries = map->choose_local_fallback_tries;
646 int choose_leaf_tries = 0;
494 647
495 if ((__u32)ruleno >= map->max_rules) { 648 if ((__u32)ruleno >= map->max_rules) {
496 dprintk(" bad ruleno %d\n", ruleno); 649 dprintk(" bad ruleno %d\n", ruleno);
@@ -503,29 +656,49 @@ int crush_do_rule(const struct crush_map *map,
503 o = b; 656 o = b;
504 657
505 for (step = 0; step < rule->len; step++) { 658 for (step = 0; step < rule->len; step++) {
659 int firstn = 0;
506 struct crush_rule_step *curstep = &rule->steps[step]; 660 struct crush_rule_step *curstep = &rule->steps[step];
507 661
508 firstn = 0;
509 switch (curstep->op) { 662 switch (curstep->op) {
510 case CRUSH_RULE_TAKE: 663 case CRUSH_RULE_TAKE:
511 w[0] = curstep->arg1; 664 w[0] = curstep->arg1;
512 wsize = 1; 665 wsize = 1;
513 break; 666 break;
514 667
515 case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: 668 case CRUSH_RULE_SET_CHOOSE_TRIES:
669 if (curstep->arg1 > 0)
670 choose_tries = curstep->arg1;
671 break;
672
673 case CRUSH_RULE_SET_CHOOSELEAF_TRIES:
674 if (curstep->arg1 > 0)
675 choose_leaf_tries = curstep->arg1;
676 break;
677
678 case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES:
679 if (curstep->arg1 > 0)
680 choose_local_tries = curstep->arg1;
681 break;
682
683 case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES:
684 if (curstep->arg1 > 0)
685 choose_local_fallback_tries = curstep->arg1;
686 break;
687
688 case CRUSH_RULE_CHOOSELEAF_FIRSTN:
516 case CRUSH_RULE_CHOOSE_FIRSTN: 689 case CRUSH_RULE_CHOOSE_FIRSTN:
517 firstn = 1; 690 firstn = 1;
518 /* fall through */ 691 /* fall through */
519 case CRUSH_RULE_CHOOSE_LEAF_INDEP: 692 case CRUSH_RULE_CHOOSELEAF_INDEP:
520 case CRUSH_RULE_CHOOSE_INDEP: 693 case CRUSH_RULE_CHOOSE_INDEP:
521 if (wsize == 0) 694 if (wsize == 0)
522 break; 695 break;
523 696
524 recurse_to_leaf = 697 recurse_to_leaf =
525 curstep->op == 698 curstep->op ==
526 CRUSH_RULE_CHOOSE_LEAF_FIRSTN || 699 CRUSH_RULE_CHOOSELEAF_FIRSTN ||
527 curstep->op == 700 curstep->op ==
528 CRUSH_RULE_CHOOSE_LEAF_INDEP; 701 CRUSH_RULE_CHOOSELEAF_INDEP;
529 702
530 /* reset output */ 703 /* reset output */
531 osize = 0; 704 osize = 0;
@@ -543,22 +716,51 @@ int crush_do_rule(const struct crush_map *map,
543 continue; 716 continue;
544 } 717 }
545 j = 0; 718 j = 0;
546 osize += crush_choose(map, 719 if (firstn) {
547 map->buckets[-1-w[i]], 720 int recurse_tries;
548 weight, 721 if (choose_leaf_tries)
549 x, numrep, 722 recurse_tries =
550 curstep->arg2, 723 choose_leaf_tries;
551 o+osize, j, 724 else if (map->chooseleaf_descend_once)
552 firstn, 725 recurse_tries = 1;
553 recurse_to_leaf, 726 else
554 descend_once, c+osize); 727 recurse_tries = choose_tries;
728 osize += crush_choose_firstn(
729 map,
730 map->buckets[-1-w[i]],
731 weight, weight_max,
732 x, numrep,
733 curstep->arg2,
734 o+osize, j,
735 choose_tries,
736 recurse_tries,
737 choose_local_tries,
738 choose_local_fallback_tries,
739 recurse_to_leaf,
740 c+osize);
741 } else {
742 crush_choose_indep(
743 map,
744 map->buckets[-1-w[i]],
745 weight, weight_max,
746 x, numrep, numrep,
747 curstep->arg2,
748 o+osize, j,
749 choose_tries,
750 choose_leaf_tries ?
751 choose_leaf_tries : 1,
752 recurse_to_leaf,
753 c+osize,
754 0);
755 osize += numrep;
756 }
555 } 757 }
556 758
557 if (recurse_to_leaf) 759 if (recurse_to_leaf)
558 /* copy final _leaf_ values to output set */ 760 /* copy final _leaf_ values to output set */
559 memcpy(o, c, osize*sizeof(*o)); 761 memcpy(o, c, osize*sizeof(*o));
560 762
561 /* swap t and w arrays */ 763 /* swap o and w arrays */
562 tmp = o; 764 tmp = o;
563 o = w; 765 o = w;
564 w = tmp; 766 w = tmp;
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 83661cdc0766..258a382e75ed 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -132,7 +132,8 @@ static int osdc_show(struct seq_file *s, void *pp)
132 req->r_osd ? req->r_osd->o_osd : -1, 132 req->r_osd ? req->r_osd->o_osd : -1,
133 req->r_pgid.pool, req->r_pgid.seed); 133 req->r_pgid.pool, req->r_pgid.seed);
134 134
135 seq_printf(s, "%.*s", req->r_oid_len, req->r_oid); 135 seq_printf(s, "%.*s", req->r_base_oid.name_len,
136 req->r_base_oid.name);
136 137
137 if (req->r_reassert_version.epoch) 138 if (req->r_reassert_version.epoch)
138 seq_printf(s, "\t%u'%llu", 139 seq_printf(s, "\t%u'%llu",
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 4a5df7b1cc9f..2ed1304d22a7 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -15,6 +15,7 @@
15#include <linux/dns_resolver.h> 15#include <linux/dns_resolver.h>
16#include <net/tcp.h> 16#include <net/tcp.h>
17 17
18#include <linux/ceph/ceph_features.h>
18#include <linux/ceph/libceph.h> 19#include <linux/ceph/libceph.h>
19#include <linux/ceph/messenger.h> 20#include <linux/ceph/messenger.h>
20#include <linux/ceph/decode.h> 21#include <linux/ceph/decode.h>
@@ -1865,7 +1866,9 @@ int ceph_parse_ips(const char *c, const char *end,
1865 port = (port * 10) + (*p - '0'); 1866 port = (port * 10) + (*p - '0');
1866 p++; 1867 p++;
1867 } 1868 }
1868 if (port > 65535 || port == 0) 1869 if (port == 0)
1870 port = CEPH_MON_PORT;
1871 else if (port > 65535)
1869 goto bad; 1872 goto bad;
1870 } else { 1873 } else {
1871 port = CEPH_MON_PORT; 1874 port = CEPH_MON_PORT;
@@ -1945,7 +1948,8 @@ static int process_connect(struct ceph_connection *con)
1945{ 1948{
1946 u64 sup_feat = con->msgr->supported_features; 1949 u64 sup_feat = con->msgr->supported_features;
1947 u64 req_feat = con->msgr->required_features; 1950 u64 req_feat = con->msgr->required_features;
1948 u64 server_feat = le64_to_cpu(con->in_reply.features); 1951 u64 server_feat = ceph_sanitize_features(
1952 le64_to_cpu(con->in_reply.features));
1949 int ret; 1953 int ret;
1950 1954
1951 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1955 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -2853,8 +2857,8 @@ static void con_fault(struct ceph_connection *con)
2853 */ 2857 */
2854void ceph_messenger_init(struct ceph_messenger *msgr, 2858void ceph_messenger_init(struct ceph_messenger *msgr,
2855 struct ceph_entity_addr *myaddr, 2859 struct ceph_entity_addr *myaddr,
2856 u32 supported_features, 2860 u64 supported_features,
2857 u32 required_features, 2861 u64 required_features,
2858 bool nocrc) 2862 bool nocrc)
2859{ 2863{
2860 msgr->supported_features = supported_features; 2864 msgr->supported_features = supported_features;
@@ -3126,15 +3130,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3126 INIT_LIST_HEAD(&m->data); 3130 INIT_LIST_HEAD(&m->data);
3127 3131
3128 /* front */ 3132 /* front */
3129 m->front_max = front_len;
3130 if (front_len) { 3133 if (front_len) {
3131 if (front_len > PAGE_CACHE_SIZE) { 3134 m->front.iov_base = ceph_kvmalloc(front_len, flags);
3132 m->front.iov_base = __vmalloc(front_len, flags,
3133 PAGE_KERNEL);
3134 m->front_is_vmalloc = true;
3135 } else {
3136 m->front.iov_base = kmalloc(front_len, flags);
3137 }
3138 if (m->front.iov_base == NULL) { 3135 if (m->front.iov_base == NULL) {
3139 dout("ceph_msg_new can't allocate %d bytes\n", 3136 dout("ceph_msg_new can't allocate %d bytes\n",
3140 front_len); 3137 front_len);
@@ -3143,7 +3140,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3143 } else { 3140 } else {
3144 m->front.iov_base = NULL; 3141 m->front.iov_base = NULL;
3145 } 3142 }
3146 m->front.iov_len = front_len; 3143 m->front_alloc_len = m->front.iov_len = front_len;
3147 3144
3148 dout("ceph_msg_new %p front %d\n", m, front_len); 3145 dout("ceph_msg_new %p front %d\n", m, front_len);
3149 return m; 3146 return m;
@@ -3256,10 +3253,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
3256void ceph_msg_kfree(struct ceph_msg *m) 3253void ceph_msg_kfree(struct ceph_msg *m)
3257{ 3254{
3258 dout("msg_kfree %p\n", m); 3255 dout("msg_kfree %p\n", m);
3259 if (m->front_is_vmalloc) 3256 ceph_kvfree(m->front.iov_base);
3260 vfree(m->front.iov_base);
3261 else
3262 kfree(m->front.iov_base);
3263 kmem_cache_free(ceph_msg_cache, m); 3257 kmem_cache_free(ceph_msg_cache, m);
3264} 3258}
3265 3259
@@ -3301,8 +3295,8 @@ EXPORT_SYMBOL(ceph_msg_last_put);
3301 3295
3302void ceph_msg_dump(struct ceph_msg *msg) 3296void ceph_msg_dump(struct ceph_msg *msg)
3303{ 3297{
3304 pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, 3298 pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg,
3305 msg->front_max, msg->data_length); 3299 msg->front_alloc_len, msg->data_length);
3306 print_hex_dump(KERN_DEBUG, "header: ", 3300 print_hex_dump(KERN_DEBUG, "header: ",
3307 DUMP_PREFIX_OFFSET, 16, 1, 3301 DUMP_PREFIX_OFFSET, 16, 1,
3308 &msg->hdr, sizeof(msg->hdr), true); 3302 &msg->hdr, sizeof(msg->hdr), true);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 1fe25cd29d0e..2ac9ef35110b 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -152,7 +152,7 @@ static int __open_session(struct ceph_mon_client *monc)
152 /* initiatiate authentication handshake */ 152 /* initiatiate authentication handshake */
153 ret = ceph_auth_build_hello(monc->auth, 153 ret = ceph_auth_build_hello(monc->auth,
154 monc->m_auth->front.iov_base, 154 monc->m_auth->front.iov_base,
155 monc->m_auth->front_max); 155 monc->m_auth->front_alloc_len);
156 __send_prepared_auth_request(monc, ret); 156 __send_prepared_auth_request(monc, ret);
157 } else { 157 } else {
158 dout("open_session mon%d already open\n", monc->cur_mon); 158 dout("open_session mon%d already open\n", monc->cur_mon);
@@ -196,7 +196,7 @@ static void __send_subscribe(struct ceph_mon_client *monc)
196 int num; 196 int num;
197 197
198 p = msg->front.iov_base; 198 p = msg->front.iov_base;
199 end = p + msg->front_max; 199 end = p + msg->front_alloc_len;
200 200
201 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 201 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
202 ceph_encode_32(&p, num); 202 ceph_encode_32(&p, num);
@@ -897,7 +897,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
897 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 897 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
898 msg->front.iov_len, 898 msg->front.iov_len,
899 monc->m_auth->front.iov_base, 899 monc->m_auth->front.iov_base,
900 monc->m_auth->front_max); 900 monc->m_auth->front_alloc_len);
901 if (ret < 0) { 901 if (ret < 0) {
902 monc->client->auth_err = ret; 902 monc->client->auth_err = ret;
903 wake_up_all(&monc->client->auth_wq); 903 wake_up_all(&monc->client->auth_wq);
@@ -939,7 +939,7 @@ static int __validate_auth(struct ceph_mon_client *monc)
939 return 0; 939 return 0;
940 940
941 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 941 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
942 monc->m_auth->front_max); 942 monc->m_auth->front_alloc_len);
943 if (ret <= 0) 943 if (ret <= 0)
944 return ret; /* either an error, or no need to authenticate */ 944 return ret; /* either an error, or no need to authenticate */
945 __send_prepared_auth_request(monc, ret); 945 __send_prepared_auth_request(monc, ret);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2b4b32aaa893..010ff3bd58ad 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
338 msg_size = 4 + 4 + 8 + 8 + 4+8; 338 msg_size = 4 + 4 + 8 + 8 + 4+8;
339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
340 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 340 msg_size += 1 + 8 + 4 + 4; /* pg_t */
341 msg_size += 4 + MAX_OBJ_NAME_SIZE; 341 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
342 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 342 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
343 msg_size += 8; /* snapid */ 343 msg_size += 8; /* snapid */
344 msg_size += 8; /* snap_seq */ 344 msg_size += 8; /* snap_seq */
@@ -368,6 +368,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
368 INIT_LIST_HEAD(&req->r_req_lru_item); 368 INIT_LIST_HEAD(&req->r_req_lru_item);
369 INIT_LIST_HEAD(&req->r_osd_item); 369 INIT_LIST_HEAD(&req->r_osd_item);
370 370
371 req->r_base_oloc.pool = -1;
372 req->r_target_oloc.pool = -1;
373
371 /* create reply message */ 374 /* create reply message */
372 if (use_mempool) 375 if (use_mempool)
373 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 376 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -761,11 +764,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
761 if (num_ops > 1) 764 if (num_ops > 1)
762 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); 765 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
763 766
764 req->r_file_layout = *layout; /* keep a copy */ 767 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
765 768
766 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", 769 snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
767 vino.ino, objnum); 770 "%llx.%08llx", vino.ino, objnum);
768 req->r_oid_len = strlen(req->r_oid); 771 req->r_base_oid.name_len = strlen(req->r_base_oid.name);
769 772
770 return req; 773 return req;
771} 774}
@@ -1044,8 +1047,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1044 !ceph_con_opened(&osd->o_con)) { 1047 !ceph_con_opened(&osd->o_con)) {
1045 struct ceph_osd_request *req; 1048 struct ceph_osd_request *req;
1046 1049
1047 dout(" osd addr hasn't changed and connection never opened," 1050 dout("osd addr hasn't changed and connection never opened, "
1048 " letting msgr retry"); 1051 "letting msgr retry\n");
1049 /* touch each r_stamp for handle_timeout()'s benfit */ 1052 /* touch each r_stamp for handle_timeout()'s benfit */
1050 list_for_each_entry(req, &osd->o_requests, r_osd_item) 1053 list_for_each_entry(req, &osd->o_requests, r_osd_item)
1051 req->r_stamp = jiffies; 1054 req->r_stamp = jiffies;
@@ -1232,6 +1235,61 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1232EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1235EXPORT_SYMBOL(ceph_osdc_set_request_linger);
1233 1236
1234/* 1237/*
1238 * Returns whether a request should be blocked from being sent
1239 * based on the current osdmap and osd_client settings.
1240 *
1241 * Caller should hold map_sem for read.
1242 */
1243static bool __req_should_be_paused(struct ceph_osd_client *osdc,
1244 struct ceph_osd_request *req)
1245{
1246 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
1247 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
1248 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
1249 return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
1250 (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
1251}
1252
1253/*
1254 * Calculate mapping of a request to a PG. Takes tiering into account.
1255 */
1256static int __calc_request_pg(struct ceph_osdmap *osdmap,
1257 struct ceph_osd_request *req,
1258 struct ceph_pg *pg_out)
1259{
1260 bool need_check_tiering;
1261
1262 need_check_tiering = false;
1263 if (req->r_target_oloc.pool == -1) {
1264 req->r_target_oloc = req->r_base_oloc; /* struct */
1265 need_check_tiering = true;
1266 }
1267 if (req->r_target_oid.name_len == 0) {
1268 ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
1269 need_check_tiering = true;
1270 }
1271
1272 if (need_check_tiering &&
1273 (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1274 struct ceph_pg_pool_info *pi;
1275
1276 pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
1277 if (pi) {
1278 if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
1279 pi->read_tier >= 0)
1280 req->r_target_oloc.pool = pi->read_tier;
1281 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1282 pi->write_tier >= 0)
1283 req->r_target_oloc.pool = pi->write_tier;
1284 }
1285 /* !pi is caught in ceph_oloc_oid_to_pg() */
1286 }
1287
1288 return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
1289 &req->r_target_oid, pg_out);
1290}
1291
1292/*
1235 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1293 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
1236 * (as needed), and set the request r_osd appropriately. If there is 1294 * (as needed), and set the request r_osd appropriately. If there is
1237 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1295 * no up osd, set r_osd to NULL. Move the request to the appropriate list
@@ -1248,10 +1306,11 @@ static int __map_request(struct ceph_osd_client *osdc,
1248 int acting[CEPH_PG_MAX_SIZE]; 1306 int acting[CEPH_PG_MAX_SIZE];
1249 int o = -1, num = 0; 1307 int o = -1, num = 0;
1250 int err; 1308 int err;
1309 bool was_paused;
1251 1310
1252 dout("map_request %p tid %lld\n", req, req->r_tid); 1311 dout("map_request %p tid %lld\n", req, req->r_tid);
1253 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 1312
1254 ceph_file_layout_pg_pool(req->r_file_layout)); 1313 err = __calc_request_pg(osdc->osdmap, req, &pgid);
1255 if (err) { 1314 if (err) {
1256 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1315 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1257 return err; 1316 return err;
@@ -1264,12 +1323,18 @@ static int __map_request(struct ceph_osd_client *osdc,
1264 num = err; 1323 num = err;
1265 } 1324 }
1266 1325
1326 was_paused = req->r_paused;
1327 req->r_paused = __req_should_be_paused(osdc, req);
1328 if (was_paused && !req->r_paused)
1329 force_resend = 1;
1330
1267 if ((!force_resend && 1331 if ((!force_resend &&
1268 req->r_osd && req->r_osd->o_osd == o && 1332 req->r_osd && req->r_osd->o_osd == o &&
1269 req->r_sent >= req->r_osd->o_incarnation && 1333 req->r_sent >= req->r_osd->o_incarnation &&
1270 req->r_num_pg_osds == num && 1334 req->r_num_pg_osds == num &&
1271 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1335 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
1272 (req->r_osd == NULL && o == -1)) 1336 (req->r_osd == NULL && o == -1) ||
1337 req->r_paused)
1273 return 0; /* no change */ 1338 return 0; /* no change */
1274 1339
1275 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1340 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
@@ -1331,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
1331 /* fill in message content that changes each time we send it */ 1396 /* fill in message content that changes each time we send it */
1332 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1397 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1333 put_unaligned_le32(req->r_flags, req->r_request_flags); 1398 put_unaligned_le32(req->r_flags, req->r_request_flags);
1334 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1399 put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
1335 p = req->r_request_pgid; 1400 p = req->r_request_pgid;
1336 ceph_encode_64(&p, req->r_pgid.pool); 1401 ceph_encode_64(&p, req->r_pgid.pool);
1337 ceph_encode_32(&p, req->r_pgid.seed); 1402 ceph_encode_32(&p, req->r_pgid.seed);
@@ -1432,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work)
1432 round_jiffies_relative(delay)); 1497 round_jiffies_relative(delay));
1433} 1498}
1434 1499
1500static int ceph_oloc_decode(void **p, void *end,
1501 struct ceph_object_locator *oloc)
1502{
1503 u8 struct_v, struct_cv;
1504 u32 len;
1505 void *struct_end;
1506 int ret = 0;
1507
1508 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1509 struct_v = ceph_decode_8(p);
1510 struct_cv = ceph_decode_8(p);
1511 if (struct_v < 3) {
1512 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
1513 struct_v, struct_cv);
1514 goto e_inval;
1515 }
1516 if (struct_cv > 6) {
1517 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
1518 struct_v, struct_cv);
1519 goto e_inval;
1520 }
1521 len = ceph_decode_32(p);
1522 ceph_decode_need(p, end, len, e_inval);
1523 struct_end = *p + len;
1524
1525 oloc->pool = ceph_decode_64(p);
1526 *p += 4; /* skip preferred */
1527
1528 len = ceph_decode_32(p);
1529 if (len > 0) {
1530 pr_warn("ceph_object_locator::key is set\n");
1531 goto e_inval;
1532 }
1533
1534 if (struct_v >= 5) {
1535 len = ceph_decode_32(p);
1536 if (len > 0) {
1537 pr_warn("ceph_object_locator::nspace is set\n");
1538 goto e_inval;
1539 }
1540 }
1541
1542 if (struct_v >= 6) {
1543 s64 hash = ceph_decode_64(p);
1544 if (hash != -1) {
1545 pr_warn("ceph_object_locator::hash is set\n");
1546 goto e_inval;
1547 }
1548 }
1549
1550 /* skip the rest */
1551 *p = struct_end;
1552out:
1553 return ret;
1554
1555e_inval:
1556 ret = -EINVAL;
1557 goto out;
1558}
1559
1560static int ceph_redirect_decode(void **p, void *end,
1561 struct ceph_request_redirect *redir)
1562{
1563 u8 struct_v, struct_cv;
1564 u32 len;
1565 void *struct_end;
1566 int ret;
1567
1568 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1569 struct_v = ceph_decode_8(p);
1570 struct_cv = ceph_decode_8(p);
1571 if (struct_cv > 1) {
1572 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
1573 struct_v, struct_cv);
1574 goto e_inval;
1575 }
1576 len = ceph_decode_32(p);
1577 ceph_decode_need(p, end, len, e_inval);
1578 struct_end = *p + len;
1579
1580 ret = ceph_oloc_decode(p, end, &redir->oloc);
1581 if (ret)
1582 goto out;
1583
1584 len = ceph_decode_32(p);
1585 if (len > 0) {
1586 pr_warn("ceph_request_redirect::object_name is set\n");
1587 goto e_inval;
1588 }
1589
1590 len = ceph_decode_32(p);
1591 *p += len; /* skip osd_instructions */
1592
1593 /* skip the rest */
1594 *p = struct_end;
1595out:
1596 return ret;
1597
1598e_inval:
1599 ret = -EINVAL;
1600 goto out;
1601}
1602
1435static void complete_request(struct ceph_osd_request *req) 1603static void complete_request(struct ceph_osd_request *req)
1436{ 1604{
1437 complete_all(&req->r_safe_completion); /* fsync waiter */ 1605 complete_all(&req->r_safe_completion); /* fsync waiter */
@@ -1446,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1446{ 1614{
1447 void *p, *end; 1615 void *p, *end;
1448 struct ceph_osd_request *req; 1616 struct ceph_osd_request *req;
1617 struct ceph_request_redirect redir;
1449 u64 tid; 1618 u64 tid;
1450 int object_len; 1619 int object_len;
1451 unsigned int numops; 1620 unsigned int numops;
@@ -1525,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1525 for (i = 0; i < numops; i++) 1694 for (i = 0; i < numops; i++)
1526 req->r_reply_op_result[i] = ceph_decode_32(&p); 1695 req->r_reply_op_result[i] = ceph_decode_32(&p);
1527 1696
1528 already_completed = req->r_got_reply; 1697 if (le16_to_cpu(msg->hdr.version) >= 6) {
1698 p += 8 + 4; /* skip replay_version */
1699 p += 8; /* skip user_version */
1529 1700
1530 if (!req->r_got_reply) { 1701 err = ceph_redirect_decode(&p, end, &redir);
1702 if (err)
1703 goto bad_put;
1704 } else {
1705 redir.oloc.pool = -1;
1706 }
1707
1708 if (redir.oloc.pool != -1) {
1709 dout("redirect pool %lld\n", redir.oloc.pool);
1710
1711 __unregister_request(osdc, req);
1712 mutex_unlock(&osdc->request_mutex);
1713
1714 req->r_target_oloc = redir.oloc; /* struct */
1715
1716 /*
1717 * Start redirect requests with nofail=true. If
1718 * mapping fails, request will end up on the notarget
1719 * list, waiting for the new osdmap (which can take
1720 * a while), even though the original request mapped
1721 * successfully. In the future we might want to follow
1722 * original request's nofail setting here.
1723 */
1724 err = ceph_osdc_start_request(osdc, req, true);
1725 BUG_ON(err);
1531 1726
1727 goto done;
1728 }
1729
1730 already_completed = req->r_got_reply;
1731 if (!req->r_got_reply) {
1532 req->r_result = result; 1732 req->r_result = result;
1533 dout("handle_reply result %d bytes %d\n", req->r_result, 1733 dout("handle_reply result %d bytes %d\n", req->r_result,
1534 bytes); 1734 bytes);
@@ -1581,6 +1781,13 @@ done:
1581 return; 1781 return;
1582 1782
1583bad_put: 1783bad_put:
1784 req->r_result = -EIO;
1785 __unregister_request(osdc, req);
1786 if (req->r_callback)
1787 req->r_callback(req, msg);
1788 else
1789 complete_all(&req->r_completion);
1790 complete_request(req);
1584 ceph_osdc_put_request(req); 1791 ceph_osdc_put_request(req);
1585bad_mutex: 1792bad_mutex:
1586 mutex_unlock(&osdc->request_mutex); 1793 mutex_unlock(&osdc->request_mutex);
@@ -1613,14 +1820,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
1613 * 1820 *
1614 * Caller should hold map_sem for read. 1821 * Caller should hold map_sem for read.
1615 */ 1822 */
1616static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1823static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
1824 bool force_resend_writes)
1617{ 1825{
1618 struct ceph_osd_request *req, *nreq; 1826 struct ceph_osd_request *req, *nreq;
1619 struct rb_node *p; 1827 struct rb_node *p;
1620 int needmap = 0; 1828 int needmap = 0;
1621 int err; 1829 int err;
1830 bool force_resend_req;
1622 1831
1623 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1832 dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
1833 force_resend_writes ? " (force resend writes)" : "");
1624 mutex_lock(&osdc->request_mutex); 1834 mutex_lock(&osdc->request_mutex);
1625 for (p = rb_first(&osdc->requests); p; ) { 1835 for (p = rb_first(&osdc->requests); p; ) {
1626 req = rb_entry(p, struct ceph_osd_request, r_node); 1836 req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -1645,7 +1855,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1645 continue; 1855 continue;
1646 } 1856 }
1647 1857
1648 err = __map_request(osdc, req, force_resend); 1858 force_resend_req = force_resend ||
1859 (force_resend_writes &&
1860 req->r_flags & CEPH_OSD_FLAG_WRITE);
1861 err = __map_request(osdc, req, force_resend_req);
1649 if (err < 0) 1862 if (err < 0)
1650 continue; /* error */ 1863 continue; /* error */
1651 if (req->r_osd == NULL) { 1864 if (req->r_osd == NULL) {
@@ -1665,7 +1878,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1665 r_linger_item) { 1878 r_linger_item) {
1666 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1879 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1667 1880
1668 err = __map_request(osdc, req, force_resend); 1881 err = __map_request(osdc, req,
1882 force_resend || force_resend_writes);
1669 dout("__map_request returned %d\n", err); 1883 dout("__map_request returned %d\n", err);
1670 if (err == 0) 1884 if (err == 0)
1671 continue; /* no change and no osd was specified */ 1885 continue; /* no change and no osd was specified */
@@ -1707,6 +1921,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1707 struct ceph_osdmap *newmap = NULL, *oldmap; 1921 struct ceph_osdmap *newmap = NULL, *oldmap;
1708 int err; 1922 int err;
1709 struct ceph_fsid fsid; 1923 struct ceph_fsid fsid;
1924 bool was_full;
1710 1925
1711 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1926 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1712 p = msg->front.iov_base; 1927 p = msg->front.iov_base;
@@ -1720,6 +1935,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1720 1935
1721 down_write(&osdc->map_sem); 1936 down_write(&osdc->map_sem);
1722 1937
1938 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
1939
1723 /* incremental maps */ 1940 /* incremental maps */
1724 ceph_decode_32_safe(&p, end, nr_maps, bad); 1941 ceph_decode_32_safe(&p, end, nr_maps, bad);
1725 dout(" %d inc maps\n", nr_maps); 1942 dout(" %d inc maps\n", nr_maps);
@@ -1744,7 +1961,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1744 ceph_osdmap_destroy(osdc->osdmap); 1961 ceph_osdmap_destroy(osdc->osdmap);
1745 osdc->osdmap = newmap; 1962 osdc->osdmap = newmap;
1746 } 1963 }
1747 kick_requests(osdc, 0); 1964 was_full = was_full ||
1965 ceph_osdmap_flag(osdc->osdmap,
1966 CEPH_OSDMAP_FULL);
1967 kick_requests(osdc, 0, was_full);
1748 } else { 1968 } else {
1749 dout("ignoring incremental map %u len %d\n", 1969 dout("ignoring incremental map %u len %d\n",
1750 epoch, maplen); 1970 epoch, maplen);
@@ -1787,7 +2007,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1787 skipped_map = 1; 2007 skipped_map = 1;
1788 ceph_osdmap_destroy(oldmap); 2008 ceph_osdmap_destroy(oldmap);
1789 } 2009 }
1790 kick_requests(osdc, skipped_map); 2010 was_full = was_full ||
2011 ceph_osdmap_flag(osdc->osdmap,
2012 CEPH_OSDMAP_FULL);
2013 kick_requests(osdc, skipped_map, was_full);
1791 } 2014 }
1792 p += maplen; 2015 p += maplen;
1793 nr_maps--; 2016 nr_maps--;
@@ -1804,7 +2027,9 @@ done:
1804 * we find out when we are no longer full and stop returning 2027 * we find out when we are no longer full and stop returning
1805 * ENOSPC. 2028 * ENOSPC.
1806 */ 2029 */
1807 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 2030 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
2031 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
2032 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
1808 ceph_monc_request_next_osdmap(&osdc->client->monc); 2033 ceph_monc_request_next_osdmap(&osdc->client->monc);
1809 2034
1810 mutex_lock(&osdc->request_mutex); 2035 mutex_lock(&osdc->request_mutex);
@@ -2068,10 +2293,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
2068 ceph_encode_32(&p, -1); /* preferred */ 2293 ceph_encode_32(&p, -1); /* preferred */
2069 2294
2070 /* oid */ 2295 /* oid */
2071 ceph_encode_32(&p, req->r_oid_len); 2296 ceph_encode_32(&p, req->r_base_oid.name_len);
2072 memcpy(p, req->r_oid, req->r_oid_len); 2297 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
2073 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 2298 dout("oid '%.*s' len %d\n", req->r_base_oid.name_len,
2074 p += req->r_oid_len; 2299 req->r_base_oid.name, req->r_base_oid.name_len);
2300 p += req->r_base_oid.name_len;
2075 2301
2076 /* ops--can imply data */ 2302 /* ops--can imply data */
2077 ceph_encode_16(&p, (u16)req->r_num_ops); 2303 ceph_encode_16(&p, (u16)req->r_num_ops);
@@ -2454,7 +2680,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2454 struct ceph_osd_client *osdc = osd->o_osdc; 2680 struct ceph_osd_client *osdc = osd->o_osdc;
2455 struct ceph_msg *m; 2681 struct ceph_msg *m;
2456 struct ceph_osd_request *req; 2682 struct ceph_osd_request *req;
2457 int front = le32_to_cpu(hdr->front_len); 2683 int front_len = le32_to_cpu(hdr->front_len);
2458 int data_len = le32_to_cpu(hdr->data_len); 2684 int data_len = le32_to_cpu(hdr->data_len);
2459 u64 tid; 2685 u64 tid;
2460 2686
@@ -2474,12 +2700,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2474 req->r_reply, req->r_reply->con); 2700 req->r_reply, req->r_reply->con);
2475 ceph_msg_revoke_incoming(req->r_reply); 2701 ceph_msg_revoke_incoming(req->r_reply);
2476 2702
2477 if (front > req->r_reply->front.iov_len) { 2703 if (front_len > req->r_reply->front_alloc_len) {
2478 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", 2704 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
2479 front, (int)req->r_reply->front.iov_len, 2705 front_len, req->r_reply->front_alloc_len,
2480 (unsigned int)con->peer_name.type, 2706 (unsigned int)con->peer_name.type,
2481 le64_to_cpu(con->peer_name.num)); 2707 le64_to_cpu(con->peer_name.num));
2482 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2708 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
2709 false);
2483 if (!m) 2710 if (!m)
2484 goto out; 2711 goto out;
2485 ceph_msg_put(req->r_reply); 2712 ceph_msg_put(req->r_reply);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index dbd9a4792427..aade4a5c1c07 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -464,6 +464,11 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
464 return NULL; 464 return NULL;
465} 465}
466 466
467struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
468{
469 return __lookup_pg_pool(&map->pg_pools, id);
470}
471
467const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) 472const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
468{ 473{
469 struct ceph_pg_pool_info *pi; 474 struct ceph_pg_pool_info *pi;
@@ -514,8 +519,8 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
514 pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); 519 pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
515 return -EINVAL; 520 return -EINVAL;
516 } 521 }
517 if (cv > 7) { 522 if (cv > 9) {
518 pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv); 523 pr_warning("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
519 return -EINVAL; 524 return -EINVAL;
520 } 525 }
521 len = ceph_decode_32(p); 526 len = ceph_decode_32(p);
@@ -543,12 +548,34 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
543 *p += len; 548 *p += len;
544 } 549 }
545 550
546 /* skip removed snaps */ 551 /* skip removed_snaps */
547 num = ceph_decode_32(p); 552 num = ceph_decode_32(p);
548 *p += num * (8 + 8); 553 *p += num * (8 + 8);
549 554
550 *p += 8; /* skip auid */ 555 *p += 8; /* skip auid */
551 pi->flags = ceph_decode_64(p); 556 pi->flags = ceph_decode_64(p);
557 *p += 4; /* skip crash_replay_interval */
558
559 if (ev >= 7)
560 *p += 1; /* skip min_size */
561
562 if (ev >= 8)
563 *p += 8 + 8; /* skip quota_max_* */
564
565 if (ev >= 9) {
566 /* skip tiers */
567 num = ceph_decode_32(p);
568 *p += num * 8;
569
570 *p += 8; /* skip tier_of */
571 *p += 1; /* skip cache_mode */
572
573 pi->read_tier = ceph_decode_64(p);
574 pi->write_tier = ceph_decode_64(p);
575 } else {
576 pi->read_tier = -1;
577 pi->write_tier = -1;
578 }
552 579
553 /* ignore the rest */ 580 /* ignore the rest */
554 581
@@ -1090,25 +1117,40 @@ invalid:
1090EXPORT_SYMBOL(ceph_calc_file_object_mapping); 1117EXPORT_SYMBOL(ceph_calc_file_object_mapping);
1091 1118
1092/* 1119/*
1093 * calculate an object layout (i.e. pgid) from an oid, 1120 * Calculate mapping of a (oloc, oid) pair to a PG. Should only be
1094 * file_layout, and osdmap 1121 * called with target's (oloc, oid), since tiering isn't taken into
1122 * account.
1095 */ 1123 */
1096int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, 1124int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
1097 struct ceph_osdmap *osdmap, uint64_t pool) 1125 struct ceph_object_locator *oloc,
1126 struct ceph_object_id *oid,
1127 struct ceph_pg *pg_out)
1098{ 1128{
1099 struct ceph_pg_pool_info *pool_info; 1129 struct ceph_pg_pool_info *pi;
1100 1130
1101 BUG_ON(!osdmap); 1131 pi = __lookup_pg_pool(&osdmap->pg_pools, oloc->pool);
1102 pool_info = __lookup_pg_pool(&osdmap->pg_pools, pool); 1132 if (!pi)
1103 if (!pool_info)
1104 return -EIO; 1133 return -EIO;
1105 pg->pool = pool;
1106 pg->seed = ceph_str_hash(pool_info->object_hash, oid, strlen(oid));
1107 1134
1108 dout("%s '%s' pgid %lld.%x\n", __func__, oid, pg->pool, pg->seed); 1135 pg_out->pool = oloc->pool;
1136 pg_out->seed = ceph_str_hash(pi->object_hash, oid->name,
1137 oid->name_len);
1138
1139 dout("%s '%.*s' pgid %llu.%x\n", __func__, oid->name_len, oid->name,
1140 pg_out->pool, pg_out->seed);
1109 return 0; 1141 return 0;
1110} 1142}
1111EXPORT_SYMBOL(ceph_calc_ceph_pg); 1143EXPORT_SYMBOL(ceph_oloc_oid_to_pg);
1144
1145static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x,
1146 int *result, int result_max,
1147 const __u32 *weight, int weight_max)
1148{
1149 int scratch[result_max * 3];
1150
1151 return crush_do_rule(map, ruleno, x, result, result_max,
1152 weight, weight_max, scratch);
1153}
1112 1154
1113/* 1155/*
1114 * Calculate raw osd vector for the given pgid. Return pointer to osd 1156 * Calculate raw osd vector for the given pgid. Return pointer to osd
@@ -1163,9 +1205,9 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1163 pool->pgp_num_mask) + 1205 pool->pgp_num_mask) +
1164 (unsigned)pgid.pool; 1206 (unsigned)pgid.pool;
1165 } 1207 }
1166 r = crush_do_rule(osdmap->crush, ruleno, pps, osds, 1208 r = crush_do_rule_ary(osdmap->crush, ruleno, pps,
1167 min_t(int, pool->size, *num), 1209 osds, min_t(int, pool->size, *num),
1168 osdmap->osd_weight); 1210 osdmap->osd_weight, osdmap->max_osd);
1169 if (r < 0) { 1211 if (r < 0) {
1170 pr_err("error %d from crush rule: pool %lld ruleset %d type %d" 1212 pr_err("error %d from crush rule: pool %lld ruleset %d type %d"
1171 " size %d\n", r, pgid.pool, pool->crush_ruleset, 1213 " size %d\n", r, pgid.pool, pool->crush_ruleset,