aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2012-05-30 14:17:19 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2012-05-30 14:17:19 -0400
commitaf56e0aa35f3ae2a4c1a6d1000702df1dd78cb76 (patch)
tree304bd85e5db2d07efa2913aa7c6313b918cfbfdb
parent65a50c951a38e9827dd9655b6e686bde912e799b (diff)
parent6bd9adbdf9ca6a052b0b7455ac67b925eb38cfad (diff)
Merge git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull ceph updates from Sage Weil: "There are some updates and cleanups to the CRUSH placement code, a bug fix with incremental maps, several cleanups and fixes from Josh Durgin in the RBD block device code, a series of cleanups and bug fixes from Alex Elder in the messenger code, and some miscellaneous bounds checking and gfp cleanups/fixes." Fix up trivial conflicts in net/ceph/{messenger.c,osdmap.c} due to the networking people preferring "unsigned int" over just "unsigned". * git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (45 commits) libceph: fix pg_temp updates libceph: avoid unregistering osd request when not registered ceph: add auth buf in prepare_write_connect() ceph: rename prepare_connect_authorizer() ceph: return pointer from prepare_connect_authorizer() ceph: use info returned by get_authorizer ceph: have get_authorizer methods return pointers ceph: ensure auth ops are defined before use ceph: messenger: reduce args to create_authorizer ceph: define ceph_auth_handshake type ceph: messenger: check return from get_authorizer ceph: messenger: rework prepare_connect_authorizer() ceph: messenger: check prepare_write_connect() result ceph: don't set WRITE_PENDING too early ceph: drop msgr argument from prepare_write_connect() ceph: messenger: send banner in process_connect() ceph: messenger: reset connection kvec caller libceph: don't reset kvec in prepare_write_banner() ceph: ignore preferred_osd field ceph: fully initialize new layout ...
-rw-r--r--Documentation/ABI/testing/sysfs-bus-rbd4
-rw-r--r--drivers/block/rbd.c72
-rw-r--r--fs/ceph/file.c1
-rw-r--r--fs/ceph/ioctl.c102
-rw-r--r--fs/ceph/ioctl.h2
-rw-r--r--fs/ceph/mds_client.c54
-rw-r--r--fs/ceph/mds_client.h5
-rw-r--r--fs/ceph/xattr.c9
-rw-r--r--include/linux/ceph/auth.h12
-rw-r--r--include/linux/ceph/ceph_fs.h4
-rw-r--r--include/linux/ceph/decode.h9
-rw-r--r--include/linux/ceph/messenger.h6
-rw-r--r--include/linux/ceph/osd_client.h11
-rw-r--r--include/linux/ceph/osdmap.h2
-rw-r--r--include/linux/crush/crush.h18
-rw-r--r--include/linux/crush/mapper.h7
-rw-r--r--net/ceph/auth_none.c15
-rw-r--r--net/ceph/auth_x.c15
-rw-r--r--net/ceph/crush/crush.c39
-rw-r--r--net/ceph/crush/mapper.c124
-rw-r--r--net/ceph/messenger.c182
-rw-r--r--net/ceph/osd_client.c63
-rw-r--r--net/ceph/osdmap.c73
23 files changed, 376 insertions, 453 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index dbedafb095e2..bcd88eb7ebcd 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -65,11 +65,11 @@ snap_*
65Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name> 65Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name>
66------------------------------------------------------------- 66-------------------------------------------------------------
67 67
68id 68snap_id
69 69
70 The rados internal snapshot id assigned for this snapshot 70 The rados internal snapshot id assigned for this snapshot
71 71
72size 72snap_size
73 73
74 The size of the image when this snapshot was taken. 74 The size of the image when this snapshot was taken.
75 75
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 013c7a549fb6..65665c9c42c6 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -141,7 +141,7 @@ struct rbd_request {
141struct rbd_snap { 141struct rbd_snap {
142 struct device dev; 142 struct device dev;
143 const char *name; 143 const char *name;
144 size_t size; 144 u64 size;
145 struct list_head node; 145 struct list_head node;
146 u64 id; 146 u64 id;
147}; 147};
@@ -175,8 +175,7 @@ struct rbd_device {
175 /* protects updating the header */ 175 /* protects updating the header */
176 struct rw_semaphore header_rwsem; 176 struct rw_semaphore header_rwsem;
177 char snap_name[RBD_MAX_SNAP_NAME_LEN]; 177 char snap_name[RBD_MAX_SNAP_NAME_LEN];
178 u32 cur_snap; /* index+1 of current snapshot within snap context 178 u64 snap_id; /* current snapshot id */
179 0 - for the head */
180 int read_only; 179 int read_only;
181 180
182 struct list_head node; 181 struct list_head node;
@@ -241,7 +240,7 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
241 put_device(&rbd_dev->dev); 240 put_device(&rbd_dev->dev);
242} 241}
243 242
244static int __rbd_update_snaps(struct rbd_device *rbd_dev); 243static int __rbd_refresh_header(struct rbd_device *rbd_dev);
245 244
246static int rbd_open(struct block_device *bdev, fmode_t mode) 245static int rbd_open(struct block_device *bdev, fmode_t mode)
247{ 246{
@@ -450,7 +449,9 @@ static void rbd_client_release(struct kref *kref)
450 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 449 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451 450
452 dout("rbd_release_client %p\n", rbdc); 451 dout("rbd_release_client %p\n", rbdc);
452 spin_lock(&rbd_client_list_lock);
453 list_del(&rbdc->node); 453 list_del(&rbdc->node);
454 spin_unlock(&rbd_client_list_lock);
454 455
455 ceph_destroy_client(rbdc->client); 456 ceph_destroy_client(rbdc->client);
456 kfree(rbdc->rbd_opts); 457 kfree(rbdc->rbd_opts);
@@ -463,9 +464,7 @@ static void rbd_client_release(struct kref *kref)
463 */ 464 */
464static void rbd_put_client(struct rbd_device *rbd_dev) 465static void rbd_put_client(struct rbd_device *rbd_dev)
465{ 466{
466 spin_lock(&rbd_client_list_lock);
467 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 467 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468 spin_unlock(&rbd_client_list_lock);
469 rbd_dev->rbd_client = NULL; 468 rbd_dev->rbd_client = NULL;
470} 469}
471 470
@@ -487,16 +486,18 @@ static void rbd_coll_release(struct kref *kref)
487 */ 486 */
488static int rbd_header_from_disk(struct rbd_image_header *header, 487static int rbd_header_from_disk(struct rbd_image_header *header,
489 struct rbd_image_header_ondisk *ondisk, 488 struct rbd_image_header_ondisk *ondisk,
490 int allocated_snaps, 489 u32 allocated_snaps,
491 gfp_t gfp_flags) 490 gfp_t gfp_flags)
492{ 491{
493 int i; 492 u32 i, snap_count;
494 u32 snap_count;
495 493
496 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) 494 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
497 return -ENXIO; 495 return -ENXIO;
498 496
499 snap_count = le32_to_cpu(ondisk->snap_count); 497 snap_count = le32_to_cpu(ondisk->snap_count);
498 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
499 / sizeof (*ondisk))
500 return -EINVAL;
500 header->snapc = kmalloc(sizeof(struct ceph_snap_context) + 501 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
501 snap_count * sizeof (*ondisk), 502 snap_count * sizeof (*ondisk),
502 gfp_flags); 503 gfp_flags);
@@ -506,11 +507,11 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
506 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len); 507 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
507 if (snap_count) { 508 if (snap_count) {
508 header->snap_names = kmalloc(header->snap_names_len, 509 header->snap_names = kmalloc(header->snap_names_len,
509 GFP_KERNEL); 510 gfp_flags);
510 if (!header->snap_names) 511 if (!header->snap_names)
511 goto err_snapc; 512 goto err_snapc;
512 header->snap_sizes = kmalloc(snap_count * sizeof(u64), 513 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
513 GFP_KERNEL); 514 gfp_flags);
514 if (!header->snap_sizes) 515 if (!header->snap_sizes)
515 goto err_names; 516 goto err_names;
516 } else { 517 } else {
@@ -552,21 +553,6 @@ err_snapc:
552 return -ENOMEM; 553 return -ENOMEM;
553} 554}
554 555
555static int snap_index(struct rbd_image_header *header, int snap_num)
556{
557 return header->total_snaps - snap_num;
558}
559
560static u64 cur_snap_id(struct rbd_device *rbd_dev)
561{
562 struct rbd_image_header *header = &rbd_dev->header;
563
564 if (!rbd_dev->cur_snap)
565 return 0;
566
567 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
568}
569
570static int snap_by_name(struct rbd_image_header *header, const char *snap_name, 556static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
571 u64 *seq, u64 *size) 557 u64 *seq, u64 *size)
572{ 558{
@@ -605,7 +591,7 @@ static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
605 snapc->seq = header->snap_seq; 591 snapc->seq = header->snap_seq;
606 else 592 else
607 snapc->seq = 0; 593 snapc->seq = 0;
608 dev->cur_snap = 0; 594 dev->snap_id = CEPH_NOSNAP;
609 dev->read_only = 0; 595 dev->read_only = 0;
610 if (size) 596 if (size)
611 *size = header->image_size; 597 *size = header->image_size;
@@ -613,8 +599,7 @@ static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
613 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size); 599 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
614 if (ret < 0) 600 if (ret < 0)
615 goto done; 601 goto done;
616 602 dev->snap_id = snapc->seq;
617 dev->cur_snap = header->total_snaps - ret;
618 dev->read_only = 1; 603 dev->read_only = 1;
619 } 604 }
620 605
@@ -935,7 +920,6 @@ static int rbd_do_request(struct request *rq,
935 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 920 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
936 layout->fl_stripe_count = cpu_to_le32(1); 921 layout->fl_stripe_count = cpu_to_le32(1);
937 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 922 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
938 layout->fl_pg_preferred = cpu_to_le32(-1);
939 layout->fl_pg_pool = cpu_to_le32(dev->poolid); 923 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
940 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, 924 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
941 req, ops); 925 req, ops);
@@ -1168,7 +1152,7 @@ static int rbd_req_read(struct request *rq,
1168 int coll_index) 1152 int coll_index)
1169{ 1153{
1170 return rbd_do_op(rq, rbd_dev, NULL, 1154 return rbd_do_op(rq, rbd_dev, NULL,
1171 (snapid ? snapid : CEPH_NOSNAP), 1155 snapid,
1172 CEPH_OSD_OP_READ, 1156 CEPH_OSD_OP_READ,
1173 CEPH_OSD_FLAG_READ, 1157 CEPH_OSD_FLAG_READ,
1174 2, 1158 2,
@@ -1187,7 +1171,7 @@ static int rbd_req_sync_read(struct rbd_device *dev,
1187 u64 *ver) 1171 u64 *ver)
1188{ 1172{
1189 return rbd_req_sync_op(dev, NULL, 1173 return rbd_req_sync_op(dev, NULL,
1190 (snapid ? snapid : CEPH_NOSNAP), 1174 snapid,
1191 CEPH_OSD_OP_READ, 1175 CEPH_OSD_OP_READ,
1192 CEPH_OSD_FLAG_READ, 1176 CEPH_OSD_FLAG_READ,
1193 NULL, 1177 NULL,
@@ -1238,7 +1222,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1238 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, 1222 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1239 notify_id, (int)opcode); 1223 notify_id, (int)opcode);
1240 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1224 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1241 rc = __rbd_update_snaps(dev); 1225 rc = __rbd_refresh_header(dev);
1242 mutex_unlock(&ctl_mutex); 1226 mutex_unlock(&ctl_mutex);
1243 if (rc) 1227 if (rc)
1244 pr_warning(RBD_DRV_NAME "%d got notification but failed to " 1228 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
@@ -1521,7 +1505,7 @@ static void rbd_rq_fn(struct request_queue *q)
1521 coll, cur_seg); 1505 coll, cur_seg);
1522 else 1506 else
1523 rbd_req_read(rq, rbd_dev, 1507 rbd_req_read(rq, rbd_dev,
1524 cur_snap_id(rbd_dev), 1508 rbd_dev->snap_id,
1525 ofs, 1509 ofs,
1526 op_size, bio, 1510 op_size, bio,
1527 coll, cur_seg); 1511 coll, cur_seg);
@@ -1592,7 +1576,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1592{ 1576{
1593 ssize_t rc; 1577 ssize_t rc;
1594 struct rbd_image_header_ondisk *dh; 1578 struct rbd_image_header_ondisk *dh;
1595 int snap_count = 0; 1579 u32 snap_count = 0;
1596 u64 ver; 1580 u64 ver;
1597 size_t len; 1581 size_t len;
1598 1582
@@ -1656,7 +1640,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
1656 struct ceph_mon_client *monc; 1640 struct ceph_mon_client *monc;
1657 1641
1658 /* we should create a snapshot only if we're pointing at the head */ 1642 /* we should create a snapshot only if we're pointing at the head */
1659 if (dev->cur_snap) 1643 if (dev->snap_id != CEPH_NOSNAP)
1660 return -EINVAL; 1644 return -EINVAL;
1661 1645
1662 monc = &dev->rbd_client->client->monc; 1646 monc = &dev->rbd_client->client->monc;
@@ -1683,7 +1667,9 @@ static int rbd_header_add_snap(struct rbd_device *dev,
1683 if (ret < 0) 1667 if (ret < 0)
1684 return ret; 1668 return ret;
1685 1669
1686 dev->header.snapc->seq = new_snapid; 1670 down_write(&dev->header_rwsem);
1671 dev->header.snapc->seq = new_snapid;
1672 up_write(&dev->header_rwsem);
1687 1673
1688 return 0; 1674 return 0;
1689bad: 1675bad:
@@ -1703,7 +1689,7 @@ static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1703/* 1689/*
1704 * only read the first part of the ondisk header, without the snaps info 1690 * only read the first part of the ondisk header, without the snaps info
1705 */ 1691 */
1706static int __rbd_update_snaps(struct rbd_device *rbd_dev) 1692static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1707{ 1693{
1708 int ret; 1694 int ret;
1709 struct rbd_image_header h; 1695 struct rbd_image_header h;
@@ -1890,7 +1876,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
1890 1876
1891 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1877 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1892 1878
1893 rc = __rbd_update_snaps(rbd_dev); 1879 rc = __rbd_refresh_header(rbd_dev);
1894 if (rc < 0) 1880 if (rc < 0)
1895 ret = rc; 1881 ret = rc;
1896 1882
@@ -1949,7 +1935,7 @@ static ssize_t rbd_snap_size_show(struct device *dev,
1949{ 1935{
1950 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1936 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1951 1937
1952 return sprintf(buf, "%zd\n", snap->size); 1938 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1953} 1939}
1954 1940
1955static ssize_t rbd_snap_id_show(struct device *dev, 1941static ssize_t rbd_snap_id_show(struct device *dev,
@@ -1958,7 +1944,7 @@ static ssize_t rbd_snap_id_show(struct device *dev,
1958{ 1944{
1959 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1945 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1960 1946
1961 return sprintf(buf, "%llu\n", (unsigned long long) snap->id); 1947 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1962} 1948}
1963 1949
1964static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); 1950static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
@@ -2173,7 +2159,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2173 rbd_dev->header.obj_version); 2159 rbd_dev->header.obj_version);
2174 if (ret == -ERANGE) { 2160 if (ret == -ERANGE) {
2175 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2161 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2176 rc = __rbd_update_snaps(rbd_dev); 2162 rc = __rbd_refresh_header(rbd_dev);
2177 mutex_unlock(&ctl_mutex); 2163 mutex_unlock(&ctl_mutex);
2178 if (rc < 0) 2164 if (rc < 0)
2179 return rc; 2165 return rc;
@@ -2558,7 +2544,7 @@ static ssize_t rbd_snap_add(struct device *dev,
2558 if (ret < 0) 2544 if (ret < 0)
2559 goto err_unlock; 2545 goto err_unlock;
2560 2546
2561 ret = __rbd_update_snaps(rbd_dev); 2547 ret = __rbd_refresh_header(rbd_dev);
2562 if (ret < 0) 2548 if (ret < 0)
2563 goto err_unlock; 2549 goto err_unlock;
2564 2550
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ed72428d9c75..988d4f302e48 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -54,7 +54,6 @@ prepare_open_request(struct super_block *sb, int flags, int create_mode)
54 req->r_fmode = ceph_flags_to_mode(flags); 54 req->r_fmode = ceph_flags_to_mode(flags);
55 req->r_args.open.flags = cpu_to_le32(flags); 55 req->r_args.open.flags = cpu_to_le32(flags);
56 req->r_args.open.mode = cpu_to_le32(create_mode); 56 req->r_args.open.mode = cpu_to_le32(create_mode);
57 req->r_args.open.preferred = cpu_to_le32(-1);
58out: 57out:
59 return req; 58 return req;
60} 59}
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 790914a598dd..8e3fb69fbe62 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -26,8 +26,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
26 l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); 26 l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
27 l.object_size = ceph_file_layout_object_size(ci->i_layout); 27 l.object_size = ceph_file_layout_object_size(ci->i_layout);
28 l.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool); 28 l.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
29 l.preferred_osd = 29 l.preferred_osd = (s32)-1;
30 (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred);
31 if (copy_to_user(arg, &l, sizeof(l))) 30 if (copy_to_user(arg, &l, sizeof(l)))
32 return -EFAULT; 31 return -EFAULT;
33 } 32 }
@@ -35,6 +34,32 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
35 return err; 34 return err;
36} 35}
37 36
37static long __validate_layout(struct ceph_mds_client *mdsc,
38 struct ceph_ioctl_layout *l)
39{
40 int i, err;
41
42 /* validate striping parameters */
43 if ((l->object_size & ~PAGE_MASK) ||
44 (l->stripe_unit & ~PAGE_MASK) ||
45 ((unsigned)l->object_size % (unsigned)l->stripe_unit))
46 return -EINVAL;
47
48 /* make sure it's a valid data pool */
49 mutex_lock(&mdsc->mutex);
50 err = -EINVAL;
51 for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++)
52 if (mdsc->mdsmap->m_data_pg_pools[i] == l->data_pool) {
53 err = 0;
54 break;
55 }
56 mutex_unlock(&mdsc->mutex);
57 if (err)
58 return err;
59
60 return 0;
61}
62
38static long ceph_ioctl_set_layout(struct file *file, void __user *arg) 63static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
39{ 64{
40 struct inode *inode = file->f_dentry->d_inode; 65 struct inode *inode = file->f_dentry->d_inode;
@@ -44,52 +69,40 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
44 struct ceph_ioctl_layout l; 69 struct ceph_ioctl_layout l;
45 struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode); 70 struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode);
46 struct ceph_ioctl_layout nl; 71 struct ceph_ioctl_layout nl;
47 int err, i; 72 int err;
48 73
49 if (copy_from_user(&l, arg, sizeof(l))) 74 if (copy_from_user(&l, arg, sizeof(l)))
50 return -EFAULT; 75 return -EFAULT;
51 76
52 /* validate changed params against current layout */ 77 /* validate changed params against current layout */
53 err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT); 78 err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT);
54 if (!err) { 79 if (err)
55 nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
56 nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
57 nl.object_size = ceph_file_layout_object_size(ci->i_layout);
58 nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
59 nl.preferred_osd =
60 (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred);
61 } else
62 return err; 80 return err;
63 81
82 memset(&nl, 0, sizeof(nl));
64 if (l.stripe_count) 83 if (l.stripe_count)
65 nl.stripe_count = l.stripe_count; 84 nl.stripe_count = l.stripe_count;
85 else
86 nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
66 if (l.stripe_unit) 87 if (l.stripe_unit)
67 nl.stripe_unit = l.stripe_unit; 88 nl.stripe_unit = l.stripe_unit;
89 else
90 nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
68 if (l.object_size) 91 if (l.object_size)
69 nl.object_size = l.object_size; 92 nl.object_size = l.object_size;
93 else
94 nl.object_size = ceph_file_layout_object_size(ci->i_layout);
70 if (l.data_pool) 95 if (l.data_pool)
71 nl.data_pool = l.data_pool; 96 nl.data_pool = l.data_pool;
72 if (l.preferred_osd) 97 else
73 nl.preferred_osd = l.preferred_osd; 98 nl.data_pool = ceph_file_layout_pg_pool(ci->i_layout);
74 99
75 if ((nl.object_size & ~PAGE_MASK) || 100 /* this is obsolete, and always -1 */
76 (nl.stripe_unit & ~PAGE_MASK) || 101 nl.preferred_osd = le64_to_cpu(-1);
77 ((unsigned)nl.object_size % (unsigned)nl.stripe_unit))
78 return -EINVAL;
79 102
80 /* make sure it's a valid data pool */ 103 err = __validate_layout(mdsc, &nl);
81 if (l.data_pool > 0) { 104 if (err)
82 mutex_lock(&mdsc->mutex); 105 return err;
83 err = -EINVAL;
84 for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++)
85 if (mdsc->mdsmap->m_data_pg_pools[i] == l.data_pool) {
86 err = 0;
87 break;
88 }
89 mutex_unlock(&mdsc->mutex);
90 if (err)
91 return err;
92 }
93 106
94 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETLAYOUT, 107 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETLAYOUT,
95 USE_AUTH_MDS); 108 USE_AUTH_MDS);
@@ -106,8 +119,6 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
106 req->r_args.setlayout.layout.fl_object_size = 119 req->r_args.setlayout.layout.fl_object_size =
107 cpu_to_le32(l.object_size); 120 cpu_to_le32(l.object_size);
108 req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool); 121 req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool);
109 req->r_args.setlayout.layout.fl_pg_preferred =
110 cpu_to_le32(l.preferred_osd);
111 122
112 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); 123 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
113 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 124 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
@@ -127,33 +138,16 @@ static long ceph_ioctl_set_layout_policy (struct file *file, void __user *arg)
127 struct inode *inode = file->f_dentry->d_inode; 138 struct inode *inode = file->f_dentry->d_inode;
128 struct ceph_mds_request *req; 139 struct ceph_mds_request *req;
129 struct ceph_ioctl_layout l; 140 struct ceph_ioctl_layout l;
130 int err, i; 141 int err;
131 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 142 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
132 143
133 /* copy and validate */ 144 /* copy and validate */
134 if (copy_from_user(&l, arg, sizeof(l))) 145 if (copy_from_user(&l, arg, sizeof(l)))
135 return -EFAULT; 146 return -EFAULT;
136 147
137 if ((l.object_size & ~PAGE_MASK) || 148 err = __validate_layout(mdsc, &l);
138 (l.stripe_unit & ~PAGE_MASK) || 149 if (err)
139 !l.stripe_unit || 150 return err;
140 (l.object_size &&
141 (unsigned)l.object_size % (unsigned)l.stripe_unit))
142 return -EINVAL;
143
144 /* make sure it's a valid data pool */
145 if (l.data_pool > 0) {
146 mutex_lock(&mdsc->mutex);
147 err = -EINVAL;
148 for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++)
149 if (mdsc->mdsmap->m_data_pg_pools[i] == l.data_pool) {
150 err = 0;
151 break;
152 }
153 mutex_unlock(&mdsc->mutex);
154 if (err)
155 return err;
156 }
157 151
158 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETDIRLAYOUT, 152 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETDIRLAYOUT,
159 USE_AUTH_MDS); 153 USE_AUTH_MDS);
@@ -171,8 +165,6 @@ static long ceph_ioctl_set_layout_policy (struct file *file, void __user *arg)
171 cpu_to_le32(l.object_size); 165 cpu_to_le32(l.object_size);
172 req->r_args.setlayout.layout.fl_pg_pool = 166 req->r_args.setlayout.layout.fl_pg_pool =
173 cpu_to_le32(l.data_pool); 167 cpu_to_le32(l.data_pool);
174 req->r_args.setlayout.layout.fl_pg_preferred =
175 cpu_to_le32(l.preferred_osd);
176 168
177 err = ceph_mdsc_do_request(mdsc, inode, req); 169 err = ceph_mdsc_do_request(mdsc, inode, req);
178 ceph_mdsc_put_request(req); 170 ceph_mdsc_put_request(req);
diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h
index be4a60487333..c77028afb1e1 100644
--- a/fs/ceph/ioctl.h
+++ b/fs/ceph/ioctl.h
@@ -34,6 +34,8 @@
34struct ceph_ioctl_layout { 34struct ceph_ioctl_layout {
35 __u64 stripe_unit, stripe_count, object_size; 35 __u64 stripe_unit, stripe_count, object_size;
36 __u64 data_pool; 36 __u64 data_pool;
37
38 /* obsolete. new values ignored, always return -1 */
37 __s64 preferred_osd; 39 __s64 preferred_osd;
38}; 40};
39 41
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 89971e137aab..200bc87eceb1 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -334,10 +334,10 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
334 dout("mdsc put_session %p %d -> %d\n", s, 334 dout("mdsc put_session %p %d -> %d\n", s,
335 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); 335 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
336 if (atomic_dec_and_test(&s->s_ref)) { 336 if (atomic_dec_and_test(&s->s_ref)) {
337 if (s->s_authorizer) 337 if (s->s_auth.authorizer)
338 s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer( 338 s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
339 s->s_mdsc->fsc->client->monc.auth, 339 s->s_mdsc->fsc->client->monc.auth,
340 s->s_authorizer); 340 s->s_auth.authorizer);
341 kfree(s); 341 kfree(s);
342 } 342 }
343} 343}
@@ -3395,39 +3395,33 @@ out:
3395/* 3395/*
3396 * authentication 3396 * authentication
3397 */ 3397 */
3398static int get_authorizer(struct ceph_connection *con, 3398
3399 void **buf, int *len, int *proto, 3399/*
3400 void **reply_buf, int *reply_len, int force_new) 3400 * Note: returned pointer is the address of a structure that's
3401 * managed separately. Caller must *not* attempt to free it.
3402 */
3403static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3404 int *proto, int force_new)
3401{ 3405{
3402 struct ceph_mds_session *s = con->private; 3406 struct ceph_mds_session *s = con->private;
3403 struct ceph_mds_client *mdsc = s->s_mdsc; 3407 struct ceph_mds_client *mdsc = s->s_mdsc;
3404 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3408 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3405 int ret = 0; 3409 struct ceph_auth_handshake *auth = &s->s_auth;
3406
3407 if (force_new && s->s_authorizer) {
3408 ac->ops->destroy_authorizer(ac, s->s_authorizer);
3409 s->s_authorizer = NULL;
3410 }
3411 if (s->s_authorizer == NULL) {
3412 if (ac->ops->create_authorizer) {
3413 ret = ac->ops->create_authorizer(
3414 ac, CEPH_ENTITY_TYPE_MDS,
3415 &s->s_authorizer,
3416 &s->s_authorizer_buf,
3417 &s->s_authorizer_buf_len,
3418 &s->s_authorizer_reply_buf,
3419 &s->s_authorizer_reply_buf_len);
3420 if (ret)
3421 return ret;
3422 }
3423 }
3424 3410
3411 if (force_new && auth->authorizer) {
3412 if (ac->ops && ac->ops->destroy_authorizer)
3413 ac->ops->destroy_authorizer(ac, auth->authorizer);
3414 auth->authorizer = NULL;
3415 }
3416 if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
3417 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3418 auth);
3419 if (ret)
3420 return ERR_PTR(ret);
3421 }
3425 *proto = ac->protocol; 3422 *proto = ac->protocol;
3426 *buf = s->s_authorizer_buf; 3423
3427 *len = s->s_authorizer_buf_len; 3424 return auth;
3428 *reply_buf = s->s_authorizer_reply_buf;
3429 *reply_len = s->s_authorizer_reply_buf_len;
3430 return 0;
3431} 3425}
3432 3426
3433 3427
@@ -3437,7 +3431,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
3437 struct ceph_mds_client *mdsc = s->s_mdsc; 3431 struct ceph_mds_client *mdsc = s->s_mdsc;
3438 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3432 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3439 3433
3440 return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len); 3434 return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3441} 3435}
3442 3436
3443static int invalidate_authorizer(struct ceph_connection *con) 3437static int invalidate_authorizer(struct ceph_connection *con)
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 8c7c04ebb595..dd26846dd71d 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -11,6 +11,7 @@
11#include <linux/ceph/types.h> 11#include <linux/ceph/types.h>
12#include <linux/ceph/messenger.h> 12#include <linux/ceph/messenger.h>
13#include <linux/ceph/mdsmap.h> 13#include <linux/ceph/mdsmap.h>
14#include <linux/ceph/auth.h>
14 15
15/* 16/*
16 * Some lock dependencies: 17 * Some lock dependencies:
@@ -113,9 +114,7 @@ struct ceph_mds_session {
113 114
114 struct ceph_connection s_con; 115 struct ceph_connection s_con;
115 116
116 struct ceph_authorizer *s_authorizer; 117 struct ceph_auth_handshake s_auth;
117 void *s_authorizer_buf, *s_authorizer_reply_buf;
118 size_t s_authorizer_buf_len, s_authorizer_reply_buf_len;
119 118
120 /* protected by s_gen_ttl_lock */ 119 /* protected by s_gen_ttl_lock */
121 spinlock_t s_gen_ttl_lock; 120 spinlock_t s_gen_ttl_lock;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 35b86331d8a5..785cb3057c95 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -118,15 +118,6 @@ static size_t ceph_vxattrcb_file_layout(struct ceph_inode_info *ci, char *val,
118 (unsigned long long)ceph_file_layout_su(ci->i_layout), 118 (unsigned long long)ceph_file_layout_su(ci->i_layout),
119 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), 119 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
120 (unsigned long long)ceph_file_layout_object_size(ci->i_layout)); 120 (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
121
122 if (ceph_file_layout_pg_preferred(ci->i_layout) >= 0) {
123 val += ret;
124 size -= ret;
125 ret += snprintf(val, size, "preferred_osd=%lld\n",
126 (unsigned long long)ceph_file_layout_pg_preferred(
127 ci->i_layout));
128 }
129
130 return ret; 121 return ret;
131} 122}
132 123
diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h
index aa13392a7efb..d4080f309b56 100644
--- a/include/linux/ceph/auth.h
+++ b/include/linux/ceph/auth.h
@@ -14,6 +14,14 @@
14struct ceph_auth_client; 14struct ceph_auth_client;
15struct ceph_authorizer; 15struct ceph_authorizer;
16 16
17struct ceph_auth_handshake {
18 struct ceph_authorizer *authorizer;
19 void *authorizer_buf;
20 size_t authorizer_buf_len;
21 void *authorizer_reply_buf;
22 size_t authorizer_reply_buf_len;
23};
24
17struct ceph_auth_client_ops { 25struct ceph_auth_client_ops {
18 const char *name; 26 const char *name;
19 27
@@ -43,9 +51,7 @@ struct ceph_auth_client_ops {
43 * the response to authenticate the service. 51 * the response to authenticate the service.
44 */ 52 */
45 int (*create_authorizer)(struct ceph_auth_client *ac, int peer_type, 53 int (*create_authorizer)(struct ceph_auth_client *ac, int peer_type,
46 struct ceph_authorizer **a, 54 struct ceph_auth_handshake *auth);
47 void **buf, size_t *len,
48 void **reply_buf, size_t *reply_len);
49 int (*verify_authorizer_reply)(struct ceph_auth_client *ac, 55 int (*verify_authorizer_reply)(struct ceph_auth_client *ac,
50 struct ceph_authorizer *a, size_t len); 56 struct ceph_authorizer *a, size_t len);
51 void (*destroy_authorizer)(struct ceph_auth_client *ac, 57 void (*destroy_authorizer)(struct ceph_auth_client *ac,
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index b8c60694b2b0..e81ab30d4896 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -65,7 +65,7 @@ struct ceph_file_layout {
65 __le32 fl_object_stripe_unit; /* UNUSED. for per-object parity, if any */ 65 __le32 fl_object_stripe_unit; /* UNUSED. for per-object parity, if any */
66 66
67 /* object -> pg layout */ 67 /* object -> pg layout */
68 __le32 fl_pg_preferred; /* preferred primary for pg (-1 for none) */ 68 __le32 fl_unused; /* unused; used to be preferred primary (-1) */
69 __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ 69 __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */
70} __attribute__ ((packed)); 70} __attribute__ ((packed));
71 71
@@ -384,7 +384,7 @@ union ceph_mds_request_args {
384 __le32 stripe_count; /* ... */ 384 __le32 stripe_count; /* ... */
385 __le32 object_size; 385 __le32 object_size;
386 __le32 file_replication; 386 __le32 file_replication;
387 __le32 preferred; 387 __le32 unused; /* used to be preferred osd */
388 } __attribute__ ((packed)) open; 388 } __attribute__ ((packed)) open;
389 struct { 389 struct {
390 __le32 flags; 390 __le32 flags;
diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
index 220ae21e819b..d8615dee5808 100644
--- a/include/linux/ceph/decode.h
+++ b/include/linux/ceph/decode.h
@@ -46,9 +46,14 @@ static inline void ceph_decode_copy(void **p, void *pv, size_t n)
46/* 46/*
47 * bounds check input. 47 * bounds check input.
48 */ 48 */
49static inline int ceph_has_room(void **p, void *end, size_t n)
50{
51 return end >= *p && n <= end - *p;
52}
53
49#define ceph_decode_need(p, end, n, bad) \ 54#define ceph_decode_need(p, end, n, bad) \
50 do { \ 55 do { \
51 if (unlikely(*(p) + (n) > (end))) \ 56 if (!likely(ceph_has_room(p, end, n))) \
52 goto bad; \ 57 goto bad; \
53 } while (0) 58 } while (0)
54 59
@@ -167,7 +172,7 @@ static inline void ceph_encode_string(void **p, void *end,
167 172
168#define ceph_encode_need(p, end, n, bad) \ 173#define ceph_encode_need(p, end, n, bad) \
169 do { \ 174 do { \
170 if (unlikely(*(p) + (n) > (end))) \ 175 if (!likely(ceph_has_room(p, end, n))) \
171 goto bad; \ 176 goto bad; \
172 } while (0) 177 } while (0)
173 178
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 3bff047f6b0f..2521a95fa6d9 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -25,9 +25,9 @@ struct ceph_connection_operations {
25 void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m); 25 void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
26 26
27 /* authorize an outgoing connection */ 27 /* authorize an outgoing connection */
28 int (*get_authorizer) (struct ceph_connection *con, 28 struct ceph_auth_handshake *(*get_authorizer) (
29 void **buf, int *len, int *proto, 29 struct ceph_connection *con,
30 void **reply_buf, int *reply_len, int force_new); 30 int *proto, int force_new);
31 int (*verify_authorizer_reply) (struct ceph_connection *con, int len); 31 int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
32 int (*invalidate_authorizer)(struct ceph_connection *con); 32 int (*invalidate_authorizer)(struct ceph_connection *con);
33 33
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 7c05ac202d90..cedfb1a8434a 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -6,9 +6,10 @@
6#include <linux/mempool.h> 6#include <linux/mempool.h>
7#include <linux/rbtree.h> 7#include <linux/rbtree.h>
8 8
9#include "types.h" 9#include <linux/ceph/types.h>
10#include "osdmap.h" 10#include <linux/ceph/osdmap.h>
11#include "messenger.h" 11#include <linux/ceph/messenger.h>
12#include <linux/ceph/auth.h>
12 13
13/* 14/*
14 * Maximum object name size 15 * Maximum object name size
@@ -40,9 +41,7 @@ struct ceph_osd {
40 struct list_head o_requests; 41 struct list_head o_requests;
41 struct list_head o_linger_requests; 42 struct list_head o_linger_requests;
42 struct list_head o_osd_lru; 43 struct list_head o_osd_lru;
43 struct ceph_authorizer *o_authorizer; 44 struct ceph_auth_handshake o_auth;
44 void *o_authorizer_buf, *o_authorizer_reply_buf;
45 size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
46 unsigned long lru_ttl; 45 unsigned long lru_ttl;
47 int o_marked_for_keepalive; 46 int o_marked_for_keepalive;
48 struct list_head o_keepalive_item; 47 struct list_head o_keepalive_item;
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index ba4c205cbb01..311ef8d6aa9e 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -65,8 +65,6 @@ struct ceph_osdmap {
65#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash)) 65#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash))
66#define ceph_file_layout_object_su(l) \ 66#define ceph_file_layout_object_su(l) \
67 ((__s32)le32_to_cpu((l).fl_object_stripe_unit)) 67 ((__s32)le32_to_cpu((l).fl_object_stripe_unit))
68#define ceph_file_layout_pg_preferred(l) \
69 ((__s32)le32_to_cpu((l).fl_pg_preferred))
70#define ceph_file_layout_pg_pool(l) \ 68#define ceph_file_layout_pg_pool(l) \
71 ((__s32)le32_to_cpu((l).fl_pg_pool)) 69 ((__s32)le32_to_cpu((l).fl_pg_pool))
72 70
diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
index 97e435b191f4..7c4750811b96 100644
--- a/include/linux/crush/crush.h
+++ b/include/linux/crush/crush.h
@@ -151,16 +151,6 @@ struct crush_map {
151 struct crush_bucket **buckets; 151 struct crush_bucket **buckets;
152 struct crush_rule **rules; 152 struct crush_rule **rules;
153 153
154 /*
155 * Parent pointers to identify the parent bucket a device or
156 * bucket in the hierarchy. If an item appears more than
157 * once, this is the _last_ time it appeared (where buckets
158 * are processed in bucket id order, from -1 on down to
159 * -max_buckets.
160 */
161 __u32 *bucket_parents;
162 __u32 *device_parents;
163
164 __s32 max_buckets; 154 __s32 max_buckets;
165 __u32 max_rules; 155 __u32 max_rules;
166 __s32 max_devices; 156 __s32 max_devices;
@@ -168,8 +158,7 @@ struct crush_map {
168 158
169 159
170/* crush.c */ 160/* crush.c */
171extern int crush_get_bucket_item_weight(struct crush_bucket *b, int pos); 161extern int crush_get_bucket_item_weight(const struct crush_bucket *b, int pos);
172extern void crush_calc_parents(struct crush_map *map);
173extern void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b); 162extern void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b);
174extern void crush_destroy_bucket_list(struct crush_bucket_list *b); 163extern void crush_destroy_bucket_list(struct crush_bucket_list *b);
175extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b); 164extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b);
@@ -177,4 +166,9 @@ extern void crush_destroy_bucket_straw(struct crush_bucket_straw *b);
177extern void crush_destroy_bucket(struct crush_bucket *b); 166extern void crush_destroy_bucket(struct crush_bucket *b);
178extern void crush_destroy(struct crush_map *map); 167extern void crush_destroy(struct crush_map *map);
179 168
169static inline int crush_calc_tree_node(int i)
170{
171 return ((i+1) << 1)-1;
172}
173
180#endif 174#endif
diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h
index c46b99c18bb0..71d79f44a7d0 100644
--- a/include/linux/crush/mapper.h
+++ b/include/linux/crush/mapper.h
@@ -10,11 +10,10 @@
10 10
11#include "crush.h" 11#include "crush.h"
12 12
13extern int crush_find_rule(struct crush_map *map, int pool, int type, int size); 13extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size);
14extern int crush_do_rule(struct crush_map *map, 14extern int crush_do_rule(const struct crush_map *map,
15 int ruleno, 15 int ruleno,
16 int x, int *result, int result_max, 16 int x, int *result, int result_max,
17 int forcefeed, /* -1 for none */ 17 const __u32 *weights);
18 __u32 *weights);
19 18
20#endif 19#endif
diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c
index 214c2bb43d62..925ca583c09c 100644
--- a/net/ceph/auth_none.c
+++ b/net/ceph/auth_none.c
@@ -59,9 +59,7 @@ static int handle_reply(struct ceph_auth_client *ac, int result,
59 */ 59 */
60static int ceph_auth_none_create_authorizer( 60static int ceph_auth_none_create_authorizer(
61 struct ceph_auth_client *ac, int peer_type, 61 struct ceph_auth_client *ac, int peer_type,
62 struct ceph_authorizer **a, 62 struct ceph_auth_handshake *auth)
63 void **buf, size_t *len,
64 void **reply_buf, size_t *reply_len)
65{ 63{
66 struct ceph_auth_none_info *ai = ac->private; 64 struct ceph_auth_none_info *ai = ac->private;
67 struct ceph_none_authorizer *au = &ai->au; 65 struct ceph_none_authorizer *au = &ai->au;
@@ -82,11 +80,12 @@ static int ceph_auth_none_create_authorizer(
82 dout("built authorizer len %d\n", au->buf_len); 80 dout("built authorizer len %d\n", au->buf_len);
83 } 81 }
84 82
85 *a = (struct ceph_authorizer *)au; 83 auth->authorizer = (struct ceph_authorizer *) au;
86 *buf = au->buf; 84 auth->authorizer_buf = au->buf;
87 *len = au->buf_len; 85 auth->authorizer_buf_len = au->buf_len;
88 *reply_buf = au->reply_buf; 86 auth->authorizer_reply_buf = au->reply_buf;
89 *reply_len = sizeof(au->reply_buf); 87 auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
88
90 return 0; 89 return 0;
91 90
92bad2: 91bad2:
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index 1587dc6010c6..a16bf14eb027 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -526,9 +526,7 @@ static int ceph_x_handle_reply(struct ceph_auth_client *ac, int result,
526 526
527static int ceph_x_create_authorizer( 527static int ceph_x_create_authorizer(
528 struct ceph_auth_client *ac, int peer_type, 528 struct ceph_auth_client *ac, int peer_type,
529 struct ceph_authorizer **a, 529 struct ceph_auth_handshake *auth)
530 void **buf, size_t *len,
531 void **reply_buf, size_t *reply_len)
532{ 530{
533 struct ceph_x_authorizer *au; 531 struct ceph_x_authorizer *au;
534 struct ceph_x_ticket_handler *th; 532 struct ceph_x_ticket_handler *th;
@@ -548,11 +546,12 @@ static int ceph_x_create_authorizer(
548 return ret; 546 return ret;
549 } 547 }
550 548
551 *a = (struct ceph_authorizer *)au; 549 auth->authorizer = (struct ceph_authorizer *) au;
552 *buf = au->buf->vec.iov_base; 550 auth->authorizer_buf = au->buf->vec.iov_base;
553 *len = au->buf->vec.iov_len; 551 auth->authorizer_buf_len = au->buf->vec.iov_len;
554 *reply_buf = au->reply_buf; 552 auth->authorizer_reply_buf = au->reply_buf;
555 *reply_len = sizeof(au->reply_buf); 553 auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
554
556 return 0; 555 return 0;
557} 556}
558 557
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index d6ebb13a18a4..089613234f03 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -26,9 +26,9 @@ const char *crush_bucket_alg_name(int alg)
26 * @b: bucket pointer 26 * @b: bucket pointer
27 * @p: item index in bucket 27 * @p: item index in bucket
28 */ 28 */
29int crush_get_bucket_item_weight(struct crush_bucket *b, int p) 29int crush_get_bucket_item_weight(const struct crush_bucket *b, int p)
30{ 30{
31 if (p >= b->size) 31 if ((__u32)p >= b->size)
32 return 0; 32 return 0;
33 33
34 switch (b->alg) { 34 switch (b->alg) {
@@ -37,38 +37,13 @@ int crush_get_bucket_item_weight(struct crush_bucket *b, int p)
37 case CRUSH_BUCKET_LIST: 37 case CRUSH_BUCKET_LIST:
38 return ((struct crush_bucket_list *)b)->item_weights[p]; 38 return ((struct crush_bucket_list *)b)->item_weights[p];
39 case CRUSH_BUCKET_TREE: 39 case CRUSH_BUCKET_TREE:
40 if (p & 1) 40 return ((struct crush_bucket_tree *)b)->node_weights[crush_calc_tree_node(p)];
41 return ((struct crush_bucket_tree *)b)->node_weights[p];
42 return 0;
43 case CRUSH_BUCKET_STRAW: 41 case CRUSH_BUCKET_STRAW:
44 return ((struct crush_bucket_straw *)b)->item_weights[p]; 42 return ((struct crush_bucket_straw *)b)->item_weights[p];
45 } 43 }
46 return 0; 44 return 0;
47} 45}
48 46
49/**
50 * crush_calc_parents - Calculate parent vectors for the given crush map.
51 * @map: crush_map pointer
52 */
53void crush_calc_parents(struct crush_map *map)
54{
55 int i, b, c;
56
57 for (b = 0; b < map->max_buckets; b++) {
58 if (map->buckets[b] == NULL)
59 continue;
60 for (i = 0; i < map->buckets[b]->size; i++) {
61 c = map->buckets[b]->items[i];
62 BUG_ON(c >= map->max_devices ||
63 c < -map->max_buckets);
64 if (c >= 0)
65 map->device_parents[c] = map->buckets[b]->id;
66 else
67 map->bucket_parents[-1-c] = map->buckets[b]->id;
68 }
69 }
70}
71
72void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b) 47void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b)
73{ 48{
74 kfree(b->h.perm); 49 kfree(b->h.perm);
@@ -87,6 +62,8 @@ void crush_destroy_bucket_list(struct crush_bucket_list *b)
87 62
88void crush_destroy_bucket_tree(struct crush_bucket_tree *b) 63void crush_destroy_bucket_tree(struct crush_bucket_tree *b)
89{ 64{
65 kfree(b->h.perm);
66 kfree(b->h.items);
90 kfree(b->node_weights); 67 kfree(b->node_weights);
91 kfree(b); 68 kfree(b);
92} 69}
@@ -124,10 +101,9 @@ void crush_destroy_bucket(struct crush_bucket *b)
124 */ 101 */
125void crush_destroy(struct crush_map *map) 102void crush_destroy(struct crush_map *map)
126{ 103{
127 int b;
128
129 /* buckets */ 104 /* buckets */
130 if (map->buckets) { 105 if (map->buckets) {
106 __s32 b;
131 for (b = 0; b < map->max_buckets; b++) { 107 for (b = 0; b < map->max_buckets; b++) {
132 if (map->buckets[b] == NULL) 108 if (map->buckets[b] == NULL)
133 continue; 109 continue;
@@ -138,13 +114,12 @@ void crush_destroy(struct crush_map *map)
138 114
139 /* rules */ 115 /* rules */
140 if (map->rules) { 116 if (map->rules) {
117 __u32 b;
141 for (b = 0; b < map->max_rules; b++) 118 for (b = 0; b < map->max_rules; b++)
142 kfree(map->rules[b]); 119 kfree(map->rules[b]);
143 kfree(map->rules); 120 kfree(map->rules);
144 } 121 }
145 122
146 kfree(map->bucket_parents);
147 kfree(map->device_parents);
148 kfree(map); 123 kfree(map);
149} 124}
150 125
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 363f8f7e6c3c..d7edc24333b8 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -33,9 +33,9 @@
33 * @type: storage ruleset type (user defined) 33 * @type: storage ruleset type (user defined)
34 * @size: output set size 34 * @size: output set size
35 */ 35 */
36int crush_find_rule(struct crush_map *map, int ruleset, int type, int size) 36int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size)
37{ 37{
38 int i; 38 __u32 i;
39 39
40 for (i = 0; i < map->max_rules; i++) { 40 for (i = 0; i < map->max_rules; i++) {
41 if (map->rules[i] && 41 if (map->rules[i] &&
@@ -73,7 +73,7 @@ static int bucket_perm_choose(struct crush_bucket *bucket,
73 unsigned int i, s; 73 unsigned int i, s;
74 74
75 /* start a new permutation if @x has changed */ 75 /* start a new permutation if @x has changed */
76 if (bucket->perm_x != x || bucket->perm_n == 0) { 76 if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) {
77 dprintk("bucket %d new x=%d\n", bucket->id, x); 77 dprintk("bucket %d new x=%d\n", bucket->id, x);
78 bucket->perm_x = x; 78 bucket->perm_x = x;
79 79
@@ -153,8 +153,8 @@ static int bucket_list_choose(struct crush_bucket_list *bucket,
153 return bucket->h.items[i]; 153 return bucket->h.items[i];
154 } 154 }
155 155
156 BUG_ON(1); 156 dprintk("bad list sums for bucket %d\n", bucket->h.id);
157 return 0; 157 return bucket->h.items[0];
158} 158}
159 159
160 160
@@ -220,7 +220,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket,
220static int bucket_straw_choose(struct crush_bucket_straw *bucket, 220static int bucket_straw_choose(struct crush_bucket_straw *bucket,
221 int x, int r) 221 int x, int r)
222{ 222{
223 int i; 223 __u32 i;
224 int high = 0; 224 int high = 0;
225 __u64 high_draw = 0; 225 __u64 high_draw = 0;
226 __u64 draw; 226 __u64 draw;
@@ -240,6 +240,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket,
240static int crush_bucket_choose(struct crush_bucket *in, int x, int r) 240static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
241{ 241{
242 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r); 242 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
243 BUG_ON(in->size == 0);
243 switch (in->alg) { 244 switch (in->alg) {
244 case CRUSH_BUCKET_UNIFORM: 245 case CRUSH_BUCKET_UNIFORM:
245 return bucket_uniform_choose((struct crush_bucket_uniform *)in, 246 return bucket_uniform_choose((struct crush_bucket_uniform *)in,
@@ -254,7 +255,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
254 return bucket_straw_choose((struct crush_bucket_straw *)in, 255 return bucket_straw_choose((struct crush_bucket_straw *)in,
255 x, r); 256 x, r);
256 default: 257 default:
257 BUG_ON(1); 258 dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
258 return in->items[0]; 259 return in->items[0];
259 } 260 }
260} 261}
@@ -263,7 +264,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
263 * true if device is marked "out" (failed, fully offloaded) 264 * true if device is marked "out" (failed, fully offloaded)
264 * of the cluster 265 * of the cluster
265 */ 266 */
266static int is_out(struct crush_map *map, __u32 *weight, int item, int x) 267static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x)
267{ 268{
268 if (weight[item] >= 0x10000) 269 if (weight[item] >= 0x10000)
269 return 0; 270 return 0;
@@ -288,16 +289,16 @@ static int is_out(struct crush_map *map, __u32 *weight, int item, int x)
288 * @recurse_to_leaf: true if we want one device under each item of given type 289 * @recurse_to_leaf: true if we want one device under each item of given type
289 * @out2: second output vector for leaf items (if @recurse_to_leaf) 290 * @out2: second output vector for leaf items (if @recurse_to_leaf)
290 */ 291 */
291static int crush_choose(struct crush_map *map, 292static int crush_choose(const struct crush_map *map,
292 struct crush_bucket *bucket, 293 struct crush_bucket *bucket,
293 __u32 *weight, 294 const __u32 *weight,
294 int x, int numrep, int type, 295 int x, int numrep, int type,
295 int *out, int outpos, 296 int *out, int outpos,
296 int firstn, int recurse_to_leaf, 297 int firstn, int recurse_to_leaf,
297 int *out2) 298 int *out2)
298{ 299{
299 int rep; 300 int rep;
300 int ftotal, flocal; 301 unsigned int ftotal, flocal;
301 int retry_descent, retry_bucket, skip_rep; 302 int retry_descent, retry_bucket, skip_rep;
302 struct crush_bucket *in = bucket; 303 struct crush_bucket *in = bucket;
303 int r; 304 int r;
@@ -305,7 +306,7 @@ static int crush_choose(struct crush_map *map,
305 int item = 0; 306 int item = 0;
306 int itemtype; 307 int itemtype;
307 int collide, reject; 308 int collide, reject;
308 const int orig_tries = 5; /* attempts before we fall back to search */ 309 const unsigned int orig_tries = 5; /* attempts before we fall back to search */
309 310
310 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", 311 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
311 bucket->id, x, outpos, numrep); 312 bucket->id, x, outpos, numrep);
@@ -326,7 +327,7 @@ static int crush_choose(struct crush_map *map,
326 r = rep; 327 r = rep;
327 if (in->alg == CRUSH_BUCKET_UNIFORM) { 328 if (in->alg == CRUSH_BUCKET_UNIFORM) {
328 /* be careful */ 329 /* be careful */
329 if (firstn || numrep >= in->size) 330 if (firstn || (__u32)numrep >= in->size)
330 /* r' = r + f_total */ 331 /* r' = r + f_total */
331 r += ftotal; 332 r += ftotal;
332 else if (in->size % numrep == 0) 333 else if (in->size % numrep == 0)
@@ -355,7 +356,11 @@ static int crush_choose(struct crush_map *map,
355 item = bucket_perm_choose(in, x, r); 356 item = bucket_perm_choose(in, x, r);
356 else 357 else
357 item = crush_bucket_choose(in, x, r); 358 item = crush_bucket_choose(in, x, r);
358 BUG_ON(item >= map->max_devices); 359 if (item >= map->max_devices) {
360 dprintk(" bad item %d\n", item);
361 skip_rep = 1;
362 break;
363 }
359 364
360 /* desired type? */ 365 /* desired type? */
361 if (item < 0) 366 if (item < 0)
@@ -366,8 +371,12 @@ static int crush_choose(struct crush_map *map,
366 371
367 /* keep going? */ 372 /* keep going? */
368 if (itemtype != type) { 373 if (itemtype != type) {
369 BUG_ON(item >= 0 || 374 if (item >= 0 ||
370 (-1-item) >= map->max_buckets); 375 (-1-item) >= map->max_buckets) {
376 dprintk(" bad item type %d\n", type);
377 skip_rep = 1;
378 break;
379 }
371 in = map->buckets[-1-item]; 380 in = map->buckets[-1-item];
372 retry_bucket = 1; 381 retry_bucket = 1;
373 continue; 382 continue;
@@ -416,7 +425,7 @@ reject:
416 if (collide && flocal < 3) 425 if (collide && flocal < 3)
417 /* retry locally a few times */ 426 /* retry locally a few times */
418 retry_bucket = 1; 427 retry_bucket = 1;
419 else if (flocal < in->size + orig_tries) 428 else if (flocal <= in->size + orig_tries)
420 /* exhaustive bucket search */ 429 /* exhaustive bucket search */
421 retry_bucket = 1; 430 retry_bucket = 1;
422 else if (ftotal < 20) 431 else if (ftotal < 20)
@@ -426,7 +435,7 @@ reject:
426 /* else give up */ 435 /* else give up */
427 skip_rep = 1; 436 skip_rep = 1;
428 dprintk(" reject %d collide %d " 437 dprintk(" reject %d collide %d "
429 "ftotal %d flocal %d\n", 438 "ftotal %u flocal %u\n",
430 reject, collide, ftotal, 439 reject, collide, ftotal,
431 flocal); 440 flocal);
432 } 441 }
@@ -455,15 +464,12 @@ reject:
455 * @x: hash input 464 * @x: hash input
456 * @result: pointer to result vector 465 * @result: pointer to result vector
457 * @result_max: maximum result size 466 * @result_max: maximum result size
458 * @force: force initial replica choice; -1 for none
459 */ 467 */
460int crush_do_rule(struct crush_map *map, 468int crush_do_rule(const struct crush_map *map,
461 int ruleno, int x, int *result, int result_max, 469 int ruleno, int x, int *result, int result_max,
462 int force, __u32 *weight) 470 const __u32 *weight)
463{ 471{
464 int result_len; 472 int result_len;
465 int force_context[CRUSH_MAX_DEPTH];
466 int force_pos = -1;
467 int a[CRUSH_MAX_SET]; 473 int a[CRUSH_MAX_SET];
468 int b[CRUSH_MAX_SET]; 474 int b[CRUSH_MAX_SET];
469 int c[CRUSH_MAX_SET]; 475 int c[CRUSH_MAX_SET];
@@ -474,66 +480,44 @@ int crush_do_rule(struct crush_map *map,
474 int osize; 480 int osize;
475 int *tmp; 481 int *tmp;
476 struct crush_rule *rule; 482 struct crush_rule *rule;
477 int step; 483 __u32 step;
478 int i, j; 484 int i, j;
479 int numrep; 485 int numrep;
480 int firstn; 486 int firstn;
481 487
482 BUG_ON(ruleno >= map->max_rules); 488 if ((__u32)ruleno >= map->max_rules) {
489 dprintk(" bad ruleno %d\n", ruleno);
490 return 0;
491 }
483 492
484 rule = map->rules[ruleno]; 493 rule = map->rules[ruleno];
485 result_len = 0; 494 result_len = 0;
486 w = a; 495 w = a;
487 o = b; 496 o = b;
488 497
489 /*
490 * determine hierarchical context of force, if any. note
491 * that this may or may not correspond to the specific types
492 * referenced by the crush rule.
493 */
494 if (force >= 0 &&
495 force < map->max_devices &&
496 map->device_parents[force] != 0 &&
497 !is_out(map, weight, force, x)) {
498 while (1) {
499 force_context[++force_pos] = force;
500 if (force >= 0)
501 force = map->device_parents[force];
502 else
503 force = map->bucket_parents[-1-force];
504 if (force == 0)
505 break;
506 }
507 }
508
509 for (step = 0; step < rule->len; step++) { 498 for (step = 0; step < rule->len; step++) {
499 struct crush_rule_step *curstep = &rule->steps[step];
500
510 firstn = 0; 501 firstn = 0;
511 switch (rule->steps[step].op) { 502 switch (curstep->op) {
512 case CRUSH_RULE_TAKE: 503 case CRUSH_RULE_TAKE:
513 w[0] = rule->steps[step].arg1; 504 w[0] = curstep->arg1;
514
515 /* find position in force_context/hierarchy */
516 while (force_pos >= 0 &&
517 force_context[force_pos] != w[0])
518 force_pos--;
519 /* and move past it */
520 if (force_pos >= 0)
521 force_pos--;
522
523 wsize = 1; 505 wsize = 1;
524 break; 506 break;
525 507
526 case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: 508 case CRUSH_RULE_CHOOSE_LEAF_FIRSTN:
527 case CRUSH_RULE_CHOOSE_FIRSTN: 509 case CRUSH_RULE_CHOOSE_FIRSTN:
528 firstn = 1; 510 firstn = 1;
511 /* fall through */
529 case CRUSH_RULE_CHOOSE_LEAF_INDEP: 512 case CRUSH_RULE_CHOOSE_LEAF_INDEP:
530 case CRUSH_RULE_CHOOSE_INDEP: 513 case CRUSH_RULE_CHOOSE_INDEP:
531 BUG_ON(wsize == 0); 514 if (wsize == 0)
515 break;
532 516
533 recurse_to_leaf = 517 recurse_to_leaf =
534 rule->steps[step].op == 518 curstep->op ==
535 CRUSH_RULE_CHOOSE_LEAF_FIRSTN || 519 CRUSH_RULE_CHOOSE_LEAF_FIRSTN ||
536 rule->steps[step].op == 520 curstep->op ==
537 CRUSH_RULE_CHOOSE_LEAF_INDEP; 521 CRUSH_RULE_CHOOSE_LEAF_INDEP;
538 522
539 /* reset output */ 523 /* reset output */
@@ -545,32 +529,18 @@ int crush_do_rule(struct crush_map *map,
545 * basically, numrep <= 0 means relative to 529 * basically, numrep <= 0 means relative to
546 * the provided result_max 530 * the provided result_max
547 */ 531 */
548 numrep = rule->steps[step].arg1; 532 numrep = curstep->arg1;
549 if (numrep <= 0) { 533 if (numrep <= 0) {
550 numrep += result_max; 534 numrep += result_max;
551 if (numrep <= 0) 535 if (numrep <= 0)
552 continue; 536 continue;
553 } 537 }
554 j = 0; 538 j = 0;
555 if (osize == 0 && force_pos >= 0) {
556 /* skip any intermediate types */
557 while (force_pos &&
558 force_context[force_pos] < 0 &&
559 rule->steps[step].arg2 !=
560 map->buckets[-1 -
561 force_context[force_pos]]->type)
562 force_pos--;
563 o[osize] = force_context[force_pos];
564 if (recurse_to_leaf)
565 c[osize] = force_context[0];
566 j++;
567 force_pos--;
568 }
569 osize += crush_choose(map, 539 osize += crush_choose(map,
570 map->buckets[-1-w[i]], 540 map->buckets[-1-w[i]],
571 weight, 541 weight,
572 x, numrep, 542 x, numrep,
573 rule->steps[step].arg2, 543 curstep->arg2,
574 o+osize, j, 544 o+osize, j,
575 firstn, 545 firstn,
576 recurse_to_leaf, c+osize); 546 recurse_to_leaf, c+osize);
@@ -597,7 +567,9 @@ int crush_do_rule(struct crush_map *map,
597 break; 567 break;
598 568
599 default: 569 default:
600 BUG_ON(1); 570 dprintk(" unknown op %d at step %d\n",
571 curstep->op, step);
572 break;
601 } 573 }
602 } 574 }
603 return result_len; 575 return result_len;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 36fa6bf68498..524f4e4f598b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -653,54 +653,57 @@ static void prepare_write_keepalive(struct ceph_connection *con)
653 * Connection negotiation. 653 * Connection negotiation.
654 */ 654 */
655 655
656static int prepare_connect_authorizer(struct ceph_connection *con) 656static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con,
657 int *auth_proto)
657{ 658{
658 void *auth_buf; 659 struct ceph_auth_handshake *auth;
659 int auth_len = 0; 660
660 int auth_protocol = 0; 661 if (!con->ops->get_authorizer) {
662 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
663 con->out_connect.authorizer_len = 0;
664
665 return NULL;
666 }
667
668 /* Can't hold the mutex while getting authorizer */
661 669
662 mutex_unlock(&con->mutex); 670 mutex_unlock(&con->mutex);
663 if (con->ops->get_authorizer) 671
664 con->ops->get_authorizer(con, &auth_buf, &auth_len, 672 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
665 &auth_protocol, &con->auth_reply_buf, 673
666 &con->auth_reply_buf_len,
667 con->auth_retry);
668 mutex_lock(&con->mutex); 674 mutex_lock(&con->mutex);
669 675
670 if (test_bit(CLOSED, &con->state) || 676 if (IS_ERR(auth))
671 test_bit(OPENING, &con->state)) 677 return auth;
672 return -EAGAIN; 678 if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
679 return ERR_PTR(-EAGAIN);
673 680
674 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 681 con->auth_reply_buf = auth->authorizer_reply_buf;
675 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 682 con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
676 683
677 if (auth_len)
678 ceph_con_out_kvec_add(con, auth_len, auth_buf);
679 684
680 return 0; 685 return auth;
681} 686}
682 687
683/* 688/*
684 * We connected to a peer and are saying hello. 689 * We connected to a peer and are saying hello.
685 */ 690 */
686static void prepare_write_banner(struct ceph_messenger *msgr, 691static void prepare_write_banner(struct ceph_connection *con)
687 struct ceph_connection *con)
688{ 692{
689 ceph_con_out_kvec_reset(con);
690 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 693 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
691 ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr), 694 ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
692 &msgr->my_enc_addr); 695 &con->msgr->my_enc_addr);
693 696
694 con->out_more = 0; 697 con->out_more = 0;
695 set_bit(WRITE_PENDING, &con->state); 698 set_bit(WRITE_PENDING, &con->state);
696} 699}
697 700
698static int prepare_write_connect(struct ceph_messenger *msgr, 701static int prepare_write_connect(struct ceph_connection *con)
699 struct ceph_connection *con,
700 int include_banner)
701{ 702{
702 unsigned int global_seq = get_global_seq(con->msgr, 0); 703 unsigned int global_seq = get_global_seq(con->msgr, 0);
703 int proto; 704 int proto;
705 int auth_proto;
706 struct ceph_auth_handshake *auth;
704 707
705 switch (con->peer_name.type) { 708 switch (con->peer_name.type) {
706 case CEPH_ENTITY_TYPE_MON: 709 case CEPH_ENTITY_TYPE_MON:
@@ -719,23 +722,32 @@ static int prepare_write_connect(struct ceph_messenger *msgr,
719 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 722 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
720 con->connect_seq, global_seq, proto); 723 con->connect_seq, global_seq, proto);
721 724
722 con->out_connect.features = cpu_to_le64(msgr->supported_features); 725 con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
723 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 726 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
724 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 727 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
725 con->out_connect.global_seq = cpu_to_le32(global_seq); 728 con->out_connect.global_seq = cpu_to_le32(global_seq);
726 con->out_connect.protocol_version = cpu_to_le32(proto); 729 con->out_connect.protocol_version = cpu_to_le32(proto);
727 con->out_connect.flags = 0; 730 con->out_connect.flags = 0;
728 731
729 if (include_banner) 732 auth_proto = CEPH_AUTH_UNKNOWN;
730 prepare_write_banner(msgr, con); 733 auth = get_connect_authorizer(con, &auth_proto);
731 else 734 if (IS_ERR(auth))
732 ceph_con_out_kvec_reset(con); 735 return PTR_ERR(auth);
733 ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); 736
737 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
738 con->out_connect.authorizer_len = auth ?
739 cpu_to_le32(auth->authorizer_buf_len) : 0;
740
741 ceph_con_out_kvec_add(con, sizeof (con->out_connect),
742 &con->out_connect);
743 if (auth && auth->authorizer_buf_len)
744 ceph_con_out_kvec_add(con, auth->authorizer_buf_len,
745 auth->authorizer_buf);
734 746
735 con->out_more = 0; 747 con->out_more = 0;
736 set_bit(WRITE_PENDING, &con->state); 748 set_bit(WRITE_PENDING, &con->state);
737 749
738 return prepare_connect_authorizer(con); 750 return 0;
739} 751}
740 752
741/* 753/*
@@ -992,11 +1004,10 @@ static int prepare_read_message(struct ceph_connection *con)
992 1004
993 1005
994static int read_partial(struct ceph_connection *con, 1006static int read_partial(struct ceph_connection *con,
995 int *to, int size, void *object) 1007 int end, int size, void *object)
996{ 1008{
997 *to += size; 1009 while (con->in_base_pos < end) {
998 while (con->in_base_pos < *to) { 1010 int left = end - con->in_base_pos;
999 int left = *to - con->in_base_pos;
1000 int have = size - left; 1011 int have = size - left;
1001 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 1012 int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
1002 if (ret <= 0) 1013 if (ret <= 0)
@@ -1012,37 +1023,52 @@ static int read_partial(struct ceph_connection *con,
1012 */ 1023 */
1013static int read_partial_banner(struct ceph_connection *con) 1024static int read_partial_banner(struct ceph_connection *con)
1014{ 1025{
1015 int ret, to = 0; 1026 int size;
1027 int end;
1028 int ret;
1016 1029
1017 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 1030 dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
1018 1031
1019 /* peer's banner */ 1032 /* peer's banner */
1020 ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); 1033 size = strlen(CEPH_BANNER);
1034 end = size;
1035 ret = read_partial(con, end, size, con->in_banner);
1021 if (ret <= 0) 1036 if (ret <= 0)
1022 goto out; 1037 goto out;
1023 ret = read_partial(con, &to, sizeof(con->actual_peer_addr), 1038
1024 &con->actual_peer_addr); 1039 size = sizeof (con->actual_peer_addr);
1040 end += size;
1041 ret = read_partial(con, end, size, &con->actual_peer_addr);
1025 if (ret <= 0) 1042 if (ret <= 0)
1026 goto out; 1043 goto out;
1027 ret = read_partial(con, &to, sizeof(con->peer_addr_for_me), 1044
1028 &con->peer_addr_for_me); 1045 size = sizeof (con->peer_addr_for_me);
1046 end += size;
1047 ret = read_partial(con, end, size, &con->peer_addr_for_me);
1029 if (ret <= 0) 1048 if (ret <= 0)
1030 goto out; 1049 goto out;
1050
1031out: 1051out:
1032 return ret; 1052 return ret;
1033} 1053}
1034 1054
1035static int read_partial_connect(struct ceph_connection *con) 1055static int read_partial_connect(struct ceph_connection *con)
1036{ 1056{
1037 int ret, to = 0; 1057 int size;
1058 int end;
1059 int ret;
1038 1060
1039 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1061 dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
1040 1062
1041 ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); 1063 size = sizeof (con->in_reply);
1064 end = size;
1065 ret = read_partial(con, end, size, &con->in_reply);
1042 if (ret <= 0) 1066 if (ret <= 0)
1043 goto out; 1067 goto out;
1044 ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len), 1068
1045 con->auth_reply_buf); 1069 size = le32_to_cpu(con->in_reply.authorizer_len);
1070 end += size;
1071 ret = read_partial(con, end, size, con->auth_reply_buf);
1046 if (ret <= 0) 1072 if (ret <= 0)
1047 goto out; 1073 goto out;
1048 1074
@@ -1377,7 +1403,8 @@ static int process_connect(struct ceph_connection *con)
1377 return -1; 1403 return -1;
1378 } 1404 }
1379 con->auth_retry = 1; 1405 con->auth_retry = 1;
1380 ret = prepare_write_connect(con->msgr, con, 0); 1406 ceph_con_out_kvec_reset(con);
1407 ret = prepare_write_connect(con);
1381 if (ret < 0) 1408 if (ret < 0)
1382 return ret; 1409 return ret;
1383 prepare_read_connect(con); 1410 prepare_read_connect(con);
@@ -1397,7 +1424,10 @@ static int process_connect(struct ceph_connection *con)
1397 ENTITY_NAME(con->peer_name), 1424 ENTITY_NAME(con->peer_name),
1398 ceph_pr_addr(&con->peer_addr.in_addr)); 1425 ceph_pr_addr(&con->peer_addr.in_addr));
1399 reset_connection(con); 1426 reset_connection(con);
1400 prepare_write_connect(con->msgr, con, 0); 1427 ceph_con_out_kvec_reset(con);
1428 ret = prepare_write_connect(con);
1429 if (ret < 0)
1430 return ret;
1401 prepare_read_connect(con); 1431 prepare_read_connect(con);
1402 1432
1403 /* Tell ceph about it. */ 1433 /* Tell ceph about it. */
@@ -1420,7 +1450,10 @@ static int process_connect(struct ceph_connection *con)
1420 le32_to_cpu(con->out_connect.connect_seq), 1450 le32_to_cpu(con->out_connect.connect_seq),
1421 le32_to_cpu(con->in_connect.connect_seq)); 1451 le32_to_cpu(con->in_connect.connect_seq));
1422 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1452 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
1423 prepare_write_connect(con->msgr, con, 0); 1453 ceph_con_out_kvec_reset(con);
1454 ret = prepare_write_connect(con);
1455 if (ret < 0)
1456 return ret;
1424 prepare_read_connect(con); 1457 prepare_read_connect(con);
1425 break; 1458 break;
1426 1459
@@ -1434,7 +1467,10 @@ static int process_connect(struct ceph_connection *con)
1434 le32_to_cpu(con->in_connect.global_seq)); 1467 le32_to_cpu(con->in_connect.global_seq));
1435 get_global_seq(con->msgr, 1468 get_global_seq(con->msgr,
1436 le32_to_cpu(con->in_connect.global_seq)); 1469 le32_to_cpu(con->in_connect.global_seq));
1437 prepare_write_connect(con->msgr, con, 0); 1470 ceph_con_out_kvec_reset(con);
1471 ret = prepare_write_connect(con);
1472 if (ret < 0)
1473 return ret;
1438 prepare_read_connect(con); 1474 prepare_read_connect(con);
1439 break; 1475 break;
1440 1476
@@ -1491,10 +1527,10 @@ static int process_connect(struct ceph_connection *con)
1491 */ 1527 */
1492static int read_partial_ack(struct ceph_connection *con) 1528static int read_partial_ack(struct ceph_connection *con)
1493{ 1529{
1494 int to = 0; 1530 int size = sizeof (con->in_temp_ack);
1531 int end = size;
1495 1532
1496 return read_partial(con, &to, sizeof(con->in_temp_ack), 1533 return read_partial(con, end, size, &con->in_temp_ack);
1497 &con->in_temp_ack);
1498} 1534}
1499 1535
1500 1536
@@ -1627,8 +1663,9 @@ static int read_partial_message_bio(struct ceph_connection *con,
1627static int read_partial_message(struct ceph_connection *con) 1663static int read_partial_message(struct ceph_connection *con)
1628{ 1664{
1629 struct ceph_msg *m = con->in_msg; 1665 struct ceph_msg *m = con->in_msg;
1666 int size;
1667 int end;
1630 int ret; 1668 int ret;
1631 int to, left;
1632 unsigned int front_len, middle_len, data_len; 1669 unsigned int front_len, middle_len, data_len;
1633 bool do_datacrc = !con->msgr->nocrc; 1670 bool do_datacrc = !con->msgr->nocrc;
1634 int skip; 1671 int skip;
@@ -1638,15 +1675,11 @@ static int read_partial_message(struct ceph_connection *con)
1638 dout("read_partial_message con %p msg %p\n", con, m); 1675 dout("read_partial_message con %p msg %p\n", con, m);
1639 1676
1640 /* header */ 1677 /* header */
1641 while (con->in_base_pos < sizeof(con->in_hdr)) { 1678 size = sizeof (con->in_hdr);
1642 left = sizeof(con->in_hdr) - con->in_base_pos; 1679 end = size;
1643 ret = ceph_tcp_recvmsg(con->sock, 1680 ret = read_partial(con, end, size, &con->in_hdr);
1644 (char *)&con->in_hdr + con->in_base_pos, 1681 if (ret <= 0)
1645 left); 1682 return ret;
1646 if (ret <= 0)
1647 return ret;
1648 con->in_base_pos += ret;
1649 }
1650 1683
1651 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); 1684 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
1652 if (cpu_to_le32(crc) != con->in_hdr.crc) { 1685 if (cpu_to_le32(crc) != con->in_hdr.crc) {
@@ -1759,16 +1792,12 @@ static int read_partial_message(struct ceph_connection *con)
1759 } 1792 }
1760 1793
1761 /* footer */ 1794 /* footer */
1762 to = sizeof(m->hdr) + sizeof(m->footer); 1795 size = sizeof (m->footer);
1763 while (con->in_base_pos < to) { 1796 end += size;
1764 left = to - con->in_base_pos; 1797 ret = read_partial(con, end, size, &m->footer);
1765 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + 1798 if (ret <= 0)
1766 (con->in_base_pos - sizeof(m->hdr)), 1799 return ret;
1767 left); 1800
1768 if (ret <= 0)
1769 return ret;
1770 con->in_base_pos += ret;
1771 }
1772 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1801 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1773 m, front_len, m->footer.front_crc, middle_len, 1802 m, front_len, m->footer.front_crc, middle_len,
1774 m->footer.middle_crc, data_len, m->footer.data_crc); 1803 m->footer.middle_crc, data_len, m->footer.data_crc);
@@ -1835,7 +1864,6 @@ static void process_message(struct ceph_connection *con)
1835 */ 1864 */
1836static int try_write(struct ceph_connection *con) 1865static int try_write(struct ceph_connection *con)
1837{ 1866{
1838 struct ceph_messenger *msgr = con->msgr;
1839 int ret = 1; 1867 int ret = 1;
1840 1868
1841 dout("try_write start %p state %lu nref %d\n", con, con->state, 1869 dout("try_write start %p state %lu nref %d\n", con, con->state,
@@ -1846,7 +1874,11 @@ more:
1846 1874
1847 /* open the socket first? */ 1875 /* open the socket first? */
1848 if (con->sock == NULL) { 1876 if (con->sock == NULL) {
1849 prepare_write_connect(msgr, con, 1); 1877 ceph_con_out_kvec_reset(con);
1878 prepare_write_banner(con);
1879 ret = prepare_write_connect(con);
1880 if (ret < 0)
1881 goto out;
1850 prepare_read_banner(con); 1882 prepare_read_banner(con);
1851 set_bit(CONNECTING, &con->state); 1883 set_bit(CONNECTING, &con->state);
1852 clear_bit(NEGOTIATING, &con->state); 1884 clear_bit(NEGOTIATING, &con->state);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 1b0ef3c4d393..1ffebed5ce0f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -278,7 +278,7 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
278{ 278{
279 dst->op = cpu_to_le16(src->op); 279 dst->op = cpu_to_le16(src->op);
280 280
281 switch (dst->op) { 281 switch (src->op) {
282 case CEPH_OSD_OP_READ: 282 case CEPH_OSD_OP_READ:
283 case CEPH_OSD_OP_WRITE: 283 case CEPH_OSD_OP_WRITE:
284 dst->extent.offset = 284 dst->extent.offset =
@@ -664,11 +664,11 @@ static void put_osd(struct ceph_osd *osd)
664{ 664{
665 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 665 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
666 atomic_read(&osd->o_ref) - 1); 666 atomic_read(&osd->o_ref) - 1);
667 if (atomic_dec_and_test(&osd->o_ref)) { 667 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
668 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 668 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
669 669
670 if (osd->o_authorizer) 670 if (ac->ops && ac->ops->destroy_authorizer)
671 ac->ops->destroy_authorizer(ac, osd->o_authorizer); 671 ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
672 kfree(osd); 672 kfree(osd);
673 } 673 }
674} 674}
@@ -841,6 +841,12 @@ static void register_request(struct ceph_osd_client *osdc,
841static void __unregister_request(struct ceph_osd_client *osdc, 841static void __unregister_request(struct ceph_osd_client *osdc,
842 struct ceph_osd_request *req) 842 struct ceph_osd_request *req)
843{ 843{
844 if (RB_EMPTY_NODE(&req->r_node)) {
845 dout("__unregister_request %p tid %lld not registered\n",
846 req, req->r_tid);
847 return;
848 }
849
844 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 850 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
845 rb_erase(&req->r_node, &osdc->requests); 851 rb_erase(&req->r_node, &osdc->requests);
846 osdc->num_requests--; 852 osdc->num_requests--;
@@ -2108,37 +2114,32 @@ static void put_osd_con(struct ceph_connection *con)
2108/* 2114/*
2109 * authentication 2115 * authentication
2110 */ 2116 */
2111static int get_authorizer(struct ceph_connection *con, 2117/*
2112 void **buf, int *len, int *proto, 2118 * Note: returned pointer is the address of a structure that's
2113 void **reply_buf, int *reply_len, int force_new) 2119 * managed separately. Caller must *not* attempt to free it.
2120 */
2121static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2122 int *proto, int force_new)
2114{ 2123{
2115 struct ceph_osd *o = con->private; 2124 struct ceph_osd *o = con->private;
2116 struct ceph_osd_client *osdc = o->o_osdc; 2125 struct ceph_osd_client *osdc = o->o_osdc;
2117 struct ceph_auth_client *ac = osdc->client->monc.auth; 2126 struct ceph_auth_client *ac = osdc->client->monc.auth;
2118 int ret = 0; 2127 struct ceph_auth_handshake *auth = &o->o_auth;
2119 2128
2120 if (force_new && o->o_authorizer) { 2129 if (force_new && auth->authorizer) {
2121 ac->ops->destroy_authorizer(ac, o->o_authorizer); 2130 if (ac->ops && ac->ops->destroy_authorizer)
2122 o->o_authorizer = NULL; 2131 ac->ops->destroy_authorizer(ac, auth->authorizer);
2123 } 2132 auth->authorizer = NULL;
2124 if (o->o_authorizer == NULL) { 2133 }
2125 ret = ac->ops->create_authorizer( 2134 if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
2126 ac, CEPH_ENTITY_TYPE_OSD, 2135 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2127 &o->o_authorizer, 2136 auth);
2128 &o->o_authorizer_buf,
2129 &o->o_authorizer_buf_len,
2130 &o->o_authorizer_reply_buf,
2131 &o->o_authorizer_reply_buf_len);
2132 if (ret) 2137 if (ret)
2133 return ret; 2138 return ERR_PTR(ret);
2134 } 2139 }
2135
2136 *proto = ac->protocol; 2140 *proto = ac->protocol;
2137 *buf = o->o_authorizer_buf; 2141
2138 *len = o->o_authorizer_buf_len; 2142 return auth;
2139 *reply_buf = o->o_authorizer_reply_buf;
2140 *reply_len = o->o_authorizer_reply_buf_len;
2141 return 0;
2142} 2143}
2143 2144
2144 2145
@@ -2148,7 +2149,11 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
2148 struct ceph_osd_client *osdc = o->o_osdc; 2149 struct ceph_osd_client *osdc = o->o_osdc;
2149 struct ceph_auth_client *ac = osdc->client->monc.auth; 2150 struct ceph_auth_client *ac = osdc->client->monc.auth;
2150 2151
2151 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len); 2152 /*
2153 * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2154 * XXX which do we do: succeed or fail?
2155 */
2156 return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2152} 2157}
2153 2158
2154static int invalidate_authorizer(struct ceph_connection *con) 2159static int invalidate_authorizer(struct ceph_connection *con)
@@ -2157,7 +2162,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
2157 struct ceph_osd_client *osdc = o->o_osdc; 2162 struct ceph_osd_client *osdc = o->o_osdc;
2158 struct ceph_auth_client *ac = osdc->client->monc.auth; 2163 struct ceph_auth_client *ac = osdc->client->monc.auth;
2159 2164
2160 if (ac->ops->invalidate_authorizer) 2165 if (ac->ops && ac->ops->invalidate_authorizer)
2161 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2166 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2162 2167
2163 return ceph_monc_validate_auth(&osdc->client->monc); 2168 return ceph_monc_validate_auth(&osdc->client->monc);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 56e561a69004..81e3b84a77ef 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -161,13 +161,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
161 c->max_rules = ceph_decode_32(p); 161 c->max_rules = ceph_decode_32(p);
162 c->max_devices = ceph_decode_32(p); 162 c->max_devices = ceph_decode_32(p);
163 163
164 c->device_parents = kcalloc(c->max_devices, sizeof(u32), GFP_NOFS);
165 if (c->device_parents == NULL)
166 goto badmem;
167 c->bucket_parents = kcalloc(c->max_buckets, sizeof(u32), GFP_NOFS);
168 if (c->bucket_parents == NULL)
169 goto badmem;
170
171 c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS); 164 c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS);
172 if (c->buckets == NULL) 165 if (c->buckets == NULL)
173 goto badmem; 166 goto badmem;
@@ -890,8 +883,12 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
890 pglen = ceph_decode_32(p); 883 pglen = ceph_decode_32(p);
891 884
892 if (pglen) { 885 if (pglen) {
893 /* insert */
894 ceph_decode_need(p, end, pglen*sizeof(u32), bad); 886 ceph_decode_need(p, end, pglen*sizeof(u32), bad);
887
888 /* removing existing (if any) */
889 (void) __remove_pg_mapping(&map->pg_temp, pgid);
890
891 /* insert */
895 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); 892 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
896 if (!pg) { 893 if (!pg) {
897 err = -ENOMEM; 894 err = -ENOMEM;
@@ -1000,7 +997,6 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol,
1000{ 997{
1001 unsigned int num, num_mask; 998 unsigned int num, num_mask;
1002 struct ceph_pg pgid; 999 struct ceph_pg pgid;
1003 s32 preferred = (s32)le32_to_cpu(fl->fl_pg_preferred);
1004 int poolid = le32_to_cpu(fl->fl_pg_pool); 1000 int poolid = le32_to_cpu(fl->fl_pg_pool);
1005 struct ceph_pg_pool_info *pool; 1001 struct ceph_pg_pool_info *pool;
1006 unsigned int ps; 1002 unsigned int ps;
@@ -1011,23 +1007,13 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol,
1011 if (!pool) 1007 if (!pool)
1012 return -EIO; 1008 return -EIO;
1013 ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); 1009 ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid));
1014 if (preferred >= 0) { 1010 num = le32_to_cpu(pool->v.pg_num);
1015 ps += preferred; 1011 num_mask = pool->pg_num_mask;
1016 num = le32_to_cpu(pool->v.lpg_num);
1017 num_mask = pool->lpg_num_mask;
1018 } else {
1019 num = le32_to_cpu(pool->v.pg_num);
1020 num_mask = pool->pg_num_mask;
1021 }
1022 1012
1023 pgid.ps = cpu_to_le16(ps); 1013 pgid.ps = cpu_to_le16(ps);
1024 pgid.preferred = cpu_to_le16(preferred); 1014 pgid.preferred = cpu_to_le16(-1);
1025 pgid.pool = fl->fl_pg_pool; 1015 pgid.pool = fl->fl_pg_pool;
1026 if (preferred >= 0) 1016 dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps);
1027 dout("calc_object_layout '%s' pgid %d.%xp%d\n", oid, poolid, ps,
1028 (int)preferred);
1029 else
1030 dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps);
1031 1017
1032 ol->ol_pgid = pgid; 1018 ol->ol_pgid = pgid;
1033 ol->ol_stripe_unit = fl->fl_object_stripe_unit; 1019 ol->ol_stripe_unit = fl->fl_object_stripe_unit;
@@ -1045,24 +1031,18 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1045 struct ceph_pg_mapping *pg; 1031 struct ceph_pg_mapping *pg;
1046 struct ceph_pg_pool_info *pool; 1032 struct ceph_pg_pool_info *pool;
1047 int ruleno; 1033 int ruleno;
1048 unsigned int poolid, ps, pps, t; 1034 unsigned int poolid, ps, pps, t, r;
1049 int preferred;
1050 1035
1051 poolid = le32_to_cpu(pgid.pool); 1036 poolid = le32_to_cpu(pgid.pool);
1052 ps = le16_to_cpu(pgid.ps); 1037 ps = le16_to_cpu(pgid.ps);
1053 preferred = (s16)le16_to_cpu(pgid.preferred);
1054 1038
1055 pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); 1039 pool = __lookup_pg_pool(&osdmap->pg_pools, poolid);
1056 if (!pool) 1040 if (!pool)
1057 return NULL; 1041 return NULL;
1058 1042
1059 /* pg_temp? */ 1043 /* pg_temp? */
1060 if (preferred >= 0) 1044 t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num),
1061 t = ceph_stable_mod(ps, le32_to_cpu(pool->v.lpg_num), 1045 pool->pgp_num_mask);
1062 pool->lpgp_num_mask);
1063 else
1064 t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num),
1065 pool->pgp_num_mask);
1066 pgid.ps = cpu_to_le16(t); 1046 pgid.ps = cpu_to_le16(t);
1067 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); 1047 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
1068 if (pg) { 1048 if (pg) {
@@ -1080,23 +1060,20 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1080 return NULL; 1060 return NULL;
1081 } 1061 }
1082 1062
1083 /* don't forcefeed bad device ids to crush */ 1063 pps = ceph_stable_mod(ps,
1084 if (preferred >= osdmap->max_osd || 1064 le32_to_cpu(pool->v.pgp_num),
1085 preferred >= osdmap->crush->max_devices) 1065 pool->pgp_num_mask);
1086 preferred = -1;
1087
1088 if (preferred >= 0)
1089 pps = ceph_stable_mod(ps,
1090 le32_to_cpu(pool->v.lpgp_num),
1091 pool->lpgp_num_mask);
1092 else
1093 pps = ceph_stable_mod(ps,
1094 le32_to_cpu(pool->v.pgp_num),
1095 pool->pgp_num_mask);
1096 pps += poolid; 1066 pps += poolid;
1097 *num = crush_do_rule(osdmap->crush, ruleno, pps, osds, 1067 r = crush_do_rule(osdmap->crush, ruleno, pps, osds,
1098 min_t(int, pool->v.size, *num), 1068 min_t(int, pool->v.size, *num),
1099 preferred, osdmap->osd_weight); 1069 osdmap->osd_weight);
1070 if (r < 0) {
1071 pr_err("error %d from crush rule: pool %d ruleset %d type %d"
1072 " size %d\n", r, poolid, pool->v.crush_ruleset,
1073 pool->v.type, pool->v.size);
1074 return NULL;
1075 }
1076 *num = r;
1100 return osds; 1077 return osds;
1101} 1078}
1102 1079