aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c87
-rw-r--r--fs/ceph/cache.c1
-rw-r--r--fs/ceph/cache.h10
-rw-r--r--fs/ceph/caps.c9
-rw-r--r--fs/ceph/debugfs.c5
-rw-r--r--fs/ceph/dir.c53
-rw-r--r--fs/ceph/export.c267
-rw-r--r--fs/ceph/file.c8
-rw-r--r--fs/ceph/inode.c76
-rw-r--r--fs/ceph/ioctl.c5
-rw-r--r--fs/ceph/locks.c98
-rw-r--r--fs/ceph/mds_client.c97
-rw-r--r--fs/ceph/mds_client.h4
-rw-r--r--fs/ceph/strings.c1
-rw-r--r--fs/ceph/super.c1
-rw-r--r--fs/ceph/super.h3
-rw-r--r--fs/ceph/xattr.c48
-rw-r--r--include/linux/ceph/ceph_features.h12
-rw-r--r--include/linux/ceph/ceph_fs.h5
-rw-r--r--include/linux/ceph/osd_client.h11
-rw-r--r--include/linux/ceph/osdmap.h50
-rw-r--r--include/linux/ceph/rados.h18
-rw-r--r--include/linux/crush/crush.h7
-rw-r--r--net/ceph/crush/mapper.c85
-rw-r--r--net/ceph/debugfs.c55
-rw-r--r--net/ceph/messenger.c6
-rw-r--r--net/ceph/osd_client.c41
-rw-r--r--net/ceph/osdmap.c993
28 files changed, 1421 insertions, 635 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 34898d53395b..4c95b503b09e 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1654,7 +1654,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1654 if (osd_req->r_result < 0) 1654 if (osd_req->r_result < 0)
1655 obj_request->result = osd_req->r_result; 1655 obj_request->result = osd_req->r_result;
1656 1656
1657 BUG_ON(osd_req->r_num_ops > 2); 1657 rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
1658 1658
1659 /* 1659 /*
1660 * We support a 64-bit length, but ultimately it has to be 1660 * We support a 64-bit length, but ultimately it has to be
@@ -1662,11 +1662,15 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1662 */ 1662 */
1663 obj_request->xferred = osd_req->r_reply_op_len[0]; 1663 obj_request->xferred = osd_req->r_reply_op_len[0];
1664 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1664 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1665
1665 opcode = osd_req->r_ops[0].op; 1666 opcode = osd_req->r_ops[0].op;
1666 switch (opcode) { 1667 switch (opcode) {
1667 case CEPH_OSD_OP_READ: 1668 case CEPH_OSD_OP_READ:
1668 rbd_osd_read_callback(obj_request); 1669 rbd_osd_read_callback(obj_request);
1669 break; 1670 break;
1671 case CEPH_OSD_OP_SETALLOCHINT:
1672 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
1673 /* fall through */
1670 case CEPH_OSD_OP_WRITE: 1674 case CEPH_OSD_OP_WRITE:
1671 rbd_osd_write_callback(obj_request); 1675 rbd_osd_write_callback(obj_request);
1672 break; 1676 break;
@@ -1715,9 +1719,16 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1715 snapc, CEPH_NOSNAP, &mtime); 1719 snapc, CEPH_NOSNAP, &mtime);
1716} 1720}
1717 1721
1722/*
1723 * Create an osd request. A read request has one osd op (read).
1724 * A write request has either one (watch) or two (hint+write) osd ops.
1725 * (All rbd data writes are prefixed with an allocation hint op, but
1726 * technically osd watch is a write request, hence this distinction.)
1727 */
1718static struct ceph_osd_request *rbd_osd_req_create( 1728static struct ceph_osd_request *rbd_osd_req_create(
1719 struct rbd_device *rbd_dev, 1729 struct rbd_device *rbd_dev,
1720 bool write_request, 1730 bool write_request,
1731 unsigned int num_ops,
1721 struct rbd_obj_request *obj_request) 1732 struct rbd_obj_request *obj_request)
1722{ 1733{
1723 struct ceph_snap_context *snapc = NULL; 1734 struct ceph_snap_context *snapc = NULL;
@@ -1733,10 +1744,13 @@ static struct ceph_osd_request *rbd_osd_req_create(
1733 snapc = img_request->snapc; 1744 snapc = img_request->snapc;
1734 } 1745 }
1735 1746
1736 /* Allocate and initialize the request, for the single op */ 1747 rbd_assert(num_ops == 1 || (write_request && num_ops == 2));
1748
1749 /* Allocate and initialize the request, for the num_ops ops */
1737 1750
1738 osdc = &rbd_dev->rbd_client->client->osdc; 1751 osdc = &rbd_dev->rbd_client->client->osdc;
1739 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC); 1752 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
1753 GFP_ATOMIC);
1740 if (!osd_req) 1754 if (!osd_req)
1741 return NULL; /* ENOMEM */ 1755 return NULL; /* ENOMEM */
1742 1756
@@ -1756,8 +1770,8 @@ static struct ceph_osd_request *rbd_osd_req_create(
1756 1770
1757/* 1771/*
1758 * Create a copyup osd request based on the information in the 1772 * Create a copyup osd request based on the information in the
1759 * object request supplied. A copyup request has two osd ops, 1773 * object request supplied. A copyup request has three osd ops,
1760 * a copyup method call, and a "normal" write request. 1774 * a copyup method call, a hint op, and a write op.
1761 */ 1775 */
1762static struct ceph_osd_request * 1776static struct ceph_osd_request *
1763rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) 1777rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
@@ -1773,12 +1787,12 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1773 rbd_assert(img_request); 1787 rbd_assert(img_request);
1774 rbd_assert(img_request_write_test(img_request)); 1788 rbd_assert(img_request_write_test(img_request));
1775 1789
1776 /* Allocate and initialize the request, for the two ops */ 1790 /* Allocate and initialize the request, for the three ops */
1777 1791
1778 snapc = img_request->snapc; 1792 snapc = img_request->snapc;
1779 rbd_dev = img_request->rbd_dev; 1793 rbd_dev = img_request->rbd_dev;
1780 osdc = &rbd_dev->rbd_client->client->osdc; 1794 osdc = &rbd_dev->rbd_client->client->osdc;
1781 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC); 1795 osd_req = ceph_osdc_alloc_request(osdc, snapc, 3, false, GFP_ATOMIC);
1782 if (!osd_req) 1796 if (!osd_req)
1783 return NULL; /* ENOMEM */ 1797 return NULL; /* ENOMEM */
1784 1798
@@ -2178,6 +2192,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2178 const char *object_name; 2192 const char *object_name;
2179 u64 offset; 2193 u64 offset;
2180 u64 length; 2194 u64 length;
2195 unsigned int which = 0;
2181 2196
2182 object_name = rbd_segment_name(rbd_dev, img_offset); 2197 object_name = rbd_segment_name(rbd_dev, img_offset);
2183 if (!object_name) 2198 if (!object_name)
@@ -2190,6 +2205,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2190 rbd_segment_name_free(object_name); 2205 rbd_segment_name_free(object_name);
2191 if (!obj_request) 2206 if (!obj_request)
2192 goto out_unwind; 2207 goto out_unwind;
2208
2193 /* 2209 /*
2194 * set obj_request->img_request before creating the 2210 * set obj_request->img_request before creating the
2195 * osd_request so that it gets the right snapc 2211 * osd_request so that it gets the right snapc
@@ -2207,7 +2223,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2207 clone_size, 2223 clone_size,
2208 GFP_ATOMIC); 2224 GFP_ATOMIC);
2209 if (!obj_request->bio_list) 2225 if (!obj_request->bio_list)
2210 goto out_partial; 2226 goto out_unwind;
2211 } else { 2227 } else {
2212 unsigned int page_count; 2228 unsigned int page_count;
2213 2229
@@ -2220,19 +2236,27 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2220 } 2236 }
2221 2237
2222 osd_req = rbd_osd_req_create(rbd_dev, write_request, 2238 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2223 obj_request); 2239 (write_request ? 2 : 1),
2240 obj_request);
2224 if (!osd_req) 2241 if (!osd_req)
2225 goto out_partial; 2242 goto out_unwind;
2226 obj_request->osd_req = osd_req; 2243 obj_request->osd_req = osd_req;
2227 obj_request->callback = rbd_img_obj_callback; 2244 obj_request->callback = rbd_img_obj_callback;
2228 2245
2229 osd_req_op_extent_init(osd_req, 0, opcode, offset, length, 2246 if (write_request) {
2230 0, 0); 2247 osd_req_op_alloc_hint_init(osd_req, which,
2248 rbd_obj_bytes(&rbd_dev->header),
2249 rbd_obj_bytes(&rbd_dev->header));
2250 which++;
2251 }
2252
2253 osd_req_op_extent_init(osd_req, which, opcode, offset, length,
2254 0, 0);
2231 if (type == OBJ_REQUEST_BIO) 2255 if (type == OBJ_REQUEST_BIO)
2232 osd_req_op_extent_osd_data_bio(osd_req, 0, 2256 osd_req_op_extent_osd_data_bio(osd_req, which,
2233 obj_request->bio_list, length); 2257 obj_request->bio_list, length);
2234 else 2258 else
2235 osd_req_op_extent_osd_data_pages(osd_req, 0, 2259 osd_req_op_extent_osd_data_pages(osd_req, which,
2236 obj_request->pages, length, 2260 obj_request->pages, length,
2237 offset & ~PAGE_MASK, false, false); 2261 offset & ~PAGE_MASK, false, false);
2238 2262
@@ -2249,11 +2273,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2249 2273
2250 return 0; 2274 return 0;
2251 2275
2252out_partial:
2253 rbd_obj_request_put(obj_request);
2254out_unwind: 2276out_unwind:
2255 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 2277 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2256 rbd_obj_request_put(obj_request); 2278 rbd_img_obj_request_del(img_request, obj_request);
2257 2279
2258 return -ENOMEM; 2280 return -ENOMEM;
2259} 2281}
@@ -2353,7 +2375,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2353 2375
2354 /* 2376 /*
2355 * The original osd request is of no use to use any more. 2377 * The original osd request is of no use to use any more.
2356 * We need a new one that can hold the two ops in a copyup 2378 * We need a new one that can hold the three ops in a copyup
2357 * request. Allocate the new copyup osd request for the 2379 * request. Allocate the new copyup osd request for the
2358 * original request, and release the old one. 2380 * original request, and release the old one.
2359 */ 2381 */
@@ -2372,17 +2394,22 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2372 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0, 2394 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2373 false, false); 2395 false, false);
2374 2396
2375 /* Then the original write request op */ 2397 /* Then the hint op */
2398
2399 osd_req_op_alloc_hint_init(osd_req, 1, rbd_obj_bytes(&rbd_dev->header),
2400 rbd_obj_bytes(&rbd_dev->header));
2401
2402 /* And the original write request op */
2376 2403
2377 offset = orig_request->offset; 2404 offset = orig_request->offset;
2378 length = orig_request->length; 2405 length = orig_request->length;
2379 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE, 2406 osd_req_op_extent_init(osd_req, 2, CEPH_OSD_OP_WRITE,
2380 offset, length, 0, 0); 2407 offset, length, 0, 0);
2381 if (orig_request->type == OBJ_REQUEST_BIO) 2408 if (orig_request->type == OBJ_REQUEST_BIO)
2382 osd_req_op_extent_osd_data_bio(osd_req, 1, 2409 osd_req_op_extent_osd_data_bio(osd_req, 2,
2383 orig_request->bio_list, length); 2410 orig_request->bio_list, length);
2384 else 2411 else
2385 osd_req_op_extent_osd_data_pages(osd_req, 1, 2412 osd_req_op_extent_osd_data_pages(osd_req, 2,
2386 orig_request->pages, length, 2413 orig_request->pages, length,
2387 offset & ~PAGE_MASK, false, false); 2414 offset & ~PAGE_MASK, false, false);
2388 2415
@@ -2603,8 +2630,8 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2603 2630
2604 rbd_assert(obj_request->img_request); 2631 rbd_assert(obj_request->img_request);
2605 rbd_dev = obj_request->img_request->rbd_dev; 2632 rbd_dev = obj_request->img_request->rbd_dev;
2606 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 2633 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
2607 stat_request); 2634 stat_request);
2608 if (!stat_request->osd_req) 2635 if (!stat_request->osd_req)
2609 goto out; 2636 goto out;
2610 stat_request->callback = rbd_img_obj_exists_callback; 2637 stat_request->callback = rbd_img_obj_exists_callback;
@@ -2807,7 +2834,8 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
2807 return -ENOMEM; 2834 return -ENOMEM;
2808 2835
2809 ret = -ENOMEM; 2836 ret = -ENOMEM;
2810 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); 2837 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
2838 obj_request);
2811 if (!obj_request->osd_req) 2839 if (!obj_request->osd_req)
2812 goto out; 2840 goto out;
2813 2841
@@ -2870,7 +2898,8 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2870 if (!obj_request) 2898 if (!obj_request)
2871 goto out_cancel; 2899 goto out_cancel;
2872 2900
2873 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request); 2901 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
2902 obj_request);
2874 if (!obj_request->osd_req) 2903 if (!obj_request->osd_req)
2875 goto out_cancel; 2904 goto out_cancel;
2876 2905
@@ -2978,7 +3007,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2978 obj_request->pages = pages; 3007 obj_request->pages = pages;
2979 obj_request->page_count = page_count; 3008 obj_request->page_count = page_count;
2980 3009
2981 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); 3010 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
3011 obj_request);
2982 if (!obj_request->osd_req) 3012 if (!obj_request->osd_req)
2983 goto out; 3013 goto out;
2984 3014
@@ -3211,7 +3241,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3211 obj_request->pages = pages; 3241 obj_request->pages = pages;
3212 obj_request->page_count = page_count; 3242 obj_request->page_count = page_count;
3213 3243
3214 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); 3244 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
3245 obj_request);
3215 if (!obj_request->osd_req) 3246 if (!obj_request->osd_req)
3216 goto out; 3247 goto out;
3217 3248
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 8c44fdd4e1c3..834f9f3723fb 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -205,6 +205,7 @@ void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc,
205 ci->fscache = fscache_acquire_cookie(fsc->fscache, 205 ci->fscache = fscache_acquire_cookie(fsc->fscache,
206 &ceph_fscache_inode_object_def, 206 &ceph_fscache_inode_object_def,
207 ci, true); 207 ci, true);
208 fscache_check_consistency(ci->fscache);
208done: 209done:
209 mutex_unlock(&inode->i_mutex); 210 mutex_unlock(&inode->i_mutex);
210 211
diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h
index da95f61b7a09..5ac591bd012b 100644
--- a/fs/ceph/cache.h
+++ b/fs/ceph/cache.h
@@ -48,6 +48,12 @@ void ceph_readpage_to_fscache(struct inode *inode, struct page *page);
48void ceph_invalidate_fscache_page(struct inode* inode, struct page *page); 48void ceph_invalidate_fscache_page(struct inode* inode, struct page *page);
49void ceph_queue_revalidate(struct inode *inode); 49void ceph_queue_revalidate(struct inode *inode);
50 50
51static inline void ceph_fscache_update_objectsize(struct inode *inode)
52{
53 struct ceph_inode_info *ci = ceph_inode(inode);
54 fscache_attr_changed(ci->fscache);
55}
56
51static inline void ceph_fscache_invalidate(struct inode *inode) 57static inline void ceph_fscache_invalidate(struct inode *inode)
52{ 58{
53 fscache_invalidate(ceph_inode(inode)->fscache); 59 fscache_invalidate(ceph_inode(inode)->fscache);
@@ -135,6 +141,10 @@ static inline void ceph_readpage_to_fscache(struct inode *inode,
135{ 141{
136} 142}
137 143
144static inline void ceph_fscache_update_objectsize(struct inode *inode)
145{
146}
147
138static inline void ceph_fscache_invalidate(struct inode *inode) 148static inline void ceph_fscache_invalidate(struct inode *inode)
139{ 149{
140} 150}
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 17543383545c..2e5e648eb5c3 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -622,8 +622,10 @@ retry:
622 622
623 if (flags & CEPH_CAP_FLAG_AUTH) { 623 if (flags & CEPH_CAP_FLAG_AUTH) {
624 if (ci->i_auth_cap == NULL || 624 if (ci->i_auth_cap == NULL ||
625 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) 625 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
626 ci->i_auth_cap = cap; 626 ci->i_auth_cap = cap;
627 cap->mds_wanted = wanted;
628 }
627 ci->i_cap_exporting_issued = 0; 629 ci->i_cap_exporting_issued = 0;
628 } else { 630 } else {
629 WARN_ON(ci->i_auth_cap == cap); 631 WARN_ON(ci->i_auth_cap == cap);
@@ -885,7 +887,10 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
885 cap = rb_entry(p, struct ceph_cap, ci_node); 887 cap = rb_entry(p, struct ceph_cap, ci_node);
886 if (!__cap_is_valid(cap)) 888 if (!__cap_is_valid(cap))
887 continue; 889 continue;
888 mds_wanted |= cap->mds_wanted; 890 if (cap == ci->i_auth_cap)
891 mds_wanted |= cap->mds_wanted;
892 else
893 mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR);
889 } 894 }
890 return mds_wanted; 895 return mds_wanted;
891} 896}
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 6d59006bfa27..16b54aa31f08 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -93,6 +93,8 @@ static int mdsc_show(struct seq_file *s, void *p)
93 } else if (req->r_path1) { 93 } else if (req->r_path1) {
94 seq_printf(s, " #%llx/%s", req->r_ino1.ino, 94 seq_printf(s, " #%llx/%s", req->r_ino1.ino,
95 req->r_path1); 95 req->r_path1);
96 } else {
97 seq_printf(s, " #%llx", req->r_ino1.ino);
96 } 98 }
97 99
98 if (req->r_old_dentry) { 100 if (req->r_old_dentry) {
@@ -102,7 +104,8 @@ static int mdsc_show(struct seq_file *s, void *p)
102 path = NULL; 104 path = NULL;
103 spin_lock(&req->r_old_dentry->d_lock); 105 spin_lock(&req->r_old_dentry->d_lock);
104 seq_printf(s, " #%llx/%.*s (%s)", 106 seq_printf(s, " #%llx/%.*s (%s)",
105 ceph_ino(req->r_old_dentry_dir), 107 req->r_old_dentry_dir ?
108 ceph_ino(req->r_old_dentry_dir) : 0,
106 req->r_old_dentry->d_name.len, 109 req->r_old_dentry->d_name.len,
107 req->r_old_dentry->d_name.name, 110 req->r_old_dentry->d_name.name,
108 path ? path : ""); 111 path ? path : "");
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 45eda6d7a40c..766410a12c2c 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -119,7 +119,8 @@ static int fpos_cmp(loff_t l, loff_t r)
119 * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by 119 * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
120 * the MDS if/when the directory is modified). 120 * the MDS if/when the directory is modified).
121 */ 121 */
122static int __dcache_readdir(struct file *file, struct dir_context *ctx) 122static int __dcache_readdir(struct file *file, struct dir_context *ctx,
123 u32 shared_gen)
123{ 124{
124 struct ceph_file_info *fi = file->private_data; 125 struct ceph_file_info *fi = file->private_data;
125 struct dentry *parent = file->f_dentry; 126 struct dentry *parent = file->f_dentry;
@@ -133,8 +134,8 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx)
133 last = fi->dentry; 134 last = fi->dentry;
134 fi->dentry = NULL; 135 fi->dentry = NULL;
135 136
136 dout("__dcache_readdir %p at %llu (last %p)\n", dir, ctx->pos, 137 dout("__dcache_readdir %p v%u at %llu (last %p)\n",
137 last); 138 dir, shared_gen, ctx->pos, last);
138 139
139 spin_lock(&parent->d_lock); 140 spin_lock(&parent->d_lock);
140 141
@@ -161,7 +162,8 @@ more:
161 goto out_unlock; 162 goto out_unlock;
162 } 163 }
163 spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); 164 spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
164 if (!d_unhashed(dentry) && dentry->d_inode && 165 if (di->lease_shared_gen == shared_gen &&
166 !d_unhashed(dentry) && dentry->d_inode &&
165 ceph_snap(dentry->d_inode) != CEPH_SNAPDIR && 167 ceph_snap(dentry->d_inode) != CEPH_SNAPDIR &&
166 ceph_ino(dentry->d_inode) != CEPH_INO_CEPH && 168 ceph_ino(dentry->d_inode) != CEPH_INO_CEPH &&
167 fpos_cmp(ctx->pos, di->offset) <= 0) 169 fpos_cmp(ctx->pos, di->offset) <= 0)
@@ -190,7 +192,7 @@ more:
190 if (last) { 192 if (last) {
191 /* remember our position */ 193 /* remember our position */
192 fi->dentry = last; 194 fi->dentry = last;
193 fi->next_offset = di->offset; 195 fi->next_offset = fpos_off(di->offset);
194 } 196 }
195 dput(dentry); 197 dput(dentry);
196 return 0; 198 return 0;
@@ -252,8 +254,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
252 int err; 254 int err;
253 u32 ftype; 255 u32 ftype;
254 struct ceph_mds_reply_info_parsed *rinfo; 256 struct ceph_mds_reply_info_parsed *rinfo;
255 const int max_entries = fsc->mount_options->max_readdir;
256 const int max_bytes = fsc->mount_options->max_readdir_bytes;
257 257
258 dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off); 258 dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off);
259 if (fi->flags & CEPH_F_ATEND) 259 if (fi->flags & CEPH_F_ATEND)
@@ -291,8 +291,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
291 ceph_snap(inode) != CEPH_SNAPDIR && 291 ceph_snap(inode) != CEPH_SNAPDIR &&
292 __ceph_dir_is_complete(ci) && 292 __ceph_dir_is_complete(ci) &&
293 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { 293 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
294 u32 shared_gen = ci->i_shared_gen;
294 spin_unlock(&ci->i_ceph_lock); 295 spin_unlock(&ci->i_ceph_lock);
295 err = __dcache_readdir(file, ctx); 296 err = __dcache_readdir(file, ctx, shared_gen);
296 if (err != -EAGAIN) 297 if (err != -EAGAIN)
297 return err; 298 return err;
298 } else { 299 } else {
@@ -322,14 +323,16 @@ more:
322 fi->last_readdir = NULL; 323 fi->last_readdir = NULL;
323 } 324 }
324 325
325 /* requery frag tree, as the frag topology may have changed */
326 frag = ceph_choose_frag(ceph_inode(inode), frag, NULL, NULL);
327
328 dout("readdir fetching %llx.%llx frag %x offset '%s'\n", 326 dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
329 ceph_vinop(inode), frag, fi->last_name); 327 ceph_vinop(inode), frag, fi->last_name);
330 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 328 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
331 if (IS_ERR(req)) 329 if (IS_ERR(req))
332 return PTR_ERR(req); 330 return PTR_ERR(req);
331 err = ceph_alloc_readdir_reply_buffer(req, inode);
332 if (err) {
333 ceph_mdsc_put_request(req);
334 return err;
335 }
333 req->r_inode = inode; 336 req->r_inode = inode;
334 ihold(inode); 337 ihold(inode);
335 req->r_dentry = dget(file->f_dentry); 338 req->r_dentry = dget(file->f_dentry);
@@ -340,9 +343,6 @@ more:
340 req->r_path2 = kstrdup(fi->last_name, GFP_NOFS); 343 req->r_path2 = kstrdup(fi->last_name, GFP_NOFS);
341 req->r_readdir_offset = fi->next_offset; 344 req->r_readdir_offset = fi->next_offset;
342 req->r_args.readdir.frag = cpu_to_le32(frag); 345 req->r_args.readdir.frag = cpu_to_le32(frag);
343 req->r_args.readdir.max_entries = cpu_to_le32(max_entries);
344 req->r_args.readdir.max_bytes = cpu_to_le32(max_bytes);
345 req->r_num_caps = max_entries + 1;
346 err = ceph_mdsc_do_request(mdsc, NULL, req); 346 err = ceph_mdsc_do_request(mdsc, NULL, req);
347 if (err < 0) { 347 if (err < 0) {
348 ceph_mdsc_put_request(req); 348 ceph_mdsc_put_request(req);
@@ -369,9 +369,9 @@ more:
369 fi->next_offset = 0; 369 fi->next_offset = 0;
370 off = fi->next_offset; 370 off = fi->next_offset;
371 } 371 }
372 fi->frag = frag;
372 fi->offset = fi->next_offset; 373 fi->offset = fi->next_offset;
373 fi->last_readdir = req; 374 fi->last_readdir = req;
374 fi->frag = frag;
375 375
376 if (req->r_reply_info.dir_end) { 376 if (req->r_reply_info.dir_end) {
377 kfree(fi->last_name); 377 kfree(fi->last_name);
@@ -454,7 +454,7 @@ more:
454 return 0; 454 return 0;
455} 455}
456 456
457static void reset_readdir(struct ceph_file_info *fi) 457static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
458{ 458{
459 if (fi->last_readdir) { 459 if (fi->last_readdir) {
460 ceph_mdsc_put_request(fi->last_readdir); 460 ceph_mdsc_put_request(fi->last_readdir);
@@ -462,7 +462,10 @@ static void reset_readdir(struct ceph_file_info *fi)
462 } 462 }
463 kfree(fi->last_name); 463 kfree(fi->last_name);
464 fi->last_name = NULL; 464 fi->last_name = NULL;
465 fi->next_offset = 2; /* compensate for . and .. */ 465 if (ceph_frag_is_leftmost(frag))
466 fi->next_offset = 2; /* compensate for . and .. */
467 else
468 fi->next_offset = 0;
466 if (fi->dentry) { 469 if (fi->dentry) {
467 dput(fi->dentry); 470 dput(fi->dentry);
468 fi->dentry = NULL; 471 fi->dentry = NULL;
@@ -474,7 +477,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
474{ 477{
475 struct ceph_file_info *fi = file->private_data; 478 struct ceph_file_info *fi = file->private_data;
476 struct inode *inode = file->f_mapping->host; 479 struct inode *inode = file->f_mapping->host;
477 loff_t old_offset = offset; 480 loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
478 loff_t retval; 481 loff_t retval;
479 482
480 mutex_lock(&inode->i_mutex); 483 mutex_lock(&inode->i_mutex);
@@ -491,7 +494,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
491 goto out; 494 goto out;
492 } 495 }
493 496
494 if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) { 497 if (offset >= 0) {
495 if (offset != file->f_pos) { 498 if (offset != file->f_pos) {
496 file->f_pos = offset; 499 file->f_pos = offset;
497 file->f_version = 0; 500 file->f_version = 0;
@@ -504,14 +507,14 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
504 * seek to new frag, or seek prior to current chunk. 507 * seek to new frag, or seek prior to current chunk.
505 */ 508 */
506 if (offset == 0 || 509 if (offset == 0 ||
507 fpos_frag(offset) != fpos_frag(old_offset) || 510 fpos_frag(offset) != fi->frag ||
508 fpos_off(offset) < fi->offset) { 511 fpos_off(offset) < fi->offset) {
509 dout("dir_llseek dropping %p content\n", file); 512 dout("dir_llseek dropping %p content\n", file);
510 reset_readdir(fi); 513 reset_readdir(fi, fpos_frag(offset));
511 } 514 }
512 515
513 /* bump dir_release_count if we did a forward seek */ 516 /* bump dir_release_count if we did a forward seek */
514 if (offset > old_offset) 517 if (fpos_cmp(offset, old_offset) > 0)
515 fi->dir_release_count--; 518 fi->dir_release_count--;
516 } 519 }
517out: 520out:
@@ -812,8 +815,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
812 } 815 }
813 req->r_dentry = dget(dentry); 816 req->r_dentry = dget(dentry);
814 req->r_num_caps = 2; 817 req->r_num_caps = 2;
815 req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */ 818 req->r_old_dentry = dget(old_dentry);
816 req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry);
817 req->r_locked_dir = dir; 819 req->r_locked_dir = dir;
818 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 820 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
819 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 821 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -911,10 +913,11 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
911 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME, USE_AUTH_MDS); 913 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME, USE_AUTH_MDS);
912 if (IS_ERR(req)) 914 if (IS_ERR(req))
913 return PTR_ERR(req); 915 return PTR_ERR(req);
916 ihold(old_dir);
914 req->r_dentry = dget(new_dentry); 917 req->r_dentry = dget(new_dentry);
915 req->r_num_caps = 2; 918 req->r_num_caps = 2;
916 req->r_old_dentry = dget(old_dentry); 919 req->r_old_dentry = dget(old_dentry);
917 req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry); 920 req->r_old_dentry_dir = old_dir;
918 req->r_locked_dir = new_dir; 921 req->r_locked_dir = new_dir;
919 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; 922 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
920 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; 923 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 16796be53ca5..00d6af6a32ec 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -8,23 +8,6 @@
8#include "mds_client.h" 8#include "mds_client.h"
9 9
10/* 10/*
11 * NFS export support
12 *
13 * NFS re-export of a ceph mount is, at present, only semireliable.
14 * The basic issue is that the Ceph architectures doesn't lend itself
15 * well to generating filehandles that will remain valid forever.
16 *
17 * So, we do our best. If you're lucky, your inode will be in the
18 * client's cache. If it's not, and you have a connectable fh, then
19 * the MDS server may be able to find it for you. Otherwise, you get
20 * ESTALE.
21 *
22 * There are ways to this more reliable, but in the non-connectable fh
23 * case, we won't every work perfectly, and in the connectable case,
24 * some changes are needed on the MDS side to work better.
25 */
26
27/*
28 * Basic fh 11 * Basic fh
29 */ 12 */
30struct ceph_nfs_fh { 13struct ceph_nfs_fh {
@@ -32,22 +15,12 @@ struct ceph_nfs_fh {
32} __attribute__ ((packed)); 15} __attribute__ ((packed));
33 16
34/* 17/*
35 * Larger 'connectable' fh that includes parent ino and name hash. 18 * Larger fh that includes parent ino.
36 * Use this whenever possible, as it works more reliably.
37 */ 19 */
38struct ceph_nfs_confh { 20struct ceph_nfs_confh {
39 u64 ino, parent_ino; 21 u64 ino, parent_ino;
40 u32 parent_name_hash;
41} __attribute__ ((packed)); 22} __attribute__ ((packed));
42 23
43/*
44 * The presence of @parent_inode here tells us whether NFS wants a
45 * connectable file handle. However, we want to make a connectionable
46 * file handle unconditionally so that the MDS gets as much of a hint
47 * as possible. That means we only use @parent_dentry to indicate
48 * whether nfsd wants a connectable fh, and whether we should indicate
49 * failure from a too-small @max_len.
50 */
51static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len, 24static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len,
52 struct inode *parent_inode) 25 struct inode *parent_inode)
53{ 26{
@@ -56,54 +29,36 @@ static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len,
56 struct ceph_nfs_confh *cfh = (void *)rawfh; 29 struct ceph_nfs_confh *cfh = (void *)rawfh;
57 int connected_handle_length = sizeof(*cfh)/4; 30 int connected_handle_length = sizeof(*cfh)/4;
58 int handle_length = sizeof(*fh)/4; 31 int handle_length = sizeof(*fh)/4;
59 struct dentry *dentry;
60 struct dentry *parent;
61 32
62 /* don't re-export snaps */ 33 /* don't re-export snaps */
63 if (ceph_snap(inode) != CEPH_NOSNAP) 34 if (ceph_snap(inode) != CEPH_NOSNAP)
64 return -EINVAL; 35 return -EINVAL;
65 36
66 dentry = d_find_alias(inode); 37 if (parent_inode && (*max_len < connected_handle_length)) {
38 *max_len = connected_handle_length;
39 return FILEID_INVALID;
40 } else if (*max_len < handle_length) {
41 *max_len = handle_length;
42 return FILEID_INVALID;
43 }
67 44
68 /* if we found an alias, generate a connectable fh */ 45 if (parent_inode) {
69 if (*max_len >= connected_handle_length && dentry) { 46 dout("encode_fh %llx with parent %llx\n",
70 dout("encode_fh %p connectable\n", dentry); 47 ceph_ino(inode), ceph_ino(parent_inode));
71 spin_lock(&dentry->d_lock);
72 parent = dentry->d_parent;
73 cfh->ino = ceph_ino(inode); 48 cfh->ino = ceph_ino(inode);
74 cfh->parent_ino = ceph_ino(parent->d_inode); 49 cfh->parent_ino = ceph_ino(parent_inode);
75 cfh->parent_name_hash = ceph_dentry_hash(parent->d_inode,
76 dentry);
77 *max_len = connected_handle_length; 50 *max_len = connected_handle_length;
78 type = 2; 51 type = FILEID_INO32_GEN_PARENT;
79 spin_unlock(&dentry->d_lock);
80 } else if (*max_len >= handle_length) {
81 if (parent_inode) {
82 /* nfsd wants connectable */
83 *max_len = connected_handle_length;
84 type = FILEID_INVALID;
85 } else {
86 dout("encode_fh %p\n", dentry);
87 fh->ino = ceph_ino(inode);
88 *max_len = handle_length;
89 type = 1;
90 }
91 } else { 52 } else {
53 dout("encode_fh %llx\n", ceph_ino(inode));
54 fh->ino = ceph_ino(inode);
92 *max_len = handle_length; 55 *max_len = handle_length;
93 type = FILEID_INVALID; 56 type = FILEID_INO32_GEN;
94 } 57 }
95 if (dentry)
96 dput(dentry);
97 return type; 58 return type;
98} 59}
99 60
100/* 61static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
101 * convert regular fh to dentry
102 *
103 * FIXME: we should try harder by querying the mds for the ino.
104 */
105static struct dentry *__fh_to_dentry(struct super_block *sb,
106 struct ceph_nfs_fh *fh, int fh_len)
107{ 62{
108 struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc; 63 struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc;
109 struct inode *inode; 64 struct inode *inode;
@@ -111,11 +66,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb,
111 struct ceph_vino vino; 66 struct ceph_vino vino;
112 int err; 67 int err;
113 68
114 if (fh_len < sizeof(*fh) / 4) 69 vino.ino = ino;
115 return ERR_PTR(-ESTALE);
116
117 dout("__fh_to_dentry %llx\n", fh->ino);
118 vino.ino = fh->ino;
119 vino.snap = CEPH_NOSNAP; 70 vino.snap = CEPH_NOSNAP;
120 inode = ceph_find_inode(sb, vino); 71 inode = ceph_find_inode(sb, vino);
121 if (!inode) { 72 if (!inode) {
@@ -139,139 +90,161 @@ static struct dentry *__fh_to_dentry(struct super_block *sb,
139 90
140 dentry = d_obtain_alias(inode); 91 dentry = d_obtain_alias(inode);
141 if (IS_ERR(dentry)) { 92 if (IS_ERR(dentry)) {
142 pr_err("fh_to_dentry %llx -- inode %p but ENOMEM\n",
143 fh->ino, inode);
144 iput(inode); 93 iput(inode);
145 return dentry; 94 return dentry;
146 } 95 }
147 err = ceph_init_dentry(dentry); 96 err = ceph_init_dentry(dentry);
148 if (err < 0) { 97 if (err < 0) {
149 iput(inode); 98 dput(dentry);
150 return ERR_PTR(err); 99 return ERR_PTR(err);
151 } 100 }
152 dout("__fh_to_dentry %llx %p dentry %p\n", fh->ino, inode, dentry); 101 dout("__fh_to_dentry %llx %p dentry %p\n", ino, inode, dentry);
153 return dentry; 102 return dentry;
154} 103}
155 104
156/* 105/*
157 * convert connectable fh to dentry 106 * convert regular fh to dentry
158 */ 107 */
159static struct dentry *__cfh_to_dentry(struct super_block *sb, 108static struct dentry *ceph_fh_to_dentry(struct super_block *sb,
160 struct ceph_nfs_confh *cfh, int fh_len) 109 struct fid *fid,
110 int fh_len, int fh_type)
111{
112 struct ceph_nfs_fh *fh = (void *)fid->raw;
113
114 if (fh_type != FILEID_INO32_GEN &&
115 fh_type != FILEID_INO32_GEN_PARENT)
116 return NULL;
117 if (fh_len < sizeof(*fh) / 4)
118 return NULL;
119
120 dout("fh_to_dentry %llx\n", fh->ino);
121 return __fh_to_dentry(sb, fh->ino);
122}
123
124static struct dentry *__get_parent(struct super_block *sb,
125 struct dentry *child, u64 ino)
161{ 126{
162 struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc; 127 struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc;
128 struct ceph_mds_request *req;
163 struct inode *inode; 129 struct inode *inode;
164 struct dentry *dentry; 130 struct dentry *dentry;
165 struct ceph_vino vino;
166 int err; 131 int err;
167 132
168 if (fh_len < sizeof(*cfh) / 4) 133 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT,
169 return ERR_PTR(-ESTALE); 134 USE_ANY_MDS);
170 135 if (IS_ERR(req))
171 dout("__cfh_to_dentry %llx (%llx/%x)\n", 136 return ERR_CAST(req);
172 cfh->ino, cfh->parent_ino, cfh->parent_name_hash);
173
174 vino.ino = cfh->ino;
175 vino.snap = CEPH_NOSNAP;
176 inode = ceph_find_inode(sb, vino);
177 if (!inode) {
178 struct ceph_mds_request *req;
179
180 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPHASH,
181 USE_ANY_MDS);
182 if (IS_ERR(req))
183 return ERR_CAST(req);
184 137
185 req->r_ino1 = vino; 138 if (child) {
186 req->r_ino2.ino = cfh->parent_ino; 139 req->r_inode = child->d_inode;
187 req->r_ino2.snap = CEPH_NOSNAP; 140 ihold(child->d_inode);
188 req->r_path2 = kmalloc(16, GFP_NOFS); 141 } else {
189 snprintf(req->r_path2, 16, "%d", cfh->parent_name_hash); 142 req->r_ino1 = (struct ceph_vino) {
190 req->r_num_caps = 1; 143 .ino = ino,
191 err = ceph_mdsc_do_request(mdsc, NULL, req); 144 .snap = CEPH_NOSNAP,
192 inode = req->r_target_inode; 145 };
193 if (inode)
194 ihold(inode);
195 ceph_mdsc_put_request(req);
196 if (!inode)
197 return ERR_PTR(err ? err : -ESTALE);
198 } 146 }
147 req->r_num_caps = 1;
148 err = ceph_mdsc_do_request(mdsc, NULL, req);
149 inode = req->r_target_inode;
150 if (inode)
151 ihold(inode);
152 ceph_mdsc_put_request(req);
153 if (!inode)
154 return ERR_PTR(-ENOENT);
199 155
200 dentry = d_obtain_alias(inode); 156 dentry = d_obtain_alias(inode);
201 if (IS_ERR(dentry)) { 157 if (IS_ERR(dentry)) {
202 pr_err("cfh_to_dentry %llx -- inode %p but ENOMEM\n",
203 cfh->ino, inode);
204 iput(inode); 158 iput(inode);
205 return dentry; 159 return dentry;
206 } 160 }
207 err = ceph_init_dentry(dentry); 161 err = ceph_init_dentry(dentry);
208 if (err < 0) { 162 if (err < 0) {
209 iput(inode); 163 dput(dentry);
210 return ERR_PTR(err); 164 return ERR_PTR(err);
211 } 165 }
212 dout("__cfh_to_dentry %llx %p dentry %p\n", cfh->ino, inode, dentry); 166 dout("__get_parent ino %llx parent %p ino %llx.%llx\n",
167 child ? ceph_ino(child->d_inode) : ino,
168 dentry, ceph_vinop(inode));
213 return dentry; 169 return dentry;
214} 170}
215 171
216static struct dentry *ceph_fh_to_dentry(struct super_block *sb, struct fid *fid, 172struct dentry *ceph_get_parent(struct dentry *child)
217 int fh_len, int fh_type)
218{ 173{
219 if (fh_type == 1) 174 /* don't re-export snaps */
220 return __fh_to_dentry(sb, (struct ceph_nfs_fh *)fid->raw, 175 if (ceph_snap(child->d_inode) != CEPH_NOSNAP)
221 fh_len); 176 return ERR_PTR(-EINVAL);
222 else 177
223 return __cfh_to_dentry(sb, (struct ceph_nfs_confh *)fid->raw, 178 dout("get_parent %p ino %llx.%llx\n",
224 fh_len); 179 child, ceph_vinop(child->d_inode));
180 return __get_parent(child->d_sb, child, 0);
225} 181}
226 182
227/* 183/*
228 * get parent, if possible. 184 * convert regular fh to parent
229 *
230 * FIXME: we could do better by querying the mds to discover the
231 * parent.
232 */ 185 */
233static struct dentry *ceph_fh_to_parent(struct super_block *sb, 186static struct dentry *ceph_fh_to_parent(struct super_block *sb,
234 struct fid *fid, 187 struct fid *fid,
235 int fh_len, int fh_type) 188 int fh_len, int fh_type)
236{ 189{
237 struct ceph_nfs_confh *cfh = (void *)fid->raw; 190 struct ceph_nfs_confh *cfh = (void *)fid->raw;
238 struct ceph_vino vino;
239 struct inode *inode;
240 struct dentry *dentry; 191 struct dentry *dentry;
241 int err;
242 192
243 if (fh_type == 1) 193 if (fh_type != FILEID_INO32_GEN_PARENT)
244 return ERR_PTR(-ESTALE); 194 return NULL;
245 if (fh_len < sizeof(*cfh) / 4) 195 if (fh_len < sizeof(*cfh) / 4)
246 return ERR_PTR(-ESTALE); 196 return NULL;
247 197
248 pr_debug("fh_to_parent %llx/%d\n", cfh->parent_ino, 198 dout("fh_to_parent %llx\n", cfh->parent_ino);
249 cfh->parent_name_hash); 199 dentry = __get_parent(sb, NULL, cfh->ino);
200 if (IS_ERR(dentry) && PTR_ERR(dentry) == -ENOENT)
201 dentry = __fh_to_dentry(sb, cfh->parent_ino);
202 return dentry;
203}
250 204
251 vino.ino = cfh->ino; 205static int ceph_get_name(struct dentry *parent, char *name,
252 vino.snap = CEPH_NOSNAP; 206 struct dentry *child)
253 inode = ceph_find_inode(sb, vino); 207{
254 if (!inode) 208 struct ceph_mds_client *mdsc;
255 return ERR_PTR(-ESTALE); 209 struct ceph_mds_request *req;
210 int err;
256 211
257 dentry = d_obtain_alias(inode); 212 mdsc = ceph_inode_to_client(child->d_inode)->mdsc;
258 if (IS_ERR(dentry)) { 213 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPNAME,
259 pr_err("fh_to_parent %llx -- inode %p but ENOMEM\n", 214 USE_ANY_MDS);
260 cfh->ino, inode); 215 if (IS_ERR(req))
261 iput(inode); 216 return PTR_ERR(req);
262 return dentry; 217
263 } 218 mutex_lock(&parent->d_inode->i_mutex);
264 err = ceph_init_dentry(dentry); 219
265 if (err < 0) { 220 req->r_inode = child->d_inode;
266 iput(inode); 221 ihold(child->d_inode);
267 return ERR_PTR(err); 222 req->r_ino2 = ceph_vino(parent->d_inode);
223 req->r_locked_dir = parent->d_inode;
224 req->r_num_caps = 2;
225 err = ceph_mdsc_do_request(mdsc, NULL, req);
226
227 mutex_unlock(&parent->d_inode->i_mutex);
228
229 if (!err) {
230 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
231 memcpy(name, rinfo->dname, rinfo->dname_len);
232 name[rinfo->dname_len] = 0;
233 dout("get_name %p ino %llx.%llx name %s\n",
234 child, ceph_vinop(child->d_inode), name);
235 } else {
236 dout("get_name %p ino %llx.%llx err %d\n",
237 child, ceph_vinop(child->d_inode), err);
268 } 238 }
269 dout("fh_to_parent %llx %p dentry %p\n", cfh->ino, inode, dentry); 239
270 return dentry; 240 ceph_mdsc_put_request(req);
241 return err;
271} 242}
272 243
273const struct export_operations ceph_export_ops = { 244const struct export_operations ceph_export_ops = {
274 .encode_fh = ceph_encode_fh, 245 .encode_fh = ceph_encode_fh,
275 .fh_to_dentry = ceph_fh_to_dentry, 246 .fh_to_dentry = ceph_fh_to_dentry,
276 .fh_to_parent = ceph_fh_to_parent, 247 .fh_to_parent = ceph_fh_to_parent,
248 .get_parent = ceph_get_parent,
249 .get_name = ceph_get_name,
277}; 250};
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 09c7afe32e49..66075a4ad979 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -210,7 +210,7 @@ int ceph_open(struct inode *inode, struct file *file)
210 ihold(inode); 210 ihold(inode);
211 211
212 req->r_num_caps = 1; 212 req->r_num_caps = 1;
213 if (flags & (O_CREAT|O_TRUNC)) 213 if (flags & O_CREAT)
214 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); 214 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
215 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 215 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
216 iput(parent_inode); 216 iput(parent_inode);
@@ -291,8 +291,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
291 } 291 }
292 err = finish_open(file, dentry, ceph_open, opened); 292 err = finish_open(file, dentry, ceph_open, opened);
293 } 293 }
294
295out_err: 294out_err:
295 if (!req->r_err && req->r_target_inode)
296 ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
296 ceph_mdsc_put_request(req); 297 ceph_mdsc_put_request(req);
297 dout("atomic_open result=%d\n", err); 298 dout("atomic_open result=%d\n", err);
298 return err; 299 return err;
@@ -970,6 +971,7 @@ retry_snap:
970 goto retry_snap; 971 goto retry_snap;
971 } 972 }
972 } else { 973 } else {
974 loff_t old_size = inode->i_size;
973 /* 975 /*
974 * No need to acquire the i_truncate_mutex. Because 976 * No need to acquire the i_truncate_mutex. Because
975 * the MDS revokes Fwb caps before sending truncate 977 * the MDS revokes Fwb caps before sending truncate
@@ -980,6 +982,8 @@ retry_snap:
980 written = generic_file_buffered_write(iocb, iov, nr_segs, 982 written = generic_file_buffered_write(iocb, iov, nr_segs,
981 pos, &iocb->ki_pos, 983 pos, &iocb->ki_pos,
982 count, 0); 984 count, 0);
985 if (inode->i_size > old_size)
986 ceph_fscache_update_objectsize(inode);
983 mutex_unlock(&inode->i_mutex); 987 mutex_unlock(&inode->i_mutex);
984 } 988 }
985 989
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 32d519d8a2e2..0b0728e5be2d 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -659,14 +659,6 @@ static int fill_inode(struct inode *inode,
659 le32_to_cpu(info->time_warp_seq), 659 le32_to_cpu(info->time_warp_seq),
660 &ctime, &mtime, &atime); 660 &ctime, &mtime, &atime);
661 661
662 /* only update max_size on auth cap */
663 if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
664 ci->i_max_size != le64_to_cpu(info->max_size)) {
665 dout("max_size %lld -> %llu\n", ci->i_max_size,
666 le64_to_cpu(info->max_size));
667 ci->i_max_size = le64_to_cpu(info->max_size);
668 }
669
670 ci->i_layout = info->layout; 662 ci->i_layout = info->layout;
671 inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1; 663 inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
672 664
@@ -755,6 +747,14 @@ static int fill_inode(struct inode *inode,
755 ci->i_max_offset = 2; 747 ci->i_max_offset = 2;
756 } 748 }
757no_change: 749no_change:
750 /* only update max_size on auth cap */
751 if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
752 ci->i_max_size != le64_to_cpu(info->max_size)) {
753 dout("max_size %lld -> %llu\n", ci->i_max_size,
754 le64_to_cpu(info->max_size));
755 ci->i_max_size = le64_to_cpu(info->max_size);
756 }
757
758 spin_unlock(&ci->i_ceph_lock); 758 spin_unlock(&ci->i_ceph_lock);
759 759
760 /* queue truncate if we saw i_size decrease */ 760 /* queue truncate if we saw i_size decrease */
@@ -1044,10 +1044,59 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1044 session, req->r_request_started, -1, 1044 session, req->r_request_started, -1,
1045 &req->r_caps_reservation); 1045 &req->r_caps_reservation);
1046 if (err < 0) 1046 if (err < 0)
1047 return err; 1047 goto done;
1048 } else { 1048 } else {
1049 WARN_ON_ONCE(1); 1049 WARN_ON_ONCE(1);
1050 } 1050 }
1051
1052 if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) {
1053 struct qstr dname;
1054 struct dentry *dn, *parent;
1055
1056 BUG_ON(!rinfo->head->is_target);
1057 BUG_ON(req->r_dentry);
1058
1059 parent = d_find_any_alias(dir);
1060 BUG_ON(!parent);
1061
1062 dname.name = rinfo->dname;
1063 dname.len = rinfo->dname_len;
1064 dname.hash = full_name_hash(dname.name, dname.len);
1065 vino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1066 vino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1067retry_lookup:
1068 dn = d_lookup(parent, &dname);
1069 dout("d_lookup on parent=%p name=%.*s got %p\n",
1070 parent, dname.len, dname.name, dn);
1071
1072 if (!dn) {
1073 dn = d_alloc(parent, &dname);
1074 dout("d_alloc %p '%.*s' = %p\n", parent,
1075 dname.len, dname.name, dn);
1076 if (dn == NULL) {
1077 dput(parent);
1078 err = -ENOMEM;
1079 goto done;
1080 }
1081 err = ceph_init_dentry(dn);
1082 if (err < 0) {
1083 dput(dn);
1084 dput(parent);
1085 goto done;
1086 }
1087 } else if (dn->d_inode &&
1088 (ceph_ino(dn->d_inode) != vino.ino ||
1089 ceph_snap(dn->d_inode) != vino.snap)) {
1090 dout(" dn %p points to wrong inode %p\n",
1091 dn, dn->d_inode);
1092 d_delete(dn);
1093 dput(dn);
1094 goto retry_lookup;
1095 }
1096
1097 req->r_dentry = dn;
1098 dput(parent);
1099 }
1051 } 1100 }
1052 1101
1053 if (rinfo->head->is_target) { 1102 if (rinfo->head->is_target) {
@@ -1063,7 +1112,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1063 1112
1064 err = fill_inode(in, &rinfo->targeti, NULL, 1113 err = fill_inode(in, &rinfo->targeti, NULL,
1065 session, req->r_request_started, 1114 session, req->r_request_started,
1066 (le32_to_cpu(rinfo->head->result) == 0) ? 1115 (!req->r_aborted && rinfo->head->result == 0) ?
1067 req->r_fmode : -1, 1116 req->r_fmode : -1,
1068 &req->r_caps_reservation); 1117 &req->r_caps_reservation);
1069 if (err < 0) { 1118 if (err < 0) {
@@ -1616,8 +1665,6 @@ static const struct inode_operations ceph_symlink_iops = {
1616 .getxattr = ceph_getxattr, 1665 .getxattr = ceph_getxattr,
1617 .listxattr = ceph_listxattr, 1666 .listxattr = ceph_listxattr,
1618 .removexattr = ceph_removexattr, 1667 .removexattr = ceph_removexattr,
1619 .get_acl = ceph_get_acl,
1620 .set_acl = ceph_set_acl,
1621}; 1668};
1622 1669
1623/* 1670/*
@@ -1627,7 +1674,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1627{ 1674{
1628 struct inode *inode = dentry->d_inode; 1675 struct inode *inode = dentry->d_inode;
1629 struct ceph_inode_info *ci = ceph_inode(inode); 1676 struct ceph_inode_info *ci = ceph_inode(inode);
1630 struct inode *parent_inode;
1631 const unsigned int ia_valid = attr->ia_valid; 1677 const unsigned int ia_valid = attr->ia_valid;
1632 struct ceph_mds_request *req; 1678 struct ceph_mds_request *req;
1633 struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; 1679 struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
@@ -1819,9 +1865,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1819 req->r_inode_drop = release; 1865 req->r_inode_drop = release;
1820 req->r_args.setattr.mask = cpu_to_le32(mask); 1866 req->r_args.setattr.mask = cpu_to_le32(mask);
1821 req->r_num_caps = 1; 1867 req->r_num_caps = 1;
1822 parent_inode = ceph_get_dentry_parent_inode(dentry); 1868 err = ceph_mdsc_do_request(mdsc, NULL, req);
1823 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
1824 iput(parent_inode);
1825 } 1869 }
1826 dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err, 1870 dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err,
1827 ceph_cap_string(dirtied), mask); 1871 ceph_cap_string(dirtied), mask);
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index dc66c9e023e4..efbe08289292 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -64,7 +64,6 @@ static long __validate_layout(struct ceph_mds_client *mdsc,
64static long ceph_ioctl_set_layout(struct file *file, void __user *arg) 64static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
65{ 65{
66 struct inode *inode = file_inode(file); 66 struct inode *inode = file_inode(file);
67 struct inode *parent_inode;
68 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 67 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
69 struct ceph_mds_request *req; 68 struct ceph_mds_request *req;
70 struct ceph_ioctl_layout l; 69 struct ceph_ioctl_layout l;
@@ -121,9 +120,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
121 cpu_to_le32(l.object_size); 120 cpu_to_le32(l.object_size);
122 req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool); 121 req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool);
123 122
124 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); 123 err = ceph_mdsc_do_request(mdsc, NULL, req);
125 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
126 iput(parent_inode);
127 ceph_mdsc_put_request(req); 124 ceph_mdsc_put_request(req);
128 return err; 125 return err;
129} 126}
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index ae6d14e82b0f..d94ba0df9f4d 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -2,11 +2,31 @@
2 2
3#include <linux/file.h> 3#include <linux/file.h>
4#include <linux/namei.h> 4#include <linux/namei.h>
5#include <linux/random.h>
5 6
6#include "super.h" 7#include "super.h"
7#include "mds_client.h" 8#include "mds_client.h"
8#include <linux/ceph/pagelist.h> 9#include <linux/ceph/pagelist.h>
9 10
11static u64 lock_secret;
12
13static inline u64 secure_addr(void *addr)
14{
15 u64 v = lock_secret ^ (u64)(unsigned long)addr;
16 /*
17 * Set the most significant bit, so that MDS knows the 'owner'
18 * is sufficient to identify the owner of lock. (old code uses
19 * both 'owner' and 'pid')
20 */
21 v |= (1ULL << 63);
22 return v;
23}
24
25void __init ceph_flock_init(void)
26{
27 get_random_bytes(&lock_secret, sizeof(lock_secret));
28}
29
10/** 30/**
11 * Implement fcntl and flock locking functions. 31 * Implement fcntl and flock locking functions.
12 */ 32 */
@@ -14,11 +34,11 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
14 int cmd, u8 wait, struct file_lock *fl) 34 int cmd, u8 wait, struct file_lock *fl)
15{ 35{
16 struct inode *inode = file_inode(file); 36 struct inode *inode = file_inode(file);
17 struct ceph_mds_client *mdsc = 37 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
18 ceph_sb_to_client(inode->i_sb)->mdsc;
19 struct ceph_mds_request *req; 38 struct ceph_mds_request *req;
20 int err; 39 int err;
21 u64 length = 0; 40 u64 length = 0;
41 u64 owner;
22 42
23 req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS); 43 req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
24 if (IS_ERR(req)) 44 if (IS_ERR(req))
@@ -32,25 +52,27 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
32 else 52 else
33 length = fl->fl_end - fl->fl_start + 1; 53 length = fl->fl_end - fl->fl_start + 1;
34 54
35 dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, " 55 if (lock_type == CEPH_LOCK_FCNTL)
36 "length: %llu, wait: %d, type: %d", (int)lock_type, 56 owner = secure_addr(fl->fl_owner);
37 (int)operation, (u64)fl->fl_pid, fl->fl_start, 57 else
38 length, wait, fl->fl_type); 58 owner = secure_addr(fl->fl_file);
59
60 dout("ceph_lock_message: rule: %d, op: %d, owner: %llx, pid: %llu, "
61 "start: %llu, length: %llu, wait: %d, type: %d", (int)lock_type,
62 (int)operation, owner, (u64)fl->fl_pid, fl->fl_start, length,
63 wait, fl->fl_type);
39 64
40 req->r_args.filelock_change.rule = lock_type; 65 req->r_args.filelock_change.rule = lock_type;
41 req->r_args.filelock_change.type = cmd; 66 req->r_args.filelock_change.type = cmd;
67 req->r_args.filelock_change.owner = cpu_to_le64(owner);
42 req->r_args.filelock_change.pid = cpu_to_le64((u64)fl->fl_pid); 68 req->r_args.filelock_change.pid = cpu_to_le64((u64)fl->fl_pid);
43 /* This should be adjusted, but I'm not sure if
44 namespaces actually get id numbers*/
45 req->r_args.filelock_change.pid_namespace =
46 cpu_to_le64((u64)(unsigned long)fl->fl_nspid);
47 req->r_args.filelock_change.start = cpu_to_le64(fl->fl_start); 69 req->r_args.filelock_change.start = cpu_to_le64(fl->fl_start);
48 req->r_args.filelock_change.length = cpu_to_le64(length); 70 req->r_args.filelock_change.length = cpu_to_le64(length);
49 req->r_args.filelock_change.wait = wait; 71 req->r_args.filelock_change.wait = wait;
50 72
51 err = ceph_mdsc_do_request(mdsc, inode, req); 73 err = ceph_mdsc_do_request(mdsc, inode, req);
52 74
53 if ( operation == CEPH_MDS_OP_GETFILELOCK){ 75 if (operation == CEPH_MDS_OP_GETFILELOCK) {
54 fl->fl_pid = le64_to_cpu(req->r_reply_info.filelock_reply->pid); 76 fl->fl_pid = le64_to_cpu(req->r_reply_info.filelock_reply->pid);
55 if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type) 77 if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type)
56 fl->fl_type = F_RDLCK; 78 fl->fl_type = F_RDLCK;
@@ -87,14 +109,19 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
87 u8 wait = 0; 109 u8 wait = 0;
88 u16 op = CEPH_MDS_OP_SETFILELOCK; 110 u16 op = CEPH_MDS_OP_SETFILELOCK;
89 111
90 fl->fl_nspid = get_pid(task_tgid(current)); 112 if (!(fl->fl_flags & FL_POSIX))
91 dout("ceph_lock, fl_pid:%d", fl->fl_pid); 113 return -ENOLCK;
114 /* No mandatory locks */
115 if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK)
116 return -ENOLCK;
117
118 dout("ceph_lock, fl_owner: %p", fl->fl_owner);
92 119
93 /* set wait bit as appropriate, then make command as Ceph expects it*/ 120 /* set wait bit as appropriate, then make command as Ceph expects it*/
94 if (F_SETLKW == cmd) 121 if (IS_GETLK(cmd))
95 wait = 1;
96 if (F_GETLK == cmd)
97 op = CEPH_MDS_OP_GETFILELOCK; 122 op = CEPH_MDS_OP_GETFILELOCK;
123 else if (IS_SETLKW(cmd))
124 wait = 1;
98 125
99 if (F_RDLCK == fl->fl_type) 126 if (F_RDLCK == fl->fl_type)
100 lock_cmd = CEPH_LOCK_SHARED; 127 lock_cmd = CEPH_LOCK_SHARED;
@@ -105,7 +132,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
105 132
106 err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file, lock_cmd, wait, fl); 133 err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file, lock_cmd, wait, fl);
107 if (!err) { 134 if (!err) {
108 if ( op != CEPH_MDS_OP_GETFILELOCK ){ 135 if (op != CEPH_MDS_OP_GETFILELOCK) {
109 dout("mds locked, locking locally"); 136 dout("mds locked, locking locally");
110 err = posix_lock_file(file, fl, NULL); 137 err = posix_lock_file(file, fl, NULL);
111 if (err && (CEPH_MDS_OP_SETFILELOCK == op)) { 138 if (err && (CEPH_MDS_OP_SETFILELOCK == op)) {
@@ -131,20 +158,22 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
131{ 158{
132 u8 lock_cmd; 159 u8 lock_cmd;
133 int err; 160 int err;
134 u8 wait = 1; 161 u8 wait = 0;
135 162
136 fl->fl_nspid = get_pid(task_tgid(current)); 163 if (!(fl->fl_flags & FL_FLOCK))
137 dout("ceph_flock, fl_pid:%d", fl->fl_pid); 164 return -ENOLCK;
138 165 /* No mandatory locks */
139 /* set wait bit, then clear it out of cmd*/ 166 if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK)
140 if (cmd & LOCK_NB) 167 return -ENOLCK;
141 wait = 0; 168
142 cmd = cmd & (LOCK_SH | LOCK_EX | LOCK_UN); 169 dout("ceph_flock, fl_file: %p", fl->fl_file);
143 /* set command sequence that Ceph wants to see: 170
144 shared lock, exclusive lock, or unlock */ 171 if (IS_SETLKW(cmd))
145 if (LOCK_SH == cmd) 172 wait = 1;
173
174 if (F_RDLCK == fl->fl_type)
146 lock_cmd = CEPH_LOCK_SHARED; 175 lock_cmd = CEPH_LOCK_SHARED;
147 else if (LOCK_EX == cmd) 176 else if (F_WRLCK == fl->fl_type)
148 lock_cmd = CEPH_LOCK_EXCL; 177 lock_cmd = CEPH_LOCK_EXCL;
149 else 178 else
150 lock_cmd = CEPH_LOCK_UNLOCK; 179 lock_cmd = CEPH_LOCK_UNLOCK;
@@ -280,13 +309,14 @@ int lock_to_ceph_filelock(struct file_lock *lock,
280 struct ceph_filelock *cephlock) 309 struct ceph_filelock *cephlock)
281{ 310{
282 int err = 0; 311 int err = 0;
283
284 cephlock->start = cpu_to_le64(lock->fl_start); 312 cephlock->start = cpu_to_le64(lock->fl_start);
285 cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1); 313 cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
286 cephlock->client = cpu_to_le64(0); 314 cephlock->client = cpu_to_le64(0);
287 cephlock->pid = cpu_to_le64(lock->fl_pid); 315 cephlock->pid = cpu_to_le64((u64)lock->fl_pid);
288 cephlock->pid_namespace = 316 if (lock->fl_flags & FL_POSIX)
289 cpu_to_le64((u64)(unsigned long)lock->fl_nspid); 317 cephlock->owner = cpu_to_le64(secure_addr(lock->fl_owner));
318 else
319 cephlock->owner = cpu_to_le64(secure_addr(lock->fl_file));
290 320
291 switch (lock->fl_type) { 321 switch (lock->fl_type) {
292 case F_RDLCK: 322 case F_RDLCK:
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index f4f050a69a48..2b4d093d0563 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -3,6 +3,7 @@
3#include <linux/fs.h> 3#include <linux/fs.h>
4#include <linux/wait.h> 4#include <linux/wait.h>
5#include <linux/slab.h> 5#include <linux/slab.h>
6#include <linux/gfp.h>
6#include <linux/sched.h> 7#include <linux/sched.h>
7#include <linux/debugfs.h> 8#include <linux/debugfs.h>
8#include <linux/seq_file.h> 9#include <linux/seq_file.h>
@@ -165,21 +166,18 @@ static int parse_reply_info_dir(void **p, void *end,
165 if (num == 0) 166 if (num == 0)
166 goto done; 167 goto done;
167 168
168 /* alloc large array */ 169 BUG_ON(!info->dir_in);
169 info->dir_nr = num;
170 info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
171 sizeof(*info->dir_dname) +
172 sizeof(*info->dir_dname_len) +
173 sizeof(*info->dir_dlease),
174 GFP_NOFS);
175 if (info->dir_in == NULL) {
176 err = -ENOMEM;
177 goto out_bad;
178 }
179 info->dir_dname = (void *)(info->dir_in + num); 170 info->dir_dname = (void *)(info->dir_in + num);
180 info->dir_dname_len = (void *)(info->dir_dname + num); 171 info->dir_dname_len = (void *)(info->dir_dname + num);
181 info->dir_dlease = (void *)(info->dir_dname_len + num); 172 info->dir_dlease = (void *)(info->dir_dname_len + num);
173 if ((unsigned long)(info->dir_dlease + num) >
174 (unsigned long)info->dir_in + info->dir_buf_size) {
175 pr_err("dir contents are larger than expected\n");
176 WARN_ON(1);
177 goto bad;
178 }
182 179
180 info->dir_nr = num;
183 while (num) { 181 while (num) {
184 /* dentry */ 182 /* dentry */
185 ceph_decode_need(p, end, sizeof(u32)*2, bad); 183 ceph_decode_need(p, end, sizeof(u32)*2, bad);
@@ -327,7 +325,9 @@ out_bad:
327 325
328static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 326static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
329{ 327{
330 kfree(info->dir_in); 328 if (!info->dir_in)
329 return;
330 free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
331} 331}
332 332
333 333
@@ -512,12 +512,11 @@ void ceph_mdsc_release_request(struct kref *kref)
512 struct ceph_mds_request *req = container_of(kref, 512 struct ceph_mds_request *req = container_of(kref,
513 struct ceph_mds_request, 513 struct ceph_mds_request,
514 r_kref); 514 r_kref);
515 destroy_reply_info(&req->r_reply_info);
515 if (req->r_request) 516 if (req->r_request)
516 ceph_msg_put(req->r_request); 517 ceph_msg_put(req->r_request);
517 if (req->r_reply) { 518 if (req->r_reply)
518 ceph_msg_put(req->r_reply); 519 ceph_msg_put(req->r_reply);
519 destroy_reply_info(&req->r_reply_info);
520 }
521 if (req->r_inode) { 520 if (req->r_inode) {
522 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 521 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
523 iput(req->r_inode); 522 iput(req->r_inode);
@@ -528,7 +527,9 @@ void ceph_mdsc_release_request(struct kref *kref)
528 iput(req->r_target_inode); 527 iput(req->r_target_inode);
529 if (req->r_dentry) 528 if (req->r_dentry)
530 dput(req->r_dentry); 529 dput(req->r_dentry);
531 if (req->r_old_dentry) { 530 if (req->r_old_dentry)
531 dput(req->r_old_dentry);
532 if (req->r_old_dentry_dir) {
532 /* 533 /*
533 * track (and drop pins for) r_old_dentry_dir 534 * track (and drop pins for) r_old_dentry_dir
534 * separately, since r_old_dentry's d_parent may have 535 * separately, since r_old_dentry's d_parent may have
@@ -537,7 +538,6 @@ void ceph_mdsc_release_request(struct kref *kref)
537 */ 538 */
538 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 539 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
539 CEPH_CAP_PIN); 540 CEPH_CAP_PIN);
540 dput(req->r_old_dentry);
541 iput(req->r_old_dentry_dir); 541 iput(req->r_old_dentry_dir);
542 } 542 }
543 kfree(req->r_path1); 543 kfree(req->r_path1);
@@ -1311,6 +1311,9 @@ static int trim_caps(struct ceph_mds_client *mdsc,
1311 trim_caps - session->s_trim_caps); 1311 trim_caps - session->s_trim_caps);
1312 session->s_trim_caps = 0; 1312 session->s_trim_caps = 0;
1313 } 1313 }
1314
1315 ceph_add_cap_releases(mdsc, session);
1316 ceph_send_cap_releases(mdsc, session);
1314 return 0; 1317 return 0;
1315} 1318}
1316 1319
@@ -1461,15 +1464,18 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
1461 1464
1462 dout("discard_cap_releases mds%d\n", session->s_mds); 1465 dout("discard_cap_releases mds%d\n", session->s_mds);
1463 1466
1464 /* zero out the in-progress message */ 1467 if (!list_empty(&session->s_cap_releases)) {
1465 msg = list_first_entry(&session->s_cap_releases, 1468 /* zero out the in-progress message */
1466 struct ceph_msg, list_head); 1469 msg = list_first_entry(&session->s_cap_releases,
1467 head = msg->front.iov_base; 1470 struct ceph_msg, list_head);
1468 num = le32_to_cpu(head->num); 1471 head = msg->front.iov_base;
1469 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); 1472 num = le32_to_cpu(head->num);
1470 head->num = cpu_to_le32(0); 1473 dout("discard_cap_releases mds%d %p %u\n",
1471 msg->front.iov_len = sizeof(*head); 1474 session->s_mds, msg, num);
1472 session->s_num_cap_releases += num; 1475 head->num = cpu_to_le32(0);
1476 msg->front.iov_len = sizeof(*head);
1477 session->s_num_cap_releases += num;
1478 }
1473 1479
1474 /* requeue completed messages */ 1480 /* requeue completed messages */
1475 while (!list_empty(&session->s_cap_releases_done)) { 1481 while (!list_empty(&session->s_cap_releases_done)) {
@@ -1492,6 +1498,43 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
1492 * requests 1498 * requests
1493 */ 1499 */
1494 1500
1501int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1502 struct inode *dir)
1503{
1504 struct ceph_inode_info *ci = ceph_inode(dir);
1505 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1506 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1507 size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
1508 sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
1509 int order, num_entries;
1510
1511 spin_lock(&ci->i_ceph_lock);
1512 num_entries = ci->i_files + ci->i_subdirs;
1513 spin_unlock(&ci->i_ceph_lock);
1514 num_entries = max(num_entries, 1);
1515 num_entries = min(num_entries, opt->max_readdir);
1516
1517 order = get_order(size * num_entries);
1518 while (order >= 0) {
1519 rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN,
1520 order);
1521 if (rinfo->dir_in)
1522 break;
1523 order--;
1524 }
1525 if (!rinfo->dir_in)
1526 return -ENOMEM;
1527
1528 num_entries = (PAGE_SIZE << order) / size;
1529 num_entries = min(num_entries, opt->max_readdir);
1530
1531 rinfo->dir_buf_size = PAGE_SIZE << order;
1532 req->r_num_caps = num_entries + 1;
1533 req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1534 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1535 return 0;
1536}
1537
1495/* 1538/*
1496 * Create an mds request. 1539 * Create an mds request.
1497 */ 1540 */
@@ -2053,7 +2096,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2053 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2096 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2054 if (req->r_locked_dir) 2097 if (req->r_locked_dir)
2055 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 2098 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
2056 if (req->r_old_dentry) 2099 if (req->r_old_dentry_dir)
2057 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2100 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2058 CEPH_CAP_PIN); 2101 CEPH_CAP_PIN);
2059 2102
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 68288917c737..e90cfccf93bd 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -67,6 +67,7 @@ struct ceph_mds_reply_info_parsed {
67 /* for readdir results */ 67 /* for readdir results */
68 struct { 68 struct {
69 struct ceph_mds_reply_dirfrag *dir_dir; 69 struct ceph_mds_reply_dirfrag *dir_dir;
70 size_t dir_buf_size;
70 int dir_nr; 71 int dir_nr;
71 char **dir_dname; 72 char **dir_dname;
72 u32 *dir_dname_len; 73 u32 *dir_dname_len;
@@ -346,7 +347,8 @@ extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
346 struct dentry *dn); 347 struct dentry *dn);
347 348
348extern void ceph_invalidate_dir_request(struct ceph_mds_request *req); 349extern void ceph_invalidate_dir_request(struct ceph_mds_request *req);
349 350extern int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
351 struct inode *dir);
350extern struct ceph_mds_request * 352extern struct ceph_mds_request *
351ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode); 353ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode);
352extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, 354extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c
index 4440f447fd3f..51cc23e48111 100644
--- a/fs/ceph/strings.c
+++ b/fs/ceph/strings.c
@@ -54,6 +54,7 @@ const char *ceph_mds_op_name(int op)
54 case CEPH_MDS_OP_LOOKUPHASH: return "lookuphash"; 54 case CEPH_MDS_OP_LOOKUPHASH: return "lookuphash";
55 case CEPH_MDS_OP_LOOKUPPARENT: return "lookupparent"; 55 case CEPH_MDS_OP_LOOKUPPARENT: return "lookupparent";
56 case CEPH_MDS_OP_LOOKUPINO: return "lookupino"; 56 case CEPH_MDS_OP_LOOKUPINO: return "lookupino";
57 case CEPH_MDS_OP_LOOKUPNAME: return "lookupname";
57 case CEPH_MDS_OP_GETATTR: return "getattr"; 58 case CEPH_MDS_OP_GETATTR: return "getattr";
58 case CEPH_MDS_OP_SETXATTR: return "setxattr"; 59 case CEPH_MDS_OP_SETXATTR: return "setxattr";
59 case CEPH_MDS_OP_SETATTR: return "setattr"; 60 case CEPH_MDS_OP_SETATTR: return "setattr";
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 10a4ccbf38da..06150fd745ac 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -1026,6 +1026,7 @@ static int __init init_ceph(void)
1026 if (ret) 1026 if (ret)
1027 goto out; 1027 goto out;
1028 1028
1029 ceph_flock_init();
1029 ceph_xattr_init(); 1030 ceph_xattr_init();
1030 ret = register_filesystem(&ceph_fs_type); 1031 ret = register_filesystem(&ceph_fs_type);
1031 if (ret) 1032 if (ret)
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index d8801a95b685..7866cd05a6bb 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -577,7 +577,7 @@ struct ceph_file_info {
577 577
578 /* readdir: position within a frag */ 578 /* readdir: position within a frag */
579 unsigned offset; /* offset of last chunk, adjusted for . and .. */ 579 unsigned offset; /* offset of last chunk, adjusted for . and .. */
580 u64 next_offset; /* offset of next chunk (last_name's + 1) */ 580 unsigned next_offset; /* offset of next chunk (last_name's + 1) */
581 char *last_name; /* last entry in previous chunk */ 581 char *last_name; /* last entry in previous chunk */
582 struct dentry *dentry; /* next dentry (for dcache readdir) */ 582 struct dentry *dentry; /* next dentry (for dcache readdir) */
583 int dir_release_count; 583 int dir_release_count;
@@ -871,6 +871,7 @@ extern long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg);
871extern const struct export_operations ceph_export_ops; 871extern const struct export_operations ceph_export_ops;
872 872
873/* locks.c */ 873/* locks.c */
874extern __init void ceph_flock_init(void);
874extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl); 875extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl);
875extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl); 876extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl);
876extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num); 877extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index a55ec37378c6..c9c2b887381e 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -64,32 +64,48 @@ static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
64} 64}
65 65
66static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, 66static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
67 size_t size) 67 size_t size)
68{ 68{
69 int ret; 69 int ret;
70 struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); 70 struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
71 struct ceph_osd_client *osdc = &fsc->client->osdc; 71 struct ceph_osd_client *osdc = &fsc->client->osdc;
72 s64 pool = ceph_file_layout_pg_pool(ci->i_layout); 72 s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
73 const char *pool_name; 73 const char *pool_name;
74 char buf[128];
74 75
75 dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode); 76 dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
76 down_read(&osdc->map_sem); 77 down_read(&osdc->map_sem);
77 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); 78 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
78 if (pool_name) 79 if (pool_name) {
79 ret = snprintf(val, size, 80 size_t len = strlen(pool_name);
80 "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%s", 81 ret = snprintf(buf, sizeof(buf),
82 "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=",
81 (unsigned long long)ceph_file_layout_su(ci->i_layout), 83 (unsigned long long)ceph_file_layout_su(ci->i_layout),
82 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), 84 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
83 (unsigned long long)ceph_file_layout_object_size(ci->i_layout), 85 (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
84 pool_name); 86 if (!size) {
85 else 87 ret += len;
86 ret = snprintf(val, size, 88 } else if (ret + len > size) {
89 ret = -ERANGE;
90 } else {
91 memcpy(val, buf, ret);
92 memcpy(val + ret, pool_name, len);
93 ret += len;
94 }
95 } else {
96 ret = snprintf(buf, sizeof(buf),
87 "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld", 97 "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld",
88 (unsigned long long)ceph_file_layout_su(ci->i_layout), 98 (unsigned long long)ceph_file_layout_su(ci->i_layout),
89 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), 99 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
90 (unsigned long long)ceph_file_layout_object_size(ci->i_layout), 100 (unsigned long long)ceph_file_layout_object_size(ci->i_layout),
91 (unsigned long long)pool); 101 (unsigned long long)pool);
92 102 if (size) {
103 if (ret <= size)
104 memcpy(val, buf, ret);
105 else
106 ret = -ERANGE;
107 }
108 }
93 up_read(&osdc->map_sem); 109 up_read(&osdc->map_sem);
94 return ret; 110 return ret;
95} 111}
@@ -215,7 +231,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
215 .name_size = sizeof("ceph.dir.layout"), 231 .name_size = sizeof("ceph.dir.layout"),
216 .getxattr_cb = ceph_vxattrcb_layout, 232 .getxattr_cb = ceph_vxattrcb_layout,
217 .readonly = false, 233 .readonly = false,
218 .hidden = false, 234 .hidden = true,
219 .exists_cb = ceph_vxattrcb_layout_exists, 235 .exists_cb = ceph_vxattrcb_layout_exists,
220 }, 236 },
221 XATTR_LAYOUT_FIELD(dir, layout, stripe_unit), 237 XATTR_LAYOUT_FIELD(dir, layout, stripe_unit),
@@ -242,7 +258,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = {
242 .name_size = sizeof("ceph.file.layout"), 258 .name_size = sizeof("ceph.file.layout"),
243 .getxattr_cb = ceph_vxattrcb_layout, 259 .getxattr_cb = ceph_vxattrcb_layout,
244 .readonly = false, 260 .readonly = false,
245 .hidden = false, 261 .hidden = true,
246 .exists_cb = ceph_vxattrcb_layout_exists, 262 .exists_cb = ceph_vxattrcb_layout_exists,
247 }, 263 },
248 XATTR_LAYOUT_FIELD(file, layout, stripe_unit), 264 XATTR_LAYOUT_FIELD(file, layout, stripe_unit),
@@ -842,7 +858,6 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
842 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 858 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
843 struct inode *inode = dentry->d_inode; 859 struct inode *inode = dentry->d_inode;
844 struct ceph_inode_info *ci = ceph_inode(inode); 860 struct ceph_inode_info *ci = ceph_inode(inode);
845 struct inode *parent_inode;
846 struct ceph_mds_request *req; 861 struct ceph_mds_request *req;
847 struct ceph_mds_client *mdsc = fsc->mdsc; 862 struct ceph_mds_client *mdsc = fsc->mdsc;
848 int err; 863 int err;
@@ -893,9 +908,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
893 req->r_data_len = size; 908 req->r_data_len = size;
894 909
895 dout("xattr.ver (before): %lld\n", ci->i_xattrs.version); 910 dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
896 parent_inode = ceph_get_dentry_parent_inode(dentry); 911 err = ceph_mdsc_do_request(mdsc, NULL, req);
897 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
898 iput(parent_inode);
899 ceph_mdsc_put_request(req); 912 ceph_mdsc_put_request(req);
900 dout("xattr.ver (after): %lld\n", ci->i_xattrs.version); 913 dout("xattr.ver (after): %lld\n", ci->i_xattrs.version);
901 914
@@ -1019,7 +1032,6 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
1019 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 1032 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
1020 struct ceph_mds_client *mdsc = fsc->mdsc; 1033 struct ceph_mds_client *mdsc = fsc->mdsc;
1021 struct inode *inode = dentry->d_inode; 1034 struct inode *inode = dentry->d_inode;
1022 struct inode *parent_inode;
1023 struct ceph_mds_request *req; 1035 struct ceph_mds_request *req;
1024 int err; 1036 int err;
1025 1037
@@ -1033,9 +1045,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
1033 req->r_num_caps = 1; 1045 req->r_num_caps = 1;
1034 req->r_path2 = kstrdup(name, GFP_NOFS); 1046 req->r_path2 = kstrdup(name, GFP_NOFS);
1035 1047
1036 parent_inode = ceph_get_dentry_parent_inode(dentry); 1048 err = ceph_mdsc_do_request(mdsc, NULL, req);
1037 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
1038 iput(parent_inode);
1039 ceph_mdsc_put_request(req); 1049 ceph_mdsc_put_request(req);
1040 return err; 1050 return err;
1041} 1051}
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index 138448f766b4..d12659ce550d 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -43,6 +43,13 @@
43#define CEPH_FEATURE_CRUSH_V2 (1ULL<<36) /* new indep; SET_* steps */ 43#define CEPH_FEATURE_CRUSH_V2 (1ULL<<36) /* new indep; SET_* steps */
44#define CEPH_FEATURE_EXPORT_PEER (1ULL<<37) 44#define CEPH_FEATURE_EXPORT_PEER (1ULL<<37)
45#define CEPH_FEATURE_OSD_ERASURE_CODES (1ULL<<38) 45#define CEPH_FEATURE_OSD_ERASURE_CODES (1ULL<<38)
46#define CEPH_FEATURE_OSD_TMAP2OMAP (1ULL<<38) /* overlap with EC */
47/* The process supports new-style OSDMap encoding. Monitors also use
48 this bit to determine if peers support NAK messages. */
49#define CEPH_FEATURE_OSDMAP_ENC (1ULL<<39)
50#define CEPH_FEATURE_MDS_INLINE_DATA (1ULL<<40)
51#define CEPH_FEATURE_CRUSH_TUNABLES3 (1ULL<<41)
52#define CEPH_FEATURE_OSD_PRIMARY_AFFINITY (1ULL<<41) /* overlap w/ tunables3 */
46 53
47/* 54/*
48 * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature 55 * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature
@@ -82,7 +89,10 @@ static inline u64 ceph_sanitize_features(u64 features)
82 CEPH_FEATURE_OSDHASHPSPOOL | \ 89 CEPH_FEATURE_OSDHASHPSPOOL | \
83 CEPH_FEATURE_OSD_CACHEPOOL | \ 90 CEPH_FEATURE_OSD_CACHEPOOL | \
84 CEPH_FEATURE_CRUSH_V2 | \ 91 CEPH_FEATURE_CRUSH_V2 | \
85 CEPH_FEATURE_EXPORT_PEER) 92 CEPH_FEATURE_EXPORT_PEER | \
93 CEPH_FEATURE_OSDMAP_ENC | \
94 CEPH_FEATURE_CRUSH_TUNABLES3 | \
95 CEPH_FEATURE_OSD_PRIMARY_AFFINITY)
86 96
87#define CEPH_FEATURES_REQUIRED_DEFAULT \ 97#define CEPH_FEATURES_REQUIRED_DEFAULT \
88 (CEPH_FEATURE_NOSRCADDR | \ 98 (CEPH_FEATURE_NOSRCADDR | \
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 25bfb0eff772..5f6db18d72e8 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -332,6 +332,7 @@ enum {
332 CEPH_MDS_OP_LOOKUPHASH = 0x00102, 332 CEPH_MDS_OP_LOOKUPHASH = 0x00102,
333 CEPH_MDS_OP_LOOKUPPARENT = 0x00103, 333 CEPH_MDS_OP_LOOKUPPARENT = 0x00103,
334 CEPH_MDS_OP_LOOKUPINO = 0x00104, 334 CEPH_MDS_OP_LOOKUPINO = 0x00104,
335 CEPH_MDS_OP_LOOKUPNAME = 0x00105,
335 336
336 CEPH_MDS_OP_SETXATTR = 0x01105, 337 CEPH_MDS_OP_SETXATTR = 0x01105,
337 CEPH_MDS_OP_RMXATTR = 0x01106, 338 CEPH_MDS_OP_RMXATTR = 0x01106,
@@ -420,8 +421,8 @@ union ceph_mds_request_args {
420 struct { 421 struct {
421 __u8 rule; /* currently fcntl or flock */ 422 __u8 rule; /* currently fcntl or flock */
422 __u8 type; /* shared, exclusive, remove*/ 423 __u8 type; /* shared, exclusive, remove*/
424 __le64 owner; /* owner of the lock */
423 __le64 pid; /* process id requesting the lock */ 425 __le64 pid; /* process id requesting the lock */
424 __le64 pid_namespace;
425 __le64 start; /* initial location to lock */ 426 __le64 start; /* initial location to lock */
426 __le64 length; /* num bytes to lock from start */ 427 __le64 length; /* num bytes to lock from start */
427 __u8 wait; /* will caller wait for lock to become available? */ 428 __u8 wait; /* will caller wait for lock to become available? */
@@ -532,8 +533,8 @@ struct ceph_filelock {
532 __le64 start;/* file offset to start lock at */ 533 __le64 start;/* file offset to start lock at */
533 __le64 length; /* num bytes to lock; 0 for all following start */ 534 __le64 length; /* num bytes to lock; 0 for all following start */
534 __le64 client; /* which client holds the lock */ 535 __le64 client; /* which client holds the lock */
536 __le64 owner; /* owner the lock */
535 __le64 pid; /* process id holding the lock on the client */ 537 __le64 pid; /* process id holding the lock on the client */
536 __le64 pid_namespace;
537 __u8 type; /* shared lock, exclusive lock, or unlock */ 538 __u8 type; /* shared lock, exclusive lock, or unlock */
538} __attribute__ ((packed)); 539} __attribute__ ((packed));
539 540
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index fd47e872ebcc..94ec69672164 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -43,7 +43,7 @@ struct ceph_osd {
43}; 43};
44 44
45 45
46#define CEPH_OSD_MAX_OP 2 46#define CEPH_OSD_MAX_OP 3
47 47
48enum ceph_osd_data_type { 48enum ceph_osd_data_type {
49 CEPH_OSD_DATA_TYPE_NONE = 0, 49 CEPH_OSD_DATA_TYPE_NONE = 0,
@@ -76,6 +76,7 @@ struct ceph_osd_data {
76 76
77struct ceph_osd_req_op { 77struct ceph_osd_req_op {
78 u16 op; /* CEPH_OSD_OP_* */ 78 u16 op; /* CEPH_OSD_OP_* */
79 u32 flags; /* CEPH_OSD_OP_FLAG_* */
79 u32 payload_len; 80 u32 payload_len;
80 union { 81 union {
81 struct ceph_osd_data raw_data_in; 82 struct ceph_osd_data raw_data_in;
@@ -102,6 +103,10 @@ struct ceph_osd_req_op {
102 u32 timeout; 103 u32 timeout;
103 __u8 flag; 104 __u8 flag;
104 } watch; 105 } watch;
106 struct {
107 u64 expected_object_size;
108 u64 expected_write_size;
109 } alloc_hint;
105 }; 110 };
106}; 111};
107 112
@@ -293,6 +298,10 @@ extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
293extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req, 298extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
294 unsigned int which, u16 opcode, 299 unsigned int which, u16 opcode,
295 u64 cookie, u64 version, int flag); 300 u64 cookie, u64 version, int flag);
301extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
302 unsigned int which,
303 u64 expected_object_size,
304 u64 expected_write_size);
296 305
297extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 306extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
298 struct ceph_snap_context *snapc, 307 struct ceph_snap_context *snapc,
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index 49ff69f0746b..561ea896c657 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -41,6 +41,18 @@ struct ceph_pg_pool_info {
41 char *name; 41 char *name;
42}; 42};
43 43
44static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
45{
46 switch (pool->type) {
47 case CEPH_POOL_TYPE_REP:
48 return true;
49 case CEPH_POOL_TYPE_EC:
50 return false;
51 default:
52 BUG_ON(1);
53 }
54}
55
44struct ceph_object_locator { 56struct ceph_object_locator {
45 s64 pool; 57 s64 pool;
46}; 58};
@@ -60,8 +72,16 @@ struct ceph_object_id {
60struct ceph_pg_mapping { 72struct ceph_pg_mapping {
61 struct rb_node node; 73 struct rb_node node;
62 struct ceph_pg pgid; 74 struct ceph_pg pgid;
63 int len; 75
64 int osds[]; 76 union {
77 struct {
78 int len;
79 int osds[];
80 } pg_temp;
81 struct {
82 int osd;
83 } primary_temp;
84 };
65}; 85};
66 86
67struct ceph_osdmap { 87struct ceph_osdmap {
@@ -78,12 +98,19 @@ struct ceph_osdmap {
78 struct ceph_entity_addr *osd_addr; 98 struct ceph_entity_addr *osd_addr;
79 99
80 struct rb_root pg_temp; 100 struct rb_root pg_temp;
101 struct rb_root primary_temp;
102
103 u32 *osd_primary_affinity;
104
81 struct rb_root pg_pools; 105 struct rb_root pg_pools;
82 u32 pool_max; 106 u32 pool_max;
83 107
84 /* the CRUSH map specifies the mapping of placement groups to 108 /* the CRUSH map specifies the mapping of placement groups to
85 * the list of osds that store+replicate them. */ 109 * the list of osds that store+replicate them. */
86 struct crush_map *crush; 110 struct crush_map *crush;
111
112 struct mutex crush_scratch_mutex;
113 int crush_scratch_ary[CEPH_PG_MAX_SIZE * 3];
87}; 114};
88 115
89static inline void ceph_oid_set_name(struct ceph_object_id *oid, 116static inline void ceph_oid_set_name(struct ceph_object_id *oid,
@@ -110,9 +137,21 @@ static inline void ceph_oid_copy(struct ceph_object_id *dest,
110 dest->name_len = src->name_len; 137 dest->name_len = src->name_len;
111} 138}
112 139
140static inline int ceph_osd_exists(struct ceph_osdmap *map, int osd)
141{
142 return osd >= 0 && osd < map->max_osd &&
143 (map->osd_state[osd] & CEPH_OSD_EXISTS);
144}
145
113static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd) 146static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd)
114{ 147{
115 return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP); 148 return ceph_osd_exists(map, osd) &&
149 (map->osd_state[osd] & CEPH_OSD_UP);
150}
151
152static inline int ceph_osd_is_down(struct ceph_osdmap *map, int osd)
153{
154 return !ceph_osd_is_up(map, osd);
116} 155}
117 156
118static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag) 157static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag)
@@ -121,6 +160,7 @@ static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag)
121} 160}
122 161
123extern char *ceph_osdmap_state_str(char *str, int len, int state); 162extern char *ceph_osdmap_state_str(char *str, int len, int state);
163extern u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd);
124 164
125static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map, 165static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
126 int osd) 166 int osd)
@@ -153,7 +193,7 @@ static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid)
153 return 0; 193 return 0;
154} 194}
155 195
156extern struct ceph_osdmap *osdmap_decode(void **p, void *end); 196extern struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end);
157extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, 197extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
158 struct ceph_osdmap *map, 198 struct ceph_osdmap *map,
159 struct ceph_messenger *msgr); 199 struct ceph_messenger *msgr);
@@ -172,7 +212,7 @@ extern int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
172 212
173extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, 213extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap,
174 struct ceph_pg pgid, 214 struct ceph_pg pgid,
175 int *acting); 215 int *osds, int *primary);
176extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, 216extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
177 struct ceph_pg pgid); 217 struct ceph_pg pgid);
178 218
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 96292df4041b..f20e0d8a2155 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -81,8 +81,9 @@ struct ceph_pg_v1 {
81 */ 81 */
82#define CEPH_NOPOOL ((__u64) (-1)) /* pool id not defined */ 82#define CEPH_NOPOOL ((__u64) (-1)) /* pool id not defined */
83 83
84#define CEPH_PG_TYPE_REP 1 84#define CEPH_POOL_TYPE_REP 1
85#define CEPH_PG_TYPE_RAID4 2 85#define CEPH_POOL_TYPE_RAID4 2 /* never implemented */
86#define CEPH_POOL_TYPE_EC 3
86 87
87/* 88/*
88 * stable_mod func is used to control number of placement groups. 89 * stable_mod func is used to control number of placement groups.
@@ -133,6 +134,10 @@ extern const char *ceph_osd_state_name(int s);
133#define CEPH_OSD_IN 0x10000 134#define CEPH_OSD_IN 0x10000
134#define CEPH_OSD_OUT 0 135#define CEPH_OSD_OUT 0
135 136
137/* osd primary-affinity. fixed point value: 0x10000 == baseline */
138#define CEPH_OSD_MAX_PRIMARY_AFFINITY 0x10000
139#define CEPH_OSD_DEFAULT_PRIMARY_AFFINITY 0x10000
140
136 141
137/* 142/*
138 * osd map flag bits 143 * osd map flag bits
@@ -227,6 +232,9 @@ enum {
227 CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24, 232 CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24,
228 CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25, 233 CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25,
229 234
235 /* hints */
236 CEPH_OSD_OP_SETALLOCHINT = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 35,
237
230 /** multi **/ 238 /** multi **/
231 CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1, 239 CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1,
232 CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2, 240 CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2,
@@ -382,7 +390,7 @@ enum {
382 */ 390 */
383struct ceph_osd_op { 391struct ceph_osd_op {
384 __le16 op; /* CEPH_OSD_OP_* */ 392 __le16 op; /* CEPH_OSD_OP_* */
385 __le32 flags; /* CEPH_OSD_FLAG_* */ 393 __le32 flags; /* CEPH_OSD_OP_FLAG_* */
386 union { 394 union {
387 struct { 395 struct {
388 __le64 offset, length; 396 __le64 offset, length;
@@ -416,6 +424,10 @@ struct ceph_osd_op {
416 __le64 offset, length; 424 __le64 offset, length;
417 __le64 src_offset; 425 __le64 src_offset;
418 } __attribute__ ((packed)) clonerange; 426 } __attribute__ ((packed)) clonerange;
427 struct {
428 __le64 expected_object_size;
429 __le64 expected_write_size;
430 } __attribute__ ((packed)) alloc_hint;
419 }; 431 };
420 __le32 payload_len; 432 __le32 payload_len;
421} __attribute__ ((packed)); 433} __attribute__ ((packed));
diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
index acaa5615d634..4fad5f8ee01d 100644
--- a/include/linux/crush/crush.h
+++ b/include/linux/crush/crush.h
@@ -51,6 +51,7 @@ enum {
51 CRUSH_RULE_SET_CHOOSELEAF_TRIES = 9, /* override chooseleaf_descend_once */ 51 CRUSH_RULE_SET_CHOOSELEAF_TRIES = 9, /* override chooseleaf_descend_once */
52 CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES = 10, 52 CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES = 10,
53 CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES = 11, 53 CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES = 11,
54 CRUSH_RULE_SET_CHOOSELEAF_VARY_R = 12
54}; 55};
55 56
56/* 57/*
@@ -173,6 +174,12 @@ struct crush_map {
173 * apply to a collision: in that case we will retry as we used 174 * apply to a collision: in that case we will retry as we used
174 * to. */ 175 * to. */
175 __u32 chooseleaf_descend_once; 176 __u32 chooseleaf_descend_once;
177
178 /* if non-zero, feed r into chooseleaf, bit-shifted right by (r-1)
179 * bits. a value of 1 is best for new clusters. for legacy clusters
180 * that want to limit reshuffling, a value of 3 or 4 will make the
181 * mappings line up a bit better with previous mappings. */
182 __u8 chooseleaf_vary_r;
176}; 183};
177 184
178 185
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index b703790b4e44..a1ef53c04415 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -292,10 +292,12 @@ static int is_out(const struct crush_map *map,
292 * @outpos: our position in that vector 292 * @outpos: our position in that vector
293 * @tries: number of attempts to make 293 * @tries: number of attempts to make
294 * @recurse_tries: number of attempts to have recursive chooseleaf make 294 * @recurse_tries: number of attempts to have recursive chooseleaf make
295 * @local_tries: localized retries 295 * @local_retries: localized retries
296 * @local_fallback_tries: localized fallback retries 296 * @local_fallback_retries: localized fallback retries
297 * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose) 297 * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose)
298 * @vary_r: pass r to recursive calls
298 * @out2: second output vector for leaf items (if @recurse_to_leaf) 299 * @out2: second output vector for leaf items (if @recurse_to_leaf)
300 * @parent_r: r value passed from the parent
299 */ 301 */
300static int crush_choose_firstn(const struct crush_map *map, 302static int crush_choose_firstn(const struct crush_map *map,
301 struct crush_bucket *bucket, 303 struct crush_bucket *bucket,
@@ -304,10 +306,12 @@ static int crush_choose_firstn(const struct crush_map *map,
304 int *out, int outpos, 306 int *out, int outpos,
305 unsigned int tries, 307 unsigned int tries,
306 unsigned int recurse_tries, 308 unsigned int recurse_tries,
307 unsigned int local_tries, 309 unsigned int local_retries,
308 unsigned int local_fallback_tries, 310 unsigned int local_fallback_retries,
309 int recurse_to_leaf, 311 int recurse_to_leaf,
310 int *out2) 312 unsigned int vary_r,
313 int *out2,
314 int parent_r)
311{ 315{
312 int rep; 316 int rep;
313 unsigned int ftotal, flocal; 317 unsigned int ftotal, flocal;
@@ -319,8 +323,11 @@ static int crush_choose_firstn(const struct crush_map *map,
319 int itemtype; 323 int itemtype;
320 int collide, reject; 324 int collide, reject;
321 325
322 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", 326 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d tries %d recurse_tries %d local_retries %d local_fallback_retries %d parent_r %d\n",
323 bucket->id, x, outpos, numrep); 327 recurse_to_leaf ? "_LEAF" : "",
328 bucket->id, x, outpos, numrep,
329 tries, recurse_tries, local_retries, local_fallback_retries,
330 parent_r);
324 331
325 for (rep = outpos; rep < numrep; rep++) { 332 for (rep = outpos; rep < numrep; rep++) {
326 /* keep trying until we get a non-out, non-colliding item */ 333 /* keep trying until we get a non-out, non-colliding item */
@@ -335,7 +342,7 @@ static int crush_choose_firstn(const struct crush_map *map,
335 do { 342 do {
336 collide = 0; 343 collide = 0;
337 retry_bucket = 0; 344 retry_bucket = 0;
338 r = rep; 345 r = rep + parent_r;
339 /* r' = r + f_total */ 346 /* r' = r + f_total */
340 r += ftotal; 347 r += ftotal;
341 348
@@ -344,9 +351,9 @@ static int crush_choose_firstn(const struct crush_map *map,
344 reject = 1; 351 reject = 1;
345 goto reject; 352 goto reject;
346 } 353 }
347 if (local_fallback_tries > 0 && 354 if (local_fallback_retries > 0 &&
348 flocal >= (in->size>>1) && 355 flocal >= (in->size>>1) &&
349 flocal > local_fallback_tries) 356 flocal > local_fallback_retries)
350 item = bucket_perm_choose(in, x, r); 357 item = bucket_perm_choose(in, x, r);
351 else 358 else
352 item = crush_bucket_choose(in, x, r); 359 item = crush_bucket_choose(in, x, r);
@@ -387,16 +394,23 @@ static int crush_choose_firstn(const struct crush_map *map,
387 reject = 0; 394 reject = 0;
388 if (!collide && recurse_to_leaf) { 395 if (!collide && recurse_to_leaf) {
389 if (item < 0) { 396 if (item < 0) {
397 int sub_r;
398 if (vary_r)
399 sub_r = r >> (vary_r-1);
400 else
401 sub_r = 0;
390 if (crush_choose_firstn(map, 402 if (crush_choose_firstn(map,
391 map->buckets[-1-item], 403 map->buckets[-1-item],
392 weight, weight_max, 404 weight, weight_max,
393 x, outpos+1, 0, 405 x, outpos+1, 0,
394 out2, outpos, 406 out2, outpos,
395 recurse_tries, 0, 407 recurse_tries, 0,
396 local_tries, 408 local_retries,
397 local_fallback_tries, 409 local_fallback_retries,
398 0, 410 0,
399 NULL) <= outpos) 411 vary_r,
412 NULL,
413 sub_r) <= outpos)
400 /* didn't get leaf */ 414 /* didn't get leaf */
401 reject = 1; 415 reject = 1;
402 } else { 416 } else {
@@ -420,14 +434,14 @@ reject:
420 ftotal++; 434 ftotal++;
421 flocal++; 435 flocal++;
422 436
423 if (collide && flocal <= local_tries) 437 if (collide && flocal <= local_retries)
424 /* retry locally a few times */ 438 /* retry locally a few times */
425 retry_bucket = 1; 439 retry_bucket = 1;
426 else if (local_fallback_tries > 0 && 440 else if (local_fallback_retries > 0 &&
427 flocal <= in->size + local_fallback_tries) 441 flocal <= in->size + local_fallback_retries)
428 /* exhaustive bucket search */ 442 /* exhaustive bucket search */
429 retry_bucket = 1; 443 retry_bucket = 1;
430 else if (ftotal <= tries) 444 else if (ftotal < tries)
431 /* then retry descent */ 445 /* then retry descent */
432 retry_descent = 1; 446 retry_descent = 1;
433 else 447 else
@@ -640,10 +654,20 @@ int crush_do_rule(const struct crush_map *map,
640 __u32 step; 654 __u32 step;
641 int i, j; 655 int i, j;
642 int numrep; 656 int numrep;
643 int choose_tries = map->choose_total_tries; 657 /*
644 int choose_local_tries = map->choose_local_tries; 658 * the original choose_total_tries value was off by one (it
645 int choose_local_fallback_tries = map->choose_local_fallback_tries; 659 * counted "retries" and not "tries"). add one.
660 */
661 int choose_tries = map->choose_total_tries + 1;
646 int choose_leaf_tries = 0; 662 int choose_leaf_tries = 0;
663 /*
664 * the local tries values were counted as "retries", though,
665 * and need no adjustment
666 */
667 int choose_local_retries = map->choose_local_tries;
668 int choose_local_fallback_retries = map->choose_local_fallback_tries;
669
670 int vary_r = map->chooseleaf_vary_r;
647 671
648 if ((__u32)ruleno >= map->max_rules) { 672 if ((__u32)ruleno >= map->max_rules) {
649 dprintk(" bad ruleno %d\n", ruleno); 673 dprintk(" bad ruleno %d\n", ruleno);
@@ -676,13 +700,18 @@ int crush_do_rule(const struct crush_map *map,
676 break; 700 break;
677 701
678 case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES: 702 case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES:
679 if (curstep->arg1 > 0) 703 if (curstep->arg1 >= 0)
680 choose_local_tries = curstep->arg1; 704 choose_local_retries = curstep->arg1;
681 break; 705 break;
682 706
683 case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES: 707 case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES:
684 if (curstep->arg1 > 0) 708 if (curstep->arg1 >= 0)
685 choose_local_fallback_tries = curstep->arg1; 709 choose_local_fallback_retries = curstep->arg1;
710 break;
711
712 case CRUSH_RULE_SET_CHOOSELEAF_VARY_R:
713 if (curstep->arg1 >= 0)
714 vary_r = curstep->arg1;
686 break; 715 break;
687 716
688 case CRUSH_RULE_CHOOSELEAF_FIRSTN: 717 case CRUSH_RULE_CHOOSELEAF_FIRSTN:
@@ -734,10 +763,12 @@ int crush_do_rule(const struct crush_map *map,
734 o+osize, j, 763 o+osize, j,
735 choose_tries, 764 choose_tries,
736 recurse_tries, 765 recurse_tries,
737 choose_local_tries, 766 choose_local_retries,
738 choose_local_fallback_tries, 767 choose_local_fallback_retries,
739 recurse_to_leaf, 768 recurse_to_leaf,
740 c+osize); 769 vary_r,
770 c+osize,
771 0);
741 } else { 772 } else {
742 crush_choose_indep( 773 crush_choose_indep(
743 map, 774 map,
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 258a382e75ed..10421a4b76f8 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -53,34 +53,55 @@ static int osdmap_show(struct seq_file *s, void *p)
53{ 53{
54 int i; 54 int i;
55 struct ceph_client *client = s->private; 55 struct ceph_client *client = s->private;
56 struct ceph_osdmap *map = client->osdc.osdmap;
56 struct rb_node *n; 57 struct rb_node *n;
57 58
58 if (client->osdc.osdmap == NULL) 59 if (map == NULL)
59 return 0; 60 return 0;
60 seq_printf(s, "epoch %d\n", client->osdc.osdmap->epoch); 61
62 seq_printf(s, "epoch %d\n", map->epoch);
61 seq_printf(s, "flags%s%s\n", 63 seq_printf(s, "flags%s%s\n",
62 (client->osdc.osdmap->flags & CEPH_OSDMAP_NEARFULL) ? 64 (map->flags & CEPH_OSDMAP_NEARFULL) ? " NEARFULL" : "",
63 " NEARFULL" : "", 65 (map->flags & CEPH_OSDMAP_FULL) ? " FULL" : "");
64 (client->osdc.osdmap->flags & CEPH_OSDMAP_FULL) ? 66
65 " FULL" : ""); 67 for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) {
66 for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) {
67 struct ceph_pg_pool_info *pool = 68 struct ceph_pg_pool_info *pool =
68 rb_entry(n, struct ceph_pg_pool_info, node); 69 rb_entry(n, struct ceph_pg_pool_info, node);
69 seq_printf(s, "pg_pool %llu pg_num %d / %d\n", 70
70 (unsigned long long)pool->id, pool->pg_num, 71 seq_printf(s, "pool %lld pg_num %u (%d) read_tier %lld write_tier %lld\n",
71 pool->pg_num_mask); 72 pool->id, pool->pg_num, pool->pg_num_mask,
73 pool->read_tier, pool->write_tier);
72 } 74 }
73 for (i = 0; i < client->osdc.osdmap->max_osd; i++) { 75 for (i = 0; i < map->max_osd; i++) {
74 struct ceph_entity_addr *addr = 76 struct ceph_entity_addr *addr = &map->osd_addr[i];
75 &client->osdc.osdmap->osd_addr[i]; 77 int state = map->osd_state[i];
76 int state = client->osdc.osdmap->osd_state[i];
77 char sb[64]; 78 char sb[64];
78 79
79 seq_printf(s, "\tosd%d\t%s\t%3d%%\t(%s)\n", 80 seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",
80 i, ceph_pr_addr(&addr->in_addr), 81 i, ceph_pr_addr(&addr->in_addr),
81 ((client->osdc.osdmap->osd_weight[i]*100) >> 16), 82 ((map->osd_weight[i]*100) >> 16),
82 ceph_osdmap_state_str(sb, sizeof(sb), state)); 83 ceph_osdmap_state_str(sb, sizeof(sb), state),
84 ((ceph_get_primary_affinity(map, i)*100) >> 16));
85 }
86 for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) {
87 struct ceph_pg_mapping *pg =
88 rb_entry(n, struct ceph_pg_mapping, node);
89
90 seq_printf(s, "pg_temp %llu.%x [", pg->pgid.pool,
91 pg->pgid.seed);
92 for (i = 0; i < pg->pg_temp.len; i++)
93 seq_printf(s, "%s%d", (i == 0 ? "" : ","),
94 pg->pg_temp.osds[i]);
95 seq_printf(s, "]\n");
83 } 96 }
97 for (n = rb_first(&map->primary_temp); n; n = rb_next(n)) {
98 struct ceph_pg_mapping *pg =
99 rb_entry(n, struct ceph_pg_mapping, node);
100
101 seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool,
102 pg->pgid.seed, pg->primary_temp.osd);
103 }
104
84 return 0; 105 return 0;
85} 106}
86 107
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 30efc5c18622..4f55f9ce63fa 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -919,6 +919,9 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
919 if (!bytes || cursor->page_offset) 919 if (!bytes || cursor->page_offset)
920 return false; /* more bytes to process in the current page */ 920 return false; /* more bytes to process in the current page */
921 921
922 if (!cursor->resid)
923 return false; /* no more data */
924
922 /* Move on to the next page; offset is already at 0 */ 925 /* Move on to the next page; offset is already at 0 */
923 926
924 BUG_ON(cursor->page_index >= cursor->page_count); 927 BUG_ON(cursor->page_index >= cursor->page_count);
@@ -1004,6 +1007,9 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
1004 if (!bytes || cursor->offset & ~PAGE_MASK) 1007 if (!bytes || cursor->offset & ~PAGE_MASK)
1005 return false; /* more bytes to process in the current page */ 1008 return false; /* more bytes to process in the current page */
1006 1009
1010 if (!cursor->resid)
1011 return false; /* no more data */
1012
1007 /* Move on to the next page */ 1013 /* Move on to the next page */
1008 1014
1009 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); 1015 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 82750f915865..b0dfce77656a 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -436,6 +436,7 @@ static bool osd_req_opcode_valid(u16 opcode)
436 case CEPH_OSD_OP_OMAPCLEAR: 436 case CEPH_OSD_OP_OMAPCLEAR:
437 case CEPH_OSD_OP_OMAPRMKEYS: 437 case CEPH_OSD_OP_OMAPRMKEYS:
438 case CEPH_OSD_OP_OMAP_CMP: 438 case CEPH_OSD_OP_OMAP_CMP:
439 case CEPH_OSD_OP_SETALLOCHINT:
439 case CEPH_OSD_OP_CLONERANGE: 440 case CEPH_OSD_OP_CLONERANGE:
440 case CEPH_OSD_OP_ASSERT_SRC_VERSION: 441 case CEPH_OSD_OP_ASSERT_SRC_VERSION:
441 case CEPH_OSD_OP_SRC_CMPXATTR: 442 case CEPH_OSD_OP_SRC_CMPXATTR:
@@ -591,6 +592,26 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
591} 592}
592EXPORT_SYMBOL(osd_req_op_watch_init); 593EXPORT_SYMBOL(osd_req_op_watch_init);
593 594
595void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
596 unsigned int which,
597 u64 expected_object_size,
598 u64 expected_write_size)
599{
600 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
601 CEPH_OSD_OP_SETALLOCHINT);
602
603 op->alloc_hint.expected_object_size = expected_object_size;
604 op->alloc_hint.expected_write_size = expected_write_size;
605
606 /*
607 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
608 * not worth a feature bit. Set FAILOK per-op flag to make
609 * sure older osds don't trip over an unsupported opcode.
610 */
611 op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
612}
613EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
614
594static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 615static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
595 struct ceph_osd_data *osd_data) 616 struct ceph_osd_data *osd_data)
596{ 617{
@@ -681,6 +702,12 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
681 dst->watch.ver = cpu_to_le64(src->watch.ver); 702 dst->watch.ver = cpu_to_le64(src->watch.ver);
682 dst->watch.flag = src->watch.flag; 703 dst->watch.flag = src->watch.flag;
683 break; 704 break;
705 case CEPH_OSD_OP_SETALLOCHINT:
706 dst->alloc_hint.expected_object_size =
707 cpu_to_le64(src->alloc_hint.expected_object_size);
708 dst->alloc_hint.expected_write_size =
709 cpu_to_le64(src->alloc_hint.expected_write_size);
710 break;
684 default: 711 default:
685 pr_err("unsupported osd opcode %s\n", 712 pr_err("unsupported osd opcode %s\n",
686 ceph_osd_op_name(src->op)); 713 ceph_osd_op_name(src->op));
@@ -688,7 +715,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
688 715
689 return 0; 716 return 0;
690 } 717 }
718
691 dst->op = cpu_to_le16(src->op); 719 dst->op = cpu_to_le16(src->op);
720 dst->flags = cpu_to_le32(src->flags);
692 dst->payload_len = cpu_to_le32(src->payload_len); 721 dst->payload_len = cpu_to_le32(src->payload_len);
693 722
694 return request_data_len; 723 return request_data_len;
@@ -1304,7 +1333,7 @@ static int __map_request(struct ceph_osd_client *osdc,
1304{ 1333{
1305 struct ceph_pg pgid; 1334 struct ceph_pg pgid;
1306 int acting[CEPH_PG_MAX_SIZE]; 1335 int acting[CEPH_PG_MAX_SIZE];
1307 int o = -1, num = 0; 1336 int num, o;
1308 int err; 1337 int err;
1309 bool was_paused; 1338 bool was_paused;
1310 1339
@@ -1317,11 +1346,9 @@ static int __map_request(struct ceph_osd_client *osdc,
1317 } 1346 }
1318 req->r_pgid = pgid; 1347 req->r_pgid = pgid;
1319 1348
1320 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 1349 num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o);
1321 if (err > 0) { 1350 if (num < 0)
1322 o = acting[0]; 1351 num = 0;
1323 num = err;
1324 }
1325 1352
1326 was_paused = req->r_paused; 1353 was_paused = req->r_paused;
1327 req->r_paused = __req_should_be_paused(osdc, req); 1354 req->r_paused = __req_should_be_paused(osdc, req);
@@ -2033,7 +2060,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
2033 int skipped_map = 0; 2060 int skipped_map = 0;
2034 2061
2035 dout("taking full map %u len %d\n", epoch, maplen); 2062 dout("taking full map %u len %d\n", epoch, maplen);
2036 newmap = osdmap_decode(&p, p+maplen); 2063 newmap = ceph_osdmap_decode(&p, p+maplen);
2037 if (IS_ERR(newmap)) { 2064 if (IS_ERR(newmap)) {
2038 err = PTR_ERR(newmap); 2065 err = PTR_ERR(newmap);
2039 goto bad; 2066 goto bad;
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index aade4a5c1c07..e632b5a52f5b 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -343,7 +343,7 @@ bad:
343 343
344/* 344/*
345 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid 345 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
346 * to a set of osds) 346 * to a set of osds) and primary_temp (explicit primary setting)
347 */ 347 */
348static int pgid_cmp(struct ceph_pg l, struct ceph_pg r) 348static int pgid_cmp(struct ceph_pg l, struct ceph_pg r)
349{ 349{
@@ -506,7 +506,7 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
506 kfree(pi); 506 kfree(pi);
507} 507}
508 508
509static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) 509static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
510{ 510{
511 u8 ev, cv; 511 u8 ev, cv;
512 unsigned len, num; 512 unsigned len, num;
@@ -587,7 +587,7 @@ bad:
587 return -EINVAL; 587 return -EINVAL;
588} 588}
589 589
590static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) 590static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
591{ 591{
592 struct ceph_pg_pool_info *pi; 592 struct ceph_pg_pool_info *pi;
593 u32 num, len; 593 u32 num, len;
@@ -633,6 +633,13 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
633 rb_erase(&pg->node, &map->pg_temp); 633 rb_erase(&pg->node, &map->pg_temp);
634 kfree(pg); 634 kfree(pg);
635 } 635 }
636 while (!RB_EMPTY_ROOT(&map->primary_temp)) {
637 struct ceph_pg_mapping *pg =
638 rb_entry(rb_first(&map->primary_temp),
639 struct ceph_pg_mapping, node);
640 rb_erase(&pg->node, &map->primary_temp);
641 kfree(pg);
642 }
636 while (!RB_EMPTY_ROOT(&map->pg_pools)) { 643 while (!RB_EMPTY_ROOT(&map->pg_pools)) {
637 struct ceph_pg_pool_info *pi = 644 struct ceph_pg_pool_info *pi =
638 rb_entry(rb_first(&map->pg_pools), 645 rb_entry(rb_first(&map->pg_pools),
@@ -642,186 +649,516 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
642 kfree(map->osd_state); 649 kfree(map->osd_state);
643 kfree(map->osd_weight); 650 kfree(map->osd_weight);
644 kfree(map->osd_addr); 651 kfree(map->osd_addr);
652 kfree(map->osd_primary_affinity);
645 kfree(map); 653 kfree(map);
646} 654}
647 655
648/* 656/*
649 * adjust max osd value. reallocate arrays. 657 * Adjust max_osd value, (re)allocate arrays.
658 *
659 * The new elements are properly initialized.
650 */ 660 */
651static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) 661static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
652{ 662{
653 u8 *state; 663 u8 *state;
654 struct ceph_entity_addr *addr;
655 u32 *weight; 664 u32 *weight;
665 struct ceph_entity_addr *addr;
666 int i;
656 667
657 state = kcalloc(max, sizeof(*state), GFP_NOFS); 668 state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS);
658 addr = kcalloc(max, sizeof(*addr), GFP_NOFS); 669 weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS);
659 weight = kcalloc(max, sizeof(*weight), GFP_NOFS); 670 addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
660 if (state == NULL || addr == NULL || weight == NULL) { 671 if (!state || !weight || !addr) {
661 kfree(state); 672 kfree(state);
662 kfree(addr);
663 kfree(weight); 673 kfree(weight);
674 kfree(addr);
675
664 return -ENOMEM; 676 return -ENOMEM;
665 } 677 }
666 678
667 /* copy old? */ 679 for (i = map->max_osd; i < max; i++) {
668 if (map->osd_state) { 680 state[i] = 0;
669 memcpy(state, map->osd_state, map->max_osd*sizeof(*state)); 681 weight[i] = CEPH_OSD_OUT;
670 memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr)); 682 memset(addr + i, 0, sizeof(*addr));
671 memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight));
672 kfree(map->osd_state);
673 kfree(map->osd_addr);
674 kfree(map->osd_weight);
675 } 683 }
676 684
677 map->osd_state = state; 685 map->osd_state = state;
678 map->osd_weight = weight; 686 map->osd_weight = weight;
679 map->osd_addr = addr; 687 map->osd_addr = addr;
688
689 if (map->osd_primary_affinity) {
690 u32 *affinity;
691
692 affinity = krealloc(map->osd_primary_affinity,
693 max*sizeof(*affinity), GFP_NOFS);
694 if (!affinity)
695 return -ENOMEM;
696
697 for (i = map->max_osd; i < max; i++)
698 affinity[i] = CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
699
700 map->osd_primary_affinity = affinity;
701 }
702
680 map->max_osd = max; 703 map->max_osd = max;
704
681 return 0; 705 return 0;
682} 706}
683 707
708#define OSDMAP_WRAPPER_COMPAT_VER 7
709#define OSDMAP_CLIENT_DATA_COMPAT_VER 1
710
684/* 711/*
685 * decode a full map. 712 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps,
713 * to struct_v of the client_data section for new (v7 and above)
714 * osdmaps.
686 */ 715 */
687struct ceph_osdmap *osdmap_decode(void **p, void *end) 716static int get_osdmap_client_data_v(void **p, void *end,
717 const char *prefix, u8 *v)
688{ 718{
689 struct ceph_osdmap *map; 719 u8 struct_v;
690 u16 version; 720
691 u32 len, max, i; 721 ceph_decode_8_safe(p, end, struct_v, e_inval);
692 int err = -EINVAL; 722 if (struct_v >= 7) {
693 void *start = *p; 723 u8 struct_compat;
694 struct ceph_pg_pool_info *pi; 724
725 ceph_decode_8_safe(p, end, struct_compat, e_inval);
726 if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
727 pr_warning("got v %d cv %d > %d of %s ceph_osdmap\n",
728 struct_v, struct_compat,
729 OSDMAP_WRAPPER_COMPAT_VER, prefix);
730 return -EINVAL;
731 }
732 *p += 4; /* ignore wrapper struct_len */
733
734 ceph_decode_8_safe(p, end, struct_v, e_inval);
735 ceph_decode_8_safe(p, end, struct_compat, e_inval);
736 if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
737 pr_warning("got v %d cv %d > %d of %s ceph_osdmap client data\n",
738 struct_v, struct_compat,
739 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
740 return -EINVAL;
741 }
742 *p += 4; /* ignore client data struct_len */
743 } else {
744 u16 version;
745
746 *p -= 1;
747 ceph_decode_16_safe(p, end, version, e_inval);
748 if (version < 6) {
749 pr_warning("got v %d < 6 of %s ceph_osdmap\n", version,
750 prefix);
751 return -EINVAL;
752 }
695 753
696 dout("osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p)); 754 /* old osdmap enconding */
755 struct_v = 0;
756 }
697 757
698 map = kzalloc(sizeof(*map), GFP_NOFS); 758 *v = struct_v;
699 if (map == NULL) 759 return 0;
700 return ERR_PTR(-ENOMEM);
701 map->pg_temp = RB_ROOT;
702 760
703 ceph_decode_16_safe(p, end, version, bad); 761e_inval:
704 if (version > 6) { 762 return -EINVAL;
705 pr_warning("got unknown v %d > 6 of osdmap\n", version); 763}
706 goto bad; 764
765static int __decode_pools(void **p, void *end, struct ceph_osdmap *map,
766 bool incremental)
767{
768 u32 n;
769
770 ceph_decode_32_safe(p, end, n, e_inval);
771 while (n--) {
772 struct ceph_pg_pool_info *pi;
773 u64 pool;
774 int ret;
775
776 ceph_decode_64_safe(p, end, pool, e_inval);
777
778 pi = __lookup_pg_pool(&map->pg_pools, pool);
779 if (!incremental || !pi) {
780 pi = kzalloc(sizeof(*pi), GFP_NOFS);
781 if (!pi)
782 return -ENOMEM;
783
784 pi->id = pool;
785
786 ret = __insert_pg_pool(&map->pg_pools, pi);
787 if (ret) {
788 kfree(pi);
789 return ret;
790 }
791 }
792
793 ret = decode_pool(p, end, pi);
794 if (ret)
795 return ret;
707 } 796 }
708 if (version < 6) { 797
709 pr_warning("got old v %d < 6 of osdmap\n", version); 798 return 0;
710 goto bad; 799
800e_inval:
801 return -EINVAL;
802}
803
804static int decode_pools(void **p, void *end, struct ceph_osdmap *map)
805{
806 return __decode_pools(p, end, map, false);
807}
808
809static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
810{
811 return __decode_pools(p, end, map, true);
812}
813
814static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map,
815 bool incremental)
816{
817 u32 n;
818
819 ceph_decode_32_safe(p, end, n, e_inval);
820 while (n--) {
821 struct ceph_pg pgid;
822 u32 len, i;
823 int ret;
824
825 ret = ceph_decode_pgid(p, end, &pgid);
826 if (ret)
827 return ret;
828
829 ceph_decode_32_safe(p, end, len, e_inval);
830
831 ret = __remove_pg_mapping(&map->pg_temp, pgid);
832 BUG_ON(!incremental && ret != -ENOENT);
833
834 if (!incremental || len > 0) {
835 struct ceph_pg_mapping *pg;
836
837 ceph_decode_need(p, end, len*sizeof(u32), e_inval);
838
839 if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
840 return -EINVAL;
841
842 pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS);
843 if (!pg)
844 return -ENOMEM;
845
846 pg->pgid = pgid;
847 pg->pg_temp.len = len;
848 for (i = 0; i < len; i++)
849 pg->pg_temp.osds[i] = ceph_decode_32(p);
850
851 ret = __insert_pg_mapping(pg, &map->pg_temp);
852 if (ret) {
853 kfree(pg);
854 return ret;
855 }
856 }
711 } 857 }
712 858
713 ceph_decode_need(p, end, 2*sizeof(u64)+6*sizeof(u32), bad); 859 return 0;
860
861e_inval:
862 return -EINVAL;
863}
864
865static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
866{
867 return __decode_pg_temp(p, end, map, false);
868}
869
870static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
871{
872 return __decode_pg_temp(p, end, map, true);
873}
874
875static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map,
876 bool incremental)
877{
878 u32 n;
879
880 ceph_decode_32_safe(p, end, n, e_inval);
881 while (n--) {
882 struct ceph_pg pgid;
883 u32 osd;
884 int ret;
885
886 ret = ceph_decode_pgid(p, end, &pgid);
887 if (ret)
888 return ret;
889
890 ceph_decode_32_safe(p, end, osd, e_inval);
891
892 ret = __remove_pg_mapping(&map->primary_temp, pgid);
893 BUG_ON(!incremental && ret != -ENOENT);
894
895 if (!incremental || osd != (u32)-1) {
896 struct ceph_pg_mapping *pg;
897
898 pg = kzalloc(sizeof(*pg), GFP_NOFS);
899 if (!pg)
900 return -ENOMEM;
901
902 pg->pgid = pgid;
903 pg->primary_temp.osd = osd;
904
905 ret = __insert_pg_mapping(pg, &map->primary_temp);
906 if (ret) {
907 kfree(pg);
908 return ret;
909 }
910 }
911 }
912
913 return 0;
914
915e_inval:
916 return -EINVAL;
917}
918
919static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
920{
921 return __decode_primary_temp(p, end, map, false);
922}
923
924static int decode_new_primary_temp(void **p, void *end,
925 struct ceph_osdmap *map)
926{
927 return __decode_primary_temp(p, end, map, true);
928}
929
930u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
931{
932 BUG_ON(osd >= map->max_osd);
933
934 if (!map->osd_primary_affinity)
935 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
936
937 return map->osd_primary_affinity[osd];
938}
939
940static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff)
941{
942 BUG_ON(osd >= map->max_osd);
943
944 if (!map->osd_primary_affinity) {
945 int i;
946
947 map->osd_primary_affinity = kmalloc(map->max_osd*sizeof(u32),
948 GFP_NOFS);
949 if (!map->osd_primary_affinity)
950 return -ENOMEM;
951
952 for (i = 0; i < map->max_osd; i++)
953 map->osd_primary_affinity[i] =
954 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
955 }
956
957 map->osd_primary_affinity[osd] = aff;
958
959 return 0;
960}
961
962static int decode_primary_affinity(void **p, void *end,
963 struct ceph_osdmap *map)
964{
965 u32 len, i;
966
967 ceph_decode_32_safe(p, end, len, e_inval);
968 if (len == 0) {
969 kfree(map->osd_primary_affinity);
970 map->osd_primary_affinity = NULL;
971 return 0;
972 }
973 if (len != map->max_osd)
974 goto e_inval;
975
976 ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval);
977
978 for (i = 0; i < map->max_osd; i++) {
979 int ret;
980
981 ret = set_primary_affinity(map, i, ceph_decode_32(p));
982 if (ret)
983 return ret;
984 }
985
986 return 0;
987
988e_inval:
989 return -EINVAL;
990}
991
992static int decode_new_primary_affinity(void **p, void *end,
993 struct ceph_osdmap *map)
994{
995 u32 n;
996
997 ceph_decode_32_safe(p, end, n, e_inval);
998 while (n--) {
999 u32 osd, aff;
1000 int ret;
1001
1002 ceph_decode_32_safe(p, end, osd, e_inval);
1003 ceph_decode_32_safe(p, end, aff, e_inval);
1004
1005 ret = set_primary_affinity(map, osd, aff);
1006 if (ret)
1007 return ret;
1008
1009 pr_info("osd%d primary-affinity 0x%x\n", osd, aff);
1010 }
1011
1012 return 0;
1013
1014e_inval:
1015 return -EINVAL;
1016}
1017
1018/*
1019 * decode a full map.
1020 */
1021static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
1022{
1023 u8 struct_v;
1024 u32 epoch = 0;
1025 void *start = *p;
1026 u32 max;
1027 u32 len, i;
1028 int err;
1029
1030 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
1031
1032 err = get_osdmap_client_data_v(p, end, "full", &struct_v);
1033 if (err)
1034 goto bad;
1035
1036 /* fsid, epoch, created, modified */
1037 ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) +
1038 sizeof(map->created) + sizeof(map->modified), e_inval);
714 ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); 1039 ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
715 map->epoch = ceph_decode_32(p); 1040 epoch = map->epoch = ceph_decode_32(p);
716 ceph_decode_copy(p, &map->created, sizeof(map->created)); 1041 ceph_decode_copy(p, &map->created, sizeof(map->created));
717 ceph_decode_copy(p, &map->modified, sizeof(map->modified)); 1042 ceph_decode_copy(p, &map->modified, sizeof(map->modified));
718 1043
719 ceph_decode_32_safe(p, end, max, bad); 1044 /* pools */
720 while (max--) { 1045 err = decode_pools(p, end, map);
721 ceph_decode_need(p, end, 8 + 2, bad); 1046 if (err)
722 err = -ENOMEM; 1047 goto bad;
723 pi = kzalloc(sizeof(*pi), GFP_NOFS);
724 if (!pi)
725 goto bad;
726 pi->id = ceph_decode_64(p);
727 err = __decode_pool(p, end, pi);
728 if (err < 0) {
729 kfree(pi);
730 goto bad;
731 }
732 __insert_pg_pool(&map->pg_pools, pi);
733 }
734 1048
735 err = __decode_pool_names(p, end, map); 1049 /* pool_name */
736 if (err < 0) { 1050 err = decode_pool_names(p, end, map);
737 dout("fail to decode pool names"); 1051 if (err)
738 goto bad; 1052 goto bad;
739 }
740 1053
741 ceph_decode_32_safe(p, end, map->pool_max, bad); 1054 ceph_decode_32_safe(p, end, map->pool_max, e_inval);
742 1055
743 ceph_decode_32_safe(p, end, map->flags, bad); 1056 ceph_decode_32_safe(p, end, map->flags, e_inval);
744 1057
745 max = ceph_decode_32(p); 1058 /* max_osd */
1059 ceph_decode_32_safe(p, end, max, e_inval);
746 1060
747 /* (re)alloc osd arrays */ 1061 /* (re)alloc osd arrays */
748 err = osdmap_set_max_osd(map, max); 1062 err = osdmap_set_max_osd(map, max);
749 if (err < 0) 1063 if (err)
750 goto bad; 1064 goto bad;
751 dout("osdmap_decode max_osd = %d\n", map->max_osd);
752 1065
753 /* osds */ 1066 /* osd_state, osd_weight, osd_addrs->client_addr */
754 err = -EINVAL;
755 ceph_decode_need(p, end, 3*sizeof(u32) + 1067 ceph_decode_need(p, end, 3*sizeof(u32) +
756 map->max_osd*(1 + sizeof(*map->osd_weight) + 1068 map->max_osd*(1 + sizeof(*map->osd_weight) +
757 sizeof(*map->osd_addr)), bad); 1069 sizeof(*map->osd_addr)), e_inval);
758 *p += 4; /* skip length field (should match max) */ 1070
1071 if (ceph_decode_32(p) != map->max_osd)
1072 goto e_inval;
1073
759 ceph_decode_copy(p, map->osd_state, map->max_osd); 1074 ceph_decode_copy(p, map->osd_state, map->max_osd);
760 1075
761 *p += 4; /* skip length field (should match max) */ 1076 if (ceph_decode_32(p) != map->max_osd)
1077 goto e_inval;
1078
762 for (i = 0; i < map->max_osd; i++) 1079 for (i = 0; i < map->max_osd; i++)
763 map->osd_weight[i] = ceph_decode_32(p); 1080 map->osd_weight[i] = ceph_decode_32(p);
764 1081
765 *p += 4; /* skip length field (should match max) */ 1082 if (ceph_decode_32(p) != map->max_osd)
1083 goto e_inval;
1084
766 ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr)); 1085 ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
767 for (i = 0; i < map->max_osd; i++) 1086 for (i = 0; i < map->max_osd; i++)
768 ceph_decode_addr(&map->osd_addr[i]); 1087 ceph_decode_addr(&map->osd_addr[i]);
769 1088
770 /* pg_temp */ 1089 /* pg_temp */
771 ceph_decode_32_safe(p, end, len, bad); 1090 err = decode_pg_temp(p, end, map);
772 for (i = 0; i < len; i++) { 1091 if (err)
773 int n, j; 1092 goto bad;
774 struct ceph_pg pgid;
775 struct ceph_pg_mapping *pg;
776 1093
777 err = ceph_decode_pgid(p, end, &pgid); 1094 /* primary_temp */
1095 if (struct_v >= 1) {
1096 err = decode_primary_temp(p, end, map);
778 if (err) 1097 if (err)
779 goto bad; 1098 goto bad;
780 ceph_decode_need(p, end, sizeof(u32), bad); 1099 }
781 n = ceph_decode_32(p);
782 err = -EINVAL;
783 if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
784 goto bad;
785 ceph_decode_need(p, end, n * sizeof(u32), bad);
786 err = -ENOMEM;
787 pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS);
788 if (!pg)
789 goto bad;
790 pg->pgid = pgid;
791 pg->len = n;
792 for (j = 0; j < n; j++)
793 pg->osds[j] = ceph_decode_32(p);
794 1100
795 err = __insert_pg_mapping(pg, &map->pg_temp); 1101 /* primary_affinity */
1102 if (struct_v >= 2) {
1103 err = decode_primary_affinity(p, end, map);
796 if (err) 1104 if (err)
797 goto bad; 1105 goto bad;
798 dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed, 1106 } else {
799 len); 1107 /* XXX can this happen? */
1108 kfree(map->osd_primary_affinity);
1109 map->osd_primary_affinity = NULL;
800 } 1110 }
801 1111
802 /* crush */ 1112 /* crush */
803 ceph_decode_32_safe(p, end, len, bad); 1113 ceph_decode_32_safe(p, end, len, e_inval);
804 dout("osdmap_decode crush len %d from off 0x%x\n", len, 1114 map->crush = crush_decode(*p, min(*p + len, end));
805 (int)(*p - start));
806 ceph_decode_need(p, end, len, bad);
807 map->crush = crush_decode(*p, end);
808 *p += len;
809 if (IS_ERR(map->crush)) { 1115 if (IS_ERR(map->crush)) {
810 err = PTR_ERR(map->crush); 1116 err = PTR_ERR(map->crush);
811 map->crush = NULL; 1117 map->crush = NULL;
812 goto bad; 1118 goto bad;
813 } 1119 }
1120 *p += len;
814 1121
815 /* ignore the rest of the map */ 1122 /* ignore the rest */
816 *p = end; 1123 *p = end;
817 1124
818 dout("osdmap_decode done %p %p\n", *p, end); 1125 dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
819 return map; 1126 return 0;
820 1127
1128e_inval:
1129 err = -EINVAL;
821bad: 1130bad:
822 dout("osdmap_decode fail err %d\n", err); 1131 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
823 ceph_osdmap_destroy(map); 1132 err, epoch, (int)(*p - start), *p, start, end);
824 return ERR_PTR(err); 1133 print_hex_dump(KERN_DEBUG, "osdmap: ",
1134 DUMP_PREFIX_OFFSET, 16, 1,
1135 start, end - start, true);
1136 return err;
1137}
1138
1139/*
1140 * Allocate and decode a full map.
1141 */
1142struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
1143{
1144 struct ceph_osdmap *map;
1145 int ret;
1146
1147 map = kzalloc(sizeof(*map), GFP_NOFS);
1148 if (!map)
1149 return ERR_PTR(-ENOMEM);
1150
1151 map->pg_temp = RB_ROOT;
1152 map->primary_temp = RB_ROOT;
1153 mutex_init(&map->crush_scratch_mutex);
1154
1155 ret = osdmap_decode(p, end, map);
1156 if (ret) {
1157 ceph_osdmap_destroy(map);
1158 return ERR_PTR(ret);
1159 }
1160
1161 return map;
825} 1162}
826 1163
827/* 1164/*
@@ -840,17 +1177,18 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
840 __s64 new_pool_max; 1177 __s64 new_pool_max;
841 __s32 new_flags, max; 1178 __s32 new_flags, max;
842 void *start = *p; 1179 void *start = *p;
843 int err = -EINVAL; 1180 int err;
844 u16 version; 1181 u8 struct_v;
1182
1183 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p));
845 1184
846 ceph_decode_16_safe(p, end, version, bad); 1185 err = get_osdmap_client_data_v(p, end, "inc", &struct_v);
847 if (version != 6) { 1186 if (err)
848 pr_warning("got unknown v %d != 6 of inc osdmap\n", version);
849 goto bad; 1187 goto bad;
850 }
851 1188
852 ceph_decode_need(p, end, sizeof(fsid)+sizeof(modified)+2*sizeof(u32), 1189 /* fsid, epoch, modified, new_pool_max, new_flags */
853 bad); 1190 ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) +
1191 sizeof(u64) + sizeof(u32), e_inval);
854 ceph_decode_copy(p, &fsid, sizeof(fsid)); 1192 ceph_decode_copy(p, &fsid, sizeof(fsid));
855 epoch = ceph_decode_32(p); 1193 epoch = ceph_decode_32(p);
856 BUG_ON(epoch != map->epoch+1); 1194 BUG_ON(epoch != map->epoch+1);
@@ -859,21 +1197,22 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
859 new_flags = ceph_decode_32(p); 1197 new_flags = ceph_decode_32(p);
860 1198
861 /* full map? */ 1199 /* full map? */
862 ceph_decode_32_safe(p, end, len, bad); 1200 ceph_decode_32_safe(p, end, len, e_inval);
863 if (len > 0) { 1201 if (len > 0) {
864 dout("apply_incremental full map len %d, %p to %p\n", 1202 dout("apply_incremental full map len %d, %p to %p\n",
865 len, *p, end); 1203 len, *p, end);
866 return osdmap_decode(p, min(*p+len, end)); 1204 return ceph_osdmap_decode(p, min(*p+len, end));
867 } 1205 }
868 1206
869 /* new crush? */ 1207 /* new crush? */
870 ceph_decode_32_safe(p, end, len, bad); 1208 ceph_decode_32_safe(p, end, len, e_inval);
871 if (len > 0) { 1209 if (len > 0) {
872 dout("apply_incremental new crush map len %d, %p to %p\n",
873 len, *p, end);
874 newcrush = crush_decode(*p, min(*p+len, end)); 1210 newcrush = crush_decode(*p, min(*p+len, end));
875 if (IS_ERR(newcrush)) 1211 if (IS_ERR(newcrush)) {
876 return ERR_CAST(newcrush); 1212 err = PTR_ERR(newcrush);
1213 newcrush = NULL;
1214 goto bad;
1215 }
877 *p += len; 1216 *p += len;
878 } 1217 }
879 1218
@@ -883,13 +1222,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
883 if (new_pool_max >= 0) 1222 if (new_pool_max >= 0)
884 map->pool_max = new_pool_max; 1223 map->pool_max = new_pool_max;
885 1224
886 ceph_decode_need(p, end, 5*sizeof(u32), bad);
887
888 /* new max? */ 1225 /* new max? */
889 max = ceph_decode_32(p); 1226 ceph_decode_32_safe(p, end, max, e_inval);
890 if (max >= 0) { 1227 if (max >= 0) {
891 err = osdmap_set_max_osd(map, max); 1228 err = osdmap_set_max_osd(map, max);
892 if (err < 0) 1229 if (err)
893 goto bad; 1230 goto bad;
894 } 1231 }
895 1232
@@ -902,51 +1239,34 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
902 newcrush = NULL; 1239 newcrush = NULL;
903 } 1240 }
904 1241
905 /* new_pool */ 1242 /* new_pools */
906 ceph_decode_32_safe(p, end, len, bad); 1243 err = decode_new_pools(p, end, map);
907 while (len--) { 1244 if (err)
908 struct ceph_pg_pool_info *pi; 1245 goto bad;
909 1246
910 ceph_decode_64_safe(p, end, pool, bad); 1247 /* new_pool_names */
911 pi = __lookup_pg_pool(&map->pg_pools, pool); 1248 err = decode_pool_names(p, end, map);
912 if (!pi) { 1249 if (err)
913 pi = kzalloc(sizeof(*pi), GFP_NOFS); 1250 goto bad;
914 if (!pi) {
915 err = -ENOMEM;
916 goto bad;
917 }
918 pi->id = pool;
919 __insert_pg_pool(&map->pg_pools, pi);
920 }
921 err = __decode_pool(p, end, pi);
922 if (err < 0)
923 goto bad;
924 }
925 if (version >= 5) {
926 err = __decode_pool_names(p, end, map);
927 if (err < 0)
928 goto bad;
929 }
930 1251
931 /* old_pool */ 1252 /* old_pool */
932 ceph_decode_32_safe(p, end, len, bad); 1253 ceph_decode_32_safe(p, end, len, e_inval);
933 while (len--) { 1254 while (len--) {
934 struct ceph_pg_pool_info *pi; 1255 struct ceph_pg_pool_info *pi;
935 1256
936 ceph_decode_64_safe(p, end, pool, bad); 1257 ceph_decode_64_safe(p, end, pool, e_inval);
937 pi = __lookup_pg_pool(&map->pg_pools, pool); 1258 pi = __lookup_pg_pool(&map->pg_pools, pool);
938 if (pi) 1259 if (pi)
939 __remove_pg_pool(&map->pg_pools, pi); 1260 __remove_pg_pool(&map->pg_pools, pi);
940 } 1261 }
941 1262
942 /* new_up */ 1263 /* new_up */
943 err = -EINVAL; 1264 ceph_decode_32_safe(p, end, len, e_inval);
944 ceph_decode_32_safe(p, end, len, bad);
945 while (len--) { 1265 while (len--) {
946 u32 osd; 1266 u32 osd;
947 struct ceph_entity_addr addr; 1267 struct ceph_entity_addr addr;
948 ceph_decode_32_safe(p, end, osd, bad); 1268 ceph_decode_32_safe(p, end, osd, e_inval);
949 ceph_decode_copy_safe(p, end, &addr, sizeof(addr), bad); 1269 ceph_decode_copy_safe(p, end, &addr, sizeof(addr), e_inval);
950 ceph_decode_addr(&addr); 1270 ceph_decode_addr(&addr);
951 pr_info("osd%d up\n", osd); 1271 pr_info("osd%d up\n", osd);
952 BUG_ON(osd >= map->max_osd); 1272 BUG_ON(osd >= map->max_osd);
@@ -955,11 +1275,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
955 } 1275 }
956 1276
957 /* new_state */ 1277 /* new_state */
958 ceph_decode_32_safe(p, end, len, bad); 1278 ceph_decode_32_safe(p, end, len, e_inval);
959 while (len--) { 1279 while (len--) {
960 u32 osd; 1280 u32 osd;
961 u8 xorstate; 1281 u8 xorstate;
962 ceph_decode_32_safe(p, end, osd, bad); 1282 ceph_decode_32_safe(p, end, osd, e_inval);
963 xorstate = **(u8 **)p; 1283 xorstate = **(u8 **)p;
964 (*p)++; /* clean flag */ 1284 (*p)++; /* clean flag */
965 if (xorstate == 0) 1285 if (xorstate == 0)
@@ -971,10 +1291,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
971 } 1291 }
972 1292
973 /* new_weight */ 1293 /* new_weight */
974 ceph_decode_32_safe(p, end, len, bad); 1294 ceph_decode_32_safe(p, end, len, e_inval);
975 while (len--) { 1295 while (len--) {
976 u32 osd, off; 1296 u32 osd, off;
977 ceph_decode_need(p, end, sizeof(u32)*2, bad); 1297 ceph_decode_need(p, end, sizeof(u32)*2, e_inval);
978 osd = ceph_decode_32(p); 1298 osd = ceph_decode_32(p);
979 off = ceph_decode_32(p); 1299 off = ceph_decode_32(p);
980 pr_info("osd%d weight 0x%x %s\n", osd, off, 1300 pr_info("osd%d weight 0x%x %s\n", osd, off,
@@ -985,56 +1305,35 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
985 } 1305 }
986 1306
987 /* new_pg_temp */ 1307 /* new_pg_temp */
988 ceph_decode_32_safe(p, end, len, bad); 1308 err = decode_new_pg_temp(p, end, map);
989 while (len--) { 1309 if (err)
990 struct ceph_pg_mapping *pg; 1310 goto bad;
991 int j;
992 struct ceph_pg pgid;
993 u32 pglen;
994 1311
995 err = ceph_decode_pgid(p, end, &pgid); 1312 /* new_primary_temp */
1313 if (struct_v >= 1) {
1314 err = decode_new_primary_temp(p, end, map);
996 if (err) 1315 if (err)
997 goto bad; 1316 goto bad;
998 ceph_decode_need(p, end, sizeof(u32), bad); 1317 }
999 pglen = ceph_decode_32(p);
1000 if (pglen) {
1001 ceph_decode_need(p, end, pglen*sizeof(u32), bad);
1002
1003 /* removing existing (if any) */
1004 (void) __remove_pg_mapping(&map->pg_temp, pgid);
1005 1318
1006 /* insert */ 1319 /* new_primary_affinity */
1007 err = -EINVAL; 1320 if (struct_v >= 2) {
1008 if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) 1321 err = decode_new_primary_affinity(p, end, map);
1009 goto bad; 1322 if (err)
1010 err = -ENOMEM; 1323 goto bad;
1011 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
1012 if (!pg)
1013 goto bad;
1014 pg->pgid = pgid;
1015 pg->len = pglen;
1016 for (j = 0; j < pglen; j++)
1017 pg->osds[j] = ceph_decode_32(p);
1018 err = __insert_pg_mapping(pg, &map->pg_temp);
1019 if (err) {
1020 kfree(pg);
1021 goto bad;
1022 }
1023 dout(" added pg_temp %lld.%x len %d\n", pgid.pool,
1024 pgid.seed, pglen);
1025 } else {
1026 /* remove */
1027 __remove_pg_mapping(&map->pg_temp, pgid);
1028 }
1029 } 1324 }
1030 1325
1031 /* ignore the rest */ 1326 /* ignore the rest */
1032 *p = end; 1327 *p = end;
1328
1329 dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd);
1033 return map; 1330 return map;
1034 1331
1332e_inval:
1333 err = -EINVAL;
1035bad: 1334bad:
1036 pr_err("corrupt inc osdmap epoch %d off %d (%p of %p-%p)\n", 1335 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1037 epoch, (int)(*p - start), *p, start, end); 1336 err, epoch, (int)(*p - start), *p, start, end);
1038 print_hex_dump(KERN_DEBUG, "osdmap: ", 1337 print_hex_dump(KERN_DEBUG, "osdmap: ",
1039 DUMP_PREFIX_OFFSET, 16, 1, 1338 DUMP_PREFIX_OFFSET, 16, 1,
1040 start, end - start, true); 1339 start, end - start, true);
@@ -1142,61 +1441,249 @@ int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
1142} 1441}
1143EXPORT_SYMBOL(ceph_oloc_oid_to_pg); 1442EXPORT_SYMBOL(ceph_oloc_oid_to_pg);
1144 1443
1145static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x, 1444static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
1146 int *result, int result_max, 1445 int *result, int result_max,
1147 const __u32 *weight, int weight_max) 1446 const __u32 *weight, int weight_max)
1148{ 1447{
1149 int scratch[result_max * 3]; 1448 int r;
1150 1449
1151 return crush_do_rule(map, ruleno, x, result, result_max, 1450 BUG_ON(result_max > CEPH_PG_MAX_SIZE);
1152 weight, weight_max, scratch); 1451
1452 mutex_lock(&map->crush_scratch_mutex);
1453 r = crush_do_rule(map->crush, ruleno, x, result, result_max,
1454 weight, weight_max, map->crush_scratch_ary);
1455 mutex_unlock(&map->crush_scratch_mutex);
1456
1457 return r;
1153} 1458}
1154 1459
1155/* 1460/*
1156 * Calculate raw osd vector for the given pgid. Return pointer to osd 1461 * Calculate raw (crush) set for given pgid.
1157 * array, or NULL on failure. 1462 *
1463 * Return raw set length, or error.
1158 */ 1464 */
1159static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, 1465static int pg_to_raw_osds(struct ceph_osdmap *osdmap,
1160 int *osds, int *num) 1466 struct ceph_pg_pool_info *pool,
1467 struct ceph_pg pgid, u32 pps, int *osds)
1161{ 1468{
1162 struct ceph_pg_mapping *pg;
1163 struct ceph_pg_pool_info *pool;
1164 int ruleno; 1469 int ruleno;
1165 int r; 1470 int len;
1166 u32 pps;
1167 1471
1168 pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); 1472 /* crush */
1169 if (!pool) 1473 ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset,
1170 return NULL; 1474 pool->type, pool->size);
1475 if (ruleno < 0) {
1476 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
1477 pgid.pool, pool->crush_ruleset, pool->type,
1478 pool->size);
1479 return -ENOENT;
1480 }
1171 1481
1172 /* pg_temp? */ 1482 len = do_crush(osdmap, ruleno, pps, osds,
1483 min_t(int, pool->size, CEPH_PG_MAX_SIZE),
1484 osdmap->osd_weight, osdmap->max_osd);
1485 if (len < 0) {
1486 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
1487 len, ruleno, pgid.pool, pool->crush_ruleset,
1488 pool->type, pool->size);
1489 return len;
1490 }
1491
1492 return len;
1493}
1494
1495/*
1496 * Given raw set, calculate up set and up primary.
1497 *
1498 * Return up set length. *primary is set to up primary osd id, or -1
1499 * if up set is empty.
1500 */
1501static int raw_to_up_osds(struct ceph_osdmap *osdmap,
1502 struct ceph_pg_pool_info *pool,
1503 int *osds, int len, int *primary)
1504{
1505 int up_primary = -1;
1506 int i;
1507
1508 if (ceph_can_shift_osds(pool)) {
1509 int removed = 0;
1510
1511 for (i = 0; i < len; i++) {
1512 if (ceph_osd_is_down(osdmap, osds[i])) {
1513 removed++;
1514 continue;
1515 }
1516 if (removed)
1517 osds[i - removed] = osds[i];
1518 }
1519
1520 len -= removed;
1521 if (len > 0)
1522 up_primary = osds[0];
1523 } else {
1524 for (i = len - 1; i >= 0; i--) {
1525 if (ceph_osd_is_down(osdmap, osds[i]))
1526 osds[i] = CRUSH_ITEM_NONE;
1527 else
1528 up_primary = osds[i];
1529 }
1530 }
1531
1532 *primary = up_primary;
1533 return len;
1534}
1535
1536static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
1537 struct ceph_pg_pool_info *pool,
1538 int *osds, int len, int *primary)
1539{
1540 int i;
1541 int pos = -1;
1542
1543 /*
1544 * Do we have any non-default primary_affinity values for these
1545 * osds?
1546 */
1547 if (!osdmap->osd_primary_affinity)
1548 return;
1549
1550 for (i = 0; i < len; i++) {
1551 if (osds[i] != CRUSH_ITEM_NONE &&
1552 osdmap->osd_primary_affinity[i] !=
1553 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
1554 break;
1555 }
1556 }
1557 if (i == len)
1558 return;
1559
1560 /*
1561 * Pick the primary. Feed both the seed (for the pg) and the
1562 * osd into the hash/rng so that a proportional fraction of an
1563 * osd's pgs get rejected as primary.
1564 */
1565 for (i = 0; i < len; i++) {
1566 int osd;
1567 u32 aff;
1568
1569 osd = osds[i];
1570 if (osd == CRUSH_ITEM_NONE)
1571 continue;
1572
1573 aff = osdmap->osd_primary_affinity[osd];
1574 if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
1575 (crush_hash32_2(CRUSH_HASH_RJENKINS1,
1576 pps, osd) >> 16) >= aff) {
1577 /*
1578 * We chose not to use this primary. Note it
1579 * anyway as a fallback in case we don't pick
1580 * anyone else, but keep looking.
1581 */
1582 if (pos < 0)
1583 pos = i;
1584 } else {
1585 pos = i;
1586 break;
1587 }
1588 }
1589 if (pos < 0)
1590 return;
1591
1592 *primary = osds[pos];
1593
1594 if (ceph_can_shift_osds(pool) && pos > 0) {
1595 /* move the new primary to the front */
1596 for (i = pos; i > 0; i--)
1597 osds[i] = osds[i - 1];
1598 osds[0] = *primary;
1599 }
1600}
1601
1602/*
1603 * Given up set, apply pg_temp and primary_temp mappings.
1604 *
1605 * Return acting set length. *primary is set to acting primary osd id,
1606 * or -1 if acting set is empty.
1607 */
1608static int apply_temps(struct ceph_osdmap *osdmap,
1609 struct ceph_pg_pool_info *pool, struct ceph_pg pgid,
1610 int *osds, int len, int *primary)
1611{
1612 struct ceph_pg_mapping *pg;
1613 int temp_len;
1614 int temp_primary;
1615 int i;
1616
1617 /* raw_pg -> pg */
1173 pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, 1618 pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,
1174 pool->pg_num_mask); 1619 pool->pg_num_mask);
1620
1621 /* pg_temp? */
1175 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); 1622 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
1176 if (pg) { 1623 if (pg) {
1177 *num = pg->len; 1624 temp_len = 0;
1178 return pg->osds; 1625 temp_primary = -1;
1626
1627 for (i = 0; i < pg->pg_temp.len; i++) {
1628 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
1629 if (ceph_can_shift_osds(pool))
1630 continue;
1631 else
1632 osds[temp_len++] = CRUSH_ITEM_NONE;
1633 } else {
1634 osds[temp_len++] = pg->pg_temp.osds[i];
1635 }
1636 }
1637
1638 /* apply pg_temp's primary */
1639 for (i = 0; i < temp_len; i++) {
1640 if (osds[i] != CRUSH_ITEM_NONE) {
1641 temp_primary = osds[i];
1642 break;
1643 }
1644 }
1645 } else {
1646 temp_len = len;
1647 temp_primary = *primary;
1179 } 1648 }
1180 1649
1181 /* crush */ 1650 /* primary_temp? */
1182 ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, 1651 pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
1183 pool->type, pool->size); 1652 if (pg)
1184 if (ruleno < 0) { 1653 temp_primary = pg->primary_temp.osd;
1185 pr_err("no crush rule pool %lld ruleset %d type %d size %d\n", 1654
1186 pgid.pool, pool->crush_ruleset, pool->type, 1655 *primary = temp_primary;
1187 pool->size); 1656 return temp_len;
1188 return NULL; 1657}
1658
1659/*
1660 * Calculate acting set for given pgid.
1661 *
1662 * Return acting set length, or error. *primary is set to acting
1663 * primary osd id, or -1 if acting set is empty or on error.
1664 */
1665int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1666 int *osds, int *primary)
1667{
1668 struct ceph_pg_pool_info *pool;
1669 u32 pps;
1670 int len;
1671
1672 pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool);
1673 if (!pool) {
1674 *primary = -1;
1675 return -ENOENT;
1189 } 1676 }
1190 1677
1191 if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { 1678 if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
1192 /* hash pool id and seed sothat pool PGs do not overlap */ 1679 /* hash pool id and seed so that pool PGs do not overlap */
1193 pps = crush_hash32_2(CRUSH_HASH_RJENKINS1, 1680 pps = crush_hash32_2(CRUSH_HASH_RJENKINS1,
1194 ceph_stable_mod(pgid.seed, pool->pgp_num, 1681 ceph_stable_mod(pgid.seed, pool->pgp_num,
1195 pool->pgp_num_mask), 1682 pool->pgp_num_mask),
1196 pgid.pool); 1683 pgid.pool);
1197 } else { 1684 } else {
1198 /* 1685 /*
1199 * legacy ehavior: add ps and pool together. this is 1686 * legacy behavior: add ps and pool together. this is
1200 * not a great approach because the PGs from each pool 1687 * not a great approach because the PGs from each pool
1201 * will overlap on top of each other: 0.5 == 1.4 == 1688 * will overlap on top of each other: 0.5 == 1.4 ==
1202 * 2.3 == ... 1689 * 2.3 == ...
@@ -1205,38 +1692,20 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1205 pool->pgp_num_mask) + 1692 pool->pgp_num_mask) +
1206 (unsigned)pgid.pool; 1693 (unsigned)pgid.pool;
1207 } 1694 }
1208 r = crush_do_rule_ary(osdmap->crush, ruleno, pps, 1695
1209 osds, min_t(int, pool->size, *num), 1696 len = pg_to_raw_osds(osdmap, pool, pgid, pps, osds);
1210 osdmap->osd_weight, osdmap->max_osd); 1697 if (len < 0) {
1211 if (r < 0) { 1698 *primary = -1;
1212 pr_err("error %d from crush rule: pool %lld ruleset %d type %d" 1699 return len;
1213 " size %d\n", r, pgid.pool, pool->crush_ruleset,
1214 pool->type, pool->size);
1215 return NULL;
1216 } 1700 }
1217 *num = r;
1218 return osds;
1219}
1220 1701
1221/* 1702 len = raw_to_up_osds(osdmap, pool, osds, len, primary);
1222 * Return acting set for given pgid.
1223 */
1224int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1225 int *acting)
1226{
1227 int rawosds[CEPH_PG_MAX_SIZE], *osds;
1228 int i, o, num = CEPH_PG_MAX_SIZE;
1229 1703
1230 osds = calc_pg_raw(osdmap, pgid, rawosds, &num); 1704 apply_primary_affinity(osdmap, pps, pool, osds, len, primary);
1231 if (!osds)
1232 return -1;
1233 1705
1234 /* primary is first up osd */ 1706 len = apply_temps(osdmap, pool, pgid, osds, len, primary);
1235 o = 0; 1707
1236 for (i = 0; i < num; i++) 1708 return len;
1237 if (ceph_osd_is_up(osdmap, osds[i]))
1238 acting[o++] = osds[i];
1239 return o;
1240} 1709}
1241 1710
1242/* 1711/*
@@ -1244,17 +1713,11 @@ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1244 */ 1713 */
1245int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) 1714int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid)
1246{ 1715{
1247 int rawosds[CEPH_PG_MAX_SIZE], *osds; 1716 int osds[CEPH_PG_MAX_SIZE];
1248 int i, num = CEPH_PG_MAX_SIZE; 1717 int primary;
1249 1718
1250 osds = calc_pg_raw(osdmap, pgid, rawosds, &num); 1719 ceph_calc_pg_acting(osdmap, pgid, osds, &primary);
1251 if (!osds)
1252 return -1;
1253 1720
1254 /* primary is first up osd */ 1721 return primary;
1255 for (i = 0; i < num; i++)
1256 if (ceph_osd_is_up(osdmap, osds[i]))
1257 return osds[i];
1258 return -1;
1259} 1722}
1260EXPORT_SYMBOL(ceph_calc_pg_primary); 1723EXPORT_SYMBOL(ceph_calc_pg_primary);