aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/ceph/addr.c21
-rw-r--r--fs/ceph/cache.c92
-rw-r--r--fs/ceph/caps.c40
-rw-r--r--fs/ceph/file.c2
-rw-r--r--fs/ceph/inode.c18
-rw-r--r--fs/ceph/locks.c25
-rw-r--r--fs/ceph/mds_client.c4
-rw-r--r--fs/ceph/super.c47
-rw-r--r--fs/ceph/super.h4
-rw-r--r--fs/ceph/xattr.c3
-rw-r--r--include/linux/ceph/ceph_features.h264
-rw-r--r--include/linux/ceph/ceph_fs.h1
-rw-r--r--include/linux/ceph/decode.h60
-rw-r--r--include/linux/ceph/libceph.h49
-rw-r--r--include/linux/ceph/messenger.h2
-rw-r--r--include/linux/ceph/osd_client.h70
-rw-r--r--include/linux/ceph/osdmap.h41
-rw-r--r--include/linux/ceph/rados.h6
-rw-r--r--include/linux/crush/crush.h66
-rw-r--r--include/linux/crush/mapper.h9
-rw-r--r--net/ceph/ceph_common.c1
-rw-r--r--net/ceph/crush/crush.c3
-rw-r--r--net/ceph/crush/mapper.c81
-rw-r--r--net/ceph/debugfs.c112
-rw-r--r--net/ceph/messenger.c10
-rw-r--r--net/ceph/mon_client.c8
-rw-r--r--net/ceph/osd_client.c905
-rw-r--r--net/ceph/osdmap.c840
28 files changed, 2308 insertions, 476 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 1e71e6ca5ddf..50836280a6f8 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -530,14 +530,10 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
530 long writeback_stat; 530 long writeback_stat;
531 u64 truncate_size; 531 u64 truncate_size;
532 u32 truncate_seq; 532 u32 truncate_seq;
533 int err = 0, len = PAGE_SIZE; 533 int err, len = PAGE_SIZE;
534 534
535 dout("writepage %p idx %lu\n", page, page->index); 535 dout("writepage %p idx %lu\n", page, page->index);
536 536
537 if (!page->mapping || !page->mapping->host) {
538 dout("writepage %p - no mapping\n", page);
539 return -EFAULT;
540 }
541 inode = page->mapping->host; 537 inode = page->mapping->host;
542 ci = ceph_inode(inode); 538 ci = ceph_inode(inode);
543 fsc = ceph_inode_to_client(inode); 539 fsc = ceph_inode_to_client(inode);
@@ -547,7 +543,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
547 snapc = page_snap_context(page); 543 snapc = page_snap_context(page);
548 if (snapc == NULL) { 544 if (snapc == NULL) {
549 dout("writepage %p page %p not dirty?\n", inode, page); 545 dout("writepage %p page %p not dirty?\n", inode, page);
550 goto out; 546 return 0;
551 } 547 }
552 oldest = get_oldest_context(inode, &snap_size, 548 oldest = get_oldest_context(inode, &snap_size,
553 &truncate_size, &truncate_seq); 549 &truncate_size, &truncate_seq);
@@ -555,9 +551,10 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
555 dout("writepage %p page %p snapc %p not writeable - noop\n", 551 dout("writepage %p page %p snapc %p not writeable - noop\n",
556 inode, page, snapc); 552 inode, page, snapc);
557 /* we should only noop if called by kswapd */ 553 /* we should only noop if called by kswapd */
558 WARN_ON((current->flags & PF_MEMALLOC) == 0); 554 WARN_ON(!(current->flags & PF_MEMALLOC));
559 ceph_put_snap_context(oldest); 555 ceph_put_snap_context(oldest);
560 goto out; 556 redirty_page_for_writepage(wbc, page);
557 return 0;
561 } 558 }
562 ceph_put_snap_context(oldest); 559 ceph_put_snap_context(oldest);
563 560
@@ -567,8 +564,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
567 /* is this a partial page at end of file? */ 564 /* is this a partial page at end of file? */
568 if (page_off >= snap_size) { 565 if (page_off >= snap_size) {
569 dout("%p page eof %llu\n", page, snap_size); 566 dout("%p page eof %llu\n", page, snap_size);
570 goto out; 567 return 0;
571 } 568 }
569
572 if (snap_size < page_off + len) 570 if (snap_size < page_off + len)
573 len = snap_size - page_off; 571 len = snap_size - page_off;
574 572
@@ -595,7 +593,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
595 dout("writepage interrupted page %p\n", page); 593 dout("writepage interrupted page %p\n", page);
596 redirty_page_for_writepage(wbc, page); 594 redirty_page_for_writepage(wbc, page);
597 end_page_writeback(page); 595 end_page_writeback(page);
598 goto out; 596 return err;
599 } 597 }
600 dout("writepage setting page/mapping error %d %p\n", 598 dout("writepage setting page/mapping error %d %p\n",
601 err, page); 599 err, page);
@@ -611,7 +609,6 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
611 end_page_writeback(page); 609 end_page_writeback(page);
612 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 610 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
613 ceph_put_snap_context(snapc); /* page's reference */ 611 ceph_put_snap_context(snapc); /* page's reference */
614out:
615 return err; 612 return err;
616} 613}
617 614
@@ -1318,7 +1315,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
1318 struct page *page, void *fsdata) 1315 struct page *page, void *fsdata)
1319{ 1316{
1320 struct inode *inode = file_inode(file); 1317 struct inode *inode = file_inode(file);
1321 int check_cap = 0; 1318 bool check_cap = false;
1322 1319
1323 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file, 1320 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1324 inode, page, (int)pos, (int)copied, (int)len); 1321 inode, page, (int)pos, (int)copied, (int)len);
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 4e7421caf380..fd1172823f86 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -35,18 +35,34 @@ struct fscache_netfs ceph_cache_netfs = {
35 .version = 0, 35 .version = 0,
36}; 36};
37 37
38static DEFINE_MUTEX(ceph_fscache_lock);
39static LIST_HEAD(ceph_fscache_list);
40
41struct ceph_fscache_entry {
42 struct list_head list;
43 struct fscache_cookie *fscache;
44 struct ceph_fsid fsid;
45 size_t uniq_len;
46 char uniquifier[0];
47};
48
38static uint16_t ceph_fscache_session_get_key(const void *cookie_netfs_data, 49static uint16_t ceph_fscache_session_get_key(const void *cookie_netfs_data,
39 void *buffer, uint16_t maxbuf) 50 void *buffer, uint16_t maxbuf)
40{ 51{
41 const struct ceph_fs_client* fsc = cookie_netfs_data; 52 const struct ceph_fs_client* fsc = cookie_netfs_data;
42 uint16_t klen; 53 const char *fscache_uniq = fsc->mount_options->fscache_uniq;
54 uint16_t fsid_len, uniq_len;
43 55
44 klen = sizeof(fsc->client->fsid); 56 fsid_len = sizeof(fsc->client->fsid);
45 if (klen > maxbuf) 57 uniq_len = fscache_uniq ? strlen(fscache_uniq) : 0;
58 if (fsid_len + uniq_len > maxbuf)
46 return 0; 59 return 0;
47 60
48 memcpy(buffer, &fsc->client->fsid, klen); 61 memcpy(buffer, &fsc->client->fsid, fsid_len);
49 return klen; 62 if (uniq_len)
63 memcpy(buffer + fsid_len, fscache_uniq, uniq_len);
64
65 return fsid_len + uniq_len;
50} 66}
51 67
52static const struct fscache_cookie_def ceph_fscache_fsid_object_def = { 68static const struct fscache_cookie_def ceph_fscache_fsid_object_def = {
@@ -67,13 +83,54 @@ void ceph_fscache_unregister(void)
67 83
68int ceph_fscache_register_fs(struct ceph_fs_client* fsc) 84int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
69{ 85{
86 const struct ceph_fsid *fsid = &fsc->client->fsid;
87 const char *fscache_uniq = fsc->mount_options->fscache_uniq;
88 size_t uniq_len = fscache_uniq ? strlen(fscache_uniq) : 0;
89 struct ceph_fscache_entry *ent;
90 int err = 0;
91
92 mutex_lock(&ceph_fscache_lock);
93 list_for_each_entry(ent, &ceph_fscache_list, list) {
94 if (memcmp(&ent->fsid, fsid, sizeof(*fsid)))
95 continue;
96 if (ent->uniq_len != uniq_len)
97 continue;
98 if (uniq_len && memcmp(ent->uniquifier, fscache_uniq, uniq_len))
99 continue;
100
101 pr_err("fscache cookie already registered for fsid %pU\n", fsid);
102 pr_err(" use fsc=%%s mount option to specify a uniquifier\n");
103 err = -EBUSY;
104 goto out_unlock;
105 }
106
107 ent = kzalloc(sizeof(*ent) + uniq_len, GFP_KERNEL);
108 if (!ent) {
109 err = -ENOMEM;
110 goto out_unlock;
111 }
112
70 fsc->fscache = fscache_acquire_cookie(ceph_cache_netfs.primary_index, 113 fsc->fscache = fscache_acquire_cookie(ceph_cache_netfs.primary_index,
71 &ceph_fscache_fsid_object_def, 114 &ceph_fscache_fsid_object_def,
72 fsc, true); 115 fsc, true);
73 if (!fsc->fscache)
74 pr_err("Unable to register fsid: %p fscache cookie\n", fsc);
75 116
76 return 0; 117 if (fsc->fscache) {
118 memcpy(&ent->fsid, fsid, sizeof(*fsid));
119 if (uniq_len > 0) {
120 memcpy(&ent->uniquifier, fscache_uniq, uniq_len);
121 ent->uniq_len = uniq_len;
122 }
123 ent->fscache = fsc->fscache;
124 list_add_tail(&ent->list, &ceph_fscache_list);
125 } else {
126 kfree(ent);
127 pr_err("unable to register fscache cookie for fsid %pU\n",
128 fsid);
129 /* all other fs ignore this error */
130 }
131out_unlock:
132 mutex_unlock(&ceph_fscache_lock);
133 return err;
77} 134}
78 135
79static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data, 136static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data,
@@ -349,7 +406,24 @@ void ceph_invalidate_fscache_page(struct inode* inode, struct page *page)
349 406
350void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc) 407void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc)
351{ 408{
352 fscache_relinquish_cookie(fsc->fscache, 0); 409 if (fscache_cookie_valid(fsc->fscache)) {
410 struct ceph_fscache_entry *ent;
411 bool found = false;
412
413 mutex_lock(&ceph_fscache_lock);
414 list_for_each_entry(ent, &ceph_fscache_list, list) {
415 if (ent->fscache == fsc->fscache) {
416 list_del(&ent->list);
417 kfree(ent);
418 found = true;
419 break;
420 }
421 }
422 WARN_ON_ONCE(!found);
423 mutex_unlock(&ceph_fscache_lock);
424
425 __fscache_relinquish_cookie(fsc->fscache, 0);
426 }
353 fsc->fscache = NULL; 427 fsc->fscache = NULL;
354} 428}
355 429
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index a3ebb632294e..7007ae2a5ad2 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1653,6 +1653,21 @@ static int try_nonblocking_invalidate(struct inode *inode)
1653 return -1; 1653 return -1;
1654} 1654}
1655 1655
1656bool __ceph_should_report_size(struct ceph_inode_info *ci)
1657{
1658 loff_t size = ci->vfs_inode.i_size;
1659 /* mds will adjust max size according to the reported size */
1660 if (ci->i_flushing_caps & CEPH_CAP_FILE_WR)
1661 return false;
1662 if (size >= ci->i_max_size)
1663 return true;
1664 /* half of previous max_size increment has been used */
1665 if (ci->i_max_size > ci->i_reported_size &&
1666 (size << 1) >= ci->i_max_size + ci->i_reported_size)
1667 return true;
1668 return false;
1669}
1670
1656/* 1671/*
1657 * Swiss army knife function to examine currently used and wanted 1672 * Swiss army knife function to examine currently used and wanted
1658 * versus held caps. Release, flush, ack revoked caps to mds as 1673 * versus held caps. Release, flush, ack revoked caps to mds as
@@ -1806,8 +1821,7 @@ retry_locked:
1806 } 1821 }
1807 1822
1808 /* approaching file_max? */ 1823 /* approaching file_max? */
1809 if ((inode->i_size << 1) >= ci->i_max_size && 1824 if (__ceph_should_report_size(ci)) {
1810 (ci->i_reported_size << 1) < ci->i_max_size) {
1811 dout("i_size approaching max_size\n"); 1825 dout("i_size approaching max_size\n");
1812 goto ack; 1826 goto ack;
1813 } 1827 }
@@ -3027,8 +3041,10 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
3027 le32_to_cpu(grant->truncate_seq), 3041 le32_to_cpu(grant->truncate_seq),
3028 le64_to_cpu(grant->truncate_size), 3042 le64_to_cpu(grant->truncate_size),
3029 size); 3043 size);
3030 /* max size increase? */ 3044 }
3031 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { 3045
3046 if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) {
3047 if (max_size != ci->i_max_size) {
3032 dout("max_size %lld -> %llu\n", 3048 dout("max_size %lld -> %llu\n",
3033 ci->i_max_size, max_size); 3049 ci->i_max_size, max_size);
3034 ci->i_max_size = max_size; 3050 ci->i_max_size = max_size;
@@ -3037,6 +3053,10 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
3037 ci->i_requested_max_size = 0; 3053 ci->i_requested_max_size = 0;
3038 } 3054 }
3039 wake = true; 3055 wake = true;
3056 } else if (ci->i_wanted_max_size > ci->i_max_size &&
3057 ci->i_wanted_max_size > ci->i_requested_max_size) {
3058 /* CEPH_CAP_OP_IMPORT */
3059 wake = true;
3040 } 3060 }
3041 } 3061 }
3042 3062
@@ -3554,7 +3574,6 @@ retry:
3554 } 3574 }
3555 3575
3556 /* make sure we re-request max_size, if necessary */ 3576 /* make sure we re-request max_size, if necessary */
3557 ci->i_wanted_max_size = 0;
3558 ci->i_requested_max_size = 0; 3577 ci->i_requested_max_size = 0;
3559 3578
3560 *old_issued = issued; 3579 *old_issued = issued;
@@ -3790,6 +3809,7 @@ bad:
3790 */ 3809 */
3791void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) 3810void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
3792{ 3811{
3812 struct inode *inode;
3793 struct ceph_inode_info *ci; 3813 struct ceph_inode_info *ci;
3794 int flags = CHECK_CAPS_NODELAY; 3814 int flags = CHECK_CAPS_NODELAY;
3795 3815
@@ -3805,9 +3825,15 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
3805 time_before(jiffies, ci->i_hold_caps_max)) 3825 time_before(jiffies, ci->i_hold_caps_max))
3806 break; 3826 break;
3807 list_del_init(&ci->i_cap_delay_list); 3827 list_del_init(&ci->i_cap_delay_list);
3828
3829 inode = igrab(&ci->vfs_inode);
3808 spin_unlock(&mdsc->cap_delay_lock); 3830 spin_unlock(&mdsc->cap_delay_lock);
3809 dout("check_delayed_caps on %p\n", &ci->vfs_inode); 3831
3810 ceph_check_caps(ci, flags, NULL); 3832 if (inode) {
3833 dout("check_delayed_caps on %p\n", inode);
3834 ceph_check_caps(ci, flags, NULL);
3835 iput(inode);
3836 }
3811 } 3837 }
3812 spin_unlock(&mdsc->cap_delay_lock); 3838 spin_unlock(&mdsc->cap_delay_lock);
3813} 3839}
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 29308a80d66f..3d48c415f3cb 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1040,8 +1040,8 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
1040 int num_pages; 1040 int num_pages;
1041 int written = 0; 1041 int written = 0;
1042 int flags; 1042 int flags;
1043 int check_caps = 0;
1044 int ret; 1043 int ret;
1044 bool check_caps = false;
1045 struct timespec mtime = current_time(inode); 1045 struct timespec mtime = current_time(inode);
1046 size_t count = iov_iter_count(from); 1046 size_t count = iov_iter_count(from);
1047 1047
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 4de6cdddf059..220dfd87cbfa 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1016,6 +1016,7 @@ static void update_dentry_lease(struct dentry *dentry,
1016 long unsigned ttl = from_time + (duration * HZ) / 1000; 1016 long unsigned ttl = from_time + (duration * HZ) / 1000;
1017 long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000; 1017 long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
1018 struct inode *dir; 1018 struct inode *dir;
1019 struct ceph_mds_session *old_lease_session = NULL;
1019 1020
1020 /* 1021 /*
1021 * Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that 1022 * Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that
@@ -1051,8 +1052,10 @@ static void update_dentry_lease(struct dentry *dentry,
1051 time_before(ttl, di->time)) 1052 time_before(ttl, di->time))
1052 goto out_unlock; /* we already have a newer lease. */ 1053 goto out_unlock; /* we already have a newer lease. */
1053 1054
1054 if (di->lease_session && di->lease_session != session) 1055 if (di->lease_session && di->lease_session != session) {
1055 goto out_unlock; 1056 old_lease_session = di->lease_session;
1057 di->lease_session = NULL;
1058 }
1056 1059
1057 ceph_dentry_lru_touch(dentry); 1060 ceph_dentry_lru_touch(dentry);
1058 1061
@@ -1065,6 +1068,8 @@ static void update_dentry_lease(struct dentry *dentry,
1065 di->time = ttl; 1068 di->time = ttl;
1066out_unlock: 1069out_unlock:
1067 spin_unlock(&dentry->d_lock); 1070 spin_unlock(&dentry->d_lock);
1071 if (old_lease_session)
1072 ceph_put_mds_session(old_lease_session);
1068 return; 1073 return;
1069} 1074}
1070 1075
@@ -1653,20 +1658,17 @@ out:
1653 return err; 1658 return err;
1654} 1659}
1655 1660
1656int ceph_inode_set_size(struct inode *inode, loff_t size) 1661bool ceph_inode_set_size(struct inode *inode, loff_t size)
1657{ 1662{
1658 struct ceph_inode_info *ci = ceph_inode(inode); 1663 struct ceph_inode_info *ci = ceph_inode(inode);
1659 int ret = 0; 1664 bool ret;
1660 1665
1661 spin_lock(&ci->i_ceph_lock); 1666 spin_lock(&ci->i_ceph_lock);
1662 dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size); 1667 dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
1663 i_size_write(inode, size); 1668 i_size_write(inode, size);
1664 inode->i_blocks = calc_inode_blocks(size); 1669 inode->i_blocks = calc_inode_blocks(size);
1665 1670
1666 /* tell the MDS if we are approaching max_size */ 1671 ret = __ceph_should_report_size(ci);
1667 if ((size << 1) >= ci->i_max_size &&
1668 (ci->i_reported_size << 1) < ci->i_max_size)
1669 ret = 1;
1670 1672
1671 spin_unlock(&ci->i_ceph_lock); 1673 spin_unlock(&ci->i_ceph_lock);
1672 return ret; 1674 return ret;
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index 6806dbeaee19..64ae74472046 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -127,6 +127,29 @@ static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
127 dout("ceph_lock_wait_for_completion: request %llu was interrupted\n", 127 dout("ceph_lock_wait_for_completion: request %llu was interrupted\n",
128 req->r_tid); 128 req->r_tid);
129 129
130 mutex_lock(&mdsc->mutex);
131 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
132 err = 0;
133 } else {
134 /*
135 * ensure we aren't running concurrently with
136 * ceph_fill_trace or ceph_readdir_prepopulate, which
137 * rely on locks (dir mutex) held by our caller.
138 */
139 mutex_lock(&req->r_fill_mutex);
140 req->r_err = err;
141 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
142 mutex_unlock(&req->r_fill_mutex);
143
144 if (!req->r_session) {
145 // haven't sent the request
146 err = 0;
147 }
148 }
149 mutex_unlock(&mdsc->mutex);
150 if (!err)
151 return 0;
152
130 intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK, 153 intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK,
131 USE_AUTH_MDS); 154 USE_AUTH_MDS);
132 if (IS_ERR(intr_req)) 155 if (IS_ERR(intr_req))
@@ -146,7 +169,7 @@ static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
146 if (err && err != -ERESTARTSYS) 169 if (err && err != -ERESTARTSYS)
147 return err; 170 return err;
148 171
149 wait_for_completion(&req->r_completion); 172 wait_for_completion_killable(&req->r_safe_completion);
150 return 0; 173 return 0;
151} 174}
152 175
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 0c05df44cc6c..666a9f274832 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -3769,13 +3769,13 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3769void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 3769void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3770{ 3770{
3771 struct ceph_mds_client *mdsc = fsc->mdsc; 3771 struct ceph_mds_client *mdsc = fsc->mdsc;
3772
3773 dout("mdsc_destroy %p\n", mdsc); 3772 dout("mdsc_destroy %p\n", mdsc);
3774 ceph_mdsc_stop(mdsc);
3775 3773
3776 /* flush out any connection work with references to us */ 3774 /* flush out any connection work with references to us */
3777 ceph_msgr_flush(); 3775 ceph_msgr_flush();
3778 3776
3777 ceph_mdsc_stop(mdsc);
3778
3779 fsc->mdsc = NULL; 3779 fsc->mdsc = NULL;
3780 kfree(mdsc); 3780 kfree(mdsc);
3781 dout("mdsc_destroy %p done\n", mdsc); 3781 dout("mdsc_destroy %p done\n", mdsc);
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 8d7918ce694a..aa06a8c24792 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -121,6 +121,7 @@ enum {
121 /* int args above */ 121 /* int args above */
122 Opt_snapdirname, 122 Opt_snapdirname,
123 Opt_mds_namespace, 123 Opt_mds_namespace,
124 Opt_fscache_uniq,
124 Opt_last_string, 125 Opt_last_string,
125 /* string args above */ 126 /* string args above */
126 Opt_dirstat, 127 Opt_dirstat,
@@ -158,6 +159,7 @@ static match_table_t fsopt_tokens = {
158 /* int args above */ 159 /* int args above */
159 {Opt_snapdirname, "snapdirname=%s"}, 160 {Opt_snapdirname, "snapdirname=%s"},
160 {Opt_mds_namespace, "mds_namespace=%s"}, 161 {Opt_mds_namespace, "mds_namespace=%s"},
162 {Opt_fscache_uniq, "fsc=%s"},
161 /* string args above */ 163 /* string args above */
162 {Opt_dirstat, "dirstat"}, 164 {Opt_dirstat, "dirstat"},
163 {Opt_nodirstat, "nodirstat"}, 165 {Opt_nodirstat, "nodirstat"},
@@ -223,6 +225,14 @@ static int parse_fsopt_token(char *c, void *private)
223 if (!fsopt->mds_namespace) 225 if (!fsopt->mds_namespace)
224 return -ENOMEM; 226 return -ENOMEM;
225 break; 227 break;
228 case Opt_fscache_uniq:
229 fsopt->fscache_uniq = kstrndup(argstr[0].from,
230 argstr[0].to-argstr[0].from,
231 GFP_KERNEL);
232 if (!fsopt->fscache_uniq)
233 return -ENOMEM;
234 fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE;
235 break;
226 /* misc */ 236 /* misc */
227 case Opt_wsize: 237 case Opt_wsize:
228 fsopt->wsize = intval; 238 fsopt->wsize = intval;
@@ -317,6 +327,7 @@ static void destroy_mount_options(struct ceph_mount_options *args)
317 kfree(args->snapdir_name); 327 kfree(args->snapdir_name);
318 kfree(args->mds_namespace); 328 kfree(args->mds_namespace);
319 kfree(args->server_path); 329 kfree(args->server_path);
330 kfree(args->fscache_uniq);
320 kfree(args); 331 kfree(args);
321} 332}
322 333
@@ -350,10 +361,12 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
350 ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace); 361 ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace);
351 if (ret) 362 if (ret)
352 return ret; 363 return ret;
353
354 ret = strcmp_null(fsopt1->server_path, fsopt2->server_path); 364 ret = strcmp_null(fsopt1->server_path, fsopt2->server_path);
355 if (ret) 365 if (ret)
356 return ret; 366 return ret;
367 ret = strcmp_null(fsopt1->fscache_uniq, fsopt2->fscache_uniq);
368 if (ret)
369 return ret;
357 370
358 return ceph_compare_options(new_opt, fsc->client); 371 return ceph_compare_options(new_opt, fsc->client);
359} 372}
@@ -475,8 +488,12 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
475 seq_puts(m, ",noasyncreaddir"); 488 seq_puts(m, ",noasyncreaddir");
476 if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0) 489 if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
477 seq_puts(m, ",nodcache"); 490 seq_puts(m, ",nodcache");
478 if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) 491 if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) {
479 seq_puts(m, ",fsc"); 492 if (fsopt->fscache_uniq)
493 seq_printf(m, ",fsc=%s", fsopt->fscache_uniq);
494 else
495 seq_puts(m, ",fsc");
496 }
480 if (fsopt->flags & CEPH_MOUNT_OPT_NOPOOLPERM) 497 if (fsopt->flags & CEPH_MOUNT_OPT_NOPOOLPERM)
481 seq_puts(m, ",nopoolperm"); 498 seq_puts(m, ",nopoolperm");
482 499
@@ -597,18 +614,11 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
597 if (!fsc->wb_pagevec_pool) 614 if (!fsc->wb_pagevec_pool)
598 goto fail_trunc_wq; 615 goto fail_trunc_wq;
599 616
600 /* setup fscache */
601 if ((fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) &&
602 (ceph_fscache_register_fs(fsc) != 0))
603 goto fail_fscache;
604
605 /* caps */ 617 /* caps */
606 fsc->min_caps = fsopt->max_readdir; 618 fsc->min_caps = fsopt->max_readdir;
607 619
608 return fsc; 620 return fsc;
609 621
610fail_fscache:
611 ceph_fscache_unregister_fs(fsc);
612fail_trunc_wq: 622fail_trunc_wq:
613 destroy_workqueue(fsc->trunc_wq); 623 destroy_workqueue(fsc->trunc_wq);
614fail_pg_inv_wq: 624fail_pg_inv_wq:
@@ -626,8 +636,6 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
626{ 636{
627 dout("destroy_fs_client %p\n", fsc); 637 dout("destroy_fs_client %p\n", fsc);
628 638
629 ceph_fscache_unregister_fs(fsc);
630
631 destroy_workqueue(fsc->wb_wq); 639 destroy_workqueue(fsc->wb_wq);
632 destroy_workqueue(fsc->pg_inv_wq); 640 destroy_workqueue(fsc->pg_inv_wq);
633 destroy_workqueue(fsc->trunc_wq); 641 destroy_workqueue(fsc->trunc_wq);
@@ -636,8 +644,6 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
636 644
637 destroy_mount_options(fsc->mount_options); 645 destroy_mount_options(fsc->mount_options);
638 646
639 ceph_fs_debugfs_cleanup(fsc);
640
641 ceph_destroy_client(fsc->client); 647 ceph_destroy_client(fsc->client);
642 648
643 kfree(fsc); 649 kfree(fsc);
@@ -822,6 +828,13 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
822 if (err < 0) 828 if (err < 0)
823 goto out; 829 goto out;
824 830
831 /* setup fscache */
832 if (fsc->mount_options->flags & CEPH_MOUNT_OPT_FSCACHE) {
833 err = ceph_fscache_register_fs(fsc);
834 if (err < 0)
835 goto out;
836 }
837
825 if (!fsc->mount_options->server_path) { 838 if (!fsc->mount_options->server_path) {
826 path = ""; 839 path = "";
827 dout("mount opening path \\t\n"); 840 dout("mount opening path \\t\n");
@@ -1040,6 +1053,12 @@ static void ceph_kill_sb(struct super_block *s)
1040 1053
1041 ceph_mdsc_pre_umount(fsc->mdsc); 1054 ceph_mdsc_pre_umount(fsc->mdsc);
1042 generic_shutdown_super(s); 1055 generic_shutdown_super(s);
1056
1057 fsc->client->extra_mon_dispatch = NULL;
1058 ceph_fs_debugfs_cleanup(fsc);
1059
1060 ceph_fscache_unregister_fs(fsc);
1061
1043 ceph_mdsc_destroy(fsc); 1062 ceph_mdsc_destroy(fsc);
1044 1063
1045 destroy_fs_client(fsc); 1064 destroy_fs_client(fsc);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index a973acd8beaf..f02a2225fe42 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -73,6 +73,7 @@ struct ceph_mount_options {
73 char *snapdir_name; /* default ".snap" */ 73 char *snapdir_name; /* default ".snap" */
74 char *mds_namespace; /* default NULL */ 74 char *mds_namespace; /* default NULL */
75 char *server_path; /* default "/" */ 75 char *server_path; /* default "/" */
76 char *fscache_uniq; /* default NULL */
76}; 77};
77 78
78struct ceph_fs_client { 79struct ceph_fs_client {
@@ -793,7 +794,7 @@ extern int ceph_readdir_prepopulate(struct ceph_mds_request *req,
793 794
794extern int ceph_inode_holds_cap(struct inode *inode, int mask); 795extern int ceph_inode_holds_cap(struct inode *inode, int mask);
795 796
796extern int ceph_inode_set_size(struct inode *inode, loff_t size); 797extern bool ceph_inode_set_size(struct inode *inode, loff_t size);
797extern void __ceph_do_pending_vmtruncate(struct inode *inode); 798extern void __ceph_do_pending_vmtruncate(struct inode *inode);
798extern void ceph_queue_vmtruncate(struct inode *inode); 799extern void ceph_queue_vmtruncate(struct inode *inode);
799 800
@@ -918,6 +919,7 @@ extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
918 struct ceph_snap_context *snapc); 919 struct ceph_snap_context *snapc);
919extern void ceph_flush_snaps(struct ceph_inode_info *ci, 920extern void ceph_flush_snaps(struct ceph_inode_info *ci,
920 struct ceph_mds_session **psession); 921 struct ceph_mds_session **psession);
922extern bool __ceph_should_report_size(struct ceph_inode_info *ci);
921extern void ceph_check_caps(struct ceph_inode_info *ci, int flags, 923extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
922 struct ceph_mds_session *session); 924 struct ceph_mds_session *session);
923extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc); 925extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 75267cdd5dfd..11263f102e4c 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -756,6 +756,9 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
756 /* let's see if a virtual xattr was requested */ 756 /* let's see if a virtual xattr was requested */
757 vxattr = ceph_match_vxattr(inode, name); 757 vxattr = ceph_match_vxattr(inode, name);
758 if (vxattr) { 758 if (vxattr) {
759 err = ceph_do_getattr(inode, 0, true);
760 if (err)
761 return err;
759 err = -ENODATA; 762 err = -ENODATA;
760 if (!(vxattr->exists_cb && !vxattr->exists_cb(ci))) 763 if (!(vxattr->exists_cb && !vxattr->exists_cb(ci)))
761 err = vxattr->getxattr_cb(ci, value, size); 764 err = vxattr->getxattr_cb(ci, value, size);
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index fd8b2953c78f..f0f6c537b64c 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -2,103 +2,174 @@
2#define __CEPH_FEATURES 2#define __CEPH_FEATURES
3 3
4/* 4/*
5 * feature bits 5 * Each time we reclaim bits for reuse we need to specify another bit
6 * that, if present, indicates we have the new incarnation of that
7 * feature. Base case is 1 (first use).
6 */ 8 */
7#define CEPH_FEATURE_UID (1ULL<<0) 9#define CEPH_FEATURE_INCARNATION_1 (0ull)
8#define CEPH_FEATURE_NOSRCADDR (1ULL<<1) 10#define CEPH_FEATURE_INCARNATION_2 (1ull<<57) // CEPH_FEATURE_SERVER_JEWEL
9#define CEPH_FEATURE_MONCLOCKCHECK (1ULL<<2) 11
10#define CEPH_FEATURE_FLOCK (1ULL<<3) 12#define DEFINE_CEPH_FEATURE(bit, incarnation, name) \
11#define CEPH_FEATURE_SUBSCRIBE2 (1ULL<<4) 13 const static uint64_t CEPH_FEATURE_##name = (1ULL<<bit); \
12#define CEPH_FEATURE_MONNAMES (1ULL<<5) 14 const static uint64_t CEPH_FEATUREMASK_##name = \
13#define CEPH_FEATURE_RECONNECT_SEQ (1ULL<<6) 15 (1ULL<<bit | CEPH_FEATURE_INCARNATION_##incarnation);
14#define CEPH_FEATURE_DIRLAYOUTHASH (1ULL<<7) 16
15#define CEPH_FEATURE_OBJECTLOCATOR (1ULL<<8) 17/* this bit is ignored but still advertised by release *when* */
16#define CEPH_FEATURE_PGID64 (1ULL<<9) 18#define DEFINE_CEPH_FEATURE_DEPRECATED(bit, incarnation, name, when) \
17#define CEPH_FEATURE_INCSUBOSDMAP (1ULL<<10) 19 const static uint64_t DEPRECATED_CEPH_FEATURE_##name = (1ULL<<bit); \
18#define CEPH_FEATURE_PGPOOL3 (1ULL<<11) 20 const static uint64_t DEPRECATED_CEPH_FEATUREMASK_##name = \
19#define CEPH_FEATURE_OSDREPLYMUX (1ULL<<12) 21 (1ULL<<bit | CEPH_FEATURE_INCARNATION_##incarnation);
20#define CEPH_FEATURE_OSDENC (1ULL<<13)
21#define CEPH_FEATURE_OMAP (1ULL<<14)
22#define CEPH_FEATURE_MONENC (1ULL<<15)
23#define CEPH_FEATURE_QUERY_T (1ULL<<16)
24#define CEPH_FEATURE_INDEP_PG_MAP (1ULL<<17)
25#define CEPH_FEATURE_CRUSH_TUNABLES (1ULL<<18)
26#define CEPH_FEATURE_CHUNKY_SCRUB (1ULL<<19)
27#define CEPH_FEATURE_MON_NULLROUTE (1ULL<<20)
28#define CEPH_FEATURE_MON_GV (1ULL<<21)
29#define CEPH_FEATURE_BACKFILL_RESERVATION (1ULL<<22)
30#define CEPH_FEATURE_MSG_AUTH (1ULL<<23)
31#define CEPH_FEATURE_RECOVERY_RESERVATION (1ULL<<24)
32#define CEPH_FEATURE_CRUSH_TUNABLES2 (1ULL<<25)
33#define CEPH_FEATURE_CREATEPOOLID (1ULL<<26)
34#define CEPH_FEATURE_REPLY_CREATE_INODE (1ULL<<27)
35#define CEPH_FEATURE_OSD_HBMSGS (1ULL<<28)
36#define CEPH_FEATURE_MDSENC (1ULL<<29)
37#define CEPH_FEATURE_OSDHASHPSPOOL (1ULL<<30)
38#define CEPH_FEATURE_MON_SINGLE_PAXOS (1ULL<<31)
39#define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32)
40#define CEPH_FEATURE_MON_SCRUB (1ULL<<33)
41#define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<34)
42#define CEPH_FEATURE_OSD_CACHEPOOL (1ULL<<35)
43#define CEPH_FEATURE_CRUSH_V2 (1ULL<<36) /* new indep; SET_* steps */
44#define CEPH_FEATURE_EXPORT_PEER (1ULL<<37)
45#define CEPH_FEATURE_OSD_ERASURE_CODES (1ULL<<38)
46#define CEPH_FEATURE_OSD_TMAP2OMAP (1ULL<<38) /* overlap with EC */
47/* The process supports new-style OSDMap encoding. Monitors also use
48 this bit to determine if peers support NAK messages. */
49#define CEPH_FEATURE_OSDMAP_ENC (1ULL<<39)
50#define CEPH_FEATURE_MDS_INLINE_DATA (1ULL<<40)
51#define CEPH_FEATURE_CRUSH_TUNABLES3 (1ULL<<41)
52#define CEPH_FEATURE_OSD_PRIMARY_AFFINITY (1ULL<<41) /* overlap w/ tunables3 */
53#define CEPH_FEATURE_MSGR_KEEPALIVE2 (1ULL<<42)
54#define CEPH_FEATURE_OSD_POOLRESEND (1ULL<<43)
55#define CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2 (1ULL<<44)
56#define CEPH_FEATURE_OSD_SET_ALLOC_HINT (1ULL<<45)
57#define CEPH_FEATURE_OSD_FADVISE_FLAGS (1ULL<<46)
58#define CEPH_FEATURE_OSD_REPOP (1ULL<<46) /* overlap with fadvise */
59#define CEPH_FEATURE_OSD_OBJECT_DIGEST (1ULL<<46) /* overlap with fadvise */
60#define CEPH_FEATURE_OSD_TRANSACTION_MAY_LAYOUT (1ULL<<46) /* overlap w/ fadvise */
61#define CEPH_FEATURE_MDS_QUOTA (1ULL<<47)
62#define CEPH_FEATURE_CRUSH_V4 (1ULL<<48) /* straw2 buckets */
63#define CEPH_FEATURE_OSD_MIN_SIZE_RECOVERY (1ULL<<49)
64// duplicated since it was introduced at the same time as MIN_SIZE_RECOVERY
65#define CEPH_FEATURE_OSD_PROXY_FEATURES (1ULL<<49) /* overlap w/ above */
66#define CEPH_FEATURE_MON_METADATA (1ULL<<50)
67#define CEPH_FEATURE_OSD_BITWISE_HOBJ_SORT (1ULL<<51) /* can sort objs bitwise */
68#define CEPH_FEATURE_OSD_PROXY_WRITE_FEATURES (1ULL<<52)
69#define CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3 (1ULL<<53)
70#define CEPH_FEATURE_OSD_HITSET_GMT (1ULL<<54)
71#define CEPH_FEATURE_HAMMER_0_94_4 (1ULL<<55)
72#define CEPH_FEATURE_NEW_OSDOP_ENCODING (1ULL<<56) /* New, v7 encoding */
73#define CEPH_FEATURE_MON_STATEFUL_SUB (1ULL<<57) /* stateful mon subscription */
74#define CEPH_FEATURE_MON_ROUTE_OSDMAP (1ULL<<57) /* peon sends osdmaps */
75#define CEPH_FEATURE_CRUSH_TUNABLES5 (1ULL<<58) /* chooseleaf stable mode */
76// duplicated since it was introduced at the same time as CEPH_FEATURE_CRUSH_TUNABLES5
77#define CEPH_FEATURE_NEW_OSDOPREPLY_ENCODING (1ULL<<58) /* New, v7 encoding */
78#define CEPH_FEATURE_FS_FILE_LAYOUT_V2 (1ULL<<58) /* file_layout_t */
79 22
80/* 23/*
81 * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature 24 * this bit is ignored by release *unused* and not advertised by
82 * vector to evaluate to 64 bit ~0. To cope, we designate 1ULL << 63 25 * release *unadvertised*
83 * to mean 33 bit ~0, and introduce a helper below to do the 26 */
84 * translation. 27#define DEFINE_CEPH_FEATURE_RETIRED(bit, inc, name, unused, unadvertised)
28
29
30/*
31 * test for a feature. this test is safer than a typical mask against
32 * the bit because it ensures that we have the bit AND the marker for the
33 * bit's incarnation. this must be used in any case where the features
34 * bits may include an old meaning of the bit.
35 */
36#define CEPH_HAVE_FEATURE(x, name) \
37 (((x) & (CEPH_FEATUREMASK_##name)) == (CEPH_FEATUREMASK_##name))
38
39
40/*
41 * Notes on deprecation:
42 *
43 * A *major* release is a release through which all upgrades must pass
44 * (e.g., jewel). For example, no pre-jewel server will ever talk to
45 * a post-jewel server (mon, osd, etc).
46 *
47 * For feature bits used *only* on the server-side:
48 *
49 * - In the first phase we indicate that a feature is DEPRECATED as of
50 * a particular release. This is the first major release X (say,
51 * jewel) that does not depend on its peers advertising the feature.
52 * That is, it safely assumes its peers all have the feature. We
53 * indicate this with the DEPRECATED macro. For example,
54 *
55 * DEFINE_CEPH_FEATURE_DEPRECATED( 2, 1, MONCLOCKCHECK, JEWEL)
56 *
57 * because 10.2.z (jewel) did not care if its peers advertised this
58 * feature bit.
59 *
60 * - In the second phase we stop advertising the the bit and call it
61 * RETIRED. This can normally be done in the *next* major release
62 * following the one in which we marked the feature DEPRECATED. In
63 * the above example, for 12.0.z (luminous) we can say:
64 *
65 * DEFINE_CEPH_FEATURE_RETIRED( 2, 1, MONCLOCKCHECK, JEWEL, LUMINOUS)
85 * 66 *
86 * This was introduced by ceph.git commit 67 * - The bit can be reused in the first post-luminous release, 13.0.z
87 * 9ea02b84104045c2ffd7e7f4e7af512953855ecd v0.58-657-g9ea02b8 68 * (m).
88 * and fixed by ceph.git commit 69 *
89 * 4255b5c2fb54ae40c53284b3ab700fdfc7e61748 v0.65-263-g4255b5c 70 * This ensures that no two versions who have different meanings for
71 * the bit ever speak to each other.
90 */ 72 */
91#define CEPH_FEATURE_RESERVED (1ULL<<63) 73
92 74DEFINE_CEPH_FEATURE( 0, 1, UID)
93static inline u64 ceph_sanitize_features(u64 features) 75DEFINE_CEPH_FEATURE( 1, 1, NOSRCADDR)
94{ 76DEFINE_CEPH_FEATURE_RETIRED( 2, 1, MONCLOCKCHECK, JEWEL, LUMINOUS)
95 if (features & CEPH_FEATURE_RESERVED) { 77
96 /* everything through OSD_SNAPMAPPER */ 78DEFINE_CEPH_FEATURE( 3, 1, FLOCK)
97 return 0x1ffffffffull; 79DEFINE_CEPH_FEATURE( 4, 1, SUBSCRIBE2)
98 } else { 80DEFINE_CEPH_FEATURE( 5, 1, MONNAMES)
99 return features; 81DEFINE_CEPH_FEATURE( 6, 1, RECONNECT_SEQ)
100 } 82DEFINE_CEPH_FEATURE( 7, 1, DIRLAYOUTHASH)
101} 83DEFINE_CEPH_FEATURE( 8, 1, OBJECTLOCATOR)
84DEFINE_CEPH_FEATURE( 9, 1, PGID64)
85DEFINE_CEPH_FEATURE(10, 1, INCSUBOSDMAP)
86DEFINE_CEPH_FEATURE(11, 1, PGPOOL3)
87DEFINE_CEPH_FEATURE(12, 1, OSDREPLYMUX)
88DEFINE_CEPH_FEATURE(13, 1, OSDENC)
89DEFINE_CEPH_FEATURE_RETIRED(14, 1, OMAP, HAMMER, JEWEL)
90DEFINE_CEPH_FEATURE(14, 2, SERVER_KRAKEN)
91DEFINE_CEPH_FEATURE(15, 1, MONENC)
92DEFINE_CEPH_FEATURE_RETIRED(16, 1, QUERY_T, JEWEL, LUMINOUS)
93
94DEFINE_CEPH_FEATURE_RETIRED(17, 1, INDEP_PG_MAP, JEWEL, LUMINOUS)
95
96DEFINE_CEPH_FEATURE(18, 1, CRUSH_TUNABLES)
97DEFINE_CEPH_FEATURE_RETIRED(19, 1, CHUNKY_SCRUB, JEWEL, LUMINOUS)
98
99DEFINE_CEPH_FEATURE_RETIRED(20, 1, MON_NULLROUTE, JEWEL, LUMINOUS)
100
101DEFINE_CEPH_FEATURE_RETIRED(21, 1, MON_GV, HAMMER, JEWEL)
102DEFINE_CEPH_FEATURE(21, 2, SERVER_LUMINOUS)
103DEFINE_CEPH_FEATURE(21, 2, RESEND_ON_SPLIT) // overlap
104DEFINE_CEPH_FEATURE(21, 2, RADOS_BACKOFF) // overlap
105DEFINE_CEPH_FEATURE(21, 2, OSDMAP_PG_UPMAP) // overlap
106DEFINE_CEPH_FEATURE(21, 2, CRUSH_CHOOSE_ARGS) // overlap
107DEFINE_CEPH_FEATURE_RETIRED(22, 1, BACKFILL_RESERVATION, JEWEL, LUMINOUS)
108
109DEFINE_CEPH_FEATURE(23, 1, MSG_AUTH)
110DEFINE_CEPH_FEATURE_RETIRED(24, 1, RECOVERY_RESERVATION, JEWEL, LUNINOUS)
111
112DEFINE_CEPH_FEATURE(25, 1, CRUSH_TUNABLES2)
113DEFINE_CEPH_FEATURE(26, 1, CREATEPOOLID)
114DEFINE_CEPH_FEATURE(27, 1, REPLY_CREATE_INODE)
115DEFINE_CEPH_FEATURE_RETIRED(28, 1, OSD_HBMSGS, HAMMER, JEWEL)
116DEFINE_CEPH_FEATURE(28, 2, SERVER_M)
117DEFINE_CEPH_FEATURE(29, 1, MDSENC)
118DEFINE_CEPH_FEATURE(30, 1, OSDHASHPSPOOL)
119DEFINE_CEPH_FEATURE(31, 1, MON_SINGLE_PAXOS) // deprecate me
120DEFINE_CEPH_FEATURE_RETIRED(32, 1, OSD_SNAPMAPPER, JEWEL, LUMINOUS)
121
122DEFINE_CEPH_FEATURE_RETIRED(33, 1, MON_SCRUB, JEWEL, LUMINOUS)
123
124DEFINE_CEPH_FEATURE_RETIRED(34, 1, OSD_PACKED_RECOVERY, JEWEL, LUMINOUS)
125
126DEFINE_CEPH_FEATURE(35, 1, OSD_CACHEPOOL)
127DEFINE_CEPH_FEATURE(36, 1, CRUSH_V2)
128DEFINE_CEPH_FEATURE(37, 1, EXPORT_PEER)
129DEFINE_CEPH_FEATURE(38, 1, OSD_ERASURE_CODES)
130DEFINE_CEPH_FEATURE(38, 1, OSD_OSD_TMAP2OMAP) // overlap
131DEFINE_CEPH_FEATURE(39, 1, OSDMAP_ENC)
132DEFINE_CEPH_FEATURE(40, 1, MDS_INLINE_DATA)
133DEFINE_CEPH_FEATURE(41, 1, CRUSH_TUNABLES3)
134DEFINE_CEPH_FEATURE(41, 1, OSD_PRIMARY_AFFINITY) // overlap
135DEFINE_CEPH_FEATURE(42, 1, MSGR_KEEPALIVE2)
136DEFINE_CEPH_FEATURE(43, 1, OSD_POOLRESEND)
137DEFINE_CEPH_FEATURE(44, 1, ERASURE_CODE_PLUGINS_V2)
138DEFINE_CEPH_FEATURE_RETIRED(45, 1, OSD_SET_ALLOC_HINT, JEWEL, LUMINOUS)
139
140DEFINE_CEPH_FEATURE(46, 1, OSD_FADVISE_FLAGS)
141DEFINE_CEPH_FEATURE_RETIRED(46, 1, OSD_REPOP, JEWEL, LUMINOUS) // overlap
142DEFINE_CEPH_FEATURE_RETIRED(46, 1, OSD_OBJECT_DIGEST, JEWEL, LUMINOUS) // overlap
143DEFINE_CEPH_FEATURE_RETIRED(46, 1, OSD_TRANSACTION_MAY_LAYOUT, JEWEL, LUMINOUS) // overlap
144
145DEFINE_CEPH_FEATURE(47, 1, MDS_QUOTA)
146DEFINE_CEPH_FEATURE(48, 1, CRUSH_V4)
147DEFINE_CEPH_FEATURE_RETIRED(49, 1, OSD_MIN_SIZE_RECOVERY, JEWEL, LUMINOUS)
148DEFINE_CEPH_FEATURE_RETIRED(49, 1, OSD_PROXY_FEATURES, JEWEL, LUMINOUS) // overlap
149
150DEFINE_CEPH_FEATURE(50, 1, MON_METADATA)
151DEFINE_CEPH_FEATURE(51, 1, OSD_BITWISE_HOBJ_SORT)
152DEFINE_CEPH_FEATURE(52, 1, OSD_PROXY_WRITE_FEATURES)
153DEFINE_CEPH_FEATURE(53, 1, ERASURE_CODE_PLUGINS_V3)
154DEFINE_CEPH_FEATURE(54, 1, OSD_HITSET_GMT)
155DEFINE_CEPH_FEATURE(55, 1, HAMMER_0_94_4)
156DEFINE_CEPH_FEATURE(56, 1, NEW_OSDOP_ENCODING)
157DEFINE_CEPH_FEATURE(57, 1, MON_STATEFUL_SUB)
158DEFINE_CEPH_FEATURE(57, 1, MON_ROUTE_OSDMAP) // overlap
159DEFINE_CEPH_FEATURE(57, 1, OSDSUBOP_NO_SNAPCONTEXT) // overlap
160DEFINE_CEPH_FEATURE(57, 1, SERVER_JEWEL) // overlap
161DEFINE_CEPH_FEATURE(58, 1, CRUSH_TUNABLES5)
162DEFINE_CEPH_FEATURE(58, 1, NEW_OSDOPREPLY_ENCODING) // overlap
163DEFINE_CEPH_FEATURE(58, 1, FS_FILE_LAYOUT_V2) // overlap
164DEFINE_CEPH_FEATURE(59, 1, FS_BTIME)
165DEFINE_CEPH_FEATURE(59, 1, FS_CHANGE_ATTR) // overlap
166DEFINE_CEPH_FEATURE(59, 1, MSG_ADDR2) // overlap
167DEFINE_CEPH_FEATURE(60, 1, BLKIN_TRACING) // *do not share this bit*
168
169DEFINE_CEPH_FEATURE(61, 1, RESERVED2) // unused, but slow down!
170DEFINE_CEPH_FEATURE(62, 1, RESERVED) // do not use; used as a sentinal
171DEFINE_CEPH_FEATURE_DEPRECATED(63, 1, RESERVED_BROKEN, LUMINOUS) // client-facing
172
102 173
103/* 174/*
104 * Features supported. 175 * Features supported.
@@ -113,6 +184,11 @@ static inline u64 ceph_sanitize_features(u64 features)
113 CEPH_FEATURE_PGPOOL3 | \ 184 CEPH_FEATURE_PGPOOL3 | \
114 CEPH_FEATURE_OSDENC | \ 185 CEPH_FEATURE_OSDENC | \
115 CEPH_FEATURE_CRUSH_TUNABLES | \ 186 CEPH_FEATURE_CRUSH_TUNABLES | \
187 CEPH_FEATURE_SERVER_LUMINOUS | \
188 CEPH_FEATURE_RESEND_ON_SPLIT | \
189 CEPH_FEATURE_RADOS_BACKOFF | \
190 CEPH_FEATURE_OSDMAP_PG_UPMAP | \
191 CEPH_FEATURE_CRUSH_CHOOSE_ARGS | \
116 CEPH_FEATURE_MSG_AUTH | \ 192 CEPH_FEATURE_MSG_AUTH | \
117 CEPH_FEATURE_CRUSH_TUNABLES2 | \ 193 CEPH_FEATURE_CRUSH_TUNABLES2 | \
118 CEPH_FEATURE_REPLY_CREATE_INODE | \ 194 CEPH_FEATURE_REPLY_CREATE_INODE | \
@@ -126,7 +202,11 @@ static inline u64 ceph_sanitize_features(u64 features)
126 CEPH_FEATURE_CRUSH_TUNABLES3 | \ 202 CEPH_FEATURE_CRUSH_TUNABLES3 | \
127 CEPH_FEATURE_OSD_PRIMARY_AFFINITY | \ 203 CEPH_FEATURE_OSD_PRIMARY_AFFINITY | \
128 CEPH_FEATURE_MSGR_KEEPALIVE2 | \ 204 CEPH_FEATURE_MSGR_KEEPALIVE2 | \
205 CEPH_FEATURE_OSD_POOLRESEND | \
129 CEPH_FEATURE_CRUSH_V4 | \ 206 CEPH_FEATURE_CRUSH_V4 | \
207 CEPH_FEATURE_NEW_OSDOP_ENCODING | \
208 CEPH_FEATURE_SERVER_JEWEL | \
209 CEPH_FEATURE_MON_STATEFUL_SUB | \
130 CEPH_FEATURE_CRUSH_TUNABLES5 | \ 210 CEPH_FEATURE_CRUSH_TUNABLES5 | \
131 CEPH_FEATURE_NEW_OSDOPREPLY_ENCODING) 211 CEPH_FEATURE_NEW_OSDOPREPLY_ENCODING)
132 212
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index ad078ebe25d6..edf5b04b918a 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -147,6 +147,7 @@ struct ceph_dir_layout {
147#define CEPH_MSG_OSD_OP 42 147#define CEPH_MSG_OSD_OP 42
148#define CEPH_MSG_OSD_OPREPLY 43 148#define CEPH_MSG_OSD_OPREPLY 43
149#define CEPH_MSG_WATCH_NOTIFY 44 149#define CEPH_MSG_WATCH_NOTIFY 44
150#define CEPH_MSG_OSD_BACKOFF 61
150 151
151 152
152/* watch-notify operations */ 153/* watch-notify operations */
diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
index f990f2cc907a..14af9b70d301 100644
--- a/include/linux/ceph/decode.h
+++ b/include/linux/ceph/decode.h
@@ -133,6 +133,66 @@ bad:
133} 133}
134 134
135/* 135/*
136 * skip helpers
137 */
138#define ceph_decode_skip_n(p, end, n, bad) \
139 do { \
140 ceph_decode_need(p, end, n, bad); \
141 *p += n; \
142 } while (0)
143
144#define ceph_decode_skip_64(p, end, bad) \
145ceph_decode_skip_n(p, end, sizeof(u64), bad)
146
147#define ceph_decode_skip_32(p, end, bad) \
148ceph_decode_skip_n(p, end, sizeof(u32), bad)
149
150#define ceph_decode_skip_16(p, end, bad) \
151ceph_decode_skip_n(p, end, sizeof(u16), bad)
152
153#define ceph_decode_skip_8(p, end, bad) \
154ceph_decode_skip_n(p, end, sizeof(u8), bad)
155
156#define ceph_decode_skip_string(p, end, bad) \
157 do { \
158 u32 len; \
159 \
160 ceph_decode_32_safe(p, end, len, bad); \
161 ceph_decode_skip_n(p, end, len, bad); \
162 } while (0)
163
164#define ceph_decode_skip_set(p, end, type, bad) \
165 do { \
166 u32 len; \
167 \
168 ceph_decode_32_safe(p, end, len, bad); \
169 while (len--) \
170 ceph_decode_skip_##type(p, end, bad); \
171 } while (0)
172
173#define ceph_decode_skip_map(p, end, ktype, vtype, bad) \
174 do { \
175 u32 len; \
176 \
177 ceph_decode_32_safe(p, end, len, bad); \
178 while (len--) { \
179 ceph_decode_skip_##ktype(p, end, bad); \
180 ceph_decode_skip_##vtype(p, end, bad); \
181 } \
182 } while (0)
183
184#define ceph_decode_skip_map_of_map(p, end, ktype1, ktype2, vtype2, bad) \
185 do { \
186 u32 len; \
187 \
188 ceph_decode_32_safe(p, end, len, bad); \
189 while (len--) { \
190 ceph_decode_skip_##ktype1(p, end, bad); \
191 ceph_decode_skip_map(p, end, ktype2, vtype2, bad); \
192 } \
193 } while (0)
194
195/*
136 * struct ceph_timespec <-> struct timespec 196 * struct ceph_timespec <-> struct timespec
137 */ 197 */
138static inline void ceph_decode_timespec(struct timespec *ts, 198static inline void ceph_decode_timespec(struct timespec *ts,
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 3229ae6c7846..8a79587e1317 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -184,10 +184,11 @@ static inline int calc_pages_for(u64 off, u64 len)
184 (off >> PAGE_SHIFT); 184 (off >> PAGE_SHIFT);
185} 185}
186 186
187/* 187#define RB_BYVAL(a) (a)
188 * These are not meant to be generic - an integer key is assumed. 188#define RB_BYPTR(a) (&(a))
189 */ 189#define RB_CMP3WAY(a, b) ((a) < (b) ? -1 : (a) > (b))
190#define DEFINE_RB_INSDEL_FUNCS(name, type, keyfld, nodefld) \ 190
191#define DEFINE_RB_INSDEL_FUNCS2(name, type, keyfld, cmpexp, keyexp, nodefld) \
191static void insert_##name(struct rb_root *root, type *t) \ 192static void insert_##name(struct rb_root *root, type *t) \
192{ \ 193{ \
193 struct rb_node **n = &root->rb_node; \ 194 struct rb_node **n = &root->rb_node; \
@@ -197,11 +198,13 @@ static void insert_##name(struct rb_root *root, type *t) \
197 \ 198 \
198 while (*n) { \ 199 while (*n) { \
199 type *cur = rb_entry(*n, type, nodefld); \ 200 type *cur = rb_entry(*n, type, nodefld); \
201 int cmp; \
200 \ 202 \
201 parent = *n; \ 203 parent = *n; \
202 if (t->keyfld < cur->keyfld) \ 204 cmp = cmpexp(keyexp(t->keyfld), keyexp(cur->keyfld)); \
205 if (cmp < 0) \
203 n = &(*n)->rb_left; \ 206 n = &(*n)->rb_left; \
204 else if (t->keyfld > cur->keyfld) \ 207 else if (cmp > 0) \
205 n = &(*n)->rb_right; \ 208 n = &(*n)->rb_right; \
206 else \ 209 else \
207 BUG(); \ 210 BUG(); \
@@ -217,19 +220,24 @@ static void erase_##name(struct rb_root *root, type *t) \
217 RB_CLEAR_NODE(&t->nodefld); \ 220 RB_CLEAR_NODE(&t->nodefld); \
218} 221}
219 222
220#define DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld) \ 223/*
221extern type __lookup_##name##_key; \ 224 * @lookup_param_type is a parameter and not constructed from (@type,
222static type *lookup_##name(struct rb_root *root, \ 225 * @keyfld) with typeof() because adding const is too unwieldy.
223 typeof(__lookup_##name##_key.keyfld) key) \ 226 */
227#define DEFINE_RB_LOOKUP_FUNC2(name, type, keyfld, cmpexp, keyexp, \
228 lookup_param_type, nodefld) \
229static type *lookup_##name(struct rb_root *root, lookup_param_type key) \
224{ \ 230{ \
225 struct rb_node *n = root->rb_node; \ 231 struct rb_node *n = root->rb_node; \
226 \ 232 \
227 while (n) { \ 233 while (n) { \
228 type *cur = rb_entry(n, type, nodefld); \ 234 type *cur = rb_entry(n, type, nodefld); \
235 int cmp; \
229 \ 236 \
230 if (key < cur->keyfld) \ 237 cmp = cmpexp(key, keyexp(cur->keyfld)); \
238 if (cmp < 0) \
231 n = n->rb_left; \ 239 n = n->rb_left; \
232 else if (key > cur->keyfld) \ 240 else if (cmp > 0) \
233 n = n->rb_right; \ 241 n = n->rb_right; \
234 else \ 242 else \
235 return cur; \ 243 return cur; \
@@ -238,6 +246,23 @@ static type *lookup_##name(struct rb_root *root, \
238 return NULL; \ 246 return NULL; \
239} 247}
240 248
249#define DEFINE_RB_FUNCS2(name, type, keyfld, cmpexp, keyexp, \
250 lookup_param_type, nodefld) \
251DEFINE_RB_INSDEL_FUNCS2(name, type, keyfld, cmpexp, keyexp, nodefld) \
252DEFINE_RB_LOOKUP_FUNC2(name, type, keyfld, cmpexp, keyexp, \
253 lookup_param_type, nodefld)
254
255/*
256 * Shorthands for integer keys.
257 */
258#define DEFINE_RB_INSDEL_FUNCS(name, type, keyfld, nodefld) \
259DEFINE_RB_INSDEL_FUNCS2(name, type, keyfld, RB_CMP3WAY, RB_BYVAL, nodefld)
260
261#define DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld) \
262extern type __lookup_##name##_key; \
263DEFINE_RB_LOOKUP_FUNC2(name, type, keyfld, RB_CMP3WAY, RB_BYVAL, \
264 typeof(__lookup_##name##_key.keyfld), nodefld)
265
241#define DEFINE_RB_FUNCS(name, type, keyfld, nodefld) \ 266#define DEFINE_RB_FUNCS(name, type, keyfld, nodefld) \
242DEFINE_RB_INSDEL_FUNCS(name, type, keyfld, nodefld) \ 267DEFINE_RB_INSDEL_FUNCS(name, type, keyfld, nodefld) \
243DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld) 268DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index c5c4c713e00f..fbd94d9fa5dd 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -44,6 +44,8 @@ struct ceph_connection_operations {
44 struct ceph_msg_header *hdr, 44 struct ceph_msg_header *hdr,
45 int *skip); 45 int *skip);
46 46
47 void (*reencode_message) (struct ceph_msg *msg);
48
47 int (*sign_message) (struct ceph_msg *msg); 49 int (*sign_message) (struct ceph_msg *msg);
48 int (*check_message_signature) (struct ceph_msg *msg); 50 int (*check_message_signature) (struct ceph_msg *msg);
49}; 51};
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 85650b415e73..c6d96a5f46fd 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -1,6 +1,7 @@
1#ifndef _FS_CEPH_OSD_CLIENT_H 1#ifndef _FS_CEPH_OSD_CLIENT_H
2#define _FS_CEPH_OSD_CLIENT_H 2#define _FS_CEPH_OSD_CLIENT_H
3 3
4#include <linux/bitrev.h>
4#include <linux/completion.h> 5#include <linux/completion.h>
5#include <linux/kref.h> 6#include <linux/kref.h>
6#include <linux/mempool.h> 7#include <linux/mempool.h>
@@ -36,6 +37,8 @@ struct ceph_osd {
36 struct ceph_connection o_con; 37 struct ceph_connection o_con;
37 struct rb_root o_requests; 38 struct rb_root o_requests;
38 struct rb_root o_linger_requests; 39 struct rb_root o_linger_requests;
40 struct rb_root o_backoff_mappings;
41 struct rb_root o_backoffs_by_id;
39 struct list_head o_osd_lru; 42 struct list_head o_osd_lru;
40 struct ceph_auth_handshake o_auth; 43 struct ceph_auth_handshake o_auth;
41 unsigned long lru_ttl; 44 unsigned long lru_ttl;
@@ -136,7 +139,8 @@ struct ceph_osd_request_target {
136 struct ceph_object_id target_oid; 139 struct ceph_object_id target_oid;
137 struct ceph_object_locator target_oloc; 140 struct ceph_object_locator target_oloc;
138 141
139 struct ceph_pg pgid; 142 struct ceph_pg pgid; /* last raw pg we mapped to */
143 struct ceph_spg spgid; /* last actual spg we mapped to */
140 u32 pg_num; 144 u32 pg_num;
141 u32 pg_num_mask; 145 u32 pg_num_mask;
142 struct ceph_osds acting; 146 struct ceph_osds acting;
@@ -148,6 +152,9 @@ struct ceph_osd_request_target {
148 unsigned int flags; /* CEPH_OSD_FLAG_* */ 152 unsigned int flags; /* CEPH_OSD_FLAG_* */
149 bool paused; 153 bool paused;
150 154
155 u32 epoch;
156 u32 last_force_resend;
157
151 int osd; 158 int osd;
152}; 159};
153 160
@@ -193,7 +200,6 @@ struct ceph_osd_request {
193 unsigned long r_stamp; /* jiffies, send or check time */ 200 unsigned long r_stamp; /* jiffies, send or check time */
194 unsigned long r_start_stamp; /* jiffies */ 201 unsigned long r_start_stamp; /* jiffies */
195 int r_attempts; 202 int r_attempts;
196 u32 r_last_force_resend;
197 u32 r_map_dne_bound; 203 u32 r_map_dne_bound;
198 204
199 struct ceph_osd_req_op r_ops[]; 205 struct ceph_osd_req_op r_ops[];
@@ -203,6 +209,23 @@ struct ceph_request_redirect {
203 struct ceph_object_locator oloc; 209 struct ceph_object_locator oloc;
204}; 210};
205 211
212/*
213 * osd request identifier
214 *
215 * caller name + incarnation# + tid to unique identify this request
216 */
217struct ceph_osd_reqid {
218 struct ceph_entity_name name;
219 __le64 tid;
220 __le32 inc;
221} __packed;
222
223struct ceph_blkin_trace_info {
224 __le64 trace_id;
225 __le64 span_id;
226 __le64 parent_span_id;
227} __packed;
228
206typedef void (*rados_watchcb2_t)(void *arg, u64 notify_id, u64 cookie, 229typedef void (*rados_watchcb2_t)(void *arg, u64 notify_id, u64 cookie,
207 u64 notifier_id, void *data, size_t data_len); 230 u64 notifier_id, void *data, size_t data_len);
208typedef void (*rados_watcherrcb_t)(void *arg, u64 cookie, int err); 231typedef void (*rados_watcherrcb_t)(void *arg, u64 cookie, int err);
@@ -221,7 +244,6 @@ struct ceph_osd_linger_request {
221 struct list_head pending_lworks; 244 struct list_head pending_lworks;
222 245
223 struct ceph_osd_request_target t; 246 struct ceph_osd_request_target t;
224 u32 last_force_resend;
225 u32 map_dne_bound; 247 u32 map_dne_bound;
226 248
227 struct timespec mtime; 249 struct timespec mtime;
@@ -256,6 +278,48 @@ struct ceph_watch_item {
256 struct ceph_entity_addr addr; 278 struct ceph_entity_addr addr;
257}; 279};
258 280
281struct ceph_spg_mapping {
282 struct rb_node node;
283 struct ceph_spg spgid;
284
285 struct rb_root backoffs;
286};
287
288struct ceph_hobject_id {
289 void *key;
290 size_t key_len;
291 void *oid;
292 size_t oid_len;
293 u64 snapid;
294 u32 hash;
295 u8 is_max;
296 void *nspace;
297 size_t nspace_len;
298 s64 pool;
299
300 /* cache */
301 u32 hash_reverse_bits;
302};
303
304static inline void ceph_hoid_build_hash_cache(struct ceph_hobject_id *hoid)
305{
306 hoid->hash_reverse_bits = bitrev32(hoid->hash);
307}
308
309/*
310 * PG-wide backoff: [begin, end)
311 * per-object backoff: begin == end
312 */
313struct ceph_osd_backoff {
314 struct rb_node spg_node;
315 struct rb_node id_node;
316
317 struct ceph_spg spgid;
318 u64 id;
319 struct ceph_hobject_id *begin;
320 struct ceph_hobject_id *end;
321};
322
259#define CEPH_LINGER_ID_START 0xffff000000000000ULL 323#define CEPH_LINGER_ID_START 0xffff000000000000ULL
260 324
261struct ceph_osd_client { 325struct ceph_osd_client {
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index 938656f70807..a0996cb9faed 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -24,7 +24,15 @@ struct ceph_pg {
24 uint32_t seed; 24 uint32_t seed;
25}; 25};
26 26
27#define CEPH_SPG_NOSHARD -1
28
29struct ceph_spg {
30 struct ceph_pg pgid;
31 s8 shard;
32};
33
27int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs); 34int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs);
35int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs);
28 36
29#define CEPH_POOL_FLAG_HASHPSPOOL (1ULL << 0) /* hash pg seed and pool id 37#define CEPH_POOL_FLAG_HASHPSPOOL (1ULL << 0) /* hash pg seed and pool id
30 together */ 38 together */
@@ -135,10 +143,14 @@ struct ceph_pg_mapping {
135 struct { 143 struct {
136 int len; 144 int len;
137 int osds[]; 145 int osds[];
138 } pg_temp; 146 } pg_temp, pg_upmap;
139 struct { 147 struct {
140 int osd; 148 int osd;
141 } primary_temp; 149 } primary_temp;
150 struct {
151 int len;
152 int from_to[][2];
153 } pg_upmap_items;
142 }; 154 };
143}; 155};
144 156
@@ -150,13 +162,17 @@ struct ceph_osdmap {
150 u32 flags; /* CEPH_OSDMAP_* */ 162 u32 flags; /* CEPH_OSDMAP_* */
151 163
152 u32 max_osd; /* size of osd_state, _offload, _addr arrays */ 164 u32 max_osd; /* size of osd_state, _offload, _addr arrays */
153 u8 *osd_state; /* CEPH_OSD_* */ 165 u32 *osd_state; /* CEPH_OSD_* */
154 u32 *osd_weight; /* 0 = failed, 0x10000 = 100% normal */ 166 u32 *osd_weight; /* 0 = failed, 0x10000 = 100% normal */
155 struct ceph_entity_addr *osd_addr; 167 struct ceph_entity_addr *osd_addr;
156 168
157 struct rb_root pg_temp; 169 struct rb_root pg_temp;
158 struct rb_root primary_temp; 170 struct rb_root primary_temp;
159 171
172 /* remap (post-CRUSH, pre-up) */
173 struct rb_root pg_upmap; /* PG := raw set */
174 struct rb_root pg_upmap_items; /* from -> to within raw set */
175
160 u32 *osd_primary_affinity; 176 u32 *osd_primary_affinity;
161 177
162 struct rb_root pg_pools; 178 struct rb_root pg_pools;
@@ -187,7 +203,7 @@ static inline bool ceph_osd_is_down(struct ceph_osdmap *map, int osd)
187 return !ceph_osd_is_up(map, osd); 203 return !ceph_osd_is_up(map, osd);
188} 204}
189 205
190extern char *ceph_osdmap_state_str(char *str, int len, int state); 206char *ceph_osdmap_state_str(char *str, int len, u32 state);
191extern u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd); 207extern u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd);
192 208
193static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map, 209static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
@@ -198,11 +214,13 @@ static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
198 return &map->osd_addr[osd]; 214 return &map->osd_addr[osd];
199} 215}
200 216
217#define CEPH_PGID_ENCODING_LEN (1 + 8 + 4 + 4)
218
201static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid) 219static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid)
202{ 220{
203 __u8 version; 221 __u8 version;
204 222
205 if (!ceph_has_room(p, end, 1 + 8 + 4 + 4)) { 223 if (!ceph_has_room(p, end, CEPH_PGID_ENCODING_LEN)) {
206 pr_warn("incomplete pg encoding\n"); 224 pr_warn("incomplete pg encoding\n");
207 return -EINVAL; 225 return -EINVAL;
208 } 226 }
@@ -240,6 +258,8 @@ static inline void ceph_osds_init(struct ceph_osds *set)
240 258
241void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src); 259void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src);
242 260
261bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
262 u32 new_pg_num);
243bool ceph_is_new_interval(const struct ceph_osds *old_acting, 263bool ceph_is_new_interval(const struct ceph_osds *old_acting,
244 const struct ceph_osds *new_acting, 264 const struct ceph_osds *new_acting,
245 const struct ceph_osds *old_up, 265 const struct ceph_osds *old_up,
@@ -262,15 +282,24 @@ extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
262 u64 off, u64 len, 282 u64 off, u64 len,
263 u64 *bno, u64 *oxoff, u64 *oxlen); 283 u64 *bno, u64 *oxoff, u64 *oxlen);
264 284
285int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
286 const struct ceph_object_id *oid,
287 const struct ceph_object_locator *oloc,
288 struct ceph_pg *raw_pgid);
265int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, 289int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
266 struct ceph_object_id *oid, 290 const struct ceph_object_id *oid,
267 struct ceph_object_locator *oloc, 291 const struct ceph_object_locator *oloc,
268 struct ceph_pg *raw_pgid); 292 struct ceph_pg *raw_pgid);
269 293
270void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap, 294void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
295 struct ceph_pg_pool_info *pi,
271 const struct ceph_pg *raw_pgid, 296 const struct ceph_pg *raw_pgid,
272 struct ceph_osds *up, 297 struct ceph_osds *up,
273 struct ceph_osds *acting); 298 struct ceph_osds *acting);
299bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
300 struct ceph_pg_pool_info *pi,
301 const struct ceph_pg *raw_pgid,
302 struct ceph_spg *spgid);
274int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap, 303int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
275 const struct ceph_pg *raw_pgid); 304 const struct ceph_pg *raw_pgid);
276 305
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 5d0018782d50..385db08bb8b2 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -439,6 +439,12 @@ enum {
439 439
440const char *ceph_osd_watch_op_name(int o); 440const char *ceph_osd_watch_op_name(int o);
441 441
442enum {
443 CEPH_OSD_BACKOFF_OP_BLOCK = 1,
444 CEPH_OSD_BACKOFF_OP_ACK_BLOCK = 2,
445 CEPH_OSD_BACKOFF_OP_UNBLOCK = 3,
446};
447
442/* 448/*
443 * an individual object operation. each may be accompanied by some data 449 * an individual object operation. each may be accompanied by some data
444 * payload 450 * payload
diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
index fbecbd089d75..92e165d417a6 100644
--- a/include/linux/crush/crush.h
+++ b/include/linux/crush/crush.h
@@ -2,6 +2,7 @@
2#define CEPH_CRUSH_CRUSH_H 2#define CEPH_CRUSH_CRUSH_H
3 3
4#ifdef __KERNEL__ 4#ifdef __KERNEL__
5# include <linux/rbtree.h>
5# include <linux/types.h> 6# include <linux/types.h>
6#else 7#else
7# include "crush_compat.h" 8# include "crush_compat.h"
@@ -137,6 +138,68 @@ struct crush_bucket {
137 138
138}; 139};
139 140
141/** @ingroup API
142 *
143 * Replacement weights for each item in a bucket. The size of the
144 * array must be exactly the size of the straw2 bucket, just as the
145 * item_weights array.
146 *
147 */
148struct crush_weight_set {
149 __u32 *weights; /*!< 16.16 fixed point weights
150 in the same order as items */
151 __u32 size; /*!< size of the __weights__ array */
152};
153
154/** @ingroup API
155 *
156 * Replacement weights and ids for a given straw2 bucket, for
157 * placement purposes.
158 *
159 * When crush_do_rule() chooses the Nth item from a straw2 bucket, the
160 * replacement weights found at __weight_set[N]__ are used instead of
161 * the weights from __item_weights__. If __N__ is greater than
162 * __weight_set_size__, the weights found at __weight_set_size-1__ are
163 * used instead. For instance if __weight_set__ is:
164 *
165 * [ [ 0x10000, 0x20000 ], // position 0
166 * [ 0x20000, 0x40000 ] ] // position 1
167 *
168 * choosing the 0th item will use position 0 weights [ 0x10000, 0x20000 ]
169 * choosing the 1th item will use position 1 weights [ 0x20000, 0x40000 ]
170 * choosing the 2th item will use position 1 weights [ 0x20000, 0x40000 ]
171 * etc.
172 *
173 */
174struct crush_choose_arg {
175 __s32 *ids; /*!< values to use instead of items */
176 __u32 ids_size; /*!< size of the __ids__ array */
177 struct crush_weight_set *weight_set; /*!< weight replacements for
178 a given position */
179 __u32 weight_set_size; /*!< size of the __weight_set__ array */
180};
181
182/** @ingroup API
183 *
184 * Replacement weights and ids for each bucket in the crushmap. The
185 * __size__ of the __args__ array must be exactly the same as the
186 * __map->max_buckets__.
187 *
188 * The __crush_choose_arg__ at index N will be used when choosing
189 * an item from the bucket __map->buckets[N]__ bucket, provided it
190 * is a straw2 bucket.
191 *
192 */
193struct crush_choose_arg_map {
194#ifdef __KERNEL__
195 struct rb_node node;
196 u64 choose_args_index;
197#endif
198 struct crush_choose_arg *args; /*!< replacement for each bucket
199 in the crushmap */
200 __u32 size; /*!< size of the __args__ array */
201};
202
140struct crush_bucket_uniform { 203struct crush_bucket_uniform {
141 struct crush_bucket h; 204 struct crush_bucket h;
142 __u32 item_weight; /* 16-bit fixed point; all items equally weighted */ 205 __u32 item_weight; /* 16-bit fixed point; all items equally weighted */
@@ -236,6 +299,9 @@ struct crush_map {
236 __u32 allowed_bucket_algs; 299 __u32 allowed_bucket_algs;
237 300
238 __u32 *choose_tries; 301 __u32 *choose_tries;
302#else
303 /* CrushWrapper::choose_args */
304 struct rb_root choose_args;
239#endif 305#endif
240}; 306};
241 307
diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h
index c95e19e1ff11..141edabb947e 100644
--- a/include/linux/crush/mapper.h
+++ b/include/linux/crush/mapper.h
@@ -11,11 +11,10 @@
11#include "crush.h" 11#include "crush.h"
12 12
13extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size); 13extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size);
14extern int crush_do_rule(const struct crush_map *map, 14int crush_do_rule(const struct crush_map *map,
15 int ruleno, 15 int ruleno, int x, int *result, int result_max,
16 int x, int *result, int result_max, 16 const __u32 *weight, int weight_max,
17 const __u32 *weights, int weight_max, 17 void *cwin, const struct crush_choose_arg *choose_args);
18 void *cwin);
19 18
20/* 19/*
21 * Returns the exact amount of workspace that will need to be used 20 * Returns the exact amount of workspace that will need to be used
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 47e94b560ba0..3d265c5cb6d0 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -85,6 +85,7 @@ const char *ceph_msg_type_name(int type)
85 case CEPH_MSG_OSD_OP: return "osd_op"; 85 case CEPH_MSG_OSD_OP: return "osd_op";
86 case CEPH_MSG_OSD_OPREPLY: return "osd_opreply"; 86 case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
87 case CEPH_MSG_WATCH_NOTIFY: return "watch_notify"; 87 case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
88 case CEPH_MSG_OSD_BACKOFF: return "osd_backoff";
88 default: return "unknown"; 89 default: return "unknown";
89 } 90 }
90} 91}
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 5bf94c04f645..4b428f46a8ca 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -1,6 +1,7 @@
1#ifdef __KERNEL__ 1#ifdef __KERNEL__
2# include <linux/slab.h> 2# include <linux/slab.h>
3# include <linux/crush/crush.h> 3# include <linux/crush/crush.h>
4void clear_choose_args(struct crush_map *c);
4#else 5#else
5# include "crush_compat.h" 6# include "crush_compat.h"
6# include "crush.h" 7# include "crush.h"
@@ -127,6 +128,8 @@ void crush_destroy(struct crush_map *map)
127 128
128#ifndef __KERNEL__ 129#ifndef __KERNEL__
129 kfree(map->choose_tries); 130 kfree(map->choose_tries);
131#else
132 clear_choose_args(map);
130#endif 133#endif
131 kfree(map); 134 kfree(map);
132} 135}
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index b5cd8c21bfdf..746b145bfd11 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -302,19 +302,42 @@ static __u64 crush_ln(unsigned int xin)
302 * 302 *
303 */ 303 */
304 304
305static __u32 *get_choose_arg_weights(const struct crush_bucket_straw2 *bucket,
306 const struct crush_choose_arg *arg,
307 int position)
308{
309 if (!arg || !arg->weight_set || arg->weight_set_size == 0)
310 return bucket->item_weights;
311
312 if (position >= arg->weight_set_size)
313 position = arg->weight_set_size - 1;
314 return arg->weight_set[position].weights;
315}
316
317static __s32 *get_choose_arg_ids(const struct crush_bucket_straw2 *bucket,
318 const struct crush_choose_arg *arg)
319{
320 if (!arg || !arg->ids)
321 return bucket->h.items;
322
323 return arg->ids;
324}
325
305static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket, 326static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
306 int x, int r) 327 int x, int r,
328 const struct crush_choose_arg *arg,
329 int position)
307{ 330{
308 unsigned int i, high = 0; 331 unsigned int i, high = 0;
309 unsigned int u; 332 unsigned int u;
310 unsigned int w;
311 __s64 ln, draw, high_draw = 0; 333 __s64 ln, draw, high_draw = 0;
334 __u32 *weights = get_choose_arg_weights(bucket, arg, position);
335 __s32 *ids = get_choose_arg_ids(bucket, arg);
312 336
313 for (i = 0; i < bucket->h.size; i++) { 337 for (i = 0; i < bucket->h.size; i++) {
314 w = bucket->item_weights[i]; 338 dprintk("weight 0x%x item %d\n", weights[i], ids[i]);
315 if (w) { 339 if (weights[i]) {
316 u = crush_hash32_3(bucket->h.hash, x, 340 u = crush_hash32_3(bucket->h.hash, x, ids[i], r);
317 bucket->h.items[i], r);
318 u &= 0xffff; 341 u &= 0xffff;
319 342
320 /* 343 /*
@@ -335,7 +358,7 @@ static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
335 * weight means a larger (less negative) value 358 * weight means a larger (less negative) value
336 * for draw. 359 * for draw.
337 */ 360 */
338 draw = div64_s64(ln, w); 361 draw = div64_s64(ln, weights[i]);
339 } else { 362 } else {
340 draw = S64_MIN; 363 draw = S64_MIN;
341 } 364 }
@@ -352,7 +375,9 @@ static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
352 375
353static int crush_bucket_choose(const struct crush_bucket *in, 376static int crush_bucket_choose(const struct crush_bucket *in,
354 struct crush_work_bucket *work, 377 struct crush_work_bucket *work,
355 int x, int r) 378 int x, int r,
379 const struct crush_choose_arg *arg,
380 int position)
356{ 381{
357 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r); 382 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
358 BUG_ON(in->size == 0); 383 BUG_ON(in->size == 0);
@@ -374,7 +399,7 @@ static int crush_bucket_choose(const struct crush_bucket *in,
374 case CRUSH_BUCKET_STRAW2: 399 case CRUSH_BUCKET_STRAW2:
375 return bucket_straw2_choose( 400 return bucket_straw2_choose(
376 (const struct crush_bucket_straw2 *)in, 401 (const struct crush_bucket_straw2 *)in,
377 x, r); 402 x, r, arg, position);
378 default: 403 default:
379 dprintk("unknown bucket %d alg %d\n", in->id, in->alg); 404 dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
380 return in->items[0]; 405 return in->items[0];
@@ -436,7 +461,8 @@ static int crush_choose_firstn(const struct crush_map *map,
436 unsigned int vary_r, 461 unsigned int vary_r,
437 unsigned int stable, 462 unsigned int stable,
438 int *out2, 463 int *out2,
439 int parent_r) 464 int parent_r,
465 const struct crush_choose_arg *choose_args)
440{ 466{
441 int rep; 467 int rep;
442 unsigned int ftotal, flocal; 468 unsigned int ftotal, flocal;
@@ -486,7 +512,10 @@ static int crush_choose_firstn(const struct crush_map *map,
486 else 512 else
487 item = crush_bucket_choose( 513 item = crush_bucket_choose(
488 in, work->work[-1-in->id], 514 in, work->work[-1-in->id],
489 x, r); 515 x, r,
516 (choose_args ?
517 &choose_args[-1-in->id] : 0),
518 outpos);
490 if (item >= map->max_devices) { 519 if (item >= map->max_devices) {
491 dprintk(" bad item %d\n", item); 520 dprintk(" bad item %d\n", item);
492 skip_rep = 1; 521 skip_rep = 1;
@@ -543,7 +572,8 @@ static int crush_choose_firstn(const struct crush_map *map,
543 vary_r, 572 vary_r,
544 stable, 573 stable,
545 NULL, 574 NULL,
546 sub_r) <= outpos) 575 sub_r,
576 choose_args) <= outpos)
547 /* didn't get leaf */ 577 /* didn't get leaf */
548 reject = 1; 578 reject = 1;
549 } else { 579 } else {
@@ -620,7 +650,8 @@ static void crush_choose_indep(const struct crush_map *map,
620 unsigned int recurse_tries, 650 unsigned int recurse_tries,
621 int recurse_to_leaf, 651 int recurse_to_leaf,
622 int *out2, 652 int *out2,
623 int parent_r) 653 int parent_r,
654 const struct crush_choose_arg *choose_args)
624{ 655{
625 const struct crush_bucket *in = bucket; 656 const struct crush_bucket *in = bucket;
626 int endpos = outpos + left; 657 int endpos = outpos + left;
@@ -692,7 +723,10 @@ static void crush_choose_indep(const struct crush_map *map,
692 723
693 item = crush_bucket_choose( 724 item = crush_bucket_choose(
694 in, work->work[-1-in->id], 725 in, work->work[-1-in->id],
695 x, r); 726 x, r,
727 (choose_args ?
728 &choose_args[-1-in->id] : 0),
729 outpos);
696 if (item >= map->max_devices) { 730 if (item >= map->max_devices) {
697 dprintk(" bad item %d\n", item); 731 dprintk(" bad item %d\n", item);
698 out[rep] = CRUSH_ITEM_NONE; 732 out[rep] = CRUSH_ITEM_NONE;
@@ -746,7 +780,8 @@ static void crush_choose_indep(const struct crush_map *map,
746 x, 1, numrep, 0, 780 x, 1, numrep, 0,
747 out2, rep, 781 out2, rep,
748 recurse_tries, 0, 782 recurse_tries, 0,
749 0, NULL, r); 783 0, NULL, r,
784 choose_args);
750 if (out2[rep] == CRUSH_ITEM_NONE) { 785 if (out2[rep] == CRUSH_ITEM_NONE) {
751 /* placed nothing; no leaf */ 786 /* placed nothing; no leaf */
752 break; 787 break;
@@ -823,7 +858,7 @@ void crush_init_workspace(const struct crush_map *map, void *v)
823 * set the pointer first and then reserve the space for it to 858 * set the pointer first and then reserve the space for it to
824 * point to by incrementing the point. 859 * point to by incrementing the point.
825 */ 860 */
826 v += sizeof(struct crush_work *); 861 v += sizeof(struct crush_work);
827 w->work = v; 862 w->work = v;
828 v += map->max_buckets * sizeof(struct crush_work_bucket *); 863 v += map->max_buckets * sizeof(struct crush_work_bucket *);
829 for (b = 0; b < map->max_buckets; ++b) { 864 for (b = 0; b < map->max_buckets; ++b) {
@@ -854,11 +889,12 @@ void crush_init_workspace(const struct crush_map *map, void *v)
854 * @weight: weight vector (for map leaves) 889 * @weight: weight vector (for map leaves)
855 * @weight_max: size of weight vector 890 * @weight_max: size of weight vector
856 * @cwin: pointer to at least crush_work_size() bytes of memory 891 * @cwin: pointer to at least crush_work_size() bytes of memory
892 * @choose_args: weights and ids for each known bucket
857 */ 893 */
858int crush_do_rule(const struct crush_map *map, 894int crush_do_rule(const struct crush_map *map,
859 int ruleno, int x, int *result, int result_max, 895 int ruleno, int x, int *result, int result_max,
860 const __u32 *weight, int weight_max, 896 const __u32 *weight, int weight_max,
861 void *cwin) 897 void *cwin, const struct crush_choose_arg *choose_args)
862{ 898{
863 int result_len; 899 int result_len;
864 struct crush_work *cw = cwin; 900 struct crush_work *cw = cwin;
@@ -968,11 +1004,6 @@ int crush_do_rule(const struct crush_map *map,
968 1004
969 for (i = 0; i < wsize; i++) { 1005 for (i = 0; i < wsize; i++) {
970 int bno; 1006 int bno;
971 /*
972 * see CRUSH_N, CRUSH_N_MINUS macros.
973 * basically, numrep <= 0 means relative to
974 * the provided result_max
975 */
976 numrep = curstep->arg1; 1007 numrep = curstep->arg1;
977 if (numrep <= 0) { 1008 if (numrep <= 0) {
978 numrep += result_max; 1009 numrep += result_max;
@@ -1013,7 +1044,8 @@ int crush_do_rule(const struct crush_map *map,
1013 vary_r, 1044 vary_r,
1014 stable, 1045 stable,
1015 c+osize, 1046 c+osize,
1016 0); 1047 0,
1048 choose_args);
1017 } else { 1049 } else {
1018 out_size = ((numrep < (result_max-osize)) ? 1050 out_size = ((numrep < (result_max-osize)) ?
1019 numrep : (result_max-osize)); 1051 numrep : (result_max-osize));
@@ -1030,7 +1062,8 @@ int crush_do_rule(const struct crush_map *map,
1030 choose_leaf_tries : 1, 1062 choose_leaf_tries : 1,
1031 recurse_to_leaf, 1063 recurse_to_leaf,
1032 c+osize, 1064 c+osize,
1033 0); 1065 0,
1066 choose_args);
1034 osize += out_size; 1067 osize += out_size;
1035 } 1068 }
1036 } 1069 }
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 71ba13927b3d..fa5233e0d01c 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -77,7 +77,7 @@ static int osdmap_show(struct seq_file *s, void *p)
77 } 77 }
78 for (i = 0; i < map->max_osd; i++) { 78 for (i = 0; i < map->max_osd; i++) {
79 struct ceph_entity_addr *addr = &map->osd_addr[i]; 79 struct ceph_entity_addr *addr = &map->osd_addr[i];
80 int state = map->osd_state[i]; 80 u32 state = map->osd_state[i];
81 char sb[64]; 81 char sb[64];
82 82
83 seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n", 83 seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",
@@ -104,6 +104,29 @@ static int osdmap_show(struct seq_file *s, void *p)
104 seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool, 104 seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool,
105 pg->pgid.seed, pg->primary_temp.osd); 105 pg->pgid.seed, pg->primary_temp.osd);
106 } 106 }
107 for (n = rb_first(&map->pg_upmap); n; n = rb_next(n)) {
108 struct ceph_pg_mapping *pg =
109 rb_entry(n, struct ceph_pg_mapping, node);
110
111 seq_printf(s, "pg_upmap %llu.%x [", pg->pgid.pool,
112 pg->pgid.seed);
113 for (i = 0; i < pg->pg_upmap.len; i++)
114 seq_printf(s, "%s%d", (i == 0 ? "" : ","),
115 pg->pg_upmap.osds[i]);
116 seq_printf(s, "]\n");
117 }
118 for (n = rb_first(&map->pg_upmap_items); n; n = rb_next(n)) {
119 struct ceph_pg_mapping *pg =
120 rb_entry(n, struct ceph_pg_mapping, node);
121
122 seq_printf(s, "pg_upmap_items %llu.%x [", pg->pgid.pool,
123 pg->pgid.seed);
124 for (i = 0; i < pg->pg_upmap_items.len; i++)
125 seq_printf(s, "%s%d->%d", (i == 0 ? "" : ","),
126 pg->pg_upmap_items.from_to[i][0],
127 pg->pg_upmap_items.from_to[i][1]);
128 seq_printf(s, "]\n");
129 }
107 130
108 up_read(&osdc->lock); 131 up_read(&osdc->lock);
109 return 0; 132 return 0;
@@ -147,17 +170,26 @@ static int monc_show(struct seq_file *s, void *p)
147 return 0; 170 return 0;
148} 171}
149 172
173static void dump_spgid(struct seq_file *s, const struct ceph_spg *spgid)
174{
175 seq_printf(s, "%llu.%x", spgid->pgid.pool, spgid->pgid.seed);
176 if (spgid->shard != CEPH_SPG_NOSHARD)
177 seq_printf(s, "s%d", spgid->shard);
178}
179
150static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t) 180static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
151{ 181{
152 int i; 182 int i;
153 183
154 seq_printf(s, "osd%d\t%llu.%x\t[", t->osd, t->pgid.pool, t->pgid.seed); 184 seq_printf(s, "osd%d\t%llu.%x\t", t->osd, t->pgid.pool, t->pgid.seed);
185 dump_spgid(s, &t->spgid);
186 seq_puts(s, "\t[");
155 for (i = 0; i < t->up.size; i++) 187 for (i = 0; i < t->up.size; i++)
156 seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]); 188 seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]);
157 seq_printf(s, "]/%d\t[", t->up.primary); 189 seq_printf(s, "]/%d\t[", t->up.primary);
158 for (i = 0; i < t->acting.size; i++) 190 for (i = 0; i < t->acting.size; i++)
159 seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]); 191 seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
160 seq_printf(s, "]/%d\t", t->acting.primary); 192 seq_printf(s, "]/%d\te%u\t", t->acting.primary, t->epoch);
161 if (t->target_oloc.pool_ns) { 193 if (t->target_oloc.pool_ns) {
162 seq_printf(s, "%*pE/%*pE\t0x%x", 194 seq_printf(s, "%*pE/%*pE\t0x%x",
163 (int)t->target_oloc.pool_ns->len, 195 (int)t->target_oloc.pool_ns->len,
@@ -234,6 +266,73 @@ static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
234 mutex_unlock(&osd->lock); 266 mutex_unlock(&osd->lock);
235} 267}
236 268
269static void dump_snapid(struct seq_file *s, u64 snapid)
270{
271 if (snapid == CEPH_NOSNAP)
272 seq_puts(s, "head");
273 else if (snapid == CEPH_SNAPDIR)
274 seq_puts(s, "snapdir");
275 else
276 seq_printf(s, "%llx", snapid);
277}
278
279static void dump_name_escaped(struct seq_file *s, unsigned char *name,
280 size_t len)
281{
282 size_t i;
283
284 for (i = 0; i < len; i++) {
285 if (name[i] == '%' || name[i] == ':' || name[i] == '/' ||
286 name[i] < 32 || name[i] >= 127) {
287 seq_printf(s, "%%%02x", name[i]);
288 } else {
289 seq_putc(s, name[i]);
290 }
291 }
292}
293
294static void dump_hoid(struct seq_file *s, const struct ceph_hobject_id *hoid)
295{
296 if (hoid->snapid == 0 && hoid->hash == 0 && !hoid->is_max &&
297 hoid->pool == S64_MIN) {
298 seq_puts(s, "MIN");
299 return;
300 }
301 if (hoid->is_max) {
302 seq_puts(s, "MAX");
303 return;
304 }
305 seq_printf(s, "%lld:%08x:", hoid->pool, hoid->hash_reverse_bits);
306 dump_name_escaped(s, hoid->nspace, hoid->nspace_len);
307 seq_putc(s, ':');
308 dump_name_escaped(s, hoid->key, hoid->key_len);
309 seq_putc(s, ':');
310 dump_name_escaped(s, hoid->oid, hoid->oid_len);
311 seq_putc(s, ':');
312 dump_snapid(s, hoid->snapid);
313}
314
315static void dump_backoffs(struct seq_file *s, struct ceph_osd *osd)
316{
317 struct rb_node *n;
318
319 mutex_lock(&osd->lock);
320 for (n = rb_first(&osd->o_backoffs_by_id); n; n = rb_next(n)) {
321 struct ceph_osd_backoff *backoff =
322 rb_entry(n, struct ceph_osd_backoff, id_node);
323
324 seq_printf(s, "osd%d\t", osd->o_osd);
325 dump_spgid(s, &backoff->spgid);
326 seq_printf(s, "\t%llu\t", backoff->id);
327 dump_hoid(s, backoff->begin);
328 seq_putc(s, '\t');
329 dump_hoid(s, backoff->end);
330 seq_putc(s, '\n');
331 }
332
333 mutex_unlock(&osd->lock);
334}
335
237static int osdc_show(struct seq_file *s, void *pp) 336static int osdc_show(struct seq_file *s, void *pp)
238{ 337{
239 struct ceph_client *client = s->private; 338 struct ceph_client *client = s->private;
@@ -259,6 +358,13 @@ static int osdc_show(struct seq_file *s, void *pp)
259 } 358 }
260 dump_linger_requests(s, &osdc->homeless_osd); 359 dump_linger_requests(s, &osdc->homeless_osd);
261 360
361 seq_puts(s, "BACKOFFS\n");
362 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
363 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
364
365 dump_backoffs(s, osd);
366 }
367
262 up_read(&osdc->lock); 368 up_read(&osdc->lock);
263 return 0; 369 return 0;
264} 370}
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 588a91930051..0c31035bbfee 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1288,13 +1288,16 @@ static void prepare_write_message(struct ceph_connection *con)
1288 m->hdr.seq = cpu_to_le64(++con->out_seq); 1288 m->hdr.seq = cpu_to_le64(++con->out_seq);
1289 m->needs_out_seq = false; 1289 m->needs_out_seq = false;
1290 } 1290 }
1291 WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len)); 1291
1292 if (con->ops->reencode_message)
1293 con->ops->reencode_message(m);
1292 1294
1293 dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n", 1295 dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
1294 m, con->out_seq, le16_to_cpu(m->hdr.type), 1296 m, con->out_seq, le16_to_cpu(m->hdr.type),
1295 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 1297 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
1296 m->data_length); 1298 m->data_length);
1297 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 1299 WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
1300 WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
1298 1301
1299 /* tag + hdr + front + middle */ 1302 /* tag + hdr + front + middle */
1300 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 1303 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
@@ -2033,8 +2036,7 @@ static int process_connect(struct ceph_connection *con)
2033{ 2036{
2034 u64 sup_feat = from_msgr(con->msgr)->supported_features; 2037 u64 sup_feat = from_msgr(con->msgr)->supported_features;
2035 u64 req_feat = from_msgr(con->msgr)->required_features; 2038 u64 req_feat = from_msgr(con->msgr)->required_features;
2036 u64 server_feat = ceph_sanitize_features( 2039 u64 server_feat = le64_to_cpu(con->in_reply.features);
2037 le64_to_cpu(con->in_reply.features));
2038 int ret; 2040 int ret;
2039 2041
2040 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 2042 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 250f11f78609..875675765531 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -6,6 +6,7 @@
6#include <linux/random.h> 6#include <linux/random.h>
7#include <linux/sched.h> 7#include <linux/sched.h>
8 8
9#include <linux/ceph/ceph_features.h>
9#include <linux/ceph/mon_client.h> 10#include <linux/ceph/mon_client.h>
10#include <linux/ceph/libceph.h> 11#include <linux/ceph/libceph.h>
11#include <linux/ceph/debugfs.h> 12#include <linux/ceph/debugfs.h>
@@ -297,6 +298,10 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
297 298
298 mutex_lock(&monc->mutex); 299 mutex_lock(&monc->mutex);
299 if (monc->sub_renew_sent) { 300 if (monc->sub_renew_sent) {
301 /*
302 * This is only needed for legacy (infernalis or older)
303 * MONs -- see delayed_work().
304 */
300 monc->sub_renew_after = monc->sub_renew_sent + 305 monc->sub_renew_after = monc->sub_renew_sent +
301 (seconds >> 1) * HZ - 1; 306 (seconds >> 1) * HZ - 1;
302 dout("%s sent %lu duration %d renew after %lu\n", __func__, 307 dout("%s sent %lu duration %d renew after %lu\n", __func__,
@@ -955,7 +960,8 @@ static void delayed_work(struct work_struct *work)
955 __validate_auth(monc); 960 __validate_auth(monc);
956 } 961 }
957 962
958 if (is_auth) { 963 if (is_auth &&
964 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
959 unsigned long now = jiffies; 965 unsigned long now = jiffies;
960 966
961 dout("%s renew subs? now %lu renew after %lu\n", 967 dout("%s renew subs? now %lu renew after %lu\n",
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 924f07c36ddb..86a9737d8e3f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -12,6 +12,7 @@
12#include <linux/bio.h> 12#include <linux/bio.h>
13#endif 13#endif
14 14
15#include <linux/ceph/ceph_features.h>
15#include <linux/ceph/libceph.h> 16#include <linux/ceph/libceph.h>
16#include <linux/ceph/osd_client.h> 17#include <linux/ceph/osd_client.h>
17#include <linux/ceph/messenger.h> 18#include <linux/ceph/messenger.h>
@@ -49,6 +50,7 @@ static void link_linger(struct ceph_osd *osd,
49 struct ceph_osd_linger_request *lreq); 50 struct ceph_osd_linger_request *lreq);
50static void unlink_linger(struct ceph_osd *osd, 51static void unlink_linger(struct ceph_osd *osd,
51 struct ceph_osd_linger_request *lreq); 52 struct ceph_osd_linger_request *lreq);
53static void clear_backoffs(struct ceph_osd *osd);
52 54
53#if 1 55#if 1
54static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) 56static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
@@ -373,6 +375,7 @@ static void target_copy(struct ceph_osd_request_target *dest,
373 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc); 375 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
374 376
375 dest->pgid = src->pgid; /* struct */ 377 dest->pgid = src->pgid; /* struct */
378 dest->spgid = src->spgid; /* struct */
376 dest->pg_num = src->pg_num; 379 dest->pg_num = src->pg_num;
377 dest->pg_num_mask = src->pg_num_mask; 380 dest->pg_num_mask = src->pg_num_mask;
378 ceph_osds_copy(&dest->acting, &src->acting); 381 ceph_osds_copy(&dest->acting, &src->acting);
@@ -384,6 +387,9 @@ static void target_copy(struct ceph_osd_request_target *dest,
384 dest->flags = src->flags; 387 dest->flags = src->flags;
385 dest->paused = src->paused; 388 dest->paused = src->paused;
386 389
390 dest->epoch = src->epoch;
391 dest->last_force_resend = src->last_force_resend;
392
387 dest->osd = src->osd; 393 dest->osd = src->osd;
388} 394}
389 395
@@ -537,7 +543,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
537} 543}
538EXPORT_SYMBOL(ceph_osdc_alloc_request); 544EXPORT_SYMBOL(ceph_osdc_alloc_request);
539 545
540static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc) 546static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
541{ 547{
542 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); 548 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
543} 549}
@@ -552,17 +558,21 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
552 WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); 558 WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
553 559
554 /* create request message */ 560 /* create request message */
555 msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ 561 msg_size = CEPH_ENCODING_START_BLK_LEN +
556 msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ 562 CEPH_PGID_ENCODING_LEN + 1; /* spgid */
563 msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
564 msg_size += CEPH_ENCODING_START_BLK_LEN +
565 sizeof(struct ceph_osd_reqid); /* reqid */
566 msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
567 msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
557 msg_size += CEPH_ENCODING_START_BLK_LEN + 568 msg_size += CEPH_ENCODING_START_BLK_LEN +
558 ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */ 569 ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
559 msg_size += 1 + 8 + 4 + 4; /* pgid */
560 msg_size += 4 + req->r_base_oid.name_len; /* oid */ 570 msg_size += 4 + req->r_base_oid.name_len; /* oid */
561 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); 571 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
562 msg_size += 8; /* snapid */ 572 msg_size += 8; /* snapid */
563 msg_size += 8; /* snap_seq */ 573 msg_size += 8; /* snap_seq */
564 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0); 574 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
565 msg_size += 4; /* retry_attempt */ 575 msg_size += 4 + 8; /* retry_attempt, features */
566 576
567 if (req->r_mempool) 577 if (req->r_mempool)
568 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 578 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -1010,6 +1020,8 @@ static void osd_init(struct ceph_osd *osd)
1010 RB_CLEAR_NODE(&osd->o_node); 1020 RB_CLEAR_NODE(&osd->o_node);
1011 osd->o_requests = RB_ROOT; 1021 osd->o_requests = RB_ROOT;
1012 osd->o_linger_requests = RB_ROOT; 1022 osd->o_linger_requests = RB_ROOT;
1023 osd->o_backoff_mappings = RB_ROOT;
1024 osd->o_backoffs_by_id = RB_ROOT;
1013 INIT_LIST_HEAD(&osd->o_osd_lru); 1025 INIT_LIST_HEAD(&osd->o_osd_lru);
1014 INIT_LIST_HEAD(&osd->o_keepalive_item); 1026 INIT_LIST_HEAD(&osd->o_keepalive_item);
1015 osd->o_incarnation = 1; 1027 osd->o_incarnation = 1;
@@ -1021,6 +1033,8 @@ static void osd_cleanup(struct ceph_osd *osd)
1021 WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); 1033 WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1022 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 1034 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1023 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 1035 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1036 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1037 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1024 WARN_ON(!list_empty(&osd->o_osd_lru)); 1038 WARN_ON(!list_empty(&osd->o_osd_lru));
1025 WARN_ON(!list_empty(&osd->o_keepalive_item)); 1039 WARN_ON(!list_empty(&osd->o_keepalive_item));
1026 1040
@@ -1141,6 +1155,7 @@ static void close_osd(struct ceph_osd *osd)
1141 unlink_linger(osd, lreq); 1155 unlink_linger(osd, lreq);
1142 link_linger(&osdc->homeless_osd, lreq); 1156 link_linger(&osdc->homeless_osd, lreq);
1143 } 1157 }
1158 clear_backoffs(osd);
1144 1159
1145 __remove_osd_from_lru(osd); 1160 __remove_osd_from_lru(osd);
1146 erase_osd(&osdc->osds, osd); 1161 erase_osd(&osdc->osds, osd);
@@ -1297,7 +1312,7 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
1297 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 1312 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1298 __pool_full(pi); 1313 __pool_full(pi);
1299 1314
1300 WARN_ON(pi->id != t->base_oloc.pool); 1315 WARN_ON(pi->id != t->target_oloc.pool);
1301 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) || 1316 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1302 ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) || 1317 ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1303 (osdc->osdmap->epoch < osdc->epoch_barrier); 1318 (osdc->osdmap->epoch < osdc->epoch_barrier);
@@ -1311,19 +1326,21 @@ enum calc_target_result {
1311 1326
1312static enum calc_target_result calc_target(struct ceph_osd_client *osdc, 1327static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1313 struct ceph_osd_request_target *t, 1328 struct ceph_osd_request_target *t,
1314 u32 *last_force_resend, 1329 struct ceph_connection *con,
1315 bool any_change) 1330 bool any_change)
1316{ 1331{
1317 struct ceph_pg_pool_info *pi; 1332 struct ceph_pg_pool_info *pi;
1318 struct ceph_pg pgid, last_pgid; 1333 struct ceph_pg pgid, last_pgid;
1319 struct ceph_osds up, acting; 1334 struct ceph_osds up, acting;
1320 bool force_resend = false; 1335 bool force_resend = false;
1321 bool need_check_tiering = false; 1336 bool unpaused = false;
1322 bool need_resend = false; 1337 bool legacy_change;
1338 bool split = false;
1323 bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE); 1339 bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1324 enum calc_target_result ct_res; 1340 enum calc_target_result ct_res;
1325 int ret; 1341 int ret;
1326 1342
1343 t->epoch = osdc->osdmap->epoch;
1327 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); 1344 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1328 if (!pi) { 1345 if (!pi) {
1329 t->osd = CEPH_HOMELESS_OSD; 1346 t->osd = CEPH_HOMELESS_OSD;
@@ -1332,33 +1349,33 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1332 } 1349 }
1333 1350
1334 if (osdc->osdmap->epoch == pi->last_force_request_resend) { 1351 if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1335 if (last_force_resend && 1352 if (t->last_force_resend < pi->last_force_request_resend) {
1336 *last_force_resend < pi->last_force_request_resend) { 1353 t->last_force_resend = pi->last_force_request_resend;
1337 *last_force_resend = pi->last_force_request_resend;
1338 force_resend = true; 1354 force_resend = true;
1339 } else if (!last_force_resend) { 1355 } else if (t->last_force_resend == 0) {
1340 force_resend = true; 1356 force_resend = true;
1341 } 1357 }
1342 } 1358 }
1343 if (ceph_oid_empty(&t->target_oid) || force_resend) {
1344 ceph_oid_copy(&t->target_oid, &t->base_oid);
1345 need_check_tiering = true;
1346 }
1347 if (ceph_oloc_empty(&t->target_oloc) || force_resend) {
1348 ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1349 need_check_tiering = true;
1350 }
1351 1359
1352 if (need_check_tiering && 1360 /* apply tiering */
1353 (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1361 ceph_oid_copy(&t->target_oid, &t->base_oid);
1362 ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1363 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1354 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0) 1364 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
1355 t->target_oloc.pool = pi->read_tier; 1365 t->target_oloc.pool = pi->read_tier;
1356 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0) 1366 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
1357 t->target_oloc.pool = pi->write_tier; 1367 t->target_oloc.pool = pi->write_tier;
1368
1369 pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
1370 if (!pi) {
1371 t->osd = CEPH_HOMELESS_OSD;
1372 ct_res = CALC_TARGET_POOL_DNE;
1373 goto out;
1374 }
1358 } 1375 }
1359 1376
1360 ret = ceph_object_locator_to_pg(osdc->osdmap, &t->target_oid, 1377 ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc,
1361 &t->target_oloc, &pgid); 1378 &pgid);
1362 if (ret) { 1379 if (ret) {
1363 WARN_ON(ret != -ENOENT); 1380 WARN_ON(ret != -ENOENT);
1364 t->osd = CEPH_HOMELESS_OSD; 1381 t->osd = CEPH_HOMELESS_OSD;
@@ -1368,7 +1385,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1368 last_pgid.pool = pgid.pool; 1385 last_pgid.pool = pgid.pool;
1369 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); 1386 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1370 1387
1371 ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting); 1388 ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
1372 if (any_change && 1389 if (any_change &&
1373 ceph_is_new_interval(&t->acting, 1390 ceph_is_new_interval(&t->acting,
1374 &acting, 1391 &acting,
@@ -1387,13 +1404,16 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1387 1404
1388 if (t->paused && !target_should_be_paused(osdc, t, pi)) { 1405 if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1389 t->paused = false; 1406 t->paused = false;
1390 need_resend = true; 1407 unpaused = true;
1391 } 1408 }
1409 legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1410 ceph_osds_changed(&t->acting, &acting, any_change);
1411 if (t->pg_num)
1412 split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1392 1413
1393 if (ceph_pg_compare(&t->pgid, &pgid) || 1414 if (legacy_change || force_resend || split) {
1394 ceph_osds_changed(&t->acting, &acting, any_change) ||
1395 force_resend) {
1396 t->pgid = pgid; /* struct */ 1415 t->pgid = pgid; /* struct */
1416 ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
1397 ceph_osds_copy(&t->acting, &acting); 1417 ceph_osds_copy(&t->acting, &acting);
1398 ceph_osds_copy(&t->up, &up); 1418 ceph_osds_copy(&t->up, &up);
1399 t->size = pi->size; 1419 t->size = pi->size;
@@ -1403,15 +1423,342 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1403 t->sort_bitwise = sort_bitwise; 1423 t->sort_bitwise = sort_bitwise;
1404 1424
1405 t->osd = acting.primary; 1425 t->osd = acting.primary;
1406 need_resend = true;
1407 } 1426 }
1408 1427
1409 ct_res = need_resend ? CALC_TARGET_NEED_RESEND : CALC_TARGET_NO_ACTION; 1428 if (unpaused || legacy_change || force_resend ||
1429 (split && con && CEPH_HAVE_FEATURE(con->peer_features,
1430 RESEND_ON_SPLIT)))
1431 ct_res = CALC_TARGET_NEED_RESEND;
1432 else
1433 ct_res = CALC_TARGET_NO_ACTION;
1434
1410out: 1435out:
1411 dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd); 1436 dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
1412 return ct_res; 1437 return ct_res;
1413} 1438}
1414 1439
1440static struct ceph_spg_mapping *alloc_spg_mapping(void)
1441{
1442 struct ceph_spg_mapping *spg;
1443
1444 spg = kmalloc(sizeof(*spg), GFP_NOIO);
1445 if (!spg)
1446 return NULL;
1447
1448 RB_CLEAR_NODE(&spg->node);
1449 spg->backoffs = RB_ROOT;
1450 return spg;
1451}
1452
1453static void free_spg_mapping(struct ceph_spg_mapping *spg)
1454{
1455 WARN_ON(!RB_EMPTY_NODE(&spg->node));
1456 WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
1457
1458 kfree(spg);
1459}
1460
1461/*
1462 * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
1463 * ceph_pg_mapping. Used to track OSD backoffs -- a backoff [range] is
1464 * defined only within a specific spgid; it does not pass anything to
1465 * children on split, or to another primary.
1466 */
1467DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
1468 RB_BYPTR, const struct ceph_spg *, node)
1469
1470static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
1471{
1472 return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
1473}
1474
1475static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
1476 void **pkey, size_t *pkey_len)
1477{
1478 if (hoid->key_len) {
1479 *pkey = hoid->key;
1480 *pkey_len = hoid->key_len;
1481 } else {
1482 *pkey = hoid->oid;
1483 *pkey_len = hoid->oid_len;
1484 }
1485}
1486
1487static int compare_names(const void *name1, size_t name1_len,
1488 const void *name2, size_t name2_len)
1489{
1490 int ret;
1491
1492 ret = memcmp(name1, name2, min(name1_len, name2_len));
1493 if (!ret) {
1494 if (name1_len < name2_len)
1495 ret = -1;
1496 else if (name1_len > name2_len)
1497 ret = 1;
1498 }
1499 return ret;
1500}
1501
1502static int hoid_compare(const struct ceph_hobject_id *lhs,
1503 const struct ceph_hobject_id *rhs)
1504{
1505 void *effective_key1, *effective_key2;
1506 size_t effective_key1_len, effective_key2_len;
1507 int ret;
1508
1509 if (lhs->is_max < rhs->is_max)
1510 return -1;
1511 if (lhs->is_max > rhs->is_max)
1512 return 1;
1513
1514 if (lhs->pool < rhs->pool)
1515 return -1;
1516 if (lhs->pool > rhs->pool)
1517 return 1;
1518
1519 if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
1520 return -1;
1521 if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
1522 return 1;
1523
1524 ret = compare_names(lhs->nspace, lhs->nspace_len,
1525 rhs->nspace, rhs->nspace_len);
1526 if (ret)
1527 return ret;
1528
1529 hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
1530 hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
1531 ret = compare_names(effective_key1, effective_key1_len,
1532 effective_key2, effective_key2_len);
1533 if (ret)
1534 return ret;
1535
1536 ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
1537 if (ret)
1538 return ret;
1539
1540 if (lhs->snapid < rhs->snapid)
1541 return -1;
1542 if (lhs->snapid > rhs->snapid)
1543 return 1;
1544
1545 return 0;
1546}
1547
1548/*
1549 * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
1550 * compat stuff here.
1551 *
1552 * Assumes @hoid is zero-initialized.
1553 */
1554static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
1555{
1556 u8 struct_v;
1557 u32 struct_len;
1558 int ret;
1559
1560 ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
1561 &struct_len);
1562 if (ret)
1563 return ret;
1564
1565 if (struct_v < 4) {
1566 pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
1567 goto e_inval;
1568 }
1569
1570 hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
1571 GFP_NOIO);
1572 if (IS_ERR(hoid->key)) {
1573 ret = PTR_ERR(hoid->key);
1574 hoid->key = NULL;
1575 return ret;
1576 }
1577
1578 hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
1579 GFP_NOIO);
1580 if (IS_ERR(hoid->oid)) {
1581 ret = PTR_ERR(hoid->oid);
1582 hoid->oid = NULL;
1583 return ret;
1584 }
1585
1586 ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
1587 ceph_decode_32_safe(p, end, hoid->hash, e_inval);
1588 ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
1589
1590 hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
1591 GFP_NOIO);
1592 if (IS_ERR(hoid->nspace)) {
1593 ret = PTR_ERR(hoid->nspace);
1594 hoid->nspace = NULL;
1595 return ret;
1596 }
1597
1598 ceph_decode_64_safe(p, end, hoid->pool, e_inval);
1599
1600 ceph_hoid_build_hash_cache(hoid);
1601 return 0;
1602
1603e_inval:
1604 return -EINVAL;
1605}
1606
1607static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
1608{
1609 return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
1610 4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
1611}
1612
1613static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
1614{
1615 ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
1616 ceph_encode_string(p, end, hoid->key, hoid->key_len);
1617 ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
1618 ceph_encode_64(p, hoid->snapid);
1619 ceph_encode_32(p, hoid->hash);
1620 ceph_encode_8(p, hoid->is_max);
1621 ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
1622 ceph_encode_64(p, hoid->pool);
1623}
1624
1625static void free_hoid(struct ceph_hobject_id *hoid)
1626{
1627 if (hoid) {
1628 kfree(hoid->key);
1629 kfree(hoid->oid);
1630 kfree(hoid->nspace);
1631 kfree(hoid);
1632 }
1633}
1634
1635static struct ceph_osd_backoff *alloc_backoff(void)
1636{
1637 struct ceph_osd_backoff *backoff;
1638
1639 backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
1640 if (!backoff)
1641 return NULL;
1642
1643 RB_CLEAR_NODE(&backoff->spg_node);
1644 RB_CLEAR_NODE(&backoff->id_node);
1645 return backoff;
1646}
1647
1648static void free_backoff(struct ceph_osd_backoff *backoff)
1649{
1650 WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
1651 WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
1652
1653 free_hoid(backoff->begin);
1654 free_hoid(backoff->end);
1655 kfree(backoff);
1656}
1657
1658/*
1659 * Within a specific spgid, backoffs are managed by ->begin hoid.
1660 */
1661DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
1662 RB_BYVAL, spg_node);
1663
1664static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
1665 const struct ceph_hobject_id *hoid)
1666{
1667 struct rb_node *n = root->rb_node;
1668
1669 while (n) {
1670 struct ceph_osd_backoff *cur =
1671 rb_entry(n, struct ceph_osd_backoff, spg_node);
1672 int cmp;
1673
1674 cmp = hoid_compare(hoid, cur->begin);
1675 if (cmp < 0) {
1676 n = n->rb_left;
1677 } else if (cmp > 0) {
1678 if (hoid_compare(hoid, cur->end) < 0)
1679 return cur;
1680
1681 n = n->rb_right;
1682 } else {
1683 return cur;
1684 }
1685 }
1686
1687 return NULL;
1688}
1689
1690/*
1691 * Each backoff has a unique id within its OSD session.
1692 */
1693DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
1694
1695static void clear_backoffs(struct ceph_osd *osd)
1696{
1697 while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1698 struct ceph_spg_mapping *spg =
1699 rb_entry(rb_first(&osd->o_backoff_mappings),
1700 struct ceph_spg_mapping, node);
1701
1702 while (!RB_EMPTY_ROOT(&spg->backoffs)) {
1703 struct ceph_osd_backoff *backoff =
1704 rb_entry(rb_first(&spg->backoffs),
1705 struct ceph_osd_backoff, spg_node);
1706
1707 erase_backoff(&spg->backoffs, backoff);
1708 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
1709 free_backoff(backoff);
1710 }
1711 erase_spg_mapping(&osd->o_backoff_mappings, spg);
1712 free_spg_mapping(spg);
1713 }
1714}
1715
1716/*
1717 * Set up a temporary, non-owning view into @t.
1718 */
1719static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
1720 const struct ceph_osd_request_target *t)
1721{
1722 hoid->key = NULL;
1723 hoid->key_len = 0;
1724 hoid->oid = t->target_oid.name;
1725 hoid->oid_len = t->target_oid.name_len;
1726 hoid->snapid = CEPH_NOSNAP;
1727 hoid->hash = t->pgid.seed;
1728 hoid->is_max = false;
1729 if (t->target_oloc.pool_ns) {
1730 hoid->nspace = t->target_oloc.pool_ns->str;
1731 hoid->nspace_len = t->target_oloc.pool_ns->len;
1732 } else {
1733 hoid->nspace = NULL;
1734 hoid->nspace_len = 0;
1735 }
1736 hoid->pool = t->target_oloc.pool;
1737 ceph_hoid_build_hash_cache(hoid);
1738}
1739
1740static bool should_plug_request(struct ceph_osd_request *req)
1741{
1742 struct ceph_osd *osd = req->r_osd;
1743 struct ceph_spg_mapping *spg;
1744 struct ceph_osd_backoff *backoff;
1745 struct ceph_hobject_id hoid;
1746
1747 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
1748 if (!spg)
1749 return false;
1750
1751 hoid_fill_from_target(&hoid, &req->r_t);
1752 backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
1753 if (!backoff)
1754 return false;
1755
1756 dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
1757 __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
1758 backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
1759 return true;
1760}
1761
1415static void setup_request_data(struct ceph_osd_request *req, 1762static void setup_request_data(struct ceph_osd_request *req,
1416 struct ceph_msg *msg) 1763 struct ceph_msg *msg)
1417{ 1764{
@@ -1483,7 +1830,37 @@ static void setup_request_data(struct ceph_osd_request *req,
1483 WARN_ON(data_len != msg->data_length); 1830 WARN_ON(data_len != msg->data_length);
1484} 1831}
1485 1832
1486static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg) 1833static void encode_pgid(void **p, const struct ceph_pg *pgid)
1834{
1835 ceph_encode_8(p, 1);
1836 ceph_encode_64(p, pgid->pool);
1837 ceph_encode_32(p, pgid->seed);
1838 ceph_encode_32(p, -1); /* preferred */
1839}
1840
1841static void encode_spgid(void **p, const struct ceph_spg *spgid)
1842{
1843 ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
1844 encode_pgid(p, &spgid->pgid);
1845 ceph_encode_8(p, spgid->shard);
1846}
1847
1848static void encode_oloc(void **p, void *end,
1849 const struct ceph_object_locator *oloc)
1850{
1851 ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
1852 ceph_encode_64(p, oloc->pool);
1853 ceph_encode_32(p, -1); /* preferred */
1854 ceph_encode_32(p, 0); /* key len */
1855 if (oloc->pool_ns)
1856 ceph_encode_string(p, end, oloc->pool_ns->str,
1857 oloc->pool_ns->len);
1858 else
1859 ceph_encode_32(p, 0);
1860}
1861
1862static void encode_request_partial(struct ceph_osd_request *req,
1863 struct ceph_msg *msg)
1487{ 1864{
1488 void *p = msg->front.iov_base; 1865 void *p = msg->front.iov_base;
1489 void *const end = p + msg->front_alloc_len; 1866 void *const end = p + msg->front_alloc_len;
@@ -1500,38 +1877,27 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
1500 1877
1501 setup_request_data(req, msg); 1878 setup_request_data(req, msg);
1502 1879
1503 ceph_encode_32(&p, 1); /* client_inc, always 1 */ 1880 encode_spgid(&p, &req->r_t.spgid); /* actual spg */
1881 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
1504 ceph_encode_32(&p, req->r_osdc->osdmap->epoch); 1882 ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
1505 ceph_encode_32(&p, req->r_flags); 1883 ceph_encode_32(&p, req->r_flags);
1506 ceph_encode_timespec(p, &req->r_mtime);
1507 p += sizeof(struct ceph_timespec);
1508 1884
1509 /* reassert_version */ 1885 /* reqid */
1510 memset(p, 0, sizeof(struct ceph_eversion)); 1886 ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
1511 p += sizeof(struct ceph_eversion); 1887 memset(p, 0, sizeof(struct ceph_osd_reqid));
1512 1888 p += sizeof(struct ceph_osd_reqid);
1513 /* oloc */
1514 ceph_start_encoding(&p, 5, 4,
1515 ceph_oloc_encoding_size(&req->r_t.target_oloc));
1516 ceph_encode_64(&p, req->r_t.target_oloc.pool);
1517 ceph_encode_32(&p, -1); /* preferred */
1518 ceph_encode_32(&p, 0); /* key len */
1519 if (req->r_t.target_oloc.pool_ns)
1520 ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str,
1521 req->r_t.target_oloc.pool_ns->len);
1522 else
1523 ceph_encode_32(&p, 0);
1524 1889
1525 /* pgid */ 1890 /* trace */
1526 ceph_encode_8(&p, 1); 1891 memset(p, 0, sizeof(struct ceph_blkin_trace_info));
1527 ceph_encode_64(&p, req->r_t.pgid.pool); 1892 p += sizeof(struct ceph_blkin_trace_info);
1528 ceph_encode_32(&p, req->r_t.pgid.seed); 1893
1529 ceph_encode_32(&p, -1); /* preferred */ 1894 ceph_encode_32(&p, 0); /* client_inc, always 0 */
1895 ceph_encode_timespec(p, &req->r_mtime);
1896 p += sizeof(struct ceph_timespec);
1530 1897
1531 /* oid */ 1898 encode_oloc(&p, end, &req->r_t.target_oloc);
1532 ceph_encode_32(&p, req->r_t.target_oid.name_len); 1899 ceph_encode_string(&p, end, req->r_t.target_oid.name,
1533 memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len); 1900 req->r_t.target_oid.name_len);
1534 p += req->r_t.target_oid.name_len;
1535 1901
1536 /* ops, can imply data */ 1902 /* ops, can imply data */
1537 ceph_encode_16(&p, req->r_num_ops); 1903 ceph_encode_16(&p, req->r_num_ops);
@@ -1552,11 +1918,10 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
1552 } 1918 }
1553 1919
1554 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */ 1920 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
1921 BUG_ON(p != end - 8); /* space for features */
1555 1922
1556 BUG_ON(p > end); 1923 msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
1557 msg->front.iov_len = p - msg->front.iov_base; 1924 /* front_len is finalized in encode_request_finish() */
1558 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
1559 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1560 msg->hdr.data_len = cpu_to_le32(data_len); 1925 msg->hdr.data_len = cpu_to_le32(data_len);
1561 /* 1926 /*
1562 * The header "data_off" is a hint to the receiver allowing it 1927 * The header "data_off" is a hint to the receiver allowing it
@@ -1565,9 +1930,99 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
1565 */ 1930 */
1566 msg->hdr.data_off = cpu_to_le16(req->r_data_offset); 1931 msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
1567 1932
1568 dout("%s req %p oid %s oid_len %d front %zu data %u\n", __func__, 1933 dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
1569 req, req->r_t.target_oid.name, req->r_t.target_oid.name_len, 1934 req->r_t.target_oid.name, req->r_t.target_oid.name_len);
1570 msg->front.iov_len, data_len); 1935}
1936
1937static void encode_request_finish(struct ceph_msg *msg)
1938{
1939 void *p = msg->front.iov_base;
1940 void *const end = p + msg->front_alloc_len;
1941
1942 if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
1943 /* luminous OSD -- encode features and be done */
1944 p = end - 8;
1945 ceph_encode_64(&p, msg->con->peer_features);
1946 } else {
1947 struct {
1948 char spgid[CEPH_ENCODING_START_BLK_LEN +
1949 CEPH_PGID_ENCODING_LEN + 1];
1950 __le32 hash;
1951 __le32 epoch;
1952 __le32 flags;
1953 char reqid[CEPH_ENCODING_START_BLK_LEN +
1954 sizeof(struct ceph_osd_reqid)];
1955 char trace[sizeof(struct ceph_blkin_trace_info)];
1956 __le32 client_inc;
1957 struct ceph_timespec mtime;
1958 } __packed head;
1959 struct ceph_pg pgid;
1960 void *oloc, *oid, *tail;
1961 int oloc_len, oid_len, tail_len;
1962 int len;
1963
1964 /*
1965 * Pre-luminous OSD -- reencode v8 into v4 using @head
1966 * as a temporary buffer. Encode the raw PG; the rest
1967 * is just a matter of moving oloc, oid and tail blobs
1968 * around.
1969 */
1970 memcpy(&head, p, sizeof(head));
1971 p += sizeof(head);
1972
1973 oloc = p;
1974 p += CEPH_ENCODING_START_BLK_LEN;
1975 pgid.pool = ceph_decode_64(&p);
1976 p += 4 + 4; /* preferred, key len */
1977 len = ceph_decode_32(&p);
1978 p += len; /* nspace */
1979 oloc_len = p - oloc;
1980
1981 oid = p;
1982 len = ceph_decode_32(&p);
1983 p += len;
1984 oid_len = p - oid;
1985
1986 tail = p;
1987 tail_len = (end - p) - 8;
1988
1989 p = msg->front.iov_base;
1990 ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
1991 ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
1992 ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
1993 ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
1994
1995 /* reassert_version */
1996 memset(p, 0, sizeof(struct ceph_eversion));
1997 p += sizeof(struct ceph_eversion);
1998
1999 BUG_ON(p >= oloc);
2000 memmove(p, oloc, oloc_len);
2001 p += oloc_len;
2002
2003 pgid.seed = le32_to_cpu(head.hash);
2004 encode_pgid(&p, &pgid); /* raw pg */
2005
2006 BUG_ON(p >= oid);
2007 memmove(p, oid, oid_len);
2008 p += oid_len;
2009
2010 /* tail -- ops, snapid, snapc, retry_attempt */
2011 BUG_ON(p >= tail);
2012 memmove(p, tail, tail_len);
2013 p += tail_len;
2014
2015 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
2016 }
2017
2018 BUG_ON(p > end);
2019 msg->front.iov_len = p - msg->front.iov_base;
2020 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2021
2022 dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
2023 le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
2024 le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
2025 le16_to_cpu(msg->hdr.version));
1571} 2026}
1572 2027
1573/* 2028/*
@@ -1580,6 +2035,10 @@ static void send_request(struct ceph_osd_request *req)
1580 verify_osd_locked(osd); 2035 verify_osd_locked(osd);
1581 WARN_ON(osd->o_osd != req->r_t.osd); 2036 WARN_ON(osd->o_osd != req->r_t.osd);
1582 2037
2038 /* backoff? */
2039 if (should_plug_request(req))
2040 return;
2041
1583 /* 2042 /*
1584 * We may have a previously queued request message hanging 2043 * We may have a previously queued request message hanging
1585 * around. Cancel it to avoid corrupting the msgr. 2044 * around. Cancel it to avoid corrupting the msgr.
@@ -1593,11 +2052,13 @@ static void send_request(struct ceph_osd_request *req)
1593 else 2052 else
1594 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY); 2053 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
1595 2054
1596 encode_request(req, req->r_request); 2055 encode_request_partial(req, req->r_request);
1597 2056
1598 dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n", 2057 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
1599 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, 2058 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
1600 req->r_t.osd, req->r_flags, req->r_attempts); 2059 req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
2060 req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2061 req->r_attempts);
1601 2062
1602 req->r_t.paused = false; 2063 req->r_t.paused = false;
1603 req->r_stamp = jiffies; 2064 req->r_stamp = jiffies;
@@ -1645,7 +2106,7 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
1645 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 2106 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
1646 2107
1647again: 2108again:
1648 ct_res = calc_target(osdc, &req->r_t, &req->r_last_force_resend, false); 2109 ct_res = calc_target(osdc, &req->r_t, NULL, false);
1649 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked) 2110 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
1650 goto promote; 2111 goto promote;
1651 2112
@@ -1737,13 +2198,12 @@ static void submit_request(struct ceph_osd_request *req, bool wrlocked)
1737static void finish_request(struct ceph_osd_request *req) 2198static void finish_request(struct ceph_osd_request *req)
1738{ 2199{
1739 struct ceph_osd_client *osdc = req->r_osdc; 2200 struct ceph_osd_client *osdc = req->r_osdc;
1740 struct ceph_osd *osd = req->r_osd;
1741 2201
1742 verify_osd_locked(osd); 2202 WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
1743 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2203 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
1744 2204
1745 WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid)); 2205 if (req->r_osd)
1746 unlink_request(osd, req); 2206 unlink_request(req->r_osd, req);
1747 atomic_dec(&osdc->num_requests); 2207 atomic_dec(&osdc->num_requests);
1748 2208
1749 /* 2209 /*
@@ -2441,7 +2901,7 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
2441 struct ceph_osd_client *osdc = lreq->osdc; 2901 struct ceph_osd_client *osdc = lreq->osdc;
2442 struct ceph_osd *osd; 2902 struct ceph_osd *osd;
2443 2903
2444 calc_target(osdc, &lreq->t, &lreq->last_force_resend, false); 2904 calc_target(osdc, &lreq->t, NULL, false);
2445 osd = lookup_create_osd(osdc, lreq->t.osd, true); 2905 osd = lookup_create_osd(osdc, lreq->t.osd, true);
2446 link_linger(osd, lreq); 2906 link_linger(osd, lreq);
2447 2907
@@ -3059,7 +3519,7 @@ recalc_linger_target(struct ceph_osd_linger_request *lreq)
3059 struct ceph_osd_client *osdc = lreq->osdc; 3519 struct ceph_osd_client *osdc = lreq->osdc;
3060 enum calc_target_result ct_res; 3520 enum calc_target_result ct_res;
3061 3521
3062 ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true); 3522 ct_res = calc_target(osdc, &lreq->t, NULL, true);
3063 if (ct_res == CALC_TARGET_NEED_RESEND) { 3523 if (ct_res == CALC_TARGET_NEED_RESEND) {
3064 struct ceph_osd *osd; 3524 struct ceph_osd *osd;
3065 3525
@@ -3117,6 +3577,7 @@ static void scan_requests(struct ceph_osd *osd,
3117 list_add_tail(&lreq->scan_item, need_resend_linger); 3577 list_add_tail(&lreq->scan_item, need_resend_linger);
3118 break; 3578 break;
3119 case CALC_TARGET_POOL_DNE: 3579 case CALC_TARGET_POOL_DNE:
3580 list_del_init(&lreq->scan_item);
3120 check_linger_pool_dne(lreq); 3581 check_linger_pool_dne(lreq);
3121 break; 3582 break;
3122 } 3583 }
@@ -3130,8 +3591,8 @@ static void scan_requests(struct ceph_osd *osd,
3130 n = rb_next(n); /* unlink_request(), check_pool_dne() */ 3591 n = rb_next(n); /* unlink_request(), check_pool_dne() */
3131 3592
3132 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 3593 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3133 ct_res = calc_target(osdc, &req->r_t, 3594 ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con,
3134 &req->r_last_force_resend, false); 3595 false);
3135 switch (ct_res) { 3596 switch (ct_res) {
3136 case CALC_TARGET_NO_ACTION: 3597 case CALC_TARGET_NO_ACTION:
3137 force_resend_writes = cleared_full || 3598 force_resend_writes = cleared_full ||
@@ -3229,8 +3690,25 @@ static void kick_requests(struct ceph_osd_client *osdc,
3229 struct list_head *need_resend_linger) 3690 struct list_head *need_resend_linger)
3230{ 3691{
3231 struct ceph_osd_linger_request *lreq, *nlreq; 3692 struct ceph_osd_linger_request *lreq, *nlreq;
3693 enum calc_target_result ct_res;
3232 struct rb_node *n; 3694 struct rb_node *n;
3233 3695
3696 /* make sure need_resend targets reflect latest map */
3697 for (n = rb_first(need_resend); n; ) {
3698 struct ceph_osd_request *req =
3699 rb_entry(n, struct ceph_osd_request, r_node);
3700
3701 n = rb_next(n);
3702
3703 if (req->r_t.epoch < osdc->osdmap->epoch) {
3704 ct_res = calc_target(osdc, &req->r_t, NULL, false);
3705 if (ct_res == CALC_TARGET_POOL_DNE) {
3706 erase_request(need_resend, req);
3707 check_pool_dne(req);
3708 }
3709 }
3710 }
3711
3234 for (n = rb_first(need_resend); n; ) { 3712 for (n = rb_first(need_resend); n; ) {
3235 struct ceph_osd_request *req = 3713 struct ceph_osd_request *req =
3236 rb_entry(n, struct ceph_osd_request, r_node); 3714 rb_entry(n, struct ceph_osd_request, r_node);
@@ -3239,8 +3717,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
3239 n = rb_next(n); 3717 n = rb_next(n);
3240 erase_request(need_resend, req); /* before link_request() */ 3718 erase_request(need_resend, req); /* before link_request() */
3241 3719
3242 WARN_ON(req->r_osd);
3243 calc_target(osdc, &req->r_t, NULL, false);
3244 osd = lookup_create_osd(osdc, req->r_t.osd, true); 3720 osd = lookup_create_osd(osdc, req->r_t.osd, true);
3245 link_request(osd, req); 3721 link_request(osd, req);
3246 if (!req->r_linger) { 3722 if (!req->r_linger) {
@@ -3383,6 +3859,8 @@ static void kick_osd_requests(struct ceph_osd *osd)
3383{ 3859{
3384 struct rb_node *n; 3860 struct rb_node *n;
3385 3861
3862 clear_backoffs(osd);
3863
3386 for (n = rb_first(&osd->o_requests); n; ) { 3864 for (n = rb_first(&osd->o_requests); n; ) {
3387 struct ceph_osd_request *req = 3865 struct ceph_osd_request *req =
3388 rb_entry(n, struct ceph_osd_request, r_node); 3866 rb_entry(n, struct ceph_osd_request, r_node);
@@ -3428,6 +3906,261 @@ out_unlock:
3428 up_write(&osdc->lock); 3906 up_write(&osdc->lock);
3429} 3907}
3430 3908
3909struct MOSDBackoff {
3910 struct ceph_spg spgid;
3911 u32 map_epoch;
3912 u8 op;
3913 u64 id;
3914 struct ceph_hobject_id *begin;
3915 struct ceph_hobject_id *end;
3916};
3917
3918static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
3919{
3920 void *p = msg->front.iov_base;
3921 void *const end = p + msg->front.iov_len;
3922 u8 struct_v;
3923 u32 struct_len;
3924 int ret;
3925
3926 ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
3927 if (ret)
3928 return ret;
3929
3930 ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
3931 if (ret)
3932 return ret;
3933
3934 ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
3935 ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
3936 ceph_decode_8_safe(&p, end, m->op, e_inval);
3937 ceph_decode_64_safe(&p, end, m->id, e_inval);
3938
3939 m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
3940 if (!m->begin)
3941 return -ENOMEM;
3942
3943 ret = decode_hoid(&p, end, m->begin);
3944 if (ret) {
3945 free_hoid(m->begin);
3946 return ret;
3947 }
3948
3949 m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
3950 if (!m->end) {
3951 free_hoid(m->begin);
3952 return -ENOMEM;
3953 }
3954
3955 ret = decode_hoid(&p, end, m->end);
3956 if (ret) {
3957 free_hoid(m->begin);
3958 free_hoid(m->end);
3959 return ret;
3960 }
3961
3962 return 0;
3963
3964e_inval:
3965 return -EINVAL;
3966}
3967
3968static struct ceph_msg *create_backoff_message(
3969 const struct ceph_osd_backoff *backoff,
3970 u32 map_epoch)
3971{
3972 struct ceph_msg *msg;
3973 void *p, *end;
3974 int msg_size;
3975
3976 msg_size = CEPH_ENCODING_START_BLK_LEN +
3977 CEPH_PGID_ENCODING_LEN + 1; /* spgid */
3978 msg_size += 4 + 1 + 8; /* map_epoch, op, id */
3979 msg_size += CEPH_ENCODING_START_BLK_LEN +
3980 hoid_encoding_size(backoff->begin);
3981 msg_size += CEPH_ENCODING_START_BLK_LEN +
3982 hoid_encoding_size(backoff->end);
3983
3984 msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
3985 if (!msg)
3986 return NULL;
3987
3988 p = msg->front.iov_base;
3989 end = p + msg->front_alloc_len;
3990
3991 encode_spgid(&p, &backoff->spgid);
3992 ceph_encode_32(&p, map_epoch);
3993 ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
3994 ceph_encode_64(&p, backoff->id);
3995 encode_hoid(&p, end, backoff->begin);
3996 encode_hoid(&p, end, backoff->end);
3997 BUG_ON(p != end);
3998
3999 msg->front.iov_len = p - msg->front.iov_base;
4000 msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
4001 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
4002
4003 return msg;
4004}
4005
4006static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4007{
4008 struct ceph_spg_mapping *spg;
4009 struct ceph_osd_backoff *backoff;
4010 struct ceph_msg *msg;
4011
4012 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4013 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4014
4015 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4016 if (!spg) {
4017 spg = alloc_spg_mapping();
4018 if (!spg) {
4019 pr_err("%s failed to allocate spg\n", __func__);
4020 return;
4021 }
4022 spg->spgid = m->spgid; /* struct */
4023 insert_spg_mapping(&osd->o_backoff_mappings, spg);
4024 }
4025
4026 backoff = alloc_backoff();
4027 if (!backoff) {
4028 pr_err("%s failed to allocate backoff\n", __func__);
4029 return;
4030 }
4031 backoff->spgid = m->spgid; /* struct */
4032 backoff->id = m->id;
4033 backoff->begin = m->begin;
4034 m->begin = NULL; /* backoff now owns this */
4035 backoff->end = m->end;
4036 m->end = NULL; /* ditto */
4037
4038 insert_backoff(&spg->backoffs, backoff);
4039 insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4040
4041 /*
4042 * Ack with original backoff's epoch so that the OSD can
4043 * discard this if there was a PG split.
4044 */
4045 msg = create_backoff_message(backoff, m->map_epoch);
4046 if (!msg) {
4047 pr_err("%s failed to allocate msg\n", __func__);
4048 return;
4049 }
4050 ceph_con_send(&osd->o_con, msg);
4051}
4052
4053static bool target_contained_by(const struct ceph_osd_request_target *t,
4054 const struct ceph_hobject_id *begin,
4055 const struct ceph_hobject_id *end)
4056{
4057 struct ceph_hobject_id hoid;
4058 int cmp;
4059
4060 hoid_fill_from_target(&hoid, t);
4061 cmp = hoid_compare(&hoid, begin);
4062 return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
4063}
4064
4065static void handle_backoff_unblock(struct ceph_osd *osd,
4066 const struct MOSDBackoff *m)
4067{
4068 struct ceph_spg_mapping *spg;
4069 struct ceph_osd_backoff *backoff;
4070 struct rb_node *n;
4071
4072 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4073 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4074
4075 backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4076 if (!backoff) {
4077 pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4078 __func__, osd->o_osd, m->spgid.pgid.pool,
4079 m->spgid.pgid.seed, m->spgid.shard, m->id);
4080 return;
4081 }
4082
4083 if (hoid_compare(backoff->begin, m->begin) &&
4084 hoid_compare(backoff->end, m->end)) {
4085 pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4086 __func__, osd->o_osd, m->spgid.pgid.pool,
4087 m->spgid.pgid.seed, m->spgid.shard, m->id);
4088 /* unblock it anyway... */
4089 }
4090
4091 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4092 BUG_ON(!spg);
4093
4094 erase_backoff(&spg->backoffs, backoff);
4095 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4096 free_backoff(backoff);
4097
4098 if (RB_EMPTY_ROOT(&spg->backoffs)) {
4099 erase_spg_mapping(&osd->o_backoff_mappings, spg);
4100 free_spg_mapping(spg);
4101 }
4102
4103 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4104 struct ceph_osd_request *req =
4105 rb_entry(n, struct ceph_osd_request, r_node);
4106
4107 if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
4108 /*
4109 * Match against @m, not @backoff -- the PG may
4110 * have split on the OSD.
4111 */
4112 if (target_contained_by(&req->r_t, m->begin, m->end)) {
4113 /*
4114 * If no other installed backoff applies,
4115 * resend.
4116 */
4117 send_request(req);
4118 }
4119 }
4120 }
4121}
4122
4123static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4124{
4125 struct ceph_osd_client *osdc = osd->o_osdc;
4126 struct MOSDBackoff m;
4127 int ret;
4128
4129 down_read(&osdc->lock);
4130 if (!osd_registered(osd)) {
4131 dout("%s osd%d unknown\n", __func__, osd->o_osd);
4132 up_read(&osdc->lock);
4133 return;
4134 }
4135 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4136
4137 mutex_lock(&osd->lock);
4138 ret = decode_MOSDBackoff(msg, &m);
4139 if (ret) {
4140 pr_err("failed to decode MOSDBackoff: %d\n", ret);
4141 ceph_msg_dump(msg);
4142 goto out_unlock;
4143 }
4144
4145 switch (m.op) {
4146 case CEPH_OSD_BACKOFF_OP_BLOCK:
4147 handle_backoff_block(osd, &m);
4148 break;
4149 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
4150 handle_backoff_unblock(osd, &m);
4151 break;
4152 default:
4153 pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4154 }
4155
4156 free_hoid(m.begin);
4157 free_hoid(m.end);
4158
4159out_unlock:
4160 mutex_unlock(&osd->lock);
4161 up_read(&osdc->lock);
4162}
4163
3431/* 4164/*
3432 * Process osd watch notifications 4165 * Process osd watch notifications
3433 */ 4166 */
@@ -4365,6 +5098,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4365 case CEPH_MSG_OSD_OPREPLY: 5098 case CEPH_MSG_OSD_OPREPLY:
4366 handle_reply(osd, msg); 5099 handle_reply(osd, msg);
4367 break; 5100 break;
5101 case CEPH_MSG_OSD_BACKOFF:
5102 handle_backoff(osd, msg);
5103 break;
4368 case CEPH_MSG_WATCH_NOTIFY: 5104 case CEPH_MSG_WATCH_NOTIFY:
4369 handle_watch_notify(osdc, msg); 5105 handle_watch_notify(osdc, msg);
4370 break; 5106 break;
@@ -4487,6 +5223,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
4487 *skip = 0; 5223 *skip = 0;
4488 switch (type) { 5224 switch (type) {
4489 case CEPH_MSG_OSD_MAP: 5225 case CEPH_MSG_OSD_MAP:
5226 case CEPH_MSG_OSD_BACKOFF:
4490 case CEPH_MSG_WATCH_NOTIFY: 5227 case CEPH_MSG_WATCH_NOTIFY:
4491 return alloc_msg_with_page_vector(hdr); 5228 return alloc_msg_with_page_vector(hdr);
4492 case CEPH_MSG_OSD_OPREPLY: 5229 case CEPH_MSG_OSD_OPREPLY:
@@ -4571,6 +5308,11 @@ static int invalidate_authorizer(struct ceph_connection *con)
4571 return ceph_monc_validate_auth(&osdc->client->monc); 5308 return ceph_monc_validate_auth(&osdc->client->monc);
4572} 5309}
4573 5310
5311static void osd_reencode_message(struct ceph_msg *msg)
5312{
5313 encode_request_finish(msg);
5314}
5315
4574static int osd_sign_message(struct ceph_msg *msg) 5316static int osd_sign_message(struct ceph_msg *msg)
4575{ 5317{
4576 struct ceph_osd *o = msg->con->private; 5318 struct ceph_osd *o = msg->con->private;
@@ -4595,6 +5337,7 @@ static const struct ceph_connection_operations osd_con_ops = {
4595 .verify_authorizer_reply = verify_authorizer_reply, 5337 .verify_authorizer_reply = verify_authorizer_reply,
4596 .invalidate_authorizer = invalidate_authorizer, 5338 .invalidate_authorizer = invalidate_authorizer,
4597 .alloc_msg = alloc_msg, 5339 .alloc_msg = alloc_msg,
5340 .reencode_message = osd_reencode_message,
4598 .sign_message = osd_sign_message, 5341 .sign_message = osd_sign_message,
4599 .check_message_signature = osd_check_message_signature, 5342 .check_message_signature = osd_check_message_signature,
4600 .fault = osd_fault, 5343 .fault = osd_fault,
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 55e3a477f92d..864789c5974e 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -11,7 +11,7 @@
11#include <linux/crush/hash.h> 11#include <linux/crush/hash.h>
12#include <linux/crush/mapper.h> 12#include <linux/crush/mapper.h>
13 13
14char *ceph_osdmap_state_str(char *str, int len, int state) 14char *ceph_osdmap_state_str(char *str, int len, u32 state)
15{ 15{
16 if (!len) 16 if (!len)
17 return str; 17 return str;
@@ -138,19 +138,175 @@ bad:
138 return -EINVAL; 138 return -EINVAL;
139} 139}
140 140
141static int skip_name_map(void **p, void *end) 141static struct crush_choose_arg_map *alloc_choose_arg_map(void)
142{ 142{
143 int len; 143 struct crush_choose_arg_map *arg_map;
144 ceph_decode_32_safe(p, end, len ,bad); 144
145 while (len--) { 145 arg_map = kzalloc(sizeof(*arg_map), GFP_NOIO);
146 int strlen; 146 if (!arg_map)
147 *p += sizeof(u32); 147 return NULL;
148 ceph_decode_32_safe(p, end, strlen, bad); 148
149 *p += strlen; 149 RB_CLEAR_NODE(&arg_map->node);
150 return arg_map;
150} 151}
151 return 0; 152
152bad: 153static void free_choose_arg_map(struct crush_choose_arg_map *arg_map)
153 return -EINVAL; 154{
155 if (arg_map) {
156 int i, j;
157
158 WARN_ON(!RB_EMPTY_NODE(&arg_map->node));
159
160 for (i = 0; i < arg_map->size; i++) {
161 struct crush_choose_arg *arg = &arg_map->args[i];
162
163 for (j = 0; j < arg->weight_set_size; j++)
164 kfree(arg->weight_set[j].weights);
165 kfree(arg->weight_set);
166 kfree(arg->ids);
167 }
168 kfree(arg_map->args);
169 kfree(arg_map);
170 }
171}
172
173DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index,
174 node);
175
176void clear_choose_args(struct crush_map *c)
177{
178 while (!RB_EMPTY_ROOT(&c->choose_args)) {
179 struct crush_choose_arg_map *arg_map =
180 rb_entry(rb_first(&c->choose_args),
181 struct crush_choose_arg_map, node);
182
183 erase_choose_arg_map(&c->choose_args, arg_map);
184 free_choose_arg_map(arg_map);
185 }
186}
187
188static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen)
189{
190 u32 *a = NULL;
191 u32 len;
192 int ret;
193
194 ceph_decode_32_safe(p, end, len, e_inval);
195 if (len) {
196 u32 i;
197
198 a = kmalloc_array(len, sizeof(u32), GFP_NOIO);
199 if (!a) {
200 ret = -ENOMEM;
201 goto fail;
202 }
203
204 ceph_decode_need(p, end, len * sizeof(u32), e_inval);
205 for (i = 0; i < len; i++)
206 a[i] = ceph_decode_32(p);
207 }
208
209 *plen = len;
210 return a;
211
212e_inval:
213 ret = -EINVAL;
214fail:
215 kfree(a);
216 return ERR_PTR(ret);
217}
218
219/*
220 * Assumes @arg is zero-initialized.
221 */
222static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg)
223{
224 int ret;
225
226 ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval);
227 if (arg->weight_set_size) {
228 u32 i;
229
230 arg->weight_set = kmalloc_array(arg->weight_set_size,
231 sizeof(*arg->weight_set),
232 GFP_NOIO);
233 if (!arg->weight_set)
234 return -ENOMEM;
235
236 for (i = 0; i < arg->weight_set_size; i++) {
237 struct crush_weight_set *w = &arg->weight_set[i];
238
239 w->weights = decode_array_32_alloc(p, end, &w->size);
240 if (IS_ERR(w->weights)) {
241 ret = PTR_ERR(w->weights);
242 w->weights = NULL;
243 return ret;
244 }
245 }
246 }
247
248 arg->ids = decode_array_32_alloc(p, end, &arg->ids_size);
249 if (IS_ERR(arg->ids)) {
250 ret = PTR_ERR(arg->ids);
251 arg->ids = NULL;
252 return ret;
253 }
254
255 return 0;
256
257e_inval:
258 return -EINVAL;
259}
260
261static int decode_choose_args(void **p, void *end, struct crush_map *c)
262{
263 struct crush_choose_arg_map *arg_map = NULL;
264 u32 num_choose_arg_maps, num_buckets;
265 int ret;
266
267 ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval);
268 while (num_choose_arg_maps--) {
269 arg_map = alloc_choose_arg_map();
270 if (!arg_map) {
271 ret = -ENOMEM;
272 goto fail;
273 }
274
275 ceph_decode_64_safe(p, end, arg_map->choose_args_index,
276 e_inval);
277 arg_map->size = c->max_buckets;
278 arg_map->args = kcalloc(arg_map->size, sizeof(*arg_map->args),
279 GFP_NOIO);
280 if (!arg_map->args) {
281 ret = -ENOMEM;
282 goto fail;
283 }
284
285 ceph_decode_32_safe(p, end, num_buckets, e_inval);
286 while (num_buckets--) {
287 struct crush_choose_arg *arg;
288 u32 bucket_index;
289
290 ceph_decode_32_safe(p, end, bucket_index, e_inval);
291 if (bucket_index >= arg_map->size)
292 goto e_inval;
293
294 arg = &arg_map->args[bucket_index];
295 ret = decode_choose_arg(p, end, arg);
296 if (ret)
297 goto fail;
298 }
299
300 insert_choose_arg_map(&c->choose_args, arg_map);
301 }
302
303 return 0;
304
305e_inval:
306 ret = -EINVAL;
307fail:
308 free_choose_arg_map(arg_map);
309 return ret;
154} 310}
155 311
156static void crush_finalize(struct crush_map *c) 312static void crush_finalize(struct crush_map *c)
@@ -187,7 +343,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
187 void **p = &pbyval; 343 void **p = &pbyval;
188 void *start = pbyval; 344 void *start = pbyval;
189 u32 magic; 345 u32 magic;
190 u32 num_name_maps;
191 346
192 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); 347 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
193 348
@@ -195,6 +350,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
195 if (c == NULL) 350 if (c == NULL)
196 return ERR_PTR(-ENOMEM); 351 return ERR_PTR(-ENOMEM);
197 352
353 c->choose_args = RB_ROOT;
354
198 /* set tunables to default values */ 355 /* set tunables to default values */
199 c->choose_local_tries = 2; 356 c->choose_local_tries = 2;
200 c->choose_local_fallback_tries = 5; 357 c->choose_local_fallback_tries = 5;
@@ -353,12 +510,9 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
353 } 510 }
354 } 511 }
355 512
356 /* ignore trailing name maps. */ 513 ceph_decode_skip_map(p, end, 32, string, bad); /* type_map */
357 for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) { 514 ceph_decode_skip_map(p, end, 32, string, bad); /* name_map */
358 err = skip_name_map(p, end); 515 ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
359 if (err < 0)
360 goto done;
361 }
362 516
363 /* tunables */ 517 /* tunables */
364 ceph_decode_need(p, end, 3*sizeof(u32), done); 518 ceph_decode_need(p, end, 3*sizeof(u32), done);
@@ -391,6 +545,21 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
391 dout("crush decode tunable chooseleaf_stable = %d\n", 545 dout("crush decode tunable chooseleaf_stable = %d\n",
392 c->chooseleaf_stable); 546 c->chooseleaf_stable);
393 547
548 if (*p != end) {
549 /* class_map */
550 ceph_decode_skip_map(p, end, 32, 32, bad);
551 /* class_name */
552 ceph_decode_skip_map(p, end, 32, string, bad);
553 /* class_bucket */
554 ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad);
555 }
556
557 if (*p != end) {
558 err = decode_choose_args(p, end, c);
559 if (err)
560 goto bad;
561 }
562
394done: 563done:
395 crush_finalize(c); 564 crush_finalize(c);
396 dout("crush_decode success\n"); 565 dout("crush_decode success\n");
@@ -418,75 +587,49 @@ int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
418 return 0; 587 return 0;
419} 588}
420 589
421/* 590int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs)
422 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
423 * to a set of osds) and primary_temp (explicit primary setting)
424 */
425static int __insert_pg_mapping(struct ceph_pg_mapping *new,
426 struct rb_root *root)
427{ 591{
428 struct rb_node **p = &root->rb_node; 592 int ret;
429 struct rb_node *parent = NULL;
430 struct ceph_pg_mapping *pg = NULL;
431 int c;
432 593
433 dout("__insert_pg_mapping %llx %p\n", *(u64 *)&new->pgid, new); 594 ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid);
434 while (*p) { 595 if (ret)
435 parent = *p; 596 return ret;
436 pg = rb_entry(parent, struct ceph_pg_mapping, node); 597
437 c = ceph_pg_compare(&new->pgid, &pg->pgid); 598 if (lhs->shard < rhs->shard)
438 if (c < 0) 599 return -1;
439 p = &(*p)->rb_left; 600 if (lhs->shard > rhs->shard)
440 else if (c > 0) 601 return 1;
441 p = &(*p)->rb_right;
442 else
443 return -EEXIST;
444 }
445 602
446 rb_link_node(&new->node, parent, p);
447 rb_insert_color(&new->node, root);
448 return 0; 603 return 0;
449} 604}
450 605
451static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root, 606static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len)
452 struct ceph_pg pgid)
453{ 607{
454 struct rb_node *n = root->rb_node;
455 struct ceph_pg_mapping *pg; 608 struct ceph_pg_mapping *pg;
456 int c;
457 609
458 while (n) { 610 pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO);
459 pg = rb_entry(n, struct ceph_pg_mapping, node); 611 if (!pg)
460 c = ceph_pg_compare(&pgid, &pg->pgid); 612 return NULL;
461 if (c < 0) { 613
462 n = n->rb_left; 614 RB_CLEAR_NODE(&pg->node);
463 } else if (c > 0) { 615 return pg;
464 n = n->rb_right;
465 } else {
466 dout("__lookup_pg_mapping %lld.%x got %p\n",
467 pgid.pool, pgid.seed, pg);
468 return pg;
469 }
470 }
471 return NULL;
472} 616}
473 617
474static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid) 618static void free_pg_mapping(struct ceph_pg_mapping *pg)
475{ 619{
476 struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid); 620 WARN_ON(!RB_EMPTY_NODE(&pg->node));
477 621
478 if (pg) { 622 kfree(pg);
479 dout("__remove_pg_mapping %lld.%x %p\n", pgid.pool, pgid.seed,
480 pg);
481 rb_erase(&pg->node, root);
482 kfree(pg);
483 return 0;
484 }
485 dout("__remove_pg_mapping %lld.%x dne\n", pgid.pool, pgid.seed);
486 return -ENOENT;
487} 623}
488 624
489/* 625/*
626 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
627 * to a set of osds) and primary_temp (explicit primary setting)
628 */
629DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
630 RB_BYPTR, const struct ceph_pg *, node)
631
632/*
490 * rbtree of pg pool info 633 * rbtree of pg pool info
491 */ 634 */
492static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new) 635static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
@@ -682,11 +825,48 @@ static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
682 *p += len; 825 *p += len;
683 } 826 }
684 827
828 /*
829 * last_force_op_resend_preluminous, will be overridden if the
830 * map was encoded with RESEND_ON_SPLIT
831 */
685 if (ev >= 15) 832 if (ev >= 15)
686 pi->last_force_request_resend = ceph_decode_32(p); 833 pi->last_force_request_resend = ceph_decode_32(p);
687 else 834 else
688 pi->last_force_request_resend = 0; 835 pi->last_force_request_resend = 0;
689 836
837 if (ev >= 16)
838 *p += 4; /* skip min_read_recency_for_promote */
839
840 if (ev >= 17)
841 *p += 8; /* skip expected_num_objects */
842
843 if (ev >= 19)
844 *p += 4; /* skip cache_target_dirty_high_ratio_micro */
845
846 if (ev >= 20)
847 *p += 4; /* skip min_write_recency_for_promote */
848
849 if (ev >= 21)
850 *p += 1; /* skip use_gmt_hitset */
851
852 if (ev >= 22)
853 *p += 1; /* skip fast_read */
854
855 if (ev >= 23) {
856 *p += 4; /* skip hit_set_grade_decay_rate */
857 *p += 4; /* skip hit_set_search_last_n */
858 }
859
860 if (ev >= 24) {
861 /* skip opts */
862 *p += 1 + 1; /* versions */
863 len = ceph_decode_32(p);
864 *p += len;
865 }
866
867 if (ev >= 25)
868 pi->last_force_request_resend = ceph_decode_32(p);
869
690 /* ignore the rest */ 870 /* ignore the rest */
691 871
692 *p = pool_end; 872 *p = pool_end;
@@ -743,6 +923,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
743 map->pool_max = -1; 923 map->pool_max = -1;
744 map->pg_temp = RB_ROOT; 924 map->pg_temp = RB_ROOT;
745 map->primary_temp = RB_ROOT; 925 map->primary_temp = RB_ROOT;
926 map->pg_upmap = RB_ROOT;
927 map->pg_upmap_items = RB_ROOT;
746 mutex_init(&map->crush_workspace_mutex); 928 mutex_init(&map->crush_workspace_mutex);
747 929
748 return map; 930 return map;
@@ -757,14 +939,28 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
757 struct ceph_pg_mapping *pg = 939 struct ceph_pg_mapping *pg =
758 rb_entry(rb_first(&map->pg_temp), 940 rb_entry(rb_first(&map->pg_temp),
759 struct ceph_pg_mapping, node); 941 struct ceph_pg_mapping, node);
760 rb_erase(&pg->node, &map->pg_temp); 942 erase_pg_mapping(&map->pg_temp, pg);
761 kfree(pg); 943 free_pg_mapping(pg);
762 } 944 }
763 while (!RB_EMPTY_ROOT(&map->primary_temp)) { 945 while (!RB_EMPTY_ROOT(&map->primary_temp)) {
764 struct ceph_pg_mapping *pg = 946 struct ceph_pg_mapping *pg =
765 rb_entry(rb_first(&map->primary_temp), 947 rb_entry(rb_first(&map->primary_temp),
766 struct ceph_pg_mapping, node); 948 struct ceph_pg_mapping, node);
767 rb_erase(&pg->node, &map->primary_temp); 949 erase_pg_mapping(&map->primary_temp, pg);
950 free_pg_mapping(pg);
951 }
952 while (!RB_EMPTY_ROOT(&map->pg_upmap)) {
953 struct ceph_pg_mapping *pg =
954 rb_entry(rb_first(&map->pg_upmap),
955 struct ceph_pg_mapping, node);
956 rb_erase(&pg->node, &map->pg_upmap);
957 kfree(pg);
958 }
959 while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) {
960 struct ceph_pg_mapping *pg =
961 rb_entry(rb_first(&map->pg_upmap_items),
962 struct ceph_pg_mapping, node);
963 rb_erase(&pg->node, &map->pg_upmap_items);
768 kfree(pg); 964 kfree(pg);
769 } 965 }
770 while (!RB_EMPTY_ROOT(&map->pg_pools)) { 966 while (!RB_EMPTY_ROOT(&map->pg_pools)) {
@@ -788,7 +984,7 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
788 */ 984 */
789static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) 985static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
790{ 986{
791 u8 *state; 987 u32 *state;
792 u32 *weight; 988 u32 *weight;
793 struct ceph_entity_addr *addr; 989 struct ceph_entity_addr *addr;
794 int i; 990 int i;
@@ -964,47 +1160,40 @@ static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
964 return __decode_pools(p, end, map, true); 1160 return __decode_pools(p, end, map, true);
965} 1161}
966 1162
967static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map, 1163typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool);
968 bool incremental) 1164
1165static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root,
1166 decode_mapping_fn_t fn, bool incremental)
969{ 1167{
970 u32 n; 1168 u32 n;
971 1169
1170 WARN_ON(!incremental && !fn);
1171
972 ceph_decode_32_safe(p, end, n, e_inval); 1172 ceph_decode_32_safe(p, end, n, e_inval);
973 while (n--) { 1173 while (n--) {
1174 struct ceph_pg_mapping *pg;
974 struct ceph_pg pgid; 1175 struct ceph_pg pgid;
975 u32 len, i;
976 int ret; 1176 int ret;
977 1177
978 ret = ceph_decode_pgid(p, end, &pgid); 1178 ret = ceph_decode_pgid(p, end, &pgid);
979 if (ret) 1179 if (ret)
980 return ret; 1180 return ret;
981 1181
982 ceph_decode_32_safe(p, end, len, e_inval); 1182 pg = lookup_pg_mapping(mapping_root, &pgid);
983 1183 if (pg) {
984 ret = __remove_pg_mapping(&map->pg_temp, pgid); 1184 WARN_ON(!incremental);
985 BUG_ON(!incremental && ret != -ENOENT); 1185 erase_pg_mapping(mapping_root, pg);
986 1186 free_pg_mapping(pg);
987 if (!incremental || len > 0) { 1187 }
988 struct ceph_pg_mapping *pg;
989
990 ceph_decode_need(p, end, len*sizeof(u32), e_inval);
991
992 if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
993 return -EINVAL;
994
995 pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS);
996 if (!pg)
997 return -ENOMEM;
998 1188
999 pg->pgid = pgid; 1189 if (fn) {
1000 pg->pg_temp.len = len; 1190 pg = fn(p, end, incremental);
1001 for (i = 0; i < len; i++) 1191 if (IS_ERR(pg))
1002 pg->pg_temp.osds[i] = ceph_decode_32(p); 1192 return PTR_ERR(pg);
1003 1193
1004 ret = __insert_pg_mapping(pg, &map->pg_temp); 1194 if (pg) {
1005 if (ret) { 1195 pg->pgid = pgid; /* struct */
1006 kfree(pg); 1196 insert_pg_mapping(mapping_root, pg);
1007 return ret;
1008 } 1197 }
1009 } 1198 }
1010 } 1199 }
@@ -1015,69 +1204,77 @@ e_inval:
1015 return -EINVAL; 1204 return -EINVAL;
1016} 1205}
1017 1206
1207static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end,
1208 bool incremental)
1209{
1210 struct ceph_pg_mapping *pg;
1211 u32 len, i;
1212
1213 ceph_decode_32_safe(p, end, len, e_inval);
1214 if (len == 0 && incremental)
1215 return NULL; /* new_pg_temp: [] to remove */
1216 if (len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32))
1217 return ERR_PTR(-EINVAL);
1218
1219 ceph_decode_need(p, end, len * sizeof(u32), e_inval);
1220 pg = alloc_pg_mapping(len * sizeof(u32));
1221 if (!pg)
1222 return ERR_PTR(-ENOMEM);
1223
1224 pg->pg_temp.len = len;
1225 for (i = 0; i < len; i++)
1226 pg->pg_temp.osds[i] = ceph_decode_32(p);
1227
1228 return pg;
1229
1230e_inval:
1231 return ERR_PTR(-EINVAL);
1232}
1233
1018static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1234static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1019{ 1235{
1020 return __decode_pg_temp(p, end, map, false); 1236 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1237 false);
1021} 1238}
1022 1239
1023static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1240static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
1024{ 1241{
1025 return __decode_pg_temp(p, end, map, true); 1242 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
1243 true);
1026} 1244}
1027 1245
1028static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map, 1246static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end,
1029 bool incremental) 1247 bool incremental)
1030{ 1248{
1031 u32 n; 1249 struct ceph_pg_mapping *pg;
1032 1250 u32 osd;
1033 ceph_decode_32_safe(p, end, n, e_inval);
1034 while (n--) {
1035 struct ceph_pg pgid;
1036 u32 osd;
1037 int ret;
1038
1039 ret = ceph_decode_pgid(p, end, &pgid);
1040 if (ret)
1041 return ret;
1042
1043 ceph_decode_32_safe(p, end, osd, e_inval);
1044
1045 ret = __remove_pg_mapping(&map->primary_temp, pgid);
1046 BUG_ON(!incremental && ret != -ENOENT);
1047
1048 if (!incremental || osd != (u32)-1) {
1049 struct ceph_pg_mapping *pg;
1050
1051 pg = kzalloc(sizeof(*pg), GFP_NOFS);
1052 if (!pg)
1053 return -ENOMEM;
1054 1251
1055 pg->pgid = pgid; 1252 ceph_decode_32_safe(p, end, osd, e_inval);
1056 pg->primary_temp.osd = osd; 1253 if (osd == (u32)-1 && incremental)
1254 return NULL; /* new_primary_temp: -1 to remove */
1057 1255
1058 ret = __insert_pg_mapping(pg, &map->primary_temp); 1256 pg = alloc_pg_mapping(0);
1059 if (ret) { 1257 if (!pg)
1060 kfree(pg); 1258 return ERR_PTR(-ENOMEM);
1061 return ret;
1062 }
1063 }
1064 }
1065 1259
1066 return 0; 1260 pg->primary_temp.osd = osd;
1261 return pg;
1067 1262
1068e_inval: 1263e_inval:
1069 return -EINVAL; 1264 return ERR_PTR(-EINVAL);
1070} 1265}
1071 1266
1072static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map) 1267static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
1073{ 1268{
1074 return __decode_primary_temp(p, end, map, false); 1269 return decode_pg_mapping(p, end, &map->primary_temp,
1270 __decode_primary_temp, false);
1075} 1271}
1076 1272
1077static int decode_new_primary_temp(void **p, void *end, 1273static int decode_new_primary_temp(void **p, void *end,
1078 struct ceph_osdmap *map) 1274 struct ceph_osdmap *map)
1079{ 1275{
1080 return __decode_primary_temp(p, end, map, true); 1276 return decode_pg_mapping(p, end, &map->primary_temp,
1277 __decode_primary_temp, true);
1081} 1278}
1082 1279
1083u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd) 1280u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
@@ -1168,6 +1365,75 @@ e_inval:
1168 return -EINVAL; 1365 return -EINVAL;
1169} 1366}
1170 1367
1368static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end,
1369 bool __unused)
1370{
1371 return __decode_pg_temp(p, end, false);
1372}
1373
1374static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1375{
1376 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1377 false);
1378}
1379
1380static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1381{
1382 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
1383 true);
1384}
1385
1386static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
1387{
1388 return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true);
1389}
1390
1391static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end,
1392 bool __unused)
1393{
1394 struct ceph_pg_mapping *pg;
1395 u32 len, i;
1396
1397 ceph_decode_32_safe(p, end, len, e_inval);
1398 if (len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32)))
1399 return ERR_PTR(-EINVAL);
1400
1401 ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval);
1402 pg = kzalloc(sizeof(*pg) + 2 * len * sizeof(u32), GFP_NOIO);
1403 if (!pg)
1404 return ERR_PTR(-ENOMEM);
1405
1406 pg->pg_upmap_items.len = len;
1407 for (i = 0; i < len; i++) {
1408 pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p);
1409 pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p);
1410 }
1411
1412 return pg;
1413
1414e_inval:
1415 return ERR_PTR(-EINVAL);
1416}
1417
1418static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map)
1419{
1420 return decode_pg_mapping(p, end, &map->pg_upmap_items,
1421 __decode_pg_upmap_items, false);
1422}
1423
1424static int decode_new_pg_upmap_items(void **p, void *end,
1425 struct ceph_osdmap *map)
1426{
1427 return decode_pg_mapping(p, end, &map->pg_upmap_items,
1428 __decode_pg_upmap_items, true);
1429}
1430
1431static int decode_old_pg_upmap_items(void **p, void *end,
1432 struct ceph_osdmap *map)
1433{
1434 return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true);
1435}
1436
1171/* 1437/*
1172 * decode a full map. 1438 * decode a full map.
1173 */ 1439 */
@@ -1218,13 +1484,21 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
1218 1484
1219 /* osd_state, osd_weight, osd_addrs->client_addr */ 1485 /* osd_state, osd_weight, osd_addrs->client_addr */
1220 ceph_decode_need(p, end, 3*sizeof(u32) + 1486 ceph_decode_need(p, end, 3*sizeof(u32) +
1221 map->max_osd*(1 + sizeof(*map->osd_weight) + 1487 map->max_osd*((struct_v >= 5 ? sizeof(u32) :
1488 sizeof(u8)) +
1489 sizeof(*map->osd_weight) +
1222 sizeof(*map->osd_addr)), e_inval); 1490 sizeof(*map->osd_addr)), e_inval);
1223 1491
1224 if (ceph_decode_32(p) != map->max_osd) 1492 if (ceph_decode_32(p) != map->max_osd)
1225 goto e_inval; 1493 goto e_inval;
1226 1494
1227 ceph_decode_copy(p, map->osd_state, map->max_osd); 1495 if (struct_v >= 5) {
1496 for (i = 0; i < map->max_osd; i++)
1497 map->osd_state[i] = ceph_decode_32(p);
1498 } else {
1499 for (i = 0; i < map->max_osd; i++)
1500 map->osd_state[i] = ceph_decode_8(p);
1501 }
1228 1502
1229 if (ceph_decode_32(p) != map->max_osd) 1503 if (ceph_decode_32(p) != map->max_osd)
1230 goto e_inval; 1504 goto e_inval;
@@ -1257,9 +1531,7 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
1257 if (err) 1531 if (err)
1258 goto bad; 1532 goto bad;
1259 } else { 1533 } else {
1260 /* XXX can this happen? */ 1534 WARN_ON(map->osd_primary_affinity);
1261 kfree(map->osd_primary_affinity);
1262 map->osd_primary_affinity = NULL;
1263 } 1535 }
1264 1536
1265 /* crush */ 1537 /* crush */
@@ -1268,6 +1540,26 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
1268 if (err) 1540 if (err)
1269 goto bad; 1541 goto bad;
1270 1542
1543 *p += len;
1544 if (struct_v >= 3) {
1545 /* erasure_code_profiles */
1546 ceph_decode_skip_map_of_map(p, end, string, string, string,
1547 bad);
1548 }
1549
1550 if (struct_v >= 4) {
1551 err = decode_pg_upmap(p, end, map);
1552 if (err)
1553 goto bad;
1554
1555 err = decode_pg_upmap_items(p, end, map);
1556 if (err)
1557 goto bad;
1558 } else {
1559 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap));
1560 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items));
1561 }
1562
1271 /* ignore the rest */ 1563 /* ignore the rest */
1272 *p = end; 1564 *p = end;
1273 1565
@@ -1314,7 +1606,7 @@ struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
1314 * new_up_client: { osd=6, addr=... } # set osd_state and addr 1606 * new_up_client: { osd=6, addr=... } # set osd_state and addr
1315 * new_state: { osd=6, xorstate=EXISTS } # clear osd_state 1607 * new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1316 */ 1608 */
1317static int decode_new_up_state_weight(void **p, void *end, 1609static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
1318 struct ceph_osdmap *map) 1610 struct ceph_osdmap *map)
1319{ 1611{
1320 void *new_up_client; 1612 void *new_up_client;
@@ -1330,7 +1622,7 @@ static int decode_new_up_state_weight(void **p, void *end,
1330 1622
1331 new_state = *p; 1623 new_state = *p;
1332 ceph_decode_32_safe(p, end, len, e_inval); 1624 ceph_decode_32_safe(p, end, len, e_inval);
1333 len *= sizeof(u32) + sizeof(u8); 1625 len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8));
1334 ceph_decode_need(p, end, len, e_inval); 1626 ceph_decode_need(p, end, len, e_inval);
1335 *p += len; 1627 *p += len;
1336 1628
@@ -1366,11 +1658,14 @@ static int decode_new_up_state_weight(void **p, void *end,
1366 len = ceph_decode_32(p); 1658 len = ceph_decode_32(p);
1367 while (len--) { 1659 while (len--) {
1368 s32 osd; 1660 s32 osd;
1369 u8 xorstate; 1661 u32 xorstate;
1370 int ret; 1662 int ret;
1371 1663
1372 osd = ceph_decode_32(p); 1664 osd = ceph_decode_32(p);
1373 xorstate = ceph_decode_8(p); 1665 if (struct_v >= 5)
1666 xorstate = ceph_decode_32(p);
1667 else
1668 xorstate = ceph_decode_8(p);
1374 if (xorstate == 0) 1669 if (xorstate == 0)
1375 xorstate = CEPH_OSD_UP; 1670 xorstate = CEPH_OSD_UP;
1376 BUG_ON(osd >= map->max_osd); 1671 BUG_ON(osd >= map->max_osd);
@@ -1504,7 +1799,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
1504 } 1799 }
1505 1800
1506 /* new_up_client, new_state, new_weight */ 1801 /* new_up_client, new_state, new_weight */
1507 err = decode_new_up_state_weight(p, end, map); 1802 err = decode_new_up_state_weight(p, end, struct_v, map);
1508 if (err) 1803 if (err)
1509 goto bad; 1804 goto bad;
1510 1805
@@ -1527,6 +1822,32 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
1527 goto bad; 1822 goto bad;
1528 } 1823 }
1529 1824
1825 if (struct_v >= 3) {
1826 /* new_erasure_code_profiles */
1827 ceph_decode_skip_map_of_map(p, end, string, string, string,
1828 bad);
1829 /* old_erasure_code_profiles */
1830 ceph_decode_skip_set(p, end, string, bad);
1831 }
1832
1833 if (struct_v >= 4) {
1834 err = decode_new_pg_upmap(p, end, map);
1835 if (err)
1836 goto bad;
1837
1838 err = decode_old_pg_upmap(p, end, map);
1839 if (err)
1840 goto bad;
1841
1842 err = decode_new_pg_upmap_items(p, end, map);
1843 if (err)
1844 goto bad;
1845
1846 err = decode_old_pg_upmap_items(p, end, map);
1847 if (err)
1848 goto bad;
1849 }
1850
1530 /* ignore the rest */ 1851 /* ignore the rest */
1531 *p = end; 1852 *p = end;
1532 1853
@@ -1547,12 +1868,13 @@ bad:
1547void ceph_oloc_copy(struct ceph_object_locator *dest, 1868void ceph_oloc_copy(struct ceph_object_locator *dest,
1548 const struct ceph_object_locator *src) 1869 const struct ceph_object_locator *src)
1549{ 1870{
1550 WARN_ON(!ceph_oloc_empty(dest)); 1871 ceph_oloc_destroy(dest);
1551 WARN_ON(dest->pool_ns); /* empty() only covers ->pool */
1552 1872
1553 dest->pool = src->pool; 1873 dest->pool = src->pool;
1554 if (src->pool_ns) 1874 if (src->pool_ns)
1555 dest->pool_ns = ceph_get_string(src->pool_ns); 1875 dest->pool_ns = ceph_get_string(src->pool_ns);
1876 else
1877 dest->pool_ns = NULL;
1556} 1878}
1557EXPORT_SYMBOL(ceph_oloc_copy); 1879EXPORT_SYMBOL(ceph_oloc_copy);
1558 1880
@@ -1565,14 +1887,15 @@ EXPORT_SYMBOL(ceph_oloc_destroy);
1565void ceph_oid_copy(struct ceph_object_id *dest, 1887void ceph_oid_copy(struct ceph_object_id *dest,
1566 const struct ceph_object_id *src) 1888 const struct ceph_object_id *src)
1567{ 1889{
1568 WARN_ON(!ceph_oid_empty(dest)); 1890 ceph_oid_destroy(dest);
1569 1891
1570 if (src->name != src->inline_name) { 1892 if (src->name != src->inline_name) {
1571 /* very rare, see ceph_object_id definition */ 1893 /* very rare, see ceph_object_id definition */
1572 dest->name = kmalloc(src->name_len + 1, 1894 dest->name = kmalloc(src->name_len + 1,
1573 GFP_NOIO | __GFP_NOFAIL); 1895 GFP_NOIO | __GFP_NOFAIL);
1896 } else {
1897 dest->name = dest->inline_name;
1574 } 1898 }
1575
1576 memcpy(dest->name, src->name, src->name_len + 1); 1899 memcpy(dest->name, src->name, src->name_len + 1);
1577 dest->name_len = src->name_len; 1900 dest->name_len = src->name_len;
1578} 1901}
@@ -1714,9 +2037,8 @@ void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
1714 dest->primary = src->primary; 2037 dest->primary = src->primary;
1715} 2038}
1716 2039
1717static bool is_split(const struct ceph_pg *pgid, 2040bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
1718 u32 old_pg_num, 2041 u32 new_pg_num)
1719 u32 new_pg_num)
1720{ 2042{
1721 int old_bits = calc_bits_of(old_pg_num); 2043 int old_bits = calc_bits_of(old_pg_num);
1722 int old_mask = (1 << old_bits) - 1; 2044 int old_mask = (1 << old_bits) - 1;
@@ -1761,7 +2083,7 @@ bool ceph_is_new_interval(const struct ceph_osds *old_acting,
1761 !osds_equal(old_up, new_up) || 2083 !osds_equal(old_up, new_up) ||
1762 old_size != new_size || 2084 old_size != new_size ||
1763 old_min_size != new_min_size || 2085 old_min_size != new_min_size ||
1764 is_split(pgid, old_pg_num, new_pg_num) || 2086 ceph_pg_is_split(pgid, old_pg_num, new_pg_num) ||
1765 old_sort_bitwise != new_sort_bitwise; 2087 old_sort_bitwise != new_sort_bitwise;
1766} 2088}
1767 2089
@@ -1885,16 +2207,12 @@ EXPORT_SYMBOL(ceph_calc_file_object_mapping);
1885 * Should only be called with target_oid and target_oloc (as opposed to 2207 * Should only be called with target_oid and target_oloc (as opposed to
1886 * base_oid and base_oloc), since tiering isn't taken into account. 2208 * base_oid and base_oloc), since tiering isn't taken into account.
1887 */ 2209 */
1888int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, 2210int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
1889 struct ceph_object_id *oid, 2211 const struct ceph_object_id *oid,
1890 struct ceph_object_locator *oloc, 2212 const struct ceph_object_locator *oloc,
1891 struct ceph_pg *raw_pgid) 2213 struct ceph_pg *raw_pgid)
1892{ 2214{
1893 struct ceph_pg_pool_info *pi; 2215 WARN_ON(pi->id != oloc->pool);
1894
1895 pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
1896 if (!pi)
1897 return -ENOENT;
1898 2216
1899 if (!oloc->pool_ns) { 2217 if (!oloc->pool_ns) {
1900 raw_pgid->pool = oloc->pool; 2218 raw_pgid->pool = oloc->pool;
@@ -1926,6 +2244,20 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
1926 } 2244 }
1927 return 0; 2245 return 0;
1928} 2246}
2247
2248int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
2249 const struct ceph_object_id *oid,
2250 const struct ceph_object_locator *oloc,
2251 struct ceph_pg *raw_pgid)
2252{
2253 struct ceph_pg_pool_info *pi;
2254
2255 pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
2256 if (!pi)
2257 return -ENOENT;
2258
2259 return __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
2260}
1929EXPORT_SYMBOL(ceph_object_locator_to_pg); 2261EXPORT_SYMBOL(ceph_object_locator_to_pg);
1930 2262
1931/* 2263/*
@@ -1970,23 +2302,57 @@ static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
1970 2302
1971static int do_crush(struct ceph_osdmap *map, int ruleno, int x, 2303static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
1972 int *result, int result_max, 2304 int *result, int result_max,
1973 const __u32 *weight, int weight_max) 2305 const __u32 *weight, int weight_max,
2306 u64 choose_args_index)
1974{ 2307{
2308 struct crush_choose_arg_map *arg_map;
1975 int r; 2309 int r;
1976 2310
1977 BUG_ON(result_max > CEPH_PG_MAX_SIZE); 2311 BUG_ON(result_max > CEPH_PG_MAX_SIZE);
1978 2312
2313 arg_map = lookup_choose_arg_map(&map->crush->choose_args,
2314 choose_args_index);
2315
1979 mutex_lock(&map->crush_workspace_mutex); 2316 mutex_lock(&map->crush_workspace_mutex);
1980 r = crush_do_rule(map->crush, ruleno, x, result, result_max, 2317 r = crush_do_rule(map->crush, ruleno, x, result, result_max,
1981 weight, weight_max, map->crush_workspace); 2318 weight, weight_max, map->crush_workspace,
2319 arg_map ? arg_map->args : NULL);
1982 mutex_unlock(&map->crush_workspace_mutex); 2320 mutex_unlock(&map->crush_workspace_mutex);
1983 2321
1984 return r; 2322 return r;
1985} 2323}
1986 2324
2325static void remove_nonexistent_osds(struct ceph_osdmap *osdmap,
2326 struct ceph_pg_pool_info *pi,
2327 struct ceph_osds *set)
2328{
2329 int i;
2330
2331 if (ceph_can_shift_osds(pi)) {
2332 int removed = 0;
2333
2334 /* shift left */
2335 for (i = 0; i < set->size; i++) {
2336 if (!ceph_osd_exists(osdmap, set->osds[i])) {
2337 removed++;
2338 continue;
2339 }
2340 if (removed)
2341 set->osds[i - removed] = set->osds[i];
2342 }
2343 set->size -= removed;
2344 } else {
2345 /* set dne devices to NONE */
2346 for (i = 0; i < set->size; i++) {
2347 if (!ceph_osd_exists(osdmap, set->osds[i]))
2348 set->osds[i] = CRUSH_ITEM_NONE;
2349 }
2350 }
2351}
2352
1987/* 2353/*
1988 * Calculate raw set (CRUSH output) for given PG. The result may 2354 * Calculate raw set (CRUSH output) for given PG and filter out
1989 * contain nonexistent OSDs. ->primary is undefined for a raw set. 2355 * nonexistent OSDs. ->primary is undefined for a raw set.
1990 * 2356 *
1991 * Placement seed (CRUSH input) is returned through @ppps. 2357 * Placement seed (CRUSH input) is returned through @ppps.
1992 */ 2358 */
@@ -2020,7 +2386,7 @@ static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
2020 } 2386 }
2021 2387
2022 len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size, 2388 len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
2023 osdmap->osd_weight, osdmap->max_osd); 2389 osdmap->osd_weight, osdmap->max_osd, pi->id);
2024 if (len < 0) { 2390 if (len < 0) {
2025 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", 2391 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
2026 len, ruleno, pi->id, pi->crush_ruleset, pi->type, 2392 len, ruleno, pi->id, pi->crush_ruleset, pi->type,
@@ -2029,6 +2395,70 @@ static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
2029 } 2395 }
2030 2396
2031 raw->size = len; 2397 raw->size = len;
2398 remove_nonexistent_osds(osdmap, pi, raw);
2399}
2400
2401/* apply pg_upmap[_items] mappings */
2402static void apply_upmap(struct ceph_osdmap *osdmap,
2403 const struct ceph_pg *pgid,
2404 struct ceph_osds *raw)
2405{
2406 struct ceph_pg_mapping *pg;
2407 int i, j;
2408
2409 pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid);
2410 if (pg) {
2411 /* make sure targets aren't marked out */
2412 for (i = 0; i < pg->pg_upmap.len; i++) {
2413 int osd = pg->pg_upmap.osds[i];
2414
2415 if (osd != CRUSH_ITEM_NONE &&
2416 osd < osdmap->max_osd &&
2417 osdmap->osd_weight[osd] == 0) {
2418 /* reject/ignore explicit mapping */
2419 return;
2420 }
2421 }
2422 for (i = 0; i < pg->pg_upmap.len; i++)
2423 raw->osds[i] = pg->pg_upmap.osds[i];
2424 raw->size = pg->pg_upmap.len;
2425 return;
2426 }
2427
2428 pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid);
2429 if (pg) {
2430 /*
2431 * Note: this approach does not allow a bidirectional swap,
2432 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
2433 */
2434 for (i = 0; i < pg->pg_upmap_items.len; i++) {
2435 int from = pg->pg_upmap_items.from_to[i][0];
2436 int to = pg->pg_upmap_items.from_to[i][1];
2437 int pos = -1;
2438 bool exists = false;
2439
2440 /* make sure replacement doesn't already appear */
2441 for (j = 0; j < raw->size; j++) {
2442 int osd = raw->osds[j];
2443
2444 if (osd == to) {
2445 exists = true;
2446 break;
2447 }
2448 /* ignore mapping if target is marked out */
2449 if (osd == from && pos < 0 &&
2450 !(to != CRUSH_ITEM_NONE &&
2451 to < osdmap->max_osd &&
2452 osdmap->osd_weight[to] == 0)) {
2453 pos = j;
2454 }
2455 }
2456 if (!exists && pos >= 0) {
2457 raw->osds[pos] = to;
2458 return;
2459 }
2460 }
2461 }
2032} 2462}
2033 2463
2034/* 2464/*
@@ -2151,18 +2581,16 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap,
2151 */ 2581 */
2152static void get_temp_osds(struct ceph_osdmap *osdmap, 2582static void get_temp_osds(struct ceph_osdmap *osdmap,
2153 struct ceph_pg_pool_info *pi, 2583 struct ceph_pg_pool_info *pi,
2154 const struct ceph_pg *raw_pgid, 2584 const struct ceph_pg *pgid,
2155 struct ceph_osds *temp) 2585 struct ceph_osds *temp)
2156{ 2586{
2157 struct ceph_pg pgid;
2158 struct ceph_pg_mapping *pg; 2587 struct ceph_pg_mapping *pg;
2159 int i; 2588 int i;
2160 2589
2161 raw_pg_to_pg(pi, raw_pgid, &pgid);
2162 ceph_osds_init(temp); 2590 ceph_osds_init(temp);
2163 2591
2164 /* pg_temp? */ 2592 /* pg_temp? */
2165 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); 2593 pg = lookup_pg_mapping(&osdmap->pg_temp, pgid);
2166 if (pg) { 2594 if (pg) {
2167 for (i = 0; i < pg->pg_temp.len; i++) { 2595 for (i = 0; i < pg->pg_temp.len; i++) {
2168 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) { 2596 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
@@ -2185,7 +2613,7 @@ static void get_temp_osds(struct ceph_osdmap *osdmap,
2185 } 2613 }
2186 2614
2187 /* primary_temp? */ 2615 /* primary_temp? */
2188 pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid); 2616 pg = lookup_pg_mapping(&osdmap->primary_temp, pgid);
2189 if (pg) 2617 if (pg)
2190 temp->primary = pg->primary_temp.osd; 2618 temp->primary = pg->primary_temp.osd;
2191} 2619}
@@ -2198,43 +2626,75 @@ static void get_temp_osds(struct ceph_osdmap *osdmap,
2198 * resend a request. 2626 * resend a request.
2199 */ 2627 */
2200void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap, 2628void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
2629 struct ceph_pg_pool_info *pi,
2201 const struct ceph_pg *raw_pgid, 2630 const struct ceph_pg *raw_pgid,
2202 struct ceph_osds *up, 2631 struct ceph_osds *up,
2203 struct ceph_osds *acting) 2632 struct ceph_osds *acting)
2204{ 2633{
2205 struct ceph_pg_pool_info *pi; 2634 struct ceph_pg pgid;
2206 u32 pps; 2635 u32 pps;
2207 2636
2208 pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool); 2637 WARN_ON(pi->id != raw_pgid->pool);
2209 if (!pi) { 2638 raw_pg_to_pg(pi, raw_pgid, &pgid);
2210 ceph_osds_init(up);
2211 ceph_osds_init(acting);
2212 goto out;
2213 }
2214 2639
2215 pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps); 2640 pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
2641 apply_upmap(osdmap, &pgid, up);
2216 raw_to_up_osds(osdmap, pi, up); 2642 raw_to_up_osds(osdmap, pi, up);
2217 apply_primary_affinity(osdmap, pi, pps, up); 2643 apply_primary_affinity(osdmap, pi, pps, up);
2218 get_temp_osds(osdmap, pi, raw_pgid, acting); 2644 get_temp_osds(osdmap, pi, &pgid, acting);
2219 if (!acting->size) { 2645 if (!acting->size) {
2220 memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0])); 2646 memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
2221 acting->size = up->size; 2647 acting->size = up->size;
2222 if (acting->primary == -1) 2648 if (acting->primary == -1)
2223 acting->primary = up->primary; 2649 acting->primary = up->primary;
2224 } 2650 }
2225out:
2226 WARN_ON(!osds_valid(up) || !osds_valid(acting)); 2651 WARN_ON(!osds_valid(up) || !osds_valid(acting));
2227} 2652}
2228 2653
2654bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
2655 struct ceph_pg_pool_info *pi,
2656 const struct ceph_pg *raw_pgid,
2657 struct ceph_spg *spgid)
2658{
2659 struct ceph_pg pgid;
2660 struct ceph_osds up, acting;
2661 int i;
2662
2663 WARN_ON(pi->id != raw_pgid->pool);
2664 raw_pg_to_pg(pi, raw_pgid, &pgid);
2665
2666 if (ceph_can_shift_osds(pi)) {
2667 spgid->pgid = pgid; /* struct */
2668 spgid->shard = CEPH_SPG_NOSHARD;
2669 return true;
2670 }
2671
2672 ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting);
2673 for (i = 0; i < acting.size; i++) {
2674 if (acting.osds[i] == acting.primary) {
2675 spgid->pgid = pgid; /* struct */
2676 spgid->shard = i;
2677 return true;
2678 }
2679 }
2680
2681 return false;
2682}
2683
2229/* 2684/*
2230 * Return acting primary for given PG, or -1 if none. 2685 * Return acting primary for given PG, or -1 if none.
2231 */ 2686 */
2232int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap, 2687int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
2233 const struct ceph_pg *raw_pgid) 2688 const struct ceph_pg *raw_pgid)
2234{ 2689{
2690 struct ceph_pg_pool_info *pi;
2235 struct ceph_osds up, acting; 2691 struct ceph_osds up, acting;
2236 2692
2237 ceph_pg_to_up_acting_osds(osdmap, raw_pgid, &up, &acting); 2693 pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
2694 if (!pi)
2695 return -1;
2696
2697 ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting);
2238 return acting.primary; 2698 return acting.primary;
2239} 2699}
2240EXPORT_SYMBOL(ceph_pg_to_acting_primary); 2700EXPORT_SYMBOL(ceph_pg_to_acting_primary);