aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-01-28 14:02:23 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2014-01-28 14:02:23 -0500
commitd891ea23d5203e5c47439b2a174f86a00b356a6c (patch)
tree3876cefcced9df5519f437cd8eb275cb979b93f6 /net
parent08d21b5f93eb92a781daea71b6fcb3a340909141 (diff)
parent125d725c923527a85876c031028c7f55c28b74b3 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull ceph updates from Sage Weil: "This is a big batch. From Ilya we have: - rbd support for more than ~250 mapped devices (now uses same scheme that SCSI does for device major/minor numbering) - crush updates for new mapping behaviors (will be needed for coming erasure coding support, among other things) - preliminary support for tiered storage pools There is also a big series fixing a pile cephfs bugs with clustered MDSs from Yan Zheng, ACL support for cephfs from Guangliang Zhao, ceph fscache improvements from Li Wang, improved behavior when we get ENOSPC from Josh Durgin, some readv/writev improvements from Majianpeng, and the usual mix of small cleanups" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (76 commits) ceph: cast PAGE_SIZE to size_t in ceph_sync_write() ceph: fix dout() compile warnings in ceph_filemap_fault() libceph: support CEPH_FEATURE_OSD_CACHEPOOL feature libceph: follow redirect replies from osds libceph: rename ceph_osd_request::r_{oloc,oid} to r_base_{oloc,oid} libceph: follow {read,write}_tier fields on osd request submission libceph: add ceph_pg_pool_by_id() libceph: CEPH_OSD_FLAG_* enum update libceph: replace ceph_calc_ceph_pg() with ceph_oloc_oid_to_pg() libceph: introduce and start using oid abstraction libceph: rename MAX_OBJ_NAME_SIZE to CEPH_MAX_OID_NAME_LEN libceph: move ceph_file_layout helpers to ceph_fs.h libceph: start using oloc abstraction libceph: dout() is missing a newline libceph: add ceph_kv{malloc,free}() and switch to them libceph: support CEPH_FEATURE_EXPORT_PEER ceph: add imported caps when handling cap export message ceph: add open export target session helper ceph: remove exported caps when handling cap import message ceph: handle session flush message ...
Diffstat (limited to 'net')
-rw-r--r--net/ceph/buffer.c22
-rw-r--r--net/ceph/ceph_common.c24
-rw-r--r--net/ceph/crush/crush.c7
-rw-r--r--net/ceph/crush/mapper.c336
-rw-r--r--net/ceph/debugfs.c3
-rw-r--r--net/ceph/messenger.c32
-rw-r--r--net/ceph/mon_client.c8
-rw-r--r--net/ceph/osd_client.c283
-rw-r--r--net/ceph/osdmap.c78
9 files changed, 636 insertions, 157 deletions
diff --git a/net/ceph/buffer.c b/net/ceph/buffer.c
index bf3e6a13c215..621b5f65407f 100644
--- a/net/ceph/buffer.c
+++ b/net/ceph/buffer.c
@@ -6,6 +6,7 @@
6 6
7#include <linux/ceph/buffer.h> 7#include <linux/ceph/buffer.h>
8#include <linux/ceph/decode.h> 8#include <linux/ceph/decode.h>
9#include <linux/ceph/libceph.h> /* for ceph_kv{malloc,free} */
9 10
10struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp) 11struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)
11{ 12{
@@ -15,16 +16,10 @@ struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)
15 if (!b) 16 if (!b)
16 return NULL; 17 return NULL;
17 18
18 b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN); 19 b->vec.iov_base = ceph_kvmalloc(len, gfp);
19 if (b->vec.iov_base) { 20 if (!b->vec.iov_base) {
20 b->is_vmalloc = false; 21 kfree(b);
21 } else { 22 return NULL;
22 b->vec.iov_base = __vmalloc(len, gfp | __GFP_HIGHMEM, PAGE_KERNEL);
23 if (!b->vec.iov_base) {
24 kfree(b);
25 return NULL;
26 }
27 b->is_vmalloc = true;
28 } 23 }
29 24
30 kref_init(&b->kref); 25 kref_init(&b->kref);
@@ -40,12 +35,7 @@ void ceph_buffer_release(struct kref *kref)
40 struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref); 35 struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref);
41 36
42 dout("buffer_release %p\n", b); 37 dout("buffer_release %p\n", b);
43 if (b->vec.iov_base) { 38 ceph_kvfree(b->vec.iov_base);
44 if (b->is_vmalloc)
45 vfree(b->vec.iov_base);
46 else
47 kfree(b->vec.iov_base);
48 }
49 kfree(b); 39 kfree(b);
50} 40}
51EXPORT_SYMBOL(ceph_buffer_release); 41EXPORT_SYMBOL(ceph_buffer_release);
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 34b11ee8124e..67d7721d237e 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -15,6 +15,7 @@
15#include <linux/slab.h> 15#include <linux/slab.h>
16#include <linux/statfs.h> 16#include <linux/statfs.h>
17#include <linux/string.h> 17#include <linux/string.h>
18#include <linux/vmalloc.h>
18#include <linux/nsproxy.h> 19#include <linux/nsproxy.h>
19#include <net/net_namespace.h> 20#include <net/net_namespace.h>
20 21
@@ -170,6 +171,25 @@ int ceph_compare_options(struct ceph_options *new_opt,
170} 171}
171EXPORT_SYMBOL(ceph_compare_options); 172EXPORT_SYMBOL(ceph_compare_options);
172 173
174void *ceph_kvmalloc(size_t size, gfp_t flags)
175{
176 if (size <= (PAGE_SIZE << PAGE_ALLOC_COSTLY_ORDER)) {
177 void *ptr = kmalloc(size, flags | __GFP_NOWARN);
178 if (ptr)
179 return ptr;
180 }
181
182 return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL);
183}
184
185void ceph_kvfree(const void *ptr)
186{
187 if (is_vmalloc_addr(ptr))
188 vfree(ptr);
189 else
190 kfree(ptr);
191}
192
173 193
174static int parse_fsid(const char *str, struct ceph_fsid *fsid) 194static int parse_fsid(const char *str, struct ceph_fsid *fsid)
175{ 195{
@@ -461,8 +481,8 @@ EXPORT_SYMBOL(ceph_client_id);
461 * create a fresh client instance 481 * create a fresh client instance
462 */ 482 */
463struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, 483struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
464 unsigned int supported_features, 484 u64 supported_features,
465 unsigned int required_features) 485 u64 required_features)
466{ 486{
467 struct ceph_client *client; 487 struct ceph_client *client;
468 struct ceph_entity_addr *myaddr = NULL; 488 struct ceph_entity_addr *myaddr = NULL;
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 089613234f03..16bc199d9a62 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -116,11 +116,14 @@ void crush_destroy(struct crush_map *map)
116 if (map->rules) { 116 if (map->rules) {
117 __u32 b; 117 __u32 b;
118 for (b = 0; b < map->max_rules; b++) 118 for (b = 0; b < map->max_rules; b++)
119 kfree(map->rules[b]); 119 crush_destroy_rule(map->rules[b]);
120 kfree(map->rules); 120 kfree(map->rules);
121 } 121 }
122 122
123 kfree(map); 123 kfree(map);
124} 124}
125 125
126 126void crush_destroy_rule(struct crush_rule *rule)
127{
128 kfree(rule);
129}
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index cbd06a91941c..b703790b4e44 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -189,7 +189,7 @@ static int terminal(int x)
189static int bucket_tree_choose(struct crush_bucket_tree *bucket, 189static int bucket_tree_choose(struct crush_bucket_tree *bucket,
190 int x, int r) 190 int x, int r)
191{ 191{
192 int n, l; 192 int n;
193 __u32 w; 193 __u32 w;
194 __u64 t; 194 __u64 t;
195 195
@@ -197,6 +197,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket,
197 n = bucket->num_nodes >> 1; 197 n = bucket->num_nodes >> 1;
198 198
199 while (!terminal(n)) { 199 while (!terminal(n)) {
200 int l;
200 /* pick point in [0, w) */ 201 /* pick point in [0, w) */
201 w = bucket->node_weights[n]; 202 w = bucket->node_weights[n];
202 t = (__u64)crush_hash32_4(bucket->h.hash, x, n, r, 203 t = (__u64)crush_hash32_4(bucket->h.hash, x, n, r,
@@ -264,8 +265,12 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
264 * true if device is marked "out" (failed, fully offloaded) 265 * true if device is marked "out" (failed, fully offloaded)
265 * of the cluster 266 * of the cluster
266 */ 267 */
267static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x) 268static int is_out(const struct crush_map *map,
269 const __u32 *weight, int weight_max,
270 int item, int x)
268{ 271{
272 if (item >= weight_max)
273 return 1;
269 if (weight[item] >= 0x10000) 274 if (weight[item] >= 0x10000)
270 return 0; 275 return 0;
271 if (weight[item] == 0) 276 if (weight[item] == 0)
@@ -277,7 +282,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
277} 282}
278 283
279/** 284/**
280 * crush_choose - choose numrep distinct items of given type 285 * crush_choose_firstn - choose numrep distinct items of given type
281 * @map: the crush_map 286 * @map: the crush_map
282 * @bucket: the bucket we are choose an item from 287 * @bucket: the bucket we are choose an item from
283 * @x: crush input value 288 * @x: crush input value
@@ -285,18 +290,24 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
285 * @type: the type of item to choose 290 * @type: the type of item to choose
286 * @out: pointer to output vector 291 * @out: pointer to output vector
287 * @outpos: our position in that vector 292 * @outpos: our position in that vector
288 * @firstn: true if choosing "first n" items, false if choosing "indep" 293 * @tries: number of attempts to make
289 * @recurse_to_leaf: true if we want one device under each item of given type 294 * @recurse_tries: number of attempts to have recursive chooseleaf make
290 * @descend_once: true if we should only try one descent before giving up 295 * @local_tries: localized retries
296 * @local_fallback_tries: localized fallback retries
297 * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose)
291 * @out2: second output vector for leaf items (if @recurse_to_leaf) 298 * @out2: second output vector for leaf items (if @recurse_to_leaf)
292 */ 299 */
293static int crush_choose(const struct crush_map *map, 300static int crush_choose_firstn(const struct crush_map *map,
294 struct crush_bucket *bucket, 301 struct crush_bucket *bucket,
295 const __u32 *weight, 302 const __u32 *weight, int weight_max,
296 int x, int numrep, int type, 303 int x, int numrep, int type,
297 int *out, int outpos, 304 int *out, int outpos,
298 int firstn, int recurse_to_leaf, 305 unsigned int tries,
299 int descend_once, int *out2) 306 unsigned int recurse_tries,
307 unsigned int local_tries,
308 unsigned int local_fallback_tries,
309 int recurse_to_leaf,
310 int *out2)
300{ 311{
301 int rep; 312 int rep;
302 unsigned int ftotal, flocal; 313 unsigned int ftotal, flocal;
@@ -325,35 +336,17 @@ static int crush_choose(const struct crush_map *map,
325 collide = 0; 336 collide = 0;
326 retry_bucket = 0; 337 retry_bucket = 0;
327 r = rep; 338 r = rep;
328 if (in->alg == CRUSH_BUCKET_UNIFORM) { 339 /* r' = r + f_total */
329 /* be careful */ 340 r += ftotal;
330 if (firstn || (__u32)numrep >= in->size)
331 /* r' = r + f_total */
332 r += ftotal;
333 else if (in->size % numrep == 0)
334 /* r'=r+(n+1)*f_local */
335 r += (numrep+1) *
336 (flocal+ftotal);
337 else
338 /* r' = r + n*f_local */
339 r += numrep * (flocal+ftotal);
340 } else {
341 if (firstn)
342 /* r' = r + f_total */
343 r += ftotal;
344 else
345 /* r' = r + n*f_local */
346 r += numrep * (flocal+ftotal);
347 }
348 341
349 /* bucket choose */ 342 /* bucket choose */
350 if (in->size == 0) { 343 if (in->size == 0) {
351 reject = 1; 344 reject = 1;
352 goto reject; 345 goto reject;
353 } 346 }
354 if (map->choose_local_fallback_tries > 0 && 347 if (local_fallback_tries > 0 &&
355 flocal >= (in->size>>1) && 348 flocal >= (in->size>>1) &&
356 flocal > map->choose_local_fallback_tries) 349 flocal > local_fallback_tries)
357 item = bucket_perm_choose(in, x, r); 350 item = bucket_perm_choose(in, x, r);
358 else 351 else
359 item = crush_bucket_choose(in, x, r); 352 item = crush_bucket_choose(in, x, r);
@@ -394,13 +387,15 @@ static int crush_choose(const struct crush_map *map,
394 reject = 0; 387 reject = 0;
395 if (!collide && recurse_to_leaf) { 388 if (!collide && recurse_to_leaf) {
396 if (item < 0) { 389 if (item < 0) {
397 if (crush_choose(map, 390 if (crush_choose_firstn(map,
398 map->buckets[-1-item], 391 map->buckets[-1-item],
399 weight, 392 weight, weight_max,
400 x, outpos+1, 0, 393 x, outpos+1, 0,
401 out2, outpos, 394 out2, outpos,
402 firstn, 0, 395 recurse_tries, 0,
403 map->chooseleaf_descend_once, 396 local_tries,
397 local_fallback_tries,
398 0,
404 NULL) <= outpos) 399 NULL) <= outpos)
405 /* didn't get leaf */ 400 /* didn't get leaf */
406 reject = 1; 401 reject = 1;
@@ -414,6 +409,7 @@ static int crush_choose(const struct crush_map *map,
414 /* out? */ 409 /* out? */
415 if (itemtype == 0) 410 if (itemtype == 0)
416 reject = is_out(map, weight, 411 reject = is_out(map, weight,
412 weight_max,
417 item, x); 413 item, x);
418 else 414 else
419 reject = 0; 415 reject = 0;
@@ -424,17 +420,14 @@ reject:
424 ftotal++; 420 ftotal++;
425 flocal++; 421 flocal++;
426 422
427 if (reject && descend_once) 423 if (collide && flocal <= local_tries)
428 /* let outer call try again */
429 skip_rep = 1;
430 else if (collide && flocal <= map->choose_local_tries)
431 /* retry locally a few times */ 424 /* retry locally a few times */
432 retry_bucket = 1; 425 retry_bucket = 1;
433 else if (map->choose_local_fallback_tries > 0 && 426 else if (local_fallback_tries > 0 &&
434 flocal <= in->size + map->choose_local_fallback_tries) 427 flocal <= in->size + local_fallback_tries)
435 /* exhaustive bucket search */ 428 /* exhaustive bucket search */
436 retry_bucket = 1; 429 retry_bucket = 1;
437 else if (ftotal <= map->choose_total_tries) 430 else if (ftotal <= tries)
438 /* then retry descent */ 431 /* then retry descent */
439 retry_descent = 1; 432 retry_descent = 1;
440 else 433 else
@@ -464,21 +457,179 @@ reject:
464 457
465 458
466/** 459/**
460 * crush_choose_indep: alternative breadth-first positionally stable mapping
461 *
462 */
463static void crush_choose_indep(const struct crush_map *map,
464 struct crush_bucket *bucket,
465 const __u32 *weight, int weight_max,
466 int x, int left, int numrep, int type,
467 int *out, int outpos,
468 unsigned int tries,
469 unsigned int recurse_tries,
470 int recurse_to_leaf,
471 int *out2,
472 int parent_r)
473{
474 struct crush_bucket *in = bucket;
475 int endpos = outpos + left;
476 int rep;
477 unsigned int ftotal;
478 int r;
479 int i;
480 int item = 0;
481 int itemtype;
482 int collide;
483
484 dprintk("CHOOSE%s INDEP bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
485 bucket->id, x, outpos, numrep);
486
487 /* initially my result is undefined */
488 for (rep = outpos; rep < endpos; rep++) {
489 out[rep] = CRUSH_ITEM_UNDEF;
490 if (out2)
491 out2[rep] = CRUSH_ITEM_UNDEF;
492 }
493
494 for (ftotal = 0; left > 0 && ftotal < tries; ftotal++) {
495 for (rep = outpos; rep < endpos; rep++) {
496 if (out[rep] != CRUSH_ITEM_UNDEF)
497 continue;
498
499 in = bucket; /* initial bucket */
500
501 /* choose through intervening buckets */
502 for (;;) {
503 /* note: we base the choice on the position
504 * even in the nested call. that means that
505 * if the first layer chooses the same bucket
506 * in a different position, we will tend to
507 * choose a different item in that bucket.
508 * this will involve more devices in data
509 * movement and tend to distribute the load.
510 */
511 r = rep + parent_r;
512
513 /* be careful */
514 if (in->alg == CRUSH_BUCKET_UNIFORM &&
515 in->size % numrep == 0)
516 /* r'=r+(n+1)*f_total */
517 r += (numrep+1) * ftotal;
518 else
519 /* r' = r + n*f_total */
520 r += numrep * ftotal;
521
522 /* bucket choose */
523 if (in->size == 0) {
524 dprintk(" empty bucket\n");
525 break;
526 }
527
528 item = crush_bucket_choose(in, x, r);
529 if (item >= map->max_devices) {
530 dprintk(" bad item %d\n", item);
531 out[rep] = CRUSH_ITEM_NONE;
532 if (out2)
533 out2[rep] = CRUSH_ITEM_NONE;
534 left--;
535 break;
536 }
537
538 /* desired type? */
539 if (item < 0)
540 itemtype = map->buckets[-1-item]->type;
541 else
542 itemtype = 0;
543 dprintk(" item %d type %d\n", item, itemtype);
544
545 /* keep going? */
546 if (itemtype != type) {
547 if (item >= 0 ||
548 (-1-item) >= map->max_buckets) {
549 dprintk(" bad item type %d\n", type);
550 out[rep] = CRUSH_ITEM_NONE;
551 if (out2)
552 out2[rep] =
553 CRUSH_ITEM_NONE;
554 left--;
555 break;
556 }
557 in = map->buckets[-1-item];
558 continue;
559 }
560
561 /* collision? */
562 collide = 0;
563 for (i = outpos; i < endpos; i++) {
564 if (out[i] == item) {
565 collide = 1;
566 break;
567 }
568 }
569 if (collide)
570 break;
571
572 if (recurse_to_leaf) {
573 if (item < 0) {
574 crush_choose_indep(map,
575 map->buckets[-1-item],
576 weight, weight_max,
577 x, 1, numrep, 0,
578 out2, rep,
579 recurse_tries, 0,
580 0, NULL, r);
581 if (out2[rep] == CRUSH_ITEM_NONE) {
582 /* placed nothing; no leaf */
583 break;
584 }
585 } else {
586 /* we already have a leaf! */
587 out2[rep] = item;
588 }
589 }
590
591 /* out? */
592 if (itemtype == 0 &&
593 is_out(map, weight, weight_max, item, x))
594 break;
595
596 /* yay! */
597 out[rep] = item;
598 left--;
599 break;
600 }
601 }
602 }
603 for (rep = outpos; rep < endpos; rep++) {
604 if (out[rep] == CRUSH_ITEM_UNDEF) {
605 out[rep] = CRUSH_ITEM_NONE;
606 }
607 if (out2 && out2[rep] == CRUSH_ITEM_UNDEF) {
608 out2[rep] = CRUSH_ITEM_NONE;
609 }
610 }
611}
612
613/**
467 * crush_do_rule - calculate a mapping with the given input and rule 614 * crush_do_rule - calculate a mapping with the given input and rule
468 * @map: the crush_map 615 * @map: the crush_map
469 * @ruleno: the rule id 616 * @ruleno: the rule id
470 * @x: hash input 617 * @x: hash input
471 * @result: pointer to result vector 618 * @result: pointer to result vector
472 * @result_max: maximum result size 619 * @result_max: maximum result size
620 * @weight: weight vector (for map leaves)
621 * @weight_max: size of weight vector
622 * @scratch: scratch vector for private use; must be >= 3 * result_max
473 */ 623 */
474int crush_do_rule(const struct crush_map *map, 624int crush_do_rule(const struct crush_map *map,
475 int ruleno, int x, int *result, int result_max, 625 int ruleno, int x, int *result, int result_max,
476 const __u32 *weight) 626 const __u32 *weight, int weight_max,
627 int *scratch)
477{ 628{
478 int result_len; 629 int result_len;
479 int a[CRUSH_MAX_SET]; 630 int *a = scratch;
480 int b[CRUSH_MAX_SET]; 631 int *b = scratch + result_max;
481 int c[CRUSH_MAX_SET]; 632 int *c = scratch + result_max*2;
482 int recurse_to_leaf; 633 int recurse_to_leaf;
483 int *w; 634 int *w;
484 int wsize = 0; 635 int wsize = 0;
@@ -489,8 +640,10 @@ int crush_do_rule(const struct crush_map *map,
489 __u32 step; 640 __u32 step;
490 int i, j; 641 int i, j;
491 int numrep; 642 int numrep;
492 int firstn; 643 int choose_tries = map->choose_total_tries;
493 const int descend_once = 0; 644 int choose_local_tries = map->choose_local_tries;
645 int choose_local_fallback_tries = map->choose_local_fallback_tries;
646 int choose_leaf_tries = 0;
494 647
495 if ((__u32)ruleno >= map->max_rules) { 648 if ((__u32)ruleno >= map->max_rules) {
496 dprintk(" bad ruleno %d\n", ruleno); 649 dprintk(" bad ruleno %d\n", ruleno);
@@ -503,29 +656,49 @@ int crush_do_rule(const struct crush_map *map,
503 o = b; 656 o = b;
504 657
505 for (step = 0; step < rule->len; step++) { 658 for (step = 0; step < rule->len; step++) {
659 int firstn = 0;
506 struct crush_rule_step *curstep = &rule->steps[step]; 660 struct crush_rule_step *curstep = &rule->steps[step];
507 661
508 firstn = 0;
509 switch (curstep->op) { 662 switch (curstep->op) {
510 case CRUSH_RULE_TAKE: 663 case CRUSH_RULE_TAKE:
511 w[0] = curstep->arg1; 664 w[0] = curstep->arg1;
512 wsize = 1; 665 wsize = 1;
513 break; 666 break;
514 667
515 case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: 668 case CRUSH_RULE_SET_CHOOSE_TRIES:
669 if (curstep->arg1 > 0)
670 choose_tries = curstep->arg1;
671 break;
672
673 case CRUSH_RULE_SET_CHOOSELEAF_TRIES:
674 if (curstep->arg1 > 0)
675 choose_leaf_tries = curstep->arg1;
676 break;
677
678 case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES:
679 if (curstep->arg1 > 0)
680 choose_local_tries = curstep->arg1;
681 break;
682
683 case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES:
684 if (curstep->arg1 > 0)
685 choose_local_fallback_tries = curstep->arg1;
686 break;
687
688 case CRUSH_RULE_CHOOSELEAF_FIRSTN:
516 case CRUSH_RULE_CHOOSE_FIRSTN: 689 case CRUSH_RULE_CHOOSE_FIRSTN:
517 firstn = 1; 690 firstn = 1;
518 /* fall through */ 691 /* fall through */
519 case CRUSH_RULE_CHOOSE_LEAF_INDEP: 692 case CRUSH_RULE_CHOOSELEAF_INDEP:
520 case CRUSH_RULE_CHOOSE_INDEP: 693 case CRUSH_RULE_CHOOSE_INDEP:
521 if (wsize == 0) 694 if (wsize == 0)
522 break; 695 break;
523 696
524 recurse_to_leaf = 697 recurse_to_leaf =
525 curstep->op == 698 curstep->op ==
526 CRUSH_RULE_CHOOSE_LEAF_FIRSTN || 699 CRUSH_RULE_CHOOSELEAF_FIRSTN ||
527 curstep->op == 700 curstep->op ==
528 CRUSH_RULE_CHOOSE_LEAF_INDEP; 701 CRUSH_RULE_CHOOSELEAF_INDEP;
529 702
530 /* reset output */ 703 /* reset output */
531 osize = 0; 704 osize = 0;
@@ -543,22 +716,51 @@ int crush_do_rule(const struct crush_map *map,
543 continue; 716 continue;
544 } 717 }
545 j = 0; 718 j = 0;
546 osize += crush_choose(map, 719 if (firstn) {
547 map->buckets[-1-w[i]], 720 int recurse_tries;
548 weight, 721 if (choose_leaf_tries)
549 x, numrep, 722 recurse_tries =
550 curstep->arg2, 723 choose_leaf_tries;
551 o+osize, j, 724 else if (map->chooseleaf_descend_once)
552 firstn, 725 recurse_tries = 1;
553 recurse_to_leaf, 726 else
554 descend_once, c+osize); 727 recurse_tries = choose_tries;
728 osize += crush_choose_firstn(
729 map,
730 map->buckets[-1-w[i]],
731 weight, weight_max,
732 x, numrep,
733 curstep->arg2,
734 o+osize, j,
735 choose_tries,
736 recurse_tries,
737 choose_local_tries,
738 choose_local_fallback_tries,
739 recurse_to_leaf,
740 c+osize);
741 } else {
742 crush_choose_indep(
743 map,
744 map->buckets[-1-w[i]],
745 weight, weight_max,
746 x, numrep, numrep,
747 curstep->arg2,
748 o+osize, j,
749 choose_tries,
750 choose_leaf_tries ?
751 choose_leaf_tries : 1,
752 recurse_to_leaf,
753 c+osize,
754 0);
755 osize += numrep;
756 }
555 } 757 }
556 758
557 if (recurse_to_leaf) 759 if (recurse_to_leaf)
558 /* copy final _leaf_ values to output set */ 760 /* copy final _leaf_ values to output set */
559 memcpy(o, c, osize*sizeof(*o)); 761 memcpy(o, c, osize*sizeof(*o));
560 762
561 /* swap t and w arrays */ 763 /* swap o and w arrays */
562 tmp = o; 764 tmp = o;
563 o = w; 765 o = w;
564 w = tmp; 766 w = tmp;
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 83661cdc0766..258a382e75ed 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -132,7 +132,8 @@ static int osdc_show(struct seq_file *s, void *pp)
132 req->r_osd ? req->r_osd->o_osd : -1, 132 req->r_osd ? req->r_osd->o_osd : -1,
133 req->r_pgid.pool, req->r_pgid.seed); 133 req->r_pgid.pool, req->r_pgid.seed);
134 134
135 seq_printf(s, "%.*s", req->r_oid_len, req->r_oid); 135 seq_printf(s, "%.*s", req->r_base_oid.name_len,
136 req->r_base_oid.name);
136 137
137 if (req->r_reassert_version.epoch) 138 if (req->r_reassert_version.epoch)
138 seq_printf(s, "\t%u'%llu", 139 seq_printf(s, "\t%u'%llu",
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 4a5df7b1cc9f..2ed1304d22a7 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -15,6 +15,7 @@
15#include <linux/dns_resolver.h> 15#include <linux/dns_resolver.h>
16#include <net/tcp.h> 16#include <net/tcp.h>
17 17
18#include <linux/ceph/ceph_features.h>
18#include <linux/ceph/libceph.h> 19#include <linux/ceph/libceph.h>
19#include <linux/ceph/messenger.h> 20#include <linux/ceph/messenger.h>
20#include <linux/ceph/decode.h> 21#include <linux/ceph/decode.h>
@@ -1865,7 +1866,9 @@ int ceph_parse_ips(const char *c, const char *end,
1865 port = (port * 10) + (*p - '0'); 1866 port = (port * 10) + (*p - '0');
1866 p++; 1867 p++;
1867 } 1868 }
1868 if (port > 65535 || port == 0) 1869 if (port == 0)
1870 port = CEPH_MON_PORT;
1871 else if (port > 65535)
1869 goto bad; 1872 goto bad;
1870 } else { 1873 } else {
1871 port = CEPH_MON_PORT; 1874 port = CEPH_MON_PORT;
@@ -1945,7 +1948,8 @@ static int process_connect(struct ceph_connection *con)
1945{ 1948{
1946 u64 sup_feat = con->msgr->supported_features; 1949 u64 sup_feat = con->msgr->supported_features;
1947 u64 req_feat = con->msgr->required_features; 1950 u64 req_feat = con->msgr->required_features;
1948 u64 server_feat = le64_to_cpu(con->in_reply.features); 1951 u64 server_feat = ceph_sanitize_features(
1952 le64_to_cpu(con->in_reply.features));
1949 int ret; 1953 int ret;
1950 1954
1951 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1955 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -2853,8 +2857,8 @@ static void con_fault(struct ceph_connection *con)
2853 */ 2857 */
2854void ceph_messenger_init(struct ceph_messenger *msgr, 2858void ceph_messenger_init(struct ceph_messenger *msgr,
2855 struct ceph_entity_addr *myaddr, 2859 struct ceph_entity_addr *myaddr,
2856 u32 supported_features, 2860 u64 supported_features,
2857 u32 required_features, 2861 u64 required_features,
2858 bool nocrc) 2862 bool nocrc)
2859{ 2863{
2860 msgr->supported_features = supported_features; 2864 msgr->supported_features = supported_features;
@@ -3126,15 +3130,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3126 INIT_LIST_HEAD(&m->data); 3130 INIT_LIST_HEAD(&m->data);
3127 3131
3128 /* front */ 3132 /* front */
3129 m->front_max = front_len;
3130 if (front_len) { 3133 if (front_len) {
3131 if (front_len > PAGE_CACHE_SIZE) { 3134 m->front.iov_base = ceph_kvmalloc(front_len, flags);
3132 m->front.iov_base = __vmalloc(front_len, flags,
3133 PAGE_KERNEL);
3134 m->front_is_vmalloc = true;
3135 } else {
3136 m->front.iov_base = kmalloc(front_len, flags);
3137 }
3138 if (m->front.iov_base == NULL) { 3135 if (m->front.iov_base == NULL) {
3139 dout("ceph_msg_new can't allocate %d bytes\n", 3136 dout("ceph_msg_new can't allocate %d bytes\n",
3140 front_len); 3137 front_len);
@@ -3143,7 +3140,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3143 } else { 3140 } else {
3144 m->front.iov_base = NULL; 3141 m->front.iov_base = NULL;
3145 } 3142 }
3146 m->front.iov_len = front_len; 3143 m->front_alloc_len = m->front.iov_len = front_len;
3147 3144
3148 dout("ceph_msg_new %p front %d\n", m, front_len); 3145 dout("ceph_msg_new %p front %d\n", m, front_len);
3149 return m; 3146 return m;
@@ -3256,10 +3253,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
3256void ceph_msg_kfree(struct ceph_msg *m) 3253void ceph_msg_kfree(struct ceph_msg *m)
3257{ 3254{
3258 dout("msg_kfree %p\n", m); 3255 dout("msg_kfree %p\n", m);
3259 if (m->front_is_vmalloc) 3256 ceph_kvfree(m->front.iov_base);
3260 vfree(m->front.iov_base);
3261 else
3262 kfree(m->front.iov_base);
3263 kmem_cache_free(ceph_msg_cache, m); 3257 kmem_cache_free(ceph_msg_cache, m);
3264} 3258}
3265 3259
@@ -3301,8 +3295,8 @@ EXPORT_SYMBOL(ceph_msg_last_put);
3301 3295
3302void ceph_msg_dump(struct ceph_msg *msg) 3296void ceph_msg_dump(struct ceph_msg *msg)
3303{ 3297{
3304 pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, 3298 pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg,
3305 msg->front_max, msg->data_length); 3299 msg->front_alloc_len, msg->data_length);
3306 print_hex_dump(KERN_DEBUG, "header: ", 3300 print_hex_dump(KERN_DEBUG, "header: ",
3307 DUMP_PREFIX_OFFSET, 16, 1, 3301 DUMP_PREFIX_OFFSET, 16, 1,
3308 &msg->hdr, sizeof(msg->hdr), true); 3302 &msg->hdr, sizeof(msg->hdr), true);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 1fe25cd29d0e..2ac9ef35110b 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -152,7 +152,7 @@ static int __open_session(struct ceph_mon_client *monc)
152 /* initiatiate authentication handshake */ 152 /* initiatiate authentication handshake */
153 ret = ceph_auth_build_hello(monc->auth, 153 ret = ceph_auth_build_hello(monc->auth,
154 monc->m_auth->front.iov_base, 154 monc->m_auth->front.iov_base,
155 monc->m_auth->front_max); 155 monc->m_auth->front_alloc_len);
156 __send_prepared_auth_request(monc, ret); 156 __send_prepared_auth_request(monc, ret);
157 } else { 157 } else {
158 dout("open_session mon%d already open\n", monc->cur_mon); 158 dout("open_session mon%d already open\n", monc->cur_mon);
@@ -196,7 +196,7 @@ static void __send_subscribe(struct ceph_mon_client *monc)
196 int num; 196 int num;
197 197
198 p = msg->front.iov_base; 198 p = msg->front.iov_base;
199 end = p + msg->front_max; 199 end = p + msg->front_alloc_len;
200 200
201 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 201 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
202 ceph_encode_32(&p, num); 202 ceph_encode_32(&p, num);
@@ -897,7 +897,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
897 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 897 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
898 msg->front.iov_len, 898 msg->front.iov_len,
899 monc->m_auth->front.iov_base, 899 monc->m_auth->front.iov_base,
900 monc->m_auth->front_max); 900 monc->m_auth->front_alloc_len);
901 if (ret < 0) { 901 if (ret < 0) {
902 monc->client->auth_err = ret; 902 monc->client->auth_err = ret;
903 wake_up_all(&monc->client->auth_wq); 903 wake_up_all(&monc->client->auth_wq);
@@ -939,7 +939,7 @@ static int __validate_auth(struct ceph_mon_client *monc)
939 return 0; 939 return 0;
940 940
941 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 941 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
942 monc->m_auth->front_max); 942 monc->m_auth->front_alloc_len);
943 if (ret <= 0) 943 if (ret <= 0)
944 return ret; /* either an error, or no need to authenticate */ 944 return ret; /* either an error, or no need to authenticate */
945 __send_prepared_auth_request(monc, ret); 945 __send_prepared_auth_request(monc, ret);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2b4b32aaa893..010ff3bd58ad 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
338 msg_size = 4 + 4 + 8 + 8 + 4+8; 338 msg_size = 4 + 4 + 8 + 8 + 4+8;
339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
340 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 340 msg_size += 1 + 8 + 4 + 4; /* pg_t */
341 msg_size += 4 + MAX_OBJ_NAME_SIZE; 341 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
342 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 342 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
343 msg_size += 8; /* snapid */ 343 msg_size += 8; /* snapid */
344 msg_size += 8; /* snap_seq */ 344 msg_size += 8; /* snap_seq */
@@ -368,6 +368,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
368 INIT_LIST_HEAD(&req->r_req_lru_item); 368 INIT_LIST_HEAD(&req->r_req_lru_item);
369 INIT_LIST_HEAD(&req->r_osd_item); 369 INIT_LIST_HEAD(&req->r_osd_item);
370 370
371 req->r_base_oloc.pool = -1;
372 req->r_target_oloc.pool = -1;
373
371 /* create reply message */ 374 /* create reply message */
372 if (use_mempool) 375 if (use_mempool)
373 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 376 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -761,11 +764,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
761 if (num_ops > 1) 764 if (num_ops > 1)
762 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); 765 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
763 766
764 req->r_file_layout = *layout; /* keep a copy */ 767 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
765 768
766 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", 769 snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
767 vino.ino, objnum); 770 "%llx.%08llx", vino.ino, objnum);
768 req->r_oid_len = strlen(req->r_oid); 771 req->r_base_oid.name_len = strlen(req->r_base_oid.name);
769 772
770 return req; 773 return req;
771} 774}
@@ -1044,8 +1047,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1044 !ceph_con_opened(&osd->o_con)) { 1047 !ceph_con_opened(&osd->o_con)) {
1045 struct ceph_osd_request *req; 1048 struct ceph_osd_request *req;
1046 1049
1047 dout(" osd addr hasn't changed and connection never opened," 1050 dout("osd addr hasn't changed and connection never opened, "
1048 " letting msgr retry"); 1051 "letting msgr retry\n");
1049 /* touch each r_stamp for handle_timeout()'s benfit */ 1052 /* touch each r_stamp for handle_timeout()'s benfit */
1050 list_for_each_entry(req, &osd->o_requests, r_osd_item) 1053 list_for_each_entry(req, &osd->o_requests, r_osd_item)
1051 req->r_stamp = jiffies; 1054 req->r_stamp = jiffies;
@@ -1232,6 +1235,61 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1232EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1235EXPORT_SYMBOL(ceph_osdc_set_request_linger);
1233 1236
1234/* 1237/*
1238 * Returns whether a request should be blocked from being sent
1239 * based on the current osdmap and osd_client settings.
1240 *
1241 * Caller should hold map_sem for read.
1242 */
1243static bool __req_should_be_paused(struct ceph_osd_client *osdc,
1244 struct ceph_osd_request *req)
1245{
1246 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
1247 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
1248 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
1249 return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
1250 (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
1251}
1252
1253/*
1254 * Calculate mapping of a request to a PG. Takes tiering into account.
1255 */
1256static int __calc_request_pg(struct ceph_osdmap *osdmap,
1257 struct ceph_osd_request *req,
1258 struct ceph_pg *pg_out)
1259{
1260 bool need_check_tiering;
1261
1262 need_check_tiering = false;
1263 if (req->r_target_oloc.pool == -1) {
1264 req->r_target_oloc = req->r_base_oloc; /* struct */
1265 need_check_tiering = true;
1266 }
1267 if (req->r_target_oid.name_len == 0) {
1268 ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
1269 need_check_tiering = true;
1270 }
1271
1272 if (need_check_tiering &&
1273 (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1274 struct ceph_pg_pool_info *pi;
1275
1276 pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
1277 if (pi) {
1278 if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
1279 pi->read_tier >= 0)
1280 req->r_target_oloc.pool = pi->read_tier;
1281 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1282 pi->write_tier >= 0)
1283 req->r_target_oloc.pool = pi->write_tier;
1284 }
1285 /* !pi is caught in ceph_oloc_oid_to_pg() */
1286 }
1287
1288 return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
1289 &req->r_target_oid, pg_out);
1290}
1291
1292/*
1235 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1293 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
1236 * (as needed), and set the request r_osd appropriately. If there is 1294 * (as needed), and set the request r_osd appropriately. If there is
1237 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1295 * no up osd, set r_osd to NULL. Move the request to the appropriate list
@@ -1248,10 +1306,11 @@ static int __map_request(struct ceph_osd_client *osdc,
1248 int acting[CEPH_PG_MAX_SIZE]; 1306 int acting[CEPH_PG_MAX_SIZE];
1249 int o = -1, num = 0; 1307 int o = -1, num = 0;
1250 int err; 1308 int err;
1309 bool was_paused;
1251 1310
1252 dout("map_request %p tid %lld\n", req, req->r_tid); 1311 dout("map_request %p tid %lld\n", req, req->r_tid);
1253 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 1312
1254 ceph_file_layout_pg_pool(req->r_file_layout)); 1313 err = __calc_request_pg(osdc->osdmap, req, &pgid);
1255 if (err) { 1314 if (err) {
1256 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1315 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1257 return err; 1316 return err;
@@ -1264,12 +1323,18 @@ static int __map_request(struct ceph_osd_client *osdc,
1264 num = err; 1323 num = err;
1265 } 1324 }
1266 1325
1326 was_paused = req->r_paused;
1327 req->r_paused = __req_should_be_paused(osdc, req);
1328 if (was_paused && !req->r_paused)
1329 force_resend = 1;
1330
1267 if ((!force_resend && 1331 if ((!force_resend &&
1268 req->r_osd && req->r_osd->o_osd == o && 1332 req->r_osd && req->r_osd->o_osd == o &&
1269 req->r_sent >= req->r_osd->o_incarnation && 1333 req->r_sent >= req->r_osd->o_incarnation &&
1270 req->r_num_pg_osds == num && 1334 req->r_num_pg_osds == num &&
1271 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1335 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
1272 (req->r_osd == NULL && o == -1)) 1336 (req->r_osd == NULL && o == -1) ||
1337 req->r_paused)
1273 return 0; /* no change */ 1338 return 0; /* no change */
1274 1339
1275 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1340 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
@@ -1331,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
1331 /* fill in message content that changes each time we send it */ 1396 /* fill in message content that changes each time we send it */
1332 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1397 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1333 put_unaligned_le32(req->r_flags, req->r_request_flags); 1398 put_unaligned_le32(req->r_flags, req->r_request_flags);
1334 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1399 put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
1335 p = req->r_request_pgid; 1400 p = req->r_request_pgid;
1336 ceph_encode_64(&p, req->r_pgid.pool); 1401 ceph_encode_64(&p, req->r_pgid.pool);
1337 ceph_encode_32(&p, req->r_pgid.seed); 1402 ceph_encode_32(&p, req->r_pgid.seed);
@@ -1432,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work)
1432 round_jiffies_relative(delay)); 1497 round_jiffies_relative(delay));
1433} 1498}
1434 1499
1500static int ceph_oloc_decode(void **p, void *end,
1501 struct ceph_object_locator *oloc)
1502{
1503 u8 struct_v, struct_cv;
1504 u32 len;
1505 void *struct_end;
1506 int ret = 0;
1507
1508 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1509 struct_v = ceph_decode_8(p);
1510 struct_cv = ceph_decode_8(p);
1511 if (struct_v < 3) {
1512 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
1513 struct_v, struct_cv);
1514 goto e_inval;
1515 }
1516 if (struct_cv > 6) {
1517 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
1518 struct_v, struct_cv);
1519 goto e_inval;
1520 }
1521 len = ceph_decode_32(p);
1522 ceph_decode_need(p, end, len, e_inval);
1523 struct_end = *p + len;
1524
1525 oloc->pool = ceph_decode_64(p);
1526 *p += 4; /* skip preferred */
1527
1528 len = ceph_decode_32(p);
1529 if (len > 0) {
1530 pr_warn("ceph_object_locator::key is set\n");
1531 goto e_inval;
1532 }
1533
1534 if (struct_v >= 5) {
1535 len = ceph_decode_32(p);
1536 if (len > 0) {
1537 pr_warn("ceph_object_locator::nspace is set\n");
1538 goto e_inval;
1539 }
1540 }
1541
1542 if (struct_v >= 6) {
1543 s64 hash = ceph_decode_64(p);
1544 if (hash != -1) {
1545 pr_warn("ceph_object_locator::hash is set\n");
1546 goto e_inval;
1547 }
1548 }
1549
1550 /* skip the rest */
1551 *p = struct_end;
1552out:
1553 return ret;
1554
1555e_inval:
1556 ret = -EINVAL;
1557 goto out;
1558}
1559
1560static int ceph_redirect_decode(void **p, void *end,
1561 struct ceph_request_redirect *redir)
1562{
1563 u8 struct_v, struct_cv;
1564 u32 len;
1565 void *struct_end;
1566 int ret;
1567
1568 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1569 struct_v = ceph_decode_8(p);
1570 struct_cv = ceph_decode_8(p);
1571 if (struct_cv > 1) {
1572 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
1573 struct_v, struct_cv);
1574 goto e_inval;
1575 }
1576 len = ceph_decode_32(p);
1577 ceph_decode_need(p, end, len, e_inval);
1578 struct_end = *p + len;
1579
1580 ret = ceph_oloc_decode(p, end, &redir->oloc);
1581 if (ret)
1582 goto out;
1583
1584 len = ceph_decode_32(p);
1585 if (len > 0) {
1586 pr_warn("ceph_request_redirect::object_name is set\n");
1587 goto e_inval;
1588 }
1589
1590 len = ceph_decode_32(p);
1591 *p += len; /* skip osd_instructions */
1592
1593 /* skip the rest */
1594 *p = struct_end;
1595out:
1596 return ret;
1597
1598e_inval:
1599 ret = -EINVAL;
1600 goto out;
1601}
1602
1435static void complete_request(struct ceph_osd_request *req) 1603static void complete_request(struct ceph_osd_request *req)
1436{ 1604{
1437 complete_all(&req->r_safe_completion); /* fsync waiter */ 1605 complete_all(&req->r_safe_completion); /* fsync waiter */
@@ -1446,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1446{ 1614{
1447 void *p, *end; 1615 void *p, *end;
1448 struct ceph_osd_request *req; 1616 struct ceph_osd_request *req;
1617 struct ceph_request_redirect redir;
1449 u64 tid; 1618 u64 tid;
1450 int object_len; 1619 int object_len;
1451 unsigned int numops; 1620 unsigned int numops;
@@ -1525,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1525 for (i = 0; i < numops; i++) 1694 for (i = 0; i < numops; i++)
1526 req->r_reply_op_result[i] = ceph_decode_32(&p); 1695 req->r_reply_op_result[i] = ceph_decode_32(&p);
1527 1696
1528 already_completed = req->r_got_reply; 1697 if (le16_to_cpu(msg->hdr.version) >= 6) {
1698 p += 8 + 4; /* skip replay_version */
1699 p += 8; /* skip user_version */
1529 1700
1530 if (!req->r_got_reply) { 1701 err = ceph_redirect_decode(&p, end, &redir);
1702 if (err)
1703 goto bad_put;
1704 } else {
1705 redir.oloc.pool = -1;
1706 }
1707
1708 if (redir.oloc.pool != -1) {
1709 dout("redirect pool %lld\n", redir.oloc.pool);
1710
1711 __unregister_request(osdc, req);
1712 mutex_unlock(&osdc->request_mutex);
1713
1714 req->r_target_oloc = redir.oloc; /* struct */
1715
1716 /*
1717 * Start redirect requests with nofail=true. If
1718 * mapping fails, request will end up on the notarget
1719 * list, waiting for the new osdmap (which can take
1720 * a while), even though the original request mapped
1721 * successfully. In the future we might want to follow
1722 * original request's nofail setting here.
1723 */
1724 err = ceph_osdc_start_request(osdc, req, true);
1725 BUG_ON(err);
1531 1726
1727 goto done;
1728 }
1729
1730 already_completed = req->r_got_reply;
1731 if (!req->r_got_reply) {
1532 req->r_result = result; 1732 req->r_result = result;
1533 dout("handle_reply result %d bytes %d\n", req->r_result, 1733 dout("handle_reply result %d bytes %d\n", req->r_result,
1534 bytes); 1734 bytes);
@@ -1581,6 +1781,13 @@ done:
1581 return; 1781 return;
1582 1782
1583bad_put: 1783bad_put:
1784 req->r_result = -EIO;
1785 __unregister_request(osdc, req);
1786 if (req->r_callback)
1787 req->r_callback(req, msg);
1788 else
1789 complete_all(&req->r_completion);
1790 complete_request(req);
1584 ceph_osdc_put_request(req); 1791 ceph_osdc_put_request(req);
1585bad_mutex: 1792bad_mutex:
1586 mutex_unlock(&osdc->request_mutex); 1793 mutex_unlock(&osdc->request_mutex);
@@ -1613,14 +1820,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
1613 * 1820 *
1614 * Caller should hold map_sem for read. 1821 * Caller should hold map_sem for read.
1615 */ 1822 */
1616static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1823static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
1824 bool force_resend_writes)
1617{ 1825{
1618 struct ceph_osd_request *req, *nreq; 1826 struct ceph_osd_request *req, *nreq;
1619 struct rb_node *p; 1827 struct rb_node *p;
1620 int needmap = 0; 1828 int needmap = 0;
1621 int err; 1829 int err;
1830 bool force_resend_req;
1622 1831
1623 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1832 dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
1833 force_resend_writes ? " (force resend writes)" : "");
1624 mutex_lock(&osdc->request_mutex); 1834 mutex_lock(&osdc->request_mutex);
1625 for (p = rb_first(&osdc->requests); p; ) { 1835 for (p = rb_first(&osdc->requests); p; ) {
1626 req = rb_entry(p, struct ceph_osd_request, r_node); 1836 req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -1645,7 +1855,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1645 continue; 1855 continue;
1646 } 1856 }
1647 1857
1648 err = __map_request(osdc, req, force_resend); 1858 force_resend_req = force_resend ||
1859 (force_resend_writes &&
1860 req->r_flags & CEPH_OSD_FLAG_WRITE);
1861 err = __map_request(osdc, req, force_resend_req);
1649 if (err < 0) 1862 if (err < 0)
1650 continue; /* error */ 1863 continue; /* error */
1651 if (req->r_osd == NULL) { 1864 if (req->r_osd == NULL) {
@@ -1665,7 +1878,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1665 r_linger_item) { 1878 r_linger_item) {
1666 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1879 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1667 1880
1668 err = __map_request(osdc, req, force_resend); 1881 err = __map_request(osdc, req,
1882 force_resend || force_resend_writes);
1669 dout("__map_request returned %d\n", err); 1883 dout("__map_request returned %d\n", err);
1670 if (err == 0) 1884 if (err == 0)
1671 continue; /* no change and no osd was specified */ 1885 continue; /* no change and no osd was specified */
@@ -1707,6 +1921,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1707 struct ceph_osdmap *newmap = NULL, *oldmap; 1921 struct ceph_osdmap *newmap = NULL, *oldmap;
1708 int err; 1922 int err;
1709 struct ceph_fsid fsid; 1923 struct ceph_fsid fsid;
1924 bool was_full;
1710 1925
1711 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1926 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1712 p = msg->front.iov_base; 1927 p = msg->front.iov_base;
@@ -1720,6 +1935,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1720 1935
1721 down_write(&osdc->map_sem); 1936 down_write(&osdc->map_sem);
1722 1937
1938 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
1939
1723 /* incremental maps */ 1940 /* incremental maps */
1724 ceph_decode_32_safe(&p, end, nr_maps, bad); 1941 ceph_decode_32_safe(&p, end, nr_maps, bad);
1725 dout(" %d inc maps\n", nr_maps); 1942 dout(" %d inc maps\n", nr_maps);
@@ -1744,7 +1961,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1744 ceph_osdmap_destroy(osdc->osdmap); 1961 ceph_osdmap_destroy(osdc->osdmap);
1745 osdc->osdmap = newmap; 1962 osdc->osdmap = newmap;
1746 } 1963 }
1747 kick_requests(osdc, 0); 1964 was_full = was_full ||
1965 ceph_osdmap_flag(osdc->osdmap,
1966 CEPH_OSDMAP_FULL);
1967 kick_requests(osdc, 0, was_full);
1748 } else { 1968 } else {
1749 dout("ignoring incremental map %u len %d\n", 1969 dout("ignoring incremental map %u len %d\n",
1750 epoch, maplen); 1970 epoch, maplen);
@@ -1787,7 +2007,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1787 skipped_map = 1; 2007 skipped_map = 1;
1788 ceph_osdmap_destroy(oldmap); 2008 ceph_osdmap_destroy(oldmap);
1789 } 2009 }
1790 kick_requests(osdc, skipped_map); 2010 was_full = was_full ||
2011 ceph_osdmap_flag(osdc->osdmap,
2012 CEPH_OSDMAP_FULL);
2013 kick_requests(osdc, skipped_map, was_full);
1791 } 2014 }
1792 p += maplen; 2015 p += maplen;
1793 nr_maps--; 2016 nr_maps--;
@@ -1804,7 +2027,9 @@ done:
1804 * we find out when we are no longer full and stop returning 2027 * we find out when we are no longer full and stop returning
1805 * ENOSPC. 2028 * ENOSPC.
1806 */ 2029 */
1807 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 2030 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
2031 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
2032 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
1808 ceph_monc_request_next_osdmap(&osdc->client->monc); 2033 ceph_monc_request_next_osdmap(&osdc->client->monc);
1809 2034
1810 mutex_lock(&osdc->request_mutex); 2035 mutex_lock(&osdc->request_mutex);
@@ -2068,10 +2293,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
2068 ceph_encode_32(&p, -1); /* preferred */ 2293 ceph_encode_32(&p, -1); /* preferred */
2069 2294
2070 /* oid */ 2295 /* oid */
2071 ceph_encode_32(&p, req->r_oid_len); 2296 ceph_encode_32(&p, req->r_base_oid.name_len);
2072 memcpy(p, req->r_oid, req->r_oid_len); 2297 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
2073 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 2298 dout("oid '%.*s' len %d\n", req->r_base_oid.name_len,
2074 p += req->r_oid_len; 2299 req->r_base_oid.name, req->r_base_oid.name_len);
2300 p += req->r_base_oid.name_len;
2075 2301
2076 /* ops--can imply data */ 2302 /* ops--can imply data */
2077 ceph_encode_16(&p, (u16)req->r_num_ops); 2303 ceph_encode_16(&p, (u16)req->r_num_ops);
@@ -2454,7 +2680,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2454 struct ceph_osd_client *osdc = osd->o_osdc; 2680 struct ceph_osd_client *osdc = osd->o_osdc;
2455 struct ceph_msg *m; 2681 struct ceph_msg *m;
2456 struct ceph_osd_request *req; 2682 struct ceph_osd_request *req;
2457 int front = le32_to_cpu(hdr->front_len); 2683 int front_len = le32_to_cpu(hdr->front_len);
2458 int data_len = le32_to_cpu(hdr->data_len); 2684 int data_len = le32_to_cpu(hdr->data_len);
2459 u64 tid; 2685 u64 tid;
2460 2686
@@ -2474,12 +2700,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2474 req->r_reply, req->r_reply->con); 2700 req->r_reply, req->r_reply->con);
2475 ceph_msg_revoke_incoming(req->r_reply); 2701 ceph_msg_revoke_incoming(req->r_reply);
2476 2702
2477 if (front > req->r_reply->front.iov_len) { 2703 if (front_len > req->r_reply->front_alloc_len) {
2478 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", 2704 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
2479 front, (int)req->r_reply->front.iov_len, 2705 front_len, req->r_reply->front_alloc_len,
2480 (unsigned int)con->peer_name.type, 2706 (unsigned int)con->peer_name.type,
2481 le64_to_cpu(con->peer_name.num)); 2707 le64_to_cpu(con->peer_name.num));
2482 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2708 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
2709 false);
2483 if (!m) 2710 if (!m)
2484 goto out; 2711 goto out;
2485 ceph_msg_put(req->r_reply); 2712 ceph_msg_put(req->r_reply);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index dbd9a4792427..aade4a5c1c07 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -464,6 +464,11 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
464 return NULL; 464 return NULL;
465} 465}
466 466
467struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
468{
469 return __lookup_pg_pool(&map->pg_pools, id);
470}
471
467const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) 472const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
468{ 473{
469 struct ceph_pg_pool_info *pi; 474 struct ceph_pg_pool_info *pi;
@@ -514,8 +519,8 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
514 pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); 519 pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
515 return -EINVAL; 520 return -EINVAL;
516 } 521 }
517 if (cv > 7) { 522 if (cv > 9) {
518 pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv); 523 pr_warning("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
519 return -EINVAL; 524 return -EINVAL;
520 } 525 }
521 len = ceph_decode_32(p); 526 len = ceph_decode_32(p);
@@ -543,12 +548,34 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
543 *p += len; 548 *p += len;
544 } 549 }
545 550
546 /* skip removed snaps */ 551 /* skip removed_snaps */
547 num = ceph_decode_32(p); 552 num = ceph_decode_32(p);
548 *p += num * (8 + 8); 553 *p += num * (8 + 8);
549 554
550 *p += 8; /* skip auid */ 555 *p += 8; /* skip auid */
551 pi->flags = ceph_decode_64(p); 556 pi->flags = ceph_decode_64(p);
557 *p += 4; /* skip crash_replay_interval */
558
559 if (ev >= 7)
560 *p += 1; /* skip min_size */
561
562 if (ev >= 8)
563 *p += 8 + 8; /* skip quota_max_* */
564
565 if (ev >= 9) {
566 /* skip tiers */
567 num = ceph_decode_32(p);
568 *p += num * 8;
569
570 *p += 8; /* skip tier_of */
571 *p += 1; /* skip cache_mode */
572
573 pi->read_tier = ceph_decode_64(p);
574 pi->write_tier = ceph_decode_64(p);
575 } else {
576 pi->read_tier = -1;
577 pi->write_tier = -1;
578 }
552 579
553 /* ignore the rest */ 580 /* ignore the rest */
554 581
@@ -1090,25 +1117,40 @@ invalid:
1090EXPORT_SYMBOL(ceph_calc_file_object_mapping); 1117EXPORT_SYMBOL(ceph_calc_file_object_mapping);
1091 1118
1092/* 1119/*
1093 * calculate an object layout (i.e. pgid) from an oid, 1120 * Calculate mapping of a (oloc, oid) pair to a PG. Should only be
1094 * file_layout, and osdmap 1121 * called with target's (oloc, oid), since tiering isn't taken into
1122 * account.
1095 */ 1123 */
1096int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, 1124int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
1097 struct ceph_osdmap *osdmap, uint64_t pool) 1125 struct ceph_object_locator *oloc,
1126 struct ceph_object_id *oid,
1127 struct ceph_pg *pg_out)
1098{ 1128{
1099 struct ceph_pg_pool_info *pool_info; 1129 struct ceph_pg_pool_info *pi;
1100 1130
1101 BUG_ON(!osdmap); 1131 pi = __lookup_pg_pool(&osdmap->pg_pools, oloc->pool);
1102 pool_info = __lookup_pg_pool(&osdmap->pg_pools, pool); 1132 if (!pi)
1103 if (!pool_info)
1104 return -EIO; 1133 return -EIO;
1105 pg->pool = pool;
1106 pg->seed = ceph_str_hash(pool_info->object_hash, oid, strlen(oid));
1107 1134
1108 dout("%s '%s' pgid %lld.%x\n", __func__, oid, pg->pool, pg->seed); 1135 pg_out->pool = oloc->pool;
1136 pg_out->seed = ceph_str_hash(pi->object_hash, oid->name,
1137 oid->name_len);
1138
1139 dout("%s '%.*s' pgid %llu.%x\n", __func__, oid->name_len, oid->name,
1140 pg_out->pool, pg_out->seed);
1109 return 0; 1141 return 0;
1110} 1142}
1111EXPORT_SYMBOL(ceph_calc_ceph_pg); 1143EXPORT_SYMBOL(ceph_oloc_oid_to_pg);
1144
1145static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x,
1146 int *result, int result_max,
1147 const __u32 *weight, int weight_max)
1148{
1149 int scratch[result_max * 3];
1150
1151 return crush_do_rule(map, ruleno, x, result, result_max,
1152 weight, weight_max, scratch);
1153}
1112 1154
1113/* 1155/*
1114 * Calculate raw osd vector for the given pgid. Return pointer to osd 1156 * Calculate raw osd vector for the given pgid. Return pointer to osd
@@ -1163,9 +1205,9 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1163 pool->pgp_num_mask) + 1205 pool->pgp_num_mask) +
1164 (unsigned)pgid.pool; 1206 (unsigned)pgid.pool;
1165 } 1207 }
1166 r = crush_do_rule(osdmap->crush, ruleno, pps, osds, 1208 r = crush_do_rule_ary(osdmap->crush, ruleno, pps,
1167 min_t(int, pool->size, *num), 1209 osds, min_t(int, pool->size, *num),
1168 osdmap->osd_weight); 1210 osdmap->osd_weight, osdmap->max_osd);
1169 if (r < 0) { 1211 if (r < 0) {
1170 pr_err("error %d from crush rule: pool %lld ruleset %d type %d" 1212 pr_err("error %d from crush rule: pool %lld ruleset %d type %d"
1171 " size %d\n", r, pgid.pool, pool->crush_ruleset, 1213 " size %d\n", r, pgid.pool, pool->crush_ruleset,