aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/addr.c11
-rw-r--r--fs/ceph/auth.c9
-rw-r--r--fs/ceph/auth.h2
-rw-r--r--fs/ceph/auth_none.c1
-rw-r--r--fs/ceph/auth_x.c19
-rw-r--r--fs/ceph/caps.c24
-rw-r--r--fs/ceph/ceph_fs.h62
-rw-r--r--fs/ceph/ceph_strings.c16
-rw-r--r--fs/ceph/debugfs.c13
-rw-r--r--fs/ceph/dir.c45
-rw-r--r--fs/ceph/export.c14
-rw-r--r--fs/ceph/file.c16
-rw-r--r--fs/ceph/inode.c97
-rw-r--r--fs/ceph/ioctl.c2
-rw-r--r--fs/ceph/mds_client.c385
-rw-r--r--fs/ceph/mds_client.h6
-rw-r--r--fs/ceph/messenger.c91
-rw-r--r--fs/ceph/messenger.h10
-rw-r--r--fs/ceph/mon_client.c257
-rw-r--r--fs/ceph/mon_client.h27
-rw-r--r--fs/ceph/msgpool.c180
-rw-r--r--fs/ceph/msgpool.h12
-rw-r--r--fs/ceph/msgr.h21
-rw-r--r--fs/ceph/osd_client.c98
-rw-r--r--fs/ceph/pagelist.c2
-rw-r--r--fs/ceph/rados.h23
-rw-r--r--fs/ceph/snap.c2
-rw-r--r--fs/ceph/super.c125
-rw-r--r--fs/ceph/super.h30
-rw-r--r--fs/ceph/xattr.c35
30 files changed, 876 insertions, 759 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index a9005d862ed4..d9c60b84949a 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -274,7 +274,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
274 struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc; 274 struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
275 int rc = 0; 275 int rc = 0;
276 struct page **pages; 276 struct page **pages;
277 struct pagevec pvec;
278 loff_t offset; 277 loff_t offset;
279 u64 len; 278 u64 len;
280 279
@@ -297,8 +296,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
297 if (rc < 0) 296 if (rc < 0)
298 goto out; 297 goto out;
299 298
300 /* set uptodate and add to lru in pagevec-sized chunks */
301 pagevec_init(&pvec, 0);
302 for (; !list_empty(page_list) && len > 0; 299 for (; !list_empty(page_list) && len > 0;
303 rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) { 300 rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) {
304 struct page *page = 301 struct page *page =
@@ -312,7 +309,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
312 zero_user_segment(page, s, PAGE_CACHE_SIZE); 309 zero_user_segment(page, s, PAGE_CACHE_SIZE);
313 } 310 }
314 311
315 if (add_to_page_cache(page, mapping, page->index, GFP_NOFS)) { 312 if (add_to_page_cache_lru(page, mapping, page->index, GFP_NOFS)) {
316 page_cache_release(page); 313 page_cache_release(page);
317 dout("readpages %p add_to_page_cache failed %p\n", 314 dout("readpages %p add_to_page_cache failed %p\n",
318 inode, page); 315 inode, page);
@@ -323,10 +320,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
323 flush_dcache_page(page); 320 flush_dcache_page(page);
324 SetPageUptodate(page); 321 SetPageUptodate(page);
325 unlock_page(page); 322 unlock_page(page);
326 if (pagevec_add(&pvec, page) == 0) 323 page_cache_release(page);
327 pagevec_lru_add_file(&pvec); /* add to lru */
328 } 324 }
329 pagevec_lru_add_file(&pvec);
330 rc = 0; 325 rc = 0;
331 326
332out: 327out:
@@ -568,7 +563,7 @@ static void writepages_finish(struct ceph_osd_request *req,
568 ceph_release_pages(req->r_pages, req->r_num_pages); 563 ceph_release_pages(req->r_pages, req->r_num_pages);
569 if (req->r_pages_from_pool) 564 if (req->r_pages_from_pool)
570 mempool_free(req->r_pages, 565 mempool_free(req->r_pages,
571 ceph_client(inode->i_sb)->wb_pagevec_pool); 566 ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
572 else 567 else
573 kfree(req->r_pages); 568 kfree(req->r_pages);
574 ceph_osdc_put_request(req); 569 ceph_osdc_put_request(req);
diff --git a/fs/ceph/auth.c b/fs/ceph/auth.c
index 818afe72e6c7..9f46de2ba7a7 100644
--- a/fs/ceph/auth.c
+++ b/fs/ceph/auth.c
@@ -150,7 +150,8 @@ int ceph_build_auth_request(struct ceph_auth_client *ac,
150 150
151 ret = ac->ops->build_request(ac, p + sizeof(u32), end); 151 ret = ac->ops->build_request(ac, p + sizeof(u32), end);
152 if (ret < 0) { 152 if (ret < 0) {
153 pr_err("error %d building request\n", ret); 153 pr_err("error %d building auth method %s request\n", ret,
154 ac->ops->name);
154 return ret; 155 return ret;
155 } 156 }
156 dout(" built request %d bytes\n", ret); 157 dout(" built request %d bytes\n", ret);
@@ -216,8 +217,8 @@ int ceph_handle_auth_reply(struct ceph_auth_client *ac,
216 if (ac->protocol != protocol) { 217 if (ac->protocol != protocol) {
217 ret = ceph_auth_init_protocol(ac, protocol); 218 ret = ceph_auth_init_protocol(ac, protocol);
218 if (ret) { 219 if (ret) {
219 pr_err("error %d on auth protocol %d init\n", 220 pr_err("error %d on auth method %s init\n",
220 ret, protocol); 221 ret, ac->ops->name);
221 goto out; 222 goto out;
222 } 223 }
223 } 224 }
@@ -229,7 +230,7 @@ int ceph_handle_auth_reply(struct ceph_auth_client *ac,
229 if (ret == -EAGAIN) { 230 if (ret == -EAGAIN) {
230 return ceph_build_auth_request(ac, reply_buf, reply_len); 231 return ceph_build_auth_request(ac, reply_buf, reply_len);
231 } else if (ret) { 232 } else if (ret) {
232 pr_err("authentication error %d\n", ret); 233 pr_err("auth method '%s' error %d\n", ac->ops->name, ret);
233 return ret; 234 return ret;
234 } 235 }
235 return 0; 236 return 0;
diff --git a/fs/ceph/auth.h b/fs/ceph/auth.h
index ca4f57cfb267..4429a707c021 100644
--- a/fs/ceph/auth.h
+++ b/fs/ceph/auth.h
@@ -15,6 +15,8 @@ struct ceph_auth_client;
15struct ceph_authorizer; 15struct ceph_authorizer;
16 16
17struct ceph_auth_client_ops { 17struct ceph_auth_client_ops {
18 const char *name;
19
18 /* 20 /*
19 * true if we are authenticated and can connect to 21 * true if we are authenticated and can connect to
20 * services. 22 * services.
diff --git a/fs/ceph/auth_none.c b/fs/ceph/auth_none.c
index 8cd9e3af07f7..24407c119291 100644
--- a/fs/ceph/auth_none.c
+++ b/fs/ceph/auth_none.c
@@ -94,6 +94,7 @@ static void ceph_auth_none_destroy_authorizer(struct ceph_auth_client *ac,
94} 94}
95 95
96static const struct ceph_auth_client_ops ceph_auth_none_ops = { 96static const struct ceph_auth_client_ops ceph_auth_none_ops = {
97 .name = "none",
97 .reset = reset, 98 .reset = reset,
98 .destroy = destroy, 99 .destroy = destroy,
99 .is_authenticated = is_authenticated, 100 .is_authenticated = is_authenticated,
diff --git a/fs/ceph/auth_x.c b/fs/ceph/auth_x.c
index fee5a08da881..7b206231566d 100644
--- a/fs/ceph/auth_x.c
+++ b/fs/ceph/auth_x.c
@@ -127,7 +127,7 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac,
127 int ret; 127 int ret;
128 char *dbuf; 128 char *dbuf;
129 char *ticket_buf; 129 char *ticket_buf;
130 u8 struct_v; 130 u8 reply_struct_v;
131 131
132 dbuf = kmalloc(TEMP_TICKET_BUF_LEN, GFP_NOFS); 132 dbuf = kmalloc(TEMP_TICKET_BUF_LEN, GFP_NOFS);
133 if (!dbuf) 133 if (!dbuf)
@@ -139,14 +139,14 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac,
139 goto out_dbuf; 139 goto out_dbuf;
140 140
141 ceph_decode_need(&p, end, 1 + sizeof(u32), bad); 141 ceph_decode_need(&p, end, 1 + sizeof(u32), bad);
142 struct_v = ceph_decode_8(&p); 142 reply_struct_v = ceph_decode_8(&p);
143 if (struct_v != 1) 143 if (reply_struct_v != 1)
144 goto bad; 144 goto bad;
145 num = ceph_decode_32(&p); 145 num = ceph_decode_32(&p);
146 dout("%d tickets\n", num); 146 dout("%d tickets\n", num);
147 while (num--) { 147 while (num--) {
148 int type; 148 int type;
149 u8 struct_v; 149 u8 tkt_struct_v, blob_struct_v;
150 struct ceph_x_ticket_handler *th; 150 struct ceph_x_ticket_handler *th;
151 void *dp, *dend; 151 void *dp, *dend;
152 int dlen; 152 int dlen;
@@ -165,8 +165,8 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac,
165 type = ceph_decode_32(&p); 165 type = ceph_decode_32(&p);
166 dout(" ticket type %d %s\n", type, ceph_entity_type_name(type)); 166 dout(" ticket type %d %s\n", type, ceph_entity_type_name(type));
167 167
168 struct_v = ceph_decode_8(&p); 168 tkt_struct_v = ceph_decode_8(&p);
169 if (struct_v != 1) 169 if (tkt_struct_v != 1)
170 goto bad; 170 goto bad;
171 171
172 th = get_ticket_handler(ac, type); 172 th = get_ticket_handler(ac, type);
@@ -186,8 +186,8 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac,
186 dend = dbuf + dlen; 186 dend = dbuf + dlen;
187 dp = dbuf; 187 dp = dbuf;
188 188
189 struct_v = ceph_decode_8(&dp); 189 tkt_struct_v = ceph_decode_8(&dp);
190 if (struct_v != 1) 190 if (tkt_struct_v != 1)
191 goto bad; 191 goto bad;
192 192
193 memcpy(&old_key, &th->session_key, sizeof(old_key)); 193 memcpy(&old_key, &th->session_key, sizeof(old_key));
@@ -224,7 +224,7 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac,
224 tpend = tp + dlen; 224 tpend = tp + dlen;
225 dout(" ticket blob is %d bytes\n", dlen); 225 dout(" ticket blob is %d bytes\n", dlen);
226 ceph_decode_need(&tp, tpend, 1 + sizeof(u64), bad); 226 ceph_decode_need(&tp, tpend, 1 + sizeof(u64), bad);
227 struct_v = ceph_decode_8(&tp); 227 blob_struct_v = ceph_decode_8(&tp);
228 new_secret_id = ceph_decode_64(&tp); 228 new_secret_id = ceph_decode_64(&tp);
229 ret = ceph_decode_buffer(&new_ticket_blob, &tp, tpend); 229 ret = ceph_decode_buffer(&new_ticket_blob, &tp, tpend);
230 if (ret) 230 if (ret)
@@ -618,6 +618,7 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
618 618
619 619
620static const struct ceph_auth_client_ops ceph_x_ops = { 620static const struct ceph_auth_client_ops ceph_x_ops = {
621 .name = "x",
621 .is_authenticated = ceph_x_is_authenticated, 622 .is_authenticated = ceph_x_is_authenticated,
622 .build_request = ceph_x_build_request, 623 .build_request = ceph_x_build_request,
623 .handle_reply = ceph_x_handle_reply, 624 .handle_reply = ceph_x_handle_reply,
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index d9400534b279..0dd0b81e64f7 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -867,7 +867,8 @@ void __ceph_remove_cap(struct ceph_cap *cap)
867{ 867{
868 struct ceph_mds_session *session = cap->session; 868 struct ceph_mds_session *session = cap->session;
869 struct ceph_inode_info *ci = cap->ci; 869 struct ceph_inode_info *ci = cap->ci;
870 struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc; 870 struct ceph_mds_client *mdsc =
871 &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
871 int removed = 0; 872 int removed = 0;
872 873
873 dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); 874 dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
@@ -937,9 +938,9 @@ static int send_cap_msg(struct ceph_mds_session *session,
937 seq, issue_seq, mseq, follows, size, max_size, 938 seq, issue_seq, mseq, follows, size, max_size,
938 xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); 939 xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
939 940
940 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, NULL); 941 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS);
941 if (IS_ERR(msg)) 942 if (!msg)
942 return PTR_ERR(msg); 943 return -ENOMEM;
943 944
944 msg->hdr.tid = cpu_to_le64(flush_tid); 945 msg->hdr.tid = cpu_to_le64(flush_tid);
945 946
@@ -1298,7 +1299,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci)
1298 */ 1299 */
1299void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) 1300void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1300{ 1301{
1301 struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc; 1302 struct ceph_mds_client *mdsc =
1303 &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
1302 struct inode *inode = &ci->vfs_inode; 1304 struct inode *inode = &ci->vfs_inode;
1303 int was = ci->i_dirty_caps; 1305 int was = ci->i_dirty_caps;
1304 int dirty = 0; 1306 int dirty = 0;
@@ -1336,7 +1338,7 @@ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1336static int __mark_caps_flushing(struct inode *inode, 1338static int __mark_caps_flushing(struct inode *inode,
1337 struct ceph_mds_session *session) 1339 struct ceph_mds_session *session)
1338{ 1340{
1339 struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; 1341 struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
1340 struct ceph_inode_info *ci = ceph_inode(inode); 1342 struct ceph_inode_info *ci = ceph_inode(inode);
1341 int flushing; 1343 int flushing;
1342 1344
@@ -1663,7 +1665,7 @@ ack:
1663static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, 1665static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session,
1664 unsigned *flush_tid) 1666 unsigned *flush_tid)
1665{ 1667{
1666 struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; 1668 struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
1667 struct ceph_inode_info *ci = ceph_inode(inode); 1669 struct ceph_inode_info *ci = ceph_inode(inode);
1668 int unlock_session = session ? 0 : 1; 1670 int unlock_session = session ? 0 : 1;
1669 int flushing = 0; 1671 int flushing = 0;
@@ -1716,10 +1718,9 @@ out_unlocked:
1716static int caps_are_flushed(struct inode *inode, unsigned tid) 1718static int caps_are_flushed(struct inode *inode, unsigned tid)
1717{ 1719{
1718 struct ceph_inode_info *ci = ceph_inode(inode); 1720 struct ceph_inode_info *ci = ceph_inode(inode);
1719 int dirty, i, ret = 1; 1721 int i, ret = 1;
1720 1722
1721 spin_lock(&inode->i_lock); 1723 spin_lock(&inode->i_lock);
1722 dirty = __ceph_caps_dirty(ci);
1723 for (i = 0; i < CEPH_CAP_BITS; i++) 1724 for (i = 0; i < CEPH_CAP_BITS; i++)
1724 if ((ci->i_flushing_caps & (1 << i)) && 1725 if ((ci->i_flushing_caps & (1 << i)) &&
1725 ci->i_cap_flush_tid[i] <= tid) { 1726 ci->i_cap_flush_tid[i] <= tid) {
@@ -1829,7 +1830,8 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
1829 err = wait_event_interruptible(ci->i_cap_wq, 1830 err = wait_event_interruptible(ci->i_cap_wq,
1830 caps_are_flushed(inode, flush_tid)); 1831 caps_are_flushed(inode, flush_tid));
1831 } else { 1832 } else {
1832 struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; 1833 struct ceph_mds_client *mdsc =
1834 &ceph_sb_to_client(inode->i_sb)->mdsc;
1833 1835
1834 spin_lock(&inode->i_lock); 1836 spin_lock(&inode->i_lock);
1835 if (__ceph_caps_dirty(ci)) 1837 if (__ceph_caps_dirty(ci))
@@ -2411,7 +2413,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
2411 __releases(inode->i_lock) 2413 __releases(inode->i_lock)
2412{ 2414{
2413 struct ceph_inode_info *ci = ceph_inode(inode); 2415 struct ceph_inode_info *ci = ceph_inode(inode);
2414 struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; 2416 struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
2415 unsigned seq = le32_to_cpu(m->seq); 2417 unsigned seq = le32_to_cpu(m->seq);
2416 int dirty = le32_to_cpu(m->dirty); 2418 int dirty = le32_to_cpu(m->dirty);
2417 int cleaned = 0; 2419 int cleaned = 0;
diff --git a/fs/ceph/ceph_fs.h b/fs/ceph/ceph_fs.h
index 0c2241ef3653..3b9eeed097b3 100644
--- a/fs/ceph/ceph_fs.h
+++ b/fs/ceph/ceph_fs.h
@@ -19,7 +19,7 @@
19 * Ceph release version 19 * Ceph release version
20 */ 20 */
21#define CEPH_VERSION_MAJOR 0 21#define CEPH_VERSION_MAJOR 0
22#define CEPH_VERSION_MINOR 19 22#define CEPH_VERSION_MINOR 20
23#define CEPH_VERSION_PATCH 0 23#define CEPH_VERSION_PATCH 0
24 24
25#define _CEPH_STRINGIFY(x) #x 25#define _CEPH_STRINGIFY(x) #x
@@ -36,7 +36,7 @@
36 * client-facing protocol. 36 * client-facing protocol.
37 */ 37 */
38#define CEPH_OSD_PROTOCOL 8 /* cluster internal */ 38#define CEPH_OSD_PROTOCOL 8 /* cluster internal */
39#define CEPH_MDS_PROTOCOL 9 /* cluster internal */ 39#define CEPH_MDS_PROTOCOL 12 /* cluster internal */
40#define CEPH_MON_PROTOCOL 5 /* cluster internal */ 40#define CEPH_MON_PROTOCOL 5 /* cluster internal */
41#define CEPH_OSDC_PROTOCOL 24 /* server/client */ 41#define CEPH_OSDC_PROTOCOL 24 /* server/client */
42#define CEPH_MDSC_PROTOCOL 32 /* server/client */ 42#define CEPH_MDSC_PROTOCOL 32 /* server/client */
@@ -53,8 +53,18 @@
53/* 53/*
54 * feature bits 54 * feature bits
55 */ 55 */
56#define CEPH_FEATURE_SUPPORTED 0 56#define CEPH_FEATURE_UID 1
57#define CEPH_FEATURE_REQUIRED 0 57#define CEPH_FEATURE_NOSRCADDR 2
58#define CEPH_FEATURE_FLOCK 4
59
60#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
61#define CEPH_FEATURE_REQUIRED_MON CEPH_FEATURE_UID
62#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_FLOCK
63#define CEPH_FEATURE_REQUIRED_MDS CEPH_FEATURE_UID
64#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
65#define CEPH_FEATURE_REQUIRED_OSD CEPH_FEATURE_UID
66#define CEPH_FEATURE_SUPPORTED_CLIENT CEPH_FEATURE_NOSRCADDR
67#define CEPH_FEATURE_REQUIRED_CLIENT CEPH_FEATURE_NOSRCADDR
58 68
59 69
60/* 70/*
@@ -91,6 +101,8 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
91#define CEPH_AUTH_NONE 0x1 101#define CEPH_AUTH_NONE 0x1
92#define CEPH_AUTH_CEPHX 0x2 102#define CEPH_AUTH_CEPHX 0x2
93 103
104#define CEPH_AUTH_UID_DEFAULT ((__u64) -1)
105
94 106
95/********************************************* 107/*********************************************
96 * message layer 108 * message layer
@@ -128,11 +140,27 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
128#define CEPH_MSG_CLIENT_SNAP 0x312 140#define CEPH_MSG_CLIENT_SNAP 0x312
129#define CEPH_MSG_CLIENT_CAPRELEASE 0x313 141#define CEPH_MSG_CLIENT_CAPRELEASE 0x313
130 142
143/* pool ops */
144#define CEPH_MSG_POOLOP_REPLY 48
145#define CEPH_MSG_POOLOP 49
146
147
131/* osd */ 148/* osd */
132#define CEPH_MSG_OSD_MAP 41 149#define CEPH_MSG_OSD_MAP 41
133#define CEPH_MSG_OSD_OP 42 150#define CEPH_MSG_OSD_OP 42
134#define CEPH_MSG_OSD_OPREPLY 43 151#define CEPH_MSG_OSD_OPREPLY 43
135 152
153/* pool operations */
154enum {
155 POOL_OP_CREATE = 0x01,
156 POOL_OP_DELETE = 0x02,
157 POOL_OP_AUID_CHANGE = 0x03,
158 POOL_OP_CREATE_SNAP = 0x11,
159 POOL_OP_DELETE_SNAP = 0x12,
160 POOL_OP_CREATE_UNMANAGED_SNAP = 0x21,
161 POOL_OP_DELETE_UNMANAGED_SNAP = 0x22,
162};
163
136struct ceph_mon_request_header { 164struct ceph_mon_request_header {
137 __le64 have_version; 165 __le64 have_version;
138 __le16 session_mon; 166 __le16 session_mon;
@@ -155,6 +183,31 @@ struct ceph_mon_statfs_reply {
155 struct ceph_statfs st; 183 struct ceph_statfs st;
156} __attribute__ ((packed)); 184} __attribute__ ((packed));
157 185
186const char *ceph_pool_op_name(int op);
187
188struct ceph_mon_poolop {
189 struct ceph_mon_request_header monhdr;
190 struct ceph_fsid fsid;
191 __le32 pool;
192 __le32 op;
193 __le64 auid;
194 __le64 snapid;
195 __le32 name_len;
196} __attribute__ ((packed));
197
198struct ceph_mon_poolop_reply {
199 struct ceph_mon_request_header monhdr;
200 struct ceph_fsid fsid;
201 __le32 reply_code;
202 __le32 epoch;
203 char has_data;
204 char data[0];
205} __attribute__ ((packed));
206
207struct ceph_mon_unmanaged_snap {
208 __le64 snapid;
209} __attribute__ ((packed));
210
158struct ceph_osd_getmap { 211struct ceph_osd_getmap {
159 struct ceph_mon_request_header monhdr; 212 struct ceph_mon_request_header monhdr;
160 struct ceph_fsid fsid; 213 struct ceph_fsid fsid;
@@ -308,6 +361,7 @@ union ceph_mds_request_args {
308 struct { 361 struct {
309 __le32 frag; /* which dir fragment */ 362 __le32 frag; /* which dir fragment */
310 __le32 max_entries; /* how many dentries to grab */ 363 __le32 max_entries; /* how many dentries to grab */
364 __le32 max_bytes;
311 } __attribute__ ((packed)) readdir; 365 } __attribute__ ((packed)) readdir;
312 struct { 366 struct {
313 __le32 mode; 367 __le32 mode;
diff --git a/fs/ceph/ceph_strings.c b/fs/ceph/ceph_strings.c
index 8e4be6a80c62..7503aee828ce 100644
--- a/fs/ceph/ceph_strings.c
+++ b/fs/ceph/ceph_strings.c
@@ -10,7 +10,6 @@ const char *ceph_entity_type_name(int type)
10 case CEPH_ENTITY_TYPE_OSD: return "osd"; 10 case CEPH_ENTITY_TYPE_OSD: return "osd";
11 case CEPH_ENTITY_TYPE_MON: return "mon"; 11 case CEPH_ENTITY_TYPE_MON: return "mon";
12 case CEPH_ENTITY_TYPE_CLIENT: return "client"; 12 case CEPH_ENTITY_TYPE_CLIENT: return "client";
13 case CEPH_ENTITY_TYPE_ADMIN: return "admin";
14 case CEPH_ENTITY_TYPE_AUTH: return "auth"; 13 case CEPH_ENTITY_TYPE_AUTH: return "auth";
15 default: return "unknown"; 14 default: return "unknown";
16 } 15 }
@@ -45,6 +44,7 @@ const char *ceph_osd_op_name(int op)
45 case CEPH_OSD_OP_SETXATTRS: return "setxattrs"; 44 case CEPH_OSD_OP_SETXATTRS: return "setxattrs";
46 case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs"; 45 case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs";
47 case CEPH_OSD_OP_RMXATTR: return "rmxattr"; 46 case CEPH_OSD_OP_RMXATTR: return "rmxattr";
47 case CEPH_OSD_OP_CMPXATTR: return "cmpxattr";
48 48
49 case CEPH_OSD_OP_PULL: return "pull"; 49 case CEPH_OSD_OP_PULL: return "pull";
50 case CEPH_OSD_OP_PUSH: return "push"; 50 case CEPH_OSD_OP_PUSH: return "push";
@@ -174,3 +174,17 @@ const char *ceph_snap_op_name(int o)
174 } 174 }
175 return "???"; 175 return "???";
176} 176}
177
178const char *ceph_pool_op_name(int op)
179{
180 switch (op) {
181 case POOL_OP_CREATE: return "create";
182 case POOL_OP_DELETE: return "delete";
183 case POOL_OP_AUID_CHANGE: return "auid change";
184 case POOL_OP_CREATE_SNAP: return "create snap";
185 case POOL_OP_DELETE_SNAP: return "delete snap";
186 case POOL_OP_CREATE_UNMANAGED_SNAP: return "create unmanaged snap";
187 case POOL_OP_DELETE_UNMANAGED_SNAP: return "delete unmanaged snap";
188 }
189 return "???";
190}
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index f7048da92acc..3be33fb066cc 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -113,7 +113,7 @@ static int osdmap_show(struct seq_file *s, void *p)
113static int monc_show(struct seq_file *s, void *p) 113static int monc_show(struct seq_file *s, void *p)
114{ 114{
115 struct ceph_client *client = s->private; 115 struct ceph_client *client = s->private;
116 struct ceph_mon_statfs_request *req; 116 struct ceph_mon_generic_request *req;
117 struct ceph_mon_client *monc = &client->monc; 117 struct ceph_mon_client *monc = &client->monc;
118 struct rb_node *rp; 118 struct rb_node *rp;
119 119
@@ -126,9 +126,14 @@ static int monc_show(struct seq_file *s, void *p)
126 if (monc->want_next_osdmap) 126 if (monc->want_next_osdmap)
127 seq_printf(s, "want next osdmap\n"); 127 seq_printf(s, "want next osdmap\n");
128 128
129 for (rp = rb_first(&monc->statfs_request_tree); rp; rp = rb_next(rp)) { 129 for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) {
130 req = rb_entry(rp, struct ceph_mon_statfs_request, node); 130 __u16 op;
131 seq_printf(s, "%lld statfs\n", req->tid); 131 req = rb_entry(rp, struct ceph_mon_generic_request, node);
132 op = le16_to_cpu(req->request->hdr.type);
133 if (op == CEPH_MSG_STATFS)
134 seq_printf(s, "%lld statfs\n", req->tid);
135 else
136 seq_printf(s, "%lld unknown\n", req->tid);
132 } 137 }
133 138
134 mutex_unlock(&monc->mutex); 139 mutex_unlock(&monc->mutex);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 650d2db5ed26..4fd30900eff7 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -51,8 +51,11 @@ int ceph_init_dentry(struct dentry *dentry)
51 return -ENOMEM; /* oh well */ 51 return -ENOMEM; /* oh well */
52 52
53 spin_lock(&dentry->d_lock); 53 spin_lock(&dentry->d_lock);
54 if (dentry->d_fsdata) /* lost a race */ 54 if (dentry->d_fsdata) {
55 /* lost a race */
56 kmem_cache_free(ceph_dentry_cachep, di);
55 goto out_unlock; 57 goto out_unlock;
58 }
56 di->dentry = dentry; 59 di->dentry = dentry;
57 di->lease_session = NULL; 60 di->lease_session = NULL;
58 dentry->d_fsdata = di; 61 dentry->d_fsdata = di;
@@ -125,7 +128,8 @@ more:
125 dentry = list_entry(p, struct dentry, d_u.d_child); 128 dentry = list_entry(p, struct dentry, d_u.d_child);
126 di = ceph_dentry(dentry); 129 di = ceph_dentry(dentry);
127 while (1) { 130 while (1) {
128 dout(" p %p/%p d_subdirs %p/%p\n", p->prev, p->next, 131 dout(" p %p/%p %s d_subdirs %p/%p\n", p->prev, p->next,
132 d_unhashed(dentry) ? "!hashed" : "hashed",
129 parent->d_subdirs.prev, parent->d_subdirs.next); 133 parent->d_subdirs.prev, parent->d_subdirs.next);
130 if (p == &parent->d_subdirs) { 134 if (p == &parent->d_subdirs) {
131 fi->at_end = 1; 135 fi->at_end = 1;
@@ -229,6 +233,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
229 u32 ftype; 233 u32 ftype;
230 struct ceph_mds_reply_info_parsed *rinfo; 234 struct ceph_mds_reply_info_parsed *rinfo;
231 const int max_entries = client->mount_args->max_readdir; 235 const int max_entries = client->mount_args->max_readdir;
236 const int max_bytes = client->mount_args->max_readdir_bytes;
232 237
233 dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off); 238 dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off);
234 if (fi->at_end) 239 if (fi->at_end)
@@ -312,6 +317,7 @@ more:
312 req->r_readdir_offset = fi->next_offset; 317 req->r_readdir_offset = fi->next_offset;
313 req->r_args.readdir.frag = cpu_to_le32(frag); 318 req->r_args.readdir.frag = cpu_to_le32(frag);
314 req->r_args.readdir.max_entries = cpu_to_le32(max_entries); 319 req->r_args.readdir.max_entries = cpu_to_le32(max_entries);
320 req->r_args.readdir.max_bytes = cpu_to_le32(max_bytes);
315 req->r_num_caps = max_entries + 1; 321 req->r_num_caps = max_entries + 1;
316 err = ceph_mdsc_do_request(mdsc, NULL, req); 322 err = ceph_mdsc_do_request(mdsc, NULL, req);
317 if (err < 0) { 323 if (err < 0) {
@@ -335,7 +341,7 @@ more:
335 if (req->r_reply_info.dir_end) { 341 if (req->r_reply_info.dir_end) {
336 kfree(fi->last_name); 342 kfree(fi->last_name);
337 fi->last_name = NULL; 343 fi->last_name = NULL;
338 fi->next_offset = 0; 344 fi->next_offset = 2;
339 } else { 345 } else {
340 rinfo = &req->r_reply_info; 346 rinfo = &req->r_reply_info;
341 err = note_last_dentry(fi, 347 err = note_last_dentry(fi,
@@ -478,7 +484,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
478struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, 484struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
479 struct dentry *dentry, int err) 485 struct dentry *dentry, int err)
480{ 486{
481 struct ceph_client *client = ceph_client(dentry->d_sb); 487 struct ceph_client *client = ceph_sb_to_client(dentry->d_sb);
482 struct inode *parent = dentry->d_parent->d_inode; 488 struct inode *parent = dentry->d_parent->d_inode;
483 489
484 /* .snap dir? */ 490 /* .snap dir? */
@@ -568,7 +574,6 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
568 !is_root_ceph_dentry(dir, dentry) && 574 !is_root_ceph_dentry(dir, dentry) &&
569 (ci->i_ceph_flags & CEPH_I_COMPLETE) && 575 (ci->i_ceph_flags & CEPH_I_COMPLETE) &&
570 (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { 576 (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
571 di->offset = ci->i_max_offset++;
572 spin_unlock(&dir->i_lock); 577 spin_unlock(&dir->i_lock);
573 dout(" dir %p complete, -ENOENT\n", dir); 578 dout(" dir %p complete, -ENOENT\n", dir);
574 d_add(dentry, NULL); 579 d_add(dentry, NULL);
@@ -888,13 +893,22 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
888 893
889 /* ensure target dentry is invalidated, despite 894 /* ensure target dentry is invalidated, despite
890 rehashing bug in vfs_rename_dir */ 895 rehashing bug in vfs_rename_dir */
891 new_dentry->d_time = jiffies; 896 ceph_invalidate_dentry_lease(new_dentry);
892 ceph_dentry(new_dentry)->lease_shared_gen = 0;
893 } 897 }
894 ceph_mdsc_put_request(req); 898 ceph_mdsc_put_request(req);
895 return err; 899 return err;
896} 900}
897 901
902/*
903 * Ensure a dentry lease will no longer revalidate.
904 */
905void ceph_invalidate_dentry_lease(struct dentry *dentry)
906{
907 spin_lock(&dentry->d_lock);
908 dentry->d_time = jiffies;
909 ceph_dentry(dentry)->lease_shared_gen = 0;
910 spin_unlock(&dentry->d_lock);
911}
898 912
899/* 913/*
900 * Check if dentry lease is valid. If not, delete the lease. Try to 914 * Check if dentry lease is valid. If not, delete the lease. Try to
@@ -972,8 +986,9 @@ static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd)
972{ 986{
973 struct inode *dir = dentry->d_parent->d_inode; 987 struct inode *dir = dentry->d_parent->d_inode;
974 988
975 dout("d_revalidate %p '%.*s' inode %p\n", dentry, 989 dout("d_revalidate %p '%.*s' inode %p offset %lld\n", dentry,
976 dentry->d_name.len, dentry->d_name.name, dentry->d_inode); 990 dentry->d_name.len, dentry->d_name.name, dentry->d_inode,
991 ceph_dentry(dentry)->offset);
977 992
978 /* always trust cached snapped dentries, snapdir dentry */ 993 /* always trust cached snapped dentries, snapdir dentry */
979 if (ceph_snap(dir) != CEPH_NOSNAP) { 994 if (ceph_snap(dir) != CEPH_NOSNAP) {
@@ -1050,7 +1065,7 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
1050 struct ceph_inode_info *ci = ceph_inode(inode); 1065 struct ceph_inode_info *ci = ceph_inode(inode);
1051 int left; 1066 int left;
1052 1067
1053 if (!ceph_test_opt(ceph_client(inode->i_sb), DIRSTAT)) 1068 if (!ceph_test_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT))
1054 return -EISDIR; 1069 return -EISDIR;
1055 1070
1056 if (!cf->dir_info) { 1071 if (!cf->dir_info) {
@@ -1152,7 +1167,7 @@ void ceph_dentry_lru_add(struct dentry *dn)
1152 dout("dentry_lru_add %p %p '%.*s'\n", di, dn, 1167 dout("dentry_lru_add %p %p '%.*s'\n", di, dn,
1153 dn->d_name.len, dn->d_name.name); 1168 dn->d_name.len, dn->d_name.name);
1154 if (di) { 1169 if (di) {
1155 mdsc = &ceph_client(dn->d_sb)->mdsc; 1170 mdsc = &ceph_sb_to_client(dn->d_sb)->mdsc;
1156 spin_lock(&mdsc->dentry_lru_lock); 1171 spin_lock(&mdsc->dentry_lru_lock);
1157 list_add_tail(&di->lru, &mdsc->dentry_lru); 1172 list_add_tail(&di->lru, &mdsc->dentry_lru);
1158 mdsc->num_dentry++; 1173 mdsc->num_dentry++;
@@ -1165,10 +1180,10 @@ void ceph_dentry_lru_touch(struct dentry *dn)
1165 struct ceph_dentry_info *di = ceph_dentry(dn); 1180 struct ceph_dentry_info *di = ceph_dentry(dn);
1166 struct ceph_mds_client *mdsc; 1181 struct ceph_mds_client *mdsc;
1167 1182
1168 dout("dentry_lru_touch %p %p '%.*s'\n", di, dn, 1183 dout("dentry_lru_touch %p %p '%.*s' (offset %lld)\n", di, dn,
1169 dn->d_name.len, dn->d_name.name); 1184 dn->d_name.len, dn->d_name.name, di->offset);
1170 if (di) { 1185 if (di) {
1171 mdsc = &ceph_client(dn->d_sb)->mdsc; 1186 mdsc = &ceph_sb_to_client(dn->d_sb)->mdsc;
1172 spin_lock(&mdsc->dentry_lru_lock); 1187 spin_lock(&mdsc->dentry_lru_lock);
1173 list_move_tail(&di->lru, &mdsc->dentry_lru); 1188 list_move_tail(&di->lru, &mdsc->dentry_lru);
1174 spin_unlock(&mdsc->dentry_lru_lock); 1189 spin_unlock(&mdsc->dentry_lru_lock);
@@ -1183,7 +1198,7 @@ void ceph_dentry_lru_del(struct dentry *dn)
1183 dout("dentry_lru_del %p %p '%.*s'\n", di, dn, 1198 dout("dentry_lru_del %p %p '%.*s'\n", di, dn,
1184 dn->d_name.len, dn->d_name.name); 1199 dn->d_name.len, dn->d_name.name);
1185 if (di) { 1200 if (di) {
1186 mdsc = &ceph_client(dn->d_sb)->mdsc; 1201 mdsc = &ceph_sb_to_client(dn->d_sb)->mdsc;
1187 spin_lock(&mdsc->dentry_lru_lock); 1202 spin_lock(&mdsc->dentry_lru_lock);
1188 list_del_init(&di->lru); 1203 list_del_init(&di->lru);
1189 mdsc->num_dentry--; 1204 mdsc->num_dentry--;
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 9d67572fb328..17447644d675 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -93,11 +93,11 @@ static struct dentry *__fh_to_dentry(struct super_block *sb,
93 return ERR_PTR(-ESTALE); 93 return ERR_PTR(-ESTALE);
94 94
95 dentry = d_obtain_alias(inode); 95 dentry = d_obtain_alias(inode);
96 if (!dentry) { 96 if (IS_ERR(dentry)) {
97 pr_err("fh_to_dentry %llx -- inode %p but ENOMEM\n", 97 pr_err("fh_to_dentry %llx -- inode %p but ENOMEM\n",
98 fh->ino, inode); 98 fh->ino, inode);
99 iput(inode); 99 iput(inode);
100 return ERR_PTR(-ENOMEM); 100 return dentry;
101 } 101 }
102 err = ceph_init_dentry(dentry); 102 err = ceph_init_dentry(dentry);
103 103
@@ -115,7 +115,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb,
115static struct dentry *__cfh_to_dentry(struct super_block *sb, 115static struct dentry *__cfh_to_dentry(struct super_block *sb,
116 struct ceph_nfs_confh *cfh) 116 struct ceph_nfs_confh *cfh)
117{ 117{
118 struct ceph_mds_client *mdsc = &ceph_client(sb)->mdsc; 118 struct ceph_mds_client *mdsc = &ceph_sb_to_client(sb)->mdsc;
119 struct inode *inode; 119 struct inode *inode;
120 struct dentry *dentry; 120 struct dentry *dentry;
121 struct ceph_vino vino; 121 struct ceph_vino vino;
@@ -149,11 +149,11 @@ static struct dentry *__cfh_to_dentry(struct super_block *sb,
149 } 149 }
150 150
151 dentry = d_obtain_alias(inode); 151 dentry = d_obtain_alias(inode);
152 if (!dentry) { 152 if (IS_ERR(dentry)) {
153 pr_err("cfh_to_dentry %llx -- inode %p but ENOMEM\n", 153 pr_err("cfh_to_dentry %llx -- inode %p but ENOMEM\n",
154 cfh->ino, inode); 154 cfh->ino, inode);
155 iput(inode); 155 iput(inode);
156 return ERR_PTR(-ENOMEM); 156 return dentry;
157 } 157 }
158 err = ceph_init_dentry(dentry); 158 err = ceph_init_dentry(dentry);
159 if (err < 0) { 159 if (err < 0) {
@@ -202,11 +202,11 @@ static struct dentry *ceph_fh_to_parent(struct super_block *sb,
202 return ERR_PTR(-ESTALE); 202 return ERR_PTR(-ESTALE);
203 203
204 dentry = d_obtain_alias(inode); 204 dentry = d_obtain_alias(inode);
205 if (!dentry) { 205 if (IS_ERR(dentry)) {
206 pr_err("fh_to_parent %llx -- inode %p but ENOMEM\n", 206 pr_err("fh_to_parent %llx -- inode %p but ENOMEM\n",
207 cfh->ino, inode); 207 cfh->ino, inode);
208 iput(inode); 208 iput(inode);
209 return ERR_PTR(-ENOMEM); 209 return dentry;
210 } 210 }
211 err = ceph_init_dentry(dentry); 211 err = ceph_init_dentry(dentry);
212 if (err < 0) { 212 if (err < 0) {
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 7d634938edc9..6512b6701b9e 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -317,16 +317,16 @@ void ceph_release_page_vector(struct page **pages, int num_pages)
317/* 317/*
318 * allocate a vector new pages 318 * allocate a vector new pages
319 */ 319 */
320static struct page **alloc_page_vector(int num_pages) 320struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags)
321{ 321{
322 struct page **pages; 322 struct page **pages;
323 int i; 323 int i;
324 324
325 pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS); 325 pages = kmalloc(sizeof(*pages) * num_pages, flags);
326 if (!pages) 326 if (!pages)
327 return ERR_PTR(-ENOMEM); 327 return ERR_PTR(-ENOMEM);
328 for (i = 0; i < num_pages; i++) { 328 for (i = 0; i < num_pages; i++) {
329 pages[i] = alloc_page(GFP_NOFS); 329 pages[i] = __page_cache_alloc(flags);
330 if (pages[i] == NULL) { 330 if (pages[i] == NULL) {
331 ceph_release_page_vector(pages, i); 331 ceph_release_page_vector(pages, i);
332 return ERR_PTR(-ENOMEM); 332 return ERR_PTR(-ENOMEM);
@@ -540,7 +540,7 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data,
540 * in sequence. 540 * in sequence.
541 */ 541 */
542 } else { 542 } else {
543 pages = alloc_page_vector(num_pages); 543 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
544 } 544 }
545 if (IS_ERR(pages)) 545 if (IS_ERR(pages))
546 return PTR_ERR(pages); 546 return PTR_ERR(pages);
@@ -649,8 +649,8 @@ more:
649 do_sync, 649 do_sync,
650 ci->i_truncate_seq, ci->i_truncate_size, 650 ci->i_truncate_seq, ci->i_truncate_size,
651 &mtime, false, 2); 651 &mtime, false, 2);
652 if (IS_ERR(req)) 652 if (!req)
653 return PTR_ERR(req); 653 return -ENOMEM;
654 654
655 num_pages = calc_pages_for(pos, len); 655 num_pages = calc_pages_for(pos, len);
656 656
@@ -668,7 +668,7 @@ more:
668 truncate_inode_pages_range(inode->i_mapping, pos, 668 truncate_inode_pages_range(inode->i_mapping, pos,
669 (pos+len) | (PAGE_CACHE_SIZE-1)); 669 (pos+len) | (PAGE_CACHE_SIZE-1));
670 } else { 670 } else {
671 pages = alloc_page_vector(num_pages); 671 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
672 if (IS_ERR(pages)) { 672 if (IS_ERR(pages)) {
673 ret = PTR_ERR(pages); 673 ret = PTR_ERR(pages);
674 goto out; 674 goto out;
@@ -809,7 +809,7 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
809 struct file *file = iocb->ki_filp; 809 struct file *file = iocb->ki_filp;
810 struct inode *inode = file->f_dentry->d_inode; 810 struct inode *inode = file->f_dentry->d_inode;
811 struct ceph_inode_info *ci = ceph_inode(inode); 811 struct ceph_inode_info *ci = ceph_inode(inode);
812 struct ceph_osd_client *osdc = &ceph_client(inode->i_sb)->osdc; 812 struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->osdc;
813 loff_t endoff = pos + iov->iov_len; 813 loff_t endoff = pos + iov->iov_len;
814 int got = 0; 814 int got = 0;
815 int ret, err; 815 int ret, err;
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 85b4d2ffdeba..a81b8b662c7b 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -384,7 +384,7 @@ void ceph_destroy_inode(struct inode *inode)
384 */ 384 */
385 if (ci->i_snap_realm) { 385 if (ci->i_snap_realm) {
386 struct ceph_mds_client *mdsc = 386 struct ceph_mds_client *mdsc =
387 &ceph_client(ci->vfs_inode.i_sb)->mdsc; 387 &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
388 struct ceph_snap_realm *realm = ci->i_snap_realm; 388 struct ceph_snap_realm *realm = ci->i_snap_realm;
389 389
390 dout(" dropping residual ref to snap realm %p\n", realm); 390 dout(" dropping residual ref to snap realm %p\n", realm);
@@ -619,11 +619,12 @@ static int fill_inode(struct inode *inode,
619 memcpy(ci->i_xattrs.blob->vec.iov_base, 619 memcpy(ci->i_xattrs.blob->vec.iov_base,
620 iinfo->xattr_data, iinfo->xattr_len); 620 iinfo->xattr_data, iinfo->xattr_len);
621 ci->i_xattrs.version = le64_to_cpu(info->xattr_version); 621 ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
622 xattr_blob = NULL;
622 } 623 }
623 624
624 inode->i_mapping->a_ops = &ceph_aops; 625 inode->i_mapping->a_ops = &ceph_aops;
625 inode->i_mapping->backing_dev_info = 626 inode->i_mapping->backing_dev_info =
626 &ceph_client(inode->i_sb)->backing_dev_info; 627 &ceph_sb_to_client(inode->i_sb)->backing_dev_info;
627 628
628 switch (inode->i_mode & S_IFMT) { 629 switch (inode->i_mode & S_IFMT) {
629 case S_IFIFO: 630 case S_IFIFO:
@@ -674,14 +675,15 @@ static int fill_inode(struct inode *inode,
674 /* set dir completion flag? */ 675 /* set dir completion flag? */
675 if (ci->i_files == 0 && ci->i_subdirs == 0 && 676 if (ci->i_files == 0 && ci->i_subdirs == 0 &&
676 ceph_snap(inode) == CEPH_NOSNAP && 677 ceph_snap(inode) == CEPH_NOSNAP &&
677 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED)) { 678 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
679 (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
678 dout(" marking %p complete (empty)\n", inode); 680 dout(" marking %p complete (empty)\n", inode);
679 ci->i_ceph_flags |= CEPH_I_COMPLETE; 681 ci->i_ceph_flags |= CEPH_I_COMPLETE;
680 ci->i_max_offset = 2; 682 ci->i_max_offset = 2;
681 } 683 }
682 684
683 /* it may be better to set st_size in getattr instead? */ 685 /* it may be better to set st_size in getattr instead? */
684 if (ceph_test_opt(ceph_client(inode->i_sb), RBYTES)) 686 if (ceph_test_opt(ceph_sb_to_client(inode->i_sb), RBYTES))
685 inode->i_size = ci->i_rbytes; 687 inode->i_size = ci->i_rbytes;
686 break; 688 break;
687 default: 689 default:
@@ -802,6 +804,37 @@ out_unlock:
802} 804}
803 805
804/* 806/*
807 * Set dentry's directory position based on the current dir's max, and
808 * order it in d_subdirs, so that dcache_readdir behaves.
809 */
810static void ceph_set_dentry_offset(struct dentry *dn)
811{
812 struct dentry *dir = dn->d_parent;
813 struct inode *inode = dn->d_parent->d_inode;
814 struct ceph_dentry_info *di;
815
816 BUG_ON(!inode);
817
818 di = ceph_dentry(dn);
819
820 spin_lock(&inode->i_lock);
821 if ((ceph_inode(inode)->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
822 spin_unlock(&inode->i_lock);
823 return;
824 }
825 di->offset = ceph_inode(inode)->i_max_offset++;
826 spin_unlock(&inode->i_lock);
827
828 spin_lock(&dcache_lock);
829 spin_lock(&dn->d_lock);
830 list_move_tail(&dir->d_subdirs, &dn->d_u.d_child);
831 dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset,
832 dn->d_u.d_child.prev, dn->d_u.d_child.next);
833 spin_unlock(&dn->d_lock);
834 spin_unlock(&dcache_lock);
835}
836
837/*
805 * splice a dentry to an inode. 838 * splice a dentry to an inode.
806 * caller must hold directory i_mutex for this to be safe. 839 * caller must hold directory i_mutex for this to be safe.
807 * 840 *
@@ -814,6 +847,8 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
814{ 847{
815 struct dentry *realdn; 848 struct dentry *realdn;
816 849
850 BUG_ON(dn->d_inode);
851
817 /* dn must be unhashed */ 852 /* dn must be unhashed */
818 if (!d_unhashed(dn)) 853 if (!d_unhashed(dn))
819 d_drop(dn); 854 d_drop(dn);
@@ -835,44 +870,17 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
835 dn = realdn; 870 dn = realdn;
836 } else { 871 } else {
837 BUG_ON(!ceph_dentry(dn)); 872 BUG_ON(!ceph_dentry(dn));
838
839 dout("dn %p attached to %p ino %llx.%llx\n", 873 dout("dn %p attached to %p ino %llx.%llx\n",
840 dn, dn->d_inode, ceph_vinop(dn->d_inode)); 874 dn, dn->d_inode, ceph_vinop(dn->d_inode));
841 } 875 }
842 if ((!prehash || *prehash) && d_unhashed(dn)) 876 if ((!prehash || *prehash) && d_unhashed(dn))
843 d_rehash(dn); 877 d_rehash(dn);
878 ceph_set_dentry_offset(dn);
844out: 879out:
845 return dn; 880 return dn;
846} 881}
847 882
848/* 883/*
849 * Set dentry's directory position based on the current dir's max, and
850 * order it in d_subdirs, so that dcache_readdir behaves.
851 */
852static void ceph_set_dentry_offset(struct dentry *dn)
853{
854 struct dentry *dir = dn->d_parent;
855 struct inode *inode = dn->d_parent->d_inode;
856 struct ceph_dentry_info *di;
857
858 BUG_ON(!inode);
859
860 di = ceph_dentry(dn);
861
862 spin_lock(&inode->i_lock);
863 di->offset = ceph_inode(inode)->i_max_offset++;
864 spin_unlock(&inode->i_lock);
865
866 spin_lock(&dcache_lock);
867 spin_lock(&dn->d_lock);
868 list_move_tail(&dir->d_subdirs, &dn->d_u.d_child);
869 dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset,
870 dn->d_u.d_child.prev, dn->d_u.d_child.next);
871 spin_unlock(&dn->d_lock);
872 spin_unlock(&dcache_lock);
873}
874
875/*
876 * Incorporate results into the local cache. This is either just 884 * Incorporate results into the local cache. This is either just
877 * one inode, or a directory, dentry, and possibly linked-to inode (e.g., 885 * one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
878 * after a lookup). 886 * after a lookup).
@@ -933,14 +941,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
933 941
934 if (!rinfo->head->is_target && !rinfo->head->is_dentry) { 942 if (!rinfo->head->is_target && !rinfo->head->is_dentry) {
935 dout("fill_trace reply is empty!\n"); 943 dout("fill_trace reply is empty!\n");
936 if (rinfo->head->result == 0 && req->r_locked_dir) { 944 if (rinfo->head->result == 0 && req->r_locked_dir)
937 struct ceph_inode_info *ci = 945 ceph_invalidate_dir_request(req);
938 ceph_inode(req->r_locked_dir);
939 dout(" clearing %p complete (empty trace)\n",
940 req->r_locked_dir);
941 ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
942 ci->i_release_count++;
943 }
944 return 0; 946 return 0;
945 } 947 }
946 948
@@ -1011,13 +1013,18 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1011 req->r_old_dentry->d_name.len, 1013 req->r_old_dentry->d_name.len,
1012 req->r_old_dentry->d_name.name, 1014 req->r_old_dentry->d_name.name,
1013 dn, dn->d_name.len, dn->d_name.name); 1015 dn, dn->d_name.len, dn->d_name.name);
1016
1014 /* ensure target dentry is invalidated, despite 1017 /* ensure target dentry is invalidated, despite
1015 rehashing bug in vfs_rename_dir */ 1018 rehashing bug in vfs_rename_dir */
1016 dn->d_time = jiffies; 1019 ceph_invalidate_dentry_lease(dn);
1017 ceph_dentry(dn)->lease_shared_gen = 0; 1020
1018 /* take overwritten dentry's readdir offset */ 1021 /* take overwritten dentry's readdir offset */
1022 dout("dn %p gets %p offset %lld (old offset %lld)\n",
1023 req->r_old_dentry, dn, ceph_dentry(dn)->offset,
1024 ceph_dentry(req->r_old_dentry)->offset);
1019 ceph_dentry(req->r_old_dentry)->offset = 1025 ceph_dentry(req->r_old_dentry)->offset =
1020 ceph_dentry(dn)->offset; 1026 ceph_dentry(dn)->offset;
1027
1021 dn = req->r_old_dentry; /* use old_dentry */ 1028 dn = req->r_old_dentry; /* use old_dentry */
1022 in = dn->d_inode; 1029 in = dn->d_inode;
1023 } 1030 }
@@ -1059,7 +1066,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1059 goto done; 1066 goto done;
1060 } 1067 }
1061 req->r_dentry = dn; /* may have spliced */ 1068 req->r_dentry = dn; /* may have spliced */
1062 ceph_set_dentry_offset(dn);
1063 igrab(in); 1069 igrab(in);
1064 } else if (ceph_ino(in) == vino.ino && 1070 } else if (ceph_ino(in) == vino.ino &&
1065 ceph_snap(in) == vino.snap) { 1071 ceph_snap(in) == vino.snap) {
@@ -1102,7 +1108,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1102 err = PTR_ERR(dn); 1108 err = PTR_ERR(dn);
1103 goto done; 1109 goto done;
1104 } 1110 }
1105 ceph_set_dentry_offset(dn);
1106 req->r_dentry = dn; /* may have spliced */ 1111 req->r_dentry = dn; /* may have spliced */
1107 igrab(in); 1112 igrab(in);
1108 rinfo->head->is_dentry = 1; /* fool notrace handlers */ 1113 rinfo->head->is_dentry = 1; /* fool notrace handlers */
@@ -1429,7 +1434,7 @@ void ceph_queue_vmtruncate(struct inode *inode)
1429{ 1434{
1430 struct ceph_inode_info *ci = ceph_inode(inode); 1435 struct ceph_inode_info *ci = ceph_inode(inode);
1431 1436
1432 if (queue_work(ceph_client(inode->i_sb)->trunc_wq, 1437 if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
1433 &ci->i_vmtruncate_work)) { 1438 &ci->i_vmtruncate_work)) {
1434 dout("ceph_queue_vmtruncate %p\n", inode); 1439 dout("ceph_queue_vmtruncate %p\n", inode);
1435 igrab(inode); 1440 igrab(inode);
@@ -1518,7 +1523,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1518 struct inode *parent_inode = dentry->d_parent->d_inode; 1523 struct inode *parent_inode = dentry->d_parent->d_inode;
1519 const unsigned int ia_valid = attr->ia_valid; 1524 const unsigned int ia_valid = attr->ia_valid;
1520 struct ceph_mds_request *req; 1525 struct ceph_mds_request *req;
1521 struct ceph_mds_client *mdsc = &ceph_client(dentry->d_sb)->mdsc; 1526 struct ceph_mds_client *mdsc = &ceph_sb_to_client(dentry->d_sb)->mdsc;
1522 int issued; 1527 int issued;
1523 int release = 0, dirtied = 0; 1528 int release = 0, dirtied = 0;
1524 int mask = 0; 1529 int mask = 0;
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 8a5bcae62846..d085f07756b4 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -98,7 +98,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
98 struct ceph_ioctl_dataloc dl; 98 struct ceph_ioctl_dataloc dl;
99 struct inode *inode = file->f_dentry->d_inode; 99 struct inode *inode = file->f_dentry->d_inode;
100 struct ceph_inode_info *ci = ceph_inode(inode); 100 struct ceph_inode_info *ci = ceph_inode(inode);
101 struct ceph_osd_client *osdc = &ceph_client(inode->i_sb)->osdc; 101 struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->osdc;
102 u64 len = 1, olen; 102 u64 len = 1, olen;
103 u64 tmp; 103 u64 tmp;
104 struct ceph_object_layout ol; 104 struct ceph_object_layout ol;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 24561a557e01..885aa5710cfd 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -40,7 +40,7 @@
40static void __wake_requests(struct ceph_mds_client *mdsc, 40static void __wake_requests(struct ceph_mds_client *mdsc,
41 struct list_head *head); 41 struct list_head *head);
42 42
43const static struct ceph_connection_operations mds_con_ops; 43static const struct ceph_connection_operations mds_con_ops;
44 44
45 45
46/* 46/*
@@ -665,10 +665,10 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
665 struct ceph_msg *msg; 665 struct ceph_msg *msg;
666 struct ceph_mds_session_head *h; 666 struct ceph_mds_session_head *h;
667 667
668 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL); 668 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
669 if (IS_ERR(msg)) { 669 if (!msg) {
670 pr_err("create_session_msg ENOMEM creating msg\n"); 670 pr_err("create_session_msg ENOMEM creating msg\n");
671 return ERR_PTR(PTR_ERR(msg)); 671 return NULL;
672 } 672 }
673 h = msg->front.iov_base; 673 h = msg->front.iov_base;
674 h->op = cpu_to_le32(op); 674 h->op = cpu_to_le32(op);
@@ -687,7 +687,6 @@ static int __open_session(struct ceph_mds_client *mdsc,
687 struct ceph_msg *msg; 687 struct ceph_msg *msg;
688 int mstate; 688 int mstate;
689 int mds = session->s_mds; 689 int mds = session->s_mds;
690 int err = 0;
691 690
692 /* wait for mds to go active? */ 691 /* wait for mds to go active? */
693 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 692 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
@@ -698,13 +697,9 @@ static int __open_session(struct ceph_mds_client *mdsc,
698 697
699 /* send connect message */ 698 /* send connect message */
700 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); 699 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
701 if (IS_ERR(msg)) { 700 if (!msg)
702 err = PTR_ERR(msg); 701 return -ENOMEM;
703 goto out;
704 }
705 ceph_con_send(&session->s_con, msg); 702 ceph_con_send(&session->s_con, msg);
706
707out:
708 return 0; 703 return 0;
709} 704}
710 705
@@ -804,12 +799,49 @@ out:
804} 799}
805 800
806static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 801static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
807 void *arg) 802 void *arg)
808{ 803{
809 struct ceph_inode_info *ci = ceph_inode(inode); 804 struct ceph_inode_info *ci = ceph_inode(inode);
805 int drop = 0;
806
810 dout("removing cap %p, ci is %p, inode is %p\n", 807 dout("removing cap %p, ci is %p, inode is %p\n",
811 cap, ci, &ci->vfs_inode); 808 cap, ci, &ci->vfs_inode);
812 ceph_remove_cap(cap); 809 spin_lock(&inode->i_lock);
810 __ceph_remove_cap(cap);
811 if (!__ceph_is_any_real_caps(ci)) {
812 struct ceph_mds_client *mdsc =
813 &ceph_sb_to_client(inode->i_sb)->mdsc;
814
815 spin_lock(&mdsc->cap_dirty_lock);
816 if (!list_empty(&ci->i_dirty_item)) {
817 pr_info(" dropping dirty %s state for %p %lld\n",
818 ceph_cap_string(ci->i_dirty_caps),
819 inode, ceph_ino(inode));
820 ci->i_dirty_caps = 0;
821 list_del_init(&ci->i_dirty_item);
822 drop = 1;
823 }
824 if (!list_empty(&ci->i_flushing_item)) {
825 pr_info(" dropping dirty+flushing %s state for %p %lld\n",
826 ceph_cap_string(ci->i_flushing_caps),
827 inode, ceph_ino(inode));
828 ci->i_flushing_caps = 0;
829 list_del_init(&ci->i_flushing_item);
830 mdsc->num_cap_flushing--;
831 drop = 1;
832 }
833 if (drop && ci->i_wrbuffer_ref) {
834 pr_info(" dropping dirty data for %p %lld\n",
835 inode, ceph_ino(inode));
836 ci->i_wrbuffer_ref = 0;
837 ci->i_wrbuffer_ref_head = 0;
838 drop++;
839 }
840 spin_unlock(&mdsc->cap_dirty_lock);
841 }
842 spin_unlock(&inode->i_lock);
843 while (drop--)
844 iput(inode);
813 return 0; 845 return 0;
814} 846}
815 847
@@ -821,6 +853,7 @@ static void remove_session_caps(struct ceph_mds_session *session)
821 dout("remove_session_caps on %p\n", session); 853 dout("remove_session_caps on %p\n", session);
822 iterate_session_caps(session, remove_session_caps_cb, NULL); 854 iterate_session_caps(session, remove_session_caps_cb, NULL);
823 BUG_ON(session->s_nr_caps > 0); 855 BUG_ON(session->s_nr_caps > 0);
856 BUG_ON(!list_empty(&session->s_cap_flushing));
824 cleanup_cap_releases(session); 857 cleanup_cap_releases(session);
825} 858}
826 859
@@ -883,8 +916,8 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
883 ceph_mds_state_name(state)); 916 ceph_mds_state_name(state));
884 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 917 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
885 ++session->s_renew_seq); 918 ++session->s_renew_seq);
886 if (IS_ERR(msg)) 919 if (!msg)
887 return PTR_ERR(msg); 920 return -ENOMEM;
888 ceph_con_send(&session->s_con, msg); 921 ceph_con_send(&session->s_con, msg);
889 return 0; 922 return 0;
890} 923}
@@ -931,17 +964,15 @@ static int request_close_session(struct ceph_mds_client *mdsc,
931 struct ceph_mds_session *session) 964 struct ceph_mds_session *session)
932{ 965{
933 struct ceph_msg *msg; 966 struct ceph_msg *msg;
934 int err = 0;
935 967
936 dout("request_close_session mds%d state %s seq %lld\n", 968 dout("request_close_session mds%d state %s seq %lld\n",
937 session->s_mds, session_state_name(session->s_state), 969 session->s_mds, session_state_name(session->s_state),
938 session->s_seq); 970 session->s_seq);
939 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 971 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
940 if (IS_ERR(msg)) 972 if (!msg)
941 err = PTR_ERR(msg); 973 return -ENOMEM;
942 else 974 ceph_con_send(&session->s_con, msg);
943 ceph_con_send(&session->s_con, msg); 975 return 0;
944 return err;
945} 976}
946 977
947/* 978/*
@@ -1059,7 +1090,7 @@ static int add_cap_releases(struct ceph_mds_client *mdsc,
1059 while (session->s_num_cap_releases < session->s_nr_caps + extra) { 1090 while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1060 spin_unlock(&session->s_cap_lock); 1091 spin_unlock(&session->s_cap_lock);
1061 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, 1092 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1062 0, 0, NULL); 1093 GFP_NOFS);
1063 if (!msg) 1094 if (!msg)
1064 goto out_unlocked; 1095 goto out_unlocked;
1065 dout("add_cap_releases %p msg %p now %d\n", session, msg, 1096 dout("add_cap_releases %p msg %p now %d\n", session, msg,
@@ -1151,10 +1182,8 @@ static void send_cap_releases(struct ceph_mds_client *mdsc,
1151 struct ceph_msg *msg; 1182 struct ceph_msg *msg;
1152 1183
1153 dout("send_cap_releases mds%d\n", session->s_mds); 1184 dout("send_cap_releases mds%d\n", session->s_mds);
1154 while (1) { 1185 spin_lock(&session->s_cap_lock);
1155 spin_lock(&session->s_cap_lock); 1186 while (!list_empty(&session->s_cap_releases_done)) {
1156 if (list_empty(&session->s_cap_releases_done))
1157 break;
1158 msg = list_first_entry(&session->s_cap_releases_done, 1187 msg = list_first_entry(&session->s_cap_releases_done,
1159 struct ceph_msg, list_head); 1188 struct ceph_msg, list_head);
1160 list_del_init(&msg->list_head); 1189 list_del_init(&msg->list_head);
@@ -1162,10 +1191,49 @@ static void send_cap_releases(struct ceph_mds_client *mdsc,
1162 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1191 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1163 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1192 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1164 ceph_con_send(&session->s_con, msg); 1193 ceph_con_send(&session->s_con, msg);
1194 spin_lock(&session->s_cap_lock);
1165 } 1195 }
1166 spin_unlock(&session->s_cap_lock); 1196 spin_unlock(&session->s_cap_lock);
1167} 1197}
1168 1198
1199static void discard_cap_releases(struct ceph_mds_client *mdsc,
1200 struct ceph_mds_session *session)
1201{
1202 struct ceph_msg *msg;
1203 struct ceph_mds_cap_release *head;
1204 unsigned num;
1205
1206 dout("discard_cap_releases mds%d\n", session->s_mds);
1207 spin_lock(&session->s_cap_lock);
1208
1209 /* zero out the in-progress message */
1210 msg = list_first_entry(&session->s_cap_releases,
1211 struct ceph_msg, list_head);
1212 head = msg->front.iov_base;
1213 num = le32_to_cpu(head->num);
1214 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1215 head->num = cpu_to_le32(0);
1216 session->s_num_cap_releases += num;
1217
1218 /* requeue completed messages */
1219 while (!list_empty(&session->s_cap_releases_done)) {
1220 msg = list_first_entry(&session->s_cap_releases_done,
1221 struct ceph_msg, list_head);
1222 list_del_init(&msg->list_head);
1223
1224 head = msg->front.iov_base;
1225 num = le32_to_cpu(head->num);
1226 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1227 num);
1228 session->s_num_cap_releases += num;
1229 head->num = cpu_to_le32(0);
1230 msg->front.iov_len = sizeof(*head);
1231 list_add(&msg->list_head, &session->s_cap_releases);
1232 }
1233
1234 spin_unlock(&session->s_cap_lock);
1235}
1236
1169/* 1237/*
1170 * requests 1238 * requests
1171 */ 1239 */
@@ -1181,6 +1249,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1181 if (!req) 1249 if (!req)
1182 return ERR_PTR(-ENOMEM); 1250 return ERR_PTR(-ENOMEM);
1183 1251
1252 mutex_init(&req->r_fill_mutex);
1184 req->r_started = jiffies; 1253 req->r_started = jiffies;
1185 req->r_resend_mds = -1; 1254 req->r_resend_mds = -1;
1186 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1255 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1251,7 +1320,7 @@ retry:
1251 len += 1 + temp->d_name.len; 1320 len += 1 + temp->d_name.len;
1252 temp = temp->d_parent; 1321 temp = temp->d_parent;
1253 if (temp == NULL) { 1322 if (temp == NULL) {
1254 pr_err("build_path_dentry corrupt dentry %p\n", dentry); 1323 pr_err("build_path corrupt dentry %p\n", dentry);
1255 return ERR_PTR(-EINVAL); 1324 return ERR_PTR(-EINVAL);
1256 } 1325 }
1257 } 1326 }
@@ -1267,7 +1336,7 @@ retry:
1267 struct inode *inode = temp->d_inode; 1336 struct inode *inode = temp->d_inode;
1268 1337
1269 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1338 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1270 dout("build_path_dentry path+%d: %p SNAPDIR\n", 1339 dout("build_path path+%d: %p SNAPDIR\n",
1271 pos, temp); 1340 pos, temp);
1272 } else if (stop_on_nosnap && inode && 1341 } else if (stop_on_nosnap && inode &&
1273 ceph_snap(inode) == CEPH_NOSNAP) { 1342 ceph_snap(inode) == CEPH_NOSNAP) {
@@ -1278,20 +1347,18 @@ retry:
1278 break; 1347 break;
1279 strncpy(path + pos, temp->d_name.name, 1348 strncpy(path + pos, temp->d_name.name,
1280 temp->d_name.len); 1349 temp->d_name.len);
1281 dout("build_path_dentry path+%d: %p '%.*s'\n",
1282 pos, temp, temp->d_name.len, path + pos);
1283 } 1350 }
1284 if (pos) 1351 if (pos)
1285 path[--pos] = '/'; 1352 path[--pos] = '/';
1286 temp = temp->d_parent; 1353 temp = temp->d_parent;
1287 if (temp == NULL) { 1354 if (temp == NULL) {
1288 pr_err("build_path_dentry corrupt dentry\n"); 1355 pr_err("build_path corrupt dentry\n");
1289 kfree(path); 1356 kfree(path);
1290 return ERR_PTR(-EINVAL); 1357 return ERR_PTR(-EINVAL);
1291 } 1358 }
1292 } 1359 }
1293 if (pos != 0) { 1360 if (pos != 0) {
1294 pr_err("build_path_dentry did not end path lookup where " 1361 pr_err("build_path did not end path lookup where "
1295 "expected, namelen is %d, pos is %d\n", len, pos); 1362 "expected, namelen is %d, pos is %d\n", len, pos);
1296 /* presumably this is only possible if racing with a 1363 /* presumably this is only possible if racing with a
1297 rename of one of the parent directories (we can not 1364 rename of one of the parent directories (we can not
@@ -1303,7 +1370,7 @@ retry:
1303 1370
1304 *base = ceph_ino(temp->d_inode); 1371 *base = ceph_ino(temp->d_inode);
1305 *plen = len; 1372 *plen = len;
1306 dout("build_path_dentry on %p %d built %llx '%.*s'\n", 1373 dout("build_path on %p %d built %llx '%.*s'\n",
1307 dentry, atomic_read(&dentry->d_count), *base, len, path); 1374 dentry, atomic_read(&dentry->d_count), *base, len, path);
1308 return path; 1375 return path;
1309} 1376}
@@ -1426,9 +1493,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1426 if (req->r_old_dentry_drop) 1493 if (req->r_old_dentry_drop)
1427 len += req->r_old_dentry->d_name.len; 1494 len += req->r_old_dentry->d_name.len;
1428 1495
1429 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL); 1496 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
1430 if (IS_ERR(msg)) 1497 if (!msg) {
1498 msg = ERR_PTR(-ENOMEM);
1431 goto out_free2; 1499 goto out_free2;
1500 }
1432 1501
1433 msg->hdr.tid = cpu_to_le64(req->r_tid); 1502 msg->hdr.tid = cpu_to_le64(req->r_tid);
1434 1503
@@ -1517,9 +1586,9 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
1517 } 1586 }
1518 msg = create_request_message(mdsc, req, mds); 1587 msg = create_request_message(mdsc, req, mds);
1519 if (IS_ERR(msg)) { 1588 if (IS_ERR(msg)) {
1520 req->r_reply = ERR_PTR(PTR_ERR(msg)); 1589 req->r_err = PTR_ERR(msg);
1521 complete_request(mdsc, req); 1590 complete_request(mdsc, req);
1522 return -PTR_ERR(msg); 1591 return PTR_ERR(msg);
1523 } 1592 }
1524 req->r_request = msg; 1593 req->r_request = msg;
1525 1594
@@ -1552,7 +1621,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
1552 int mds = -1; 1621 int mds = -1;
1553 int err = -EAGAIN; 1622 int err = -EAGAIN;
1554 1623
1555 if (req->r_reply) 1624 if (req->r_err || req->r_got_result)
1556 goto out; 1625 goto out;
1557 1626
1558 if (req->r_timeout && 1627 if (req->r_timeout &&
@@ -1609,7 +1678,7 @@ out:
1609 return err; 1678 return err;
1610 1679
1611finish: 1680finish:
1612 req->r_reply = ERR_PTR(err); 1681 req->r_err = err;
1613 complete_request(mdsc, req); 1682 complete_request(mdsc, req);
1614 goto out; 1683 goto out;
1615} 1684}
@@ -1630,10 +1699,9 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
1630 1699
1631/* 1700/*
1632 * Wake up threads with requests pending for @mds, so that they can 1701 * Wake up threads with requests pending for @mds, so that they can
1633 * resubmit their requests to a possibly different mds. If @all is set, 1702 * resubmit their requests to a possibly different mds.
1634 * wake up if their requests has been forwarded to @mds, too.
1635 */ 1703 */
1636static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all) 1704static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1637{ 1705{
1638 struct ceph_mds_request *req; 1706 struct ceph_mds_request *req;
1639 struct rb_node *p; 1707 struct rb_node *p;
@@ -1689,64 +1757,78 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1689 __register_request(mdsc, req, dir); 1757 __register_request(mdsc, req, dir);
1690 __do_request(mdsc, req); 1758 __do_request(mdsc, req);
1691 1759
1692 /* wait */ 1760 if (req->r_err) {
1693 if (!req->r_reply) { 1761 err = req->r_err;
1694 mutex_unlock(&mdsc->mutex); 1762 __unregister_request(mdsc, req);
1695 if (req->r_timeout) { 1763 dout("do_request early error %d\n", err);
1696 err = (long)wait_for_completion_interruptible_timeout( 1764 goto out;
1697 &req->r_completion, req->r_timeout);
1698 if (err == 0)
1699 req->r_reply = ERR_PTR(-EIO);
1700 else if (err < 0)
1701 req->r_reply = ERR_PTR(err);
1702 } else {
1703 err = wait_for_completion_interruptible(
1704 &req->r_completion);
1705 if (err)
1706 req->r_reply = ERR_PTR(err);
1707 }
1708 mutex_lock(&mdsc->mutex);
1709 } 1765 }
1710 1766
1711 if (IS_ERR(req->r_reply)) { 1767 /* wait */
1712 err = PTR_ERR(req->r_reply); 1768 mutex_unlock(&mdsc->mutex);
1713 req->r_reply = NULL; 1769 dout("do_request waiting\n");
1770 if (req->r_timeout) {
1771 err = (long)wait_for_completion_interruptible_timeout(
1772 &req->r_completion, req->r_timeout);
1773 if (err == 0)
1774 err = -EIO;
1775 } else {
1776 err = wait_for_completion_interruptible(&req->r_completion);
1777 }
1778 dout("do_request waited, got %d\n", err);
1779 mutex_lock(&mdsc->mutex);
1714 1780
1715 if (err == -ERESTARTSYS) { 1781 /* only abort if we didn't race with a real reply */
1716 /* aborted */ 1782 if (req->r_got_result) {
1717 req->r_aborted = true; 1783 err = le32_to_cpu(req->r_reply_info.head->result);
1784 } else if (err < 0) {
1785 dout("aborted request %lld with %d\n", req->r_tid, err);
1718 1786
1719 if (req->r_locked_dir && 1787 /*
1720 (req->r_op & CEPH_MDS_OP_WRITE)) { 1788 * ensure we aren't running concurrently with
1721 struct ceph_inode_info *ci = 1789 * ceph_fill_trace or ceph_readdir_prepopulate, which
1722 ceph_inode(req->r_locked_dir); 1790 * rely on locks (dir mutex) held by our caller.
1791 */
1792 mutex_lock(&req->r_fill_mutex);
1793 req->r_err = err;
1794 req->r_aborted = true;
1795 mutex_unlock(&req->r_fill_mutex);
1723 1796
1724 dout("aborted, clearing I_COMPLETE on %p\n", 1797 if (req->r_locked_dir &&
1725 req->r_locked_dir); 1798 (req->r_op & CEPH_MDS_OP_WRITE))
1726 spin_lock(&req->r_locked_dir->i_lock); 1799 ceph_invalidate_dir_request(req);
1727 ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1728 ci->i_release_count++;
1729 spin_unlock(&req->r_locked_dir->i_lock);
1730 }
1731 } else {
1732 /* clean up this request */
1733 __unregister_request(mdsc, req);
1734 if (!list_empty(&req->r_unsafe_item))
1735 list_del_init(&req->r_unsafe_item);
1736 complete(&req->r_safe_completion);
1737 }
1738 } else if (req->r_err) {
1739 err = req->r_err;
1740 } else { 1800 } else {
1741 err = le32_to_cpu(req->r_reply_info.head->result); 1801 err = req->r_err;
1742 } 1802 }
1743 mutex_unlock(&mdsc->mutex);
1744 1803
1804out:
1805 mutex_unlock(&mdsc->mutex);
1745 dout("do_request %p done, result %d\n", req, err); 1806 dout("do_request %p done, result %d\n", req, err);
1746 return err; 1807 return err;
1747} 1808}
1748 1809
1749/* 1810/*
1811 * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
1812 * namespace request.
1813 */
1814void ceph_invalidate_dir_request(struct ceph_mds_request *req)
1815{
1816 struct inode *inode = req->r_locked_dir;
1817 struct ceph_inode_info *ci = ceph_inode(inode);
1818
1819 dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
1820 spin_lock(&inode->i_lock);
1821 ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1822 ci->i_release_count++;
1823 spin_unlock(&inode->i_lock);
1824
1825 if (req->r_dentry)
1826 ceph_invalidate_dentry_lease(req->r_dentry);
1827 if (req->r_old_dentry)
1828 ceph_invalidate_dentry_lease(req->r_old_dentry);
1829}
1830
1831/*
1750 * Handle mds reply. 1832 * Handle mds reply.
1751 * 1833 *
1752 * We take the session mutex and parse and process the reply immediately. 1834 * We take the session mutex and parse and process the reply immediately.
@@ -1797,6 +1879,12 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1797 mutex_unlock(&mdsc->mutex); 1879 mutex_unlock(&mdsc->mutex);
1798 goto out; 1880 goto out;
1799 } 1881 }
1882 if (req->r_got_safe && !head->safe) {
1883 pr_warning("got unsafe after safe on %llu from mds%d\n",
1884 tid, mds);
1885 mutex_unlock(&mdsc->mutex);
1886 goto out;
1887 }
1800 1888
1801 result = le32_to_cpu(head->result); 1889 result = le32_to_cpu(head->result);
1802 1890
@@ -1838,11 +1926,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1838 mutex_unlock(&mdsc->mutex); 1926 mutex_unlock(&mdsc->mutex);
1839 goto out; 1927 goto out;
1840 } 1928 }
1841 } 1929 } else {
1842
1843 BUG_ON(req->r_reply);
1844
1845 if (!head->safe) {
1846 req->r_got_unsafe = true; 1930 req->r_got_unsafe = true;
1847 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 1931 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
1848 } 1932 }
@@ -1871,21 +1955,30 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1871 } 1955 }
1872 1956
1873 /* insert trace into our cache */ 1957 /* insert trace into our cache */
1958 mutex_lock(&req->r_fill_mutex);
1874 err = ceph_fill_trace(mdsc->client->sb, req, req->r_session); 1959 err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
1875 if (err == 0) { 1960 if (err == 0) {
1876 if (result == 0 && rinfo->dir_nr) 1961 if (result == 0 && rinfo->dir_nr)
1877 ceph_readdir_prepopulate(req, req->r_session); 1962 ceph_readdir_prepopulate(req, req->r_session);
1878 ceph_unreserve_caps(&req->r_caps_reservation); 1963 ceph_unreserve_caps(&req->r_caps_reservation);
1879 } 1964 }
1965 mutex_unlock(&req->r_fill_mutex);
1880 1966
1881 up_read(&mdsc->snap_rwsem); 1967 up_read(&mdsc->snap_rwsem);
1882out_err: 1968out_err:
1883 if (err) { 1969 mutex_lock(&mdsc->mutex);
1884 req->r_err = err; 1970 if (!req->r_aborted) {
1971 if (err) {
1972 req->r_err = err;
1973 } else {
1974 req->r_reply = msg;
1975 ceph_msg_get(msg);
1976 req->r_got_result = true;
1977 }
1885 } else { 1978 } else {
1886 req->r_reply = msg; 1979 dout("reply arrived after request %lld was aborted\n", tid);
1887 ceph_msg_get(msg);
1888 } 1980 }
1981 mutex_unlock(&mdsc->mutex);
1889 1982
1890 add_cap_releases(mdsc, req->r_session, -1); 1983 add_cap_releases(mdsc, req->r_session, -1);
1891 mutex_unlock(&session->s_mutex); 1984 mutex_unlock(&session->s_mutex);
@@ -1984,6 +2077,8 @@ static void handle_session(struct ceph_mds_session *session,
1984 2077
1985 switch (op) { 2078 switch (op) {
1986 case CEPH_SESSION_OPEN: 2079 case CEPH_SESSION_OPEN:
2080 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2081 pr_info("mds%d reconnect success\n", session->s_mds);
1987 session->s_state = CEPH_MDS_SESSION_OPEN; 2082 session->s_state = CEPH_MDS_SESSION_OPEN;
1988 renewed_caps(mdsc, session, 0); 2083 renewed_caps(mdsc, session, 0);
1989 wake = 1; 2084 wake = 1;
@@ -1997,10 +2092,12 @@ static void handle_session(struct ceph_mds_session *session,
1997 break; 2092 break;
1998 2093
1999 case CEPH_SESSION_CLOSE: 2094 case CEPH_SESSION_CLOSE:
2095 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2096 pr_info("mds%d reconnect denied\n", session->s_mds);
2000 remove_session_caps(session); 2097 remove_session_caps(session);
2001 wake = 1; /* for good measure */ 2098 wake = 1; /* for good measure */
2002 complete(&mdsc->session_close_waiters); 2099 complete(&mdsc->session_close_waiters);
2003 kick_requests(mdsc, mds, 0); /* cur only */ 2100 kick_requests(mdsc, mds);
2004 break; 2101 break;
2005 2102
2006 case CEPH_SESSION_STALE: 2103 case CEPH_SESSION_STALE:
@@ -2132,54 +2229,44 @@ out:
2132 * 2229 *
2133 * called with mdsc->mutex held. 2230 * called with mdsc->mutex held.
2134 */ 2231 */
2135static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) 2232static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2233 struct ceph_mds_session *session)
2136{ 2234{
2137 struct ceph_mds_session *session = NULL;
2138 struct ceph_msg *reply; 2235 struct ceph_msg *reply;
2139 struct rb_node *p; 2236 struct rb_node *p;
2237 int mds = session->s_mds;
2140 int err = -ENOMEM; 2238 int err = -ENOMEM;
2141 struct ceph_pagelist *pagelist; 2239 struct ceph_pagelist *pagelist;
2142 2240
2143 pr_info("reconnect to recovering mds%d\n", mds); 2241 pr_info("mds%d reconnect start\n", mds);
2144 2242
2145 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 2243 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2146 if (!pagelist) 2244 if (!pagelist)
2147 goto fail_nopagelist; 2245 goto fail_nopagelist;
2148 ceph_pagelist_init(pagelist); 2246 ceph_pagelist_init(pagelist);
2149 2247
2150 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL); 2248 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
2151 if (IS_ERR(reply)) { 2249 if (!reply)
2152 err = PTR_ERR(reply);
2153 goto fail_nomsg; 2250 goto fail_nomsg;
2154 }
2155
2156 /* find session */
2157 session = __ceph_lookup_mds_session(mdsc, mds);
2158 mutex_unlock(&mdsc->mutex); /* drop lock for duration */
2159 2251
2160 if (session) { 2252 mutex_lock(&session->s_mutex);
2161 mutex_lock(&session->s_mutex); 2253 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2254 session->s_seq = 0;
2162 2255
2163 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 2256 ceph_con_open(&session->s_con,
2164 session->s_seq = 0; 2257 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2165 2258
2166 ceph_con_open(&session->s_con, 2259 /* replay unsafe requests */
2167 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 2260 replay_unsafe_requests(mdsc, session);
2168
2169 /* replay unsafe requests */
2170 replay_unsafe_requests(mdsc, session);
2171 } else {
2172 dout("no session for mds%d, will send short reconnect\n",
2173 mds);
2174 }
2175 2261
2176 down_read(&mdsc->snap_rwsem); 2262 down_read(&mdsc->snap_rwsem);
2177 2263
2178 if (!session)
2179 goto send;
2180 dout("session %p state %s\n", session, 2264 dout("session %p state %s\n", session,
2181 session_state_name(session->s_state)); 2265 session_state_name(session->s_state));
2182 2266
2267 /* drop old cap expires; we're about to reestablish that state */
2268 discard_cap_releases(mdsc, session);
2269
2183 /* traverse this session's caps */ 2270 /* traverse this session's caps */
2184 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); 2271 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2185 if (err) 2272 if (err)
@@ -2208,36 +2295,29 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
2208 goto fail; 2295 goto fail;
2209 } 2296 }
2210 2297
2211send:
2212 reply->pagelist = pagelist; 2298 reply->pagelist = pagelist;
2213 reply->hdr.data_len = cpu_to_le32(pagelist->length); 2299 reply->hdr.data_len = cpu_to_le32(pagelist->length);
2214 reply->nr_pages = calc_pages_for(0, pagelist->length); 2300 reply->nr_pages = calc_pages_for(0, pagelist->length);
2215 ceph_con_send(&session->s_con, reply); 2301 ceph_con_send(&session->s_con, reply);
2216 2302
2217 session->s_state = CEPH_MDS_SESSION_OPEN;
2218 mutex_unlock(&session->s_mutex); 2303 mutex_unlock(&session->s_mutex);
2219 2304
2220 mutex_lock(&mdsc->mutex); 2305 mutex_lock(&mdsc->mutex);
2221 __wake_requests(mdsc, &session->s_waiting); 2306 __wake_requests(mdsc, &session->s_waiting);
2222 mutex_unlock(&mdsc->mutex); 2307 mutex_unlock(&mdsc->mutex);
2223 2308
2224 ceph_put_mds_session(session);
2225
2226 up_read(&mdsc->snap_rwsem); 2309 up_read(&mdsc->snap_rwsem);
2227 mutex_lock(&mdsc->mutex);
2228 return; 2310 return;
2229 2311
2230fail: 2312fail:
2231 ceph_msg_put(reply); 2313 ceph_msg_put(reply);
2232 up_read(&mdsc->snap_rwsem); 2314 up_read(&mdsc->snap_rwsem);
2233 mutex_unlock(&session->s_mutex); 2315 mutex_unlock(&session->s_mutex);
2234 ceph_put_mds_session(session);
2235fail_nomsg: 2316fail_nomsg:
2236 ceph_pagelist_release(pagelist); 2317 ceph_pagelist_release(pagelist);
2237 kfree(pagelist); 2318 kfree(pagelist);
2238fail_nopagelist: 2319fail_nopagelist:
2239 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 2320 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2240 mutex_lock(&mdsc->mutex);
2241 return; 2321 return;
2242} 2322}
2243 2323
@@ -2290,7 +2370,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
2290 } 2370 }
2291 2371
2292 /* kick any requests waiting on the recovering mds */ 2372 /* kick any requests waiting on the recovering mds */
2293 kick_requests(mdsc, i, 1); 2373 kick_requests(mdsc, i);
2294 } else if (oldstate == newstate) { 2374 } else if (oldstate == newstate) {
2295 continue; /* nothing new with this mds */ 2375 continue; /* nothing new with this mds */
2296 } 2376 }
@@ -2299,22 +2379,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
2299 * send reconnect? 2379 * send reconnect?
2300 */ 2380 */
2301 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 2381 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2302 newstate >= CEPH_MDS_STATE_RECONNECT) 2382 newstate >= CEPH_MDS_STATE_RECONNECT) {
2303 send_mds_reconnect(mdsc, i); 2383 mutex_unlock(&mdsc->mutex);
2384 send_mds_reconnect(mdsc, s);
2385 mutex_lock(&mdsc->mutex);
2386 }
2304 2387
2305 /* 2388 /*
2306 * kick requests on any mds that has gone active. 2389 * kick request on any mds that has gone active.
2307 *
2308 * kick requests on cur or forwarder: we may have sent
2309 * the request to mds1, mds1 told us it forwarded it
2310 * to mds2, but then we learn mds1 failed and can't be
2311 * sure it successfully forwarded our request before
2312 * it died.
2313 */ 2390 */
2314 if (oldstate < CEPH_MDS_STATE_ACTIVE && 2391 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2315 newstate >= CEPH_MDS_STATE_ACTIVE) { 2392 newstate >= CEPH_MDS_STATE_ACTIVE) {
2316 pr_info("mds%d reconnect completed\n", s->s_mds); 2393 if (oldstate != CEPH_MDS_STATE_CREATING &&
2317 kick_requests(mdsc, i, 1); 2394 oldstate != CEPH_MDS_STATE_STARTING)
2395 pr_info("mds%d recovery completed\n", s->s_mds);
2396 kick_requests(mdsc, i);
2318 ceph_kick_flushing_caps(mdsc, s); 2397 ceph_kick_flushing_caps(mdsc, s);
2319 wake_up_session_caps(s, 1); 2398 wake_up_session_caps(s, 1);
2320 } 2399 }
@@ -2457,8 +2536,8 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2457 dnamelen = dentry->d_name.len; 2536 dnamelen = dentry->d_name.len;
2458 len += dnamelen; 2537 len += dnamelen;
2459 2538
2460 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL); 2539 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
2461 if (IS_ERR(msg)) 2540 if (!msg)
2462 return; 2541 return;
2463 lease = msg->front.iov_base; 2542 lease = msg->front.iov_base;
2464 lease->action = action; 2543 lease->action = action;
@@ -2603,7 +2682,9 @@ static void delayed_work(struct work_struct *work)
2603 else 2682 else
2604 ceph_con_keepalive(&s->s_con); 2683 ceph_con_keepalive(&s->s_con);
2605 add_cap_releases(mdsc, s, -1); 2684 add_cap_releases(mdsc, s, -1);
2606 send_cap_releases(mdsc, s); 2685 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2686 s->s_state == CEPH_MDS_SESSION_HUNG)
2687 send_cap_releases(mdsc, s);
2607 mutex_unlock(&s->s_mutex); 2688 mutex_unlock(&s->s_mutex);
2608 ceph_put_mds_session(s); 2689 ceph_put_mds_session(s);
2609 2690
@@ -2620,6 +2701,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2620 mdsc->client = client; 2701 mdsc->client = client;
2621 mutex_init(&mdsc->mutex); 2702 mutex_init(&mdsc->mutex);
2622 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 2703 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2704 if (mdsc->mdsmap == NULL)
2705 return -ENOMEM;
2706
2623 init_completion(&mdsc->safe_umount_waiters); 2707 init_completion(&mdsc->safe_umount_waiters);
2624 init_completion(&mdsc->session_close_waiters); 2708 init_completion(&mdsc->session_close_waiters);
2625 INIT_LIST_HEAD(&mdsc->waiting_for_map); 2709 INIT_LIST_HEAD(&mdsc->waiting_for_map);
@@ -2645,6 +2729,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2645 init_waitqueue_head(&mdsc->cap_flushing_wq); 2729 init_waitqueue_head(&mdsc->cap_flushing_wq);
2646 spin_lock_init(&mdsc->dentry_lru_lock); 2730 spin_lock_init(&mdsc->dentry_lru_lock);
2647 INIT_LIST_HEAD(&mdsc->dentry_lru); 2731 INIT_LIST_HEAD(&mdsc->dentry_lru);
2732
2648 return 0; 2733 return 0;
2649} 2734}
2650 2735
@@ -2740,6 +2825,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
2740{ 2825{
2741 u64 want_tid, want_flush; 2826 u64 want_tid, want_flush;
2742 2827
2828 if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
2829 return;
2830
2743 dout("sync\n"); 2831 dout("sync\n");
2744 mutex_lock(&mdsc->mutex); 2832 mutex_lock(&mdsc->mutex);
2745 want_tid = mdsc->last_tid; 2833 want_tid = mdsc->last_tid;
@@ -2922,9 +3010,10 @@ static void con_put(struct ceph_connection *con)
2922static void peer_reset(struct ceph_connection *con) 3010static void peer_reset(struct ceph_connection *con)
2923{ 3011{
2924 struct ceph_mds_session *s = con->private; 3012 struct ceph_mds_session *s = con->private;
3013 struct ceph_mds_client *mdsc = s->s_mdsc;
2925 3014
2926 pr_err("mds%d gave us the boot. IMPLEMENT RECONNECT.\n", 3015 pr_warning("mds%d closed our session\n", s->s_mds);
2927 s->s_mds); 3016 send_mds_reconnect(mdsc, s);
2928} 3017}
2929 3018
2930static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 3019static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
@@ -3031,7 +3120,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
3031 return ceph_monc_validate_auth(&mdsc->client->monc); 3120 return ceph_monc_validate_auth(&mdsc->client->monc);
3032} 3121}
3033 3122
3034const static struct ceph_connection_operations mds_con_ops = { 3123static const struct ceph_connection_operations mds_con_ops = {
3035 .get = con_get, 3124 .get = con_get,
3036 .put = con_put, 3125 .put = con_put,
3037 .dispatch = dispatch, 3126 .dispatch = dispatch,
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 961cc6f65878..d9936c4f1212 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -165,6 +165,8 @@ struct ceph_mds_request {
165 struct inode *r_locked_dir; /* dir (if any) i_mutex locked by vfs */ 165 struct inode *r_locked_dir; /* dir (if any) i_mutex locked by vfs */
166 struct inode *r_target_inode; /* resulting inode */ 166 struct inode *r_target_inode; /* resulting inode */
167 167
168 struct mutex r_fill_mutex;
169
168 union ceph_mds_request_args r_args; 170 union ceph_mds_request_args r_args;
169 int r_fmode; /* file mode, if expecting cap */ 171 int r_fmode; /* file mode, if expecting cap */
170 172
@@ -213,7 +215,7 @@ struct ceph_mds_request {
213 struct completion r_safe_completion; 215 struct completion r_safe_completion;
214 ceph_mds_request_callback_t r_callback; 216 ceph_mds_request_callback_t r_callback;
215 struct list_head r_unsafe_item; /* per-session unsafe list item */ 217 struct list_head r_unsafe_item; /* per-session unsafe list item */
216 bool r_got_unsafe, r_got_safe; 218 bool r_got_unsafe, r_got_safe, r_got_result;
217 219
218 bool r_did_prepopulate; 220 bool r_did_prepopulate;
219 u32 r_readdir_offset; 221 u32 r_readdir_offset;
@@ -301,6 +303,8 @@ extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
301 struct inode *inode, 303 struct inode *inode,
302 struct dentry *dn, int mask); 304 struct dentry *dn, int mask);
303 305
306extern void ceph_invalidate_dir_request(struct ceph_mds_request *req);
307
304extern struct ceph_mds_request * 308extern struct ceph_mds_request *
305ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode); 309ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode);
306extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, 310extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index cd4fadb6491a..60b74839ebec 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -39,18 +39,6 @@ static void queue_con(struct ceph_connection *con);
39static void con_work(struct work_struct *); 39static void con_work(struct work_struct *);
40static void ceph_fault(struct ceph_connection *con); 40static void ceph_fault(struct ceph_connection *con);
41 41
42const char *ceph_name_type_str(int t)
43{
44 switch (t) {
45 case CEPH_ENTITY_TYPE_MON: return "mon";
46 case CEPH_ENTITY_TYPE_MDS: return "mds";
47 case CEPH_ENTITY_TYPE_OSD: return "osd";
48 case CEPH_ENTITY_TYPE_CLIENT: return "client";
49 case CEPH_ENTITY_TYPE_ADMIN: return "admin";
50 default: return "???";
51 }
52}
53
54/* 42/*
55 * nicely render a sockaddr as a string. 43 * nicely render a sockaddr as a string.
56 */ 44 */
@@ -340,6 +328,7 @@ static void reset_connection(struct ceph_connection *con)
340 ceph_msg_put(con->out_msg); 328 ceph_msg_put(con->out_msg);
341 con->out_msg = NULL; 329 con->out_msg = NULL;
342 } 330 }
331 con->out_keepalive_pending = false;
343 con->in_seq = 0; 332 con->in_seq = 0;
344 con->in_seq_acked = 0; 333 con->in_seq_acked = 0;
345} 334}
@@ -357,6 +346,7 @@ void ceph_con_close(struct ceph_connection *con)
357 clear_bit(WRITE_PENDING, &con->state); 346 clear_bit(WRITE_PENDING, &con->state);
358 mutex_lock(&con->mutex); 347 mutex_lock(&con->mutex);
359 reset_connection(con); 348 reset_connection(con);
349 con->peer_global_seq = 0;
360 cancel_delayed_work(&con->work); 350 cancel_delayed_work(&con->work);
361 mutex_unlock(&con->mutex); 351 mutex_unlock(&con->mutex);
362 queue_con(con); 352 queue_con(con);
@@ -661,7 +651,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
661 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 651 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
662 con->connect_seq, global_seq, proto); 652 con->connect_seq, global_seq, proto);
663 653
664 con->out_connect.features = CEPH_FEATURE_SUPPORTED; 654 con->out_connect.features = CEPH_FEATURE_SUPPORTED_CLIENT;
665 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 655 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
666 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 656 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
667 con->out_connect.global_seq = cpu_to_le32(global_seq); 657 con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1124,8 +1114,8 @@ static void fail_protocol(struct ceph_connection *con)
1124 1114
1125static int process_connect(struct ceph_connection *con) 1115static int process_connect(struct ceph_connection *con)
1126{ 1116{
1127 u64 sup_feat = CEPH_FEATURE_SUPPORTED; 1117 u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT;
1128 u64 req_feat = CEPH_FEATURE_REQUIRED; 1118 u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT;
1129 u64 server_feat = le64_to_cpu(con->in_reply.features); 1119 u64 server_feat = le64_to_cpu(con->in_reply.features);
1130 1120
1131 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1121 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -1233,6 +1223,7 @@ static int process_connect(struct ceph_connection *con)
1233 clear_bit(CONNECTING, &con->state); 1223 clear_bit(CONNECTING, &con->state);
1234 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1224 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1235 con->connect_seq++; 1225 con->connect_seq++;
1226 con->peer_features = server_feat;
1236 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1227 dout("process_connect got READY gseq %d cseq %d (%d)\n",
1237 con->peer_global_seq, 1228 con->peer_global_seq,
1238 le32_to_cpu(con->in_reply.connect_seq), 1229 le32_to_cpu(con->in_reply.connect_seq),
@@ -1402,19 +1393,17 @@ static int read_partial_message(struct ceph_connection *con)
1402 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1393 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
1403 if (skip) { 1394 if (skip) {
1404 /* skip this message */ 1395 /* skip this message */
1405 dout("alloc_msg returned NULL, skipping message\n"); 1396 dout("alloc_msg said skip message\n");
1406 con->in_base_pos = -front_len - middle_len - data_len - 1397 con->in_base_pos = -front_len - middle_len - data_len -
1407 sizeof(m->footer); 1398 sizeof(m->footer);
1408 con->in_tag = CEPH_MSGR_TAG_READY; 1399 con->in_tag = CEPH_MSGR_TAG_READY;
1409 con->in_seq++; 1400 con->in_seq++;
1410 return 0; 1401 return 0;
1411 } 1402 }
1412 if (IS_ERR(con->in_msg)) { 1403 if (!con->in_msg) {
1413 ret = PTR_ERR(con->in_msg);
1414 con->in_msg = NULL;
1415 con->error_msg = 1404 con->error_msg =
1416 "error allocating memory for incoming message"; 1405 "error allocating memory for incoming message";
1417 return ret; 1406 return -ENOMEM;
1418 } 1407 }
1419 m = con->in_msg; 1408 m = con->in_msg;
1420 m->front.iov_len = 0; /* haven't read it yet */ 1409 m->front.iov_len = 0; /* haven't read it yet */
@@ -1514,14 +1503,14 @@ static void process_message(struct ceph_connection *con)
1514 1503
1515 /* if first message, set peer_name */ 1504 /* if first message, set peer_name */
1516 if (con->peer_name.type == 0) 1505 if (con->peer_name.type == 0)
1517 con->peer_name = msg->hdr.src.name; 1506 con->peer_name = msg->hdr.src;
1518 1507
1519 con->in_seq++; 1508 con->in_seq++;
1520 mutex_unlock(&con->mutex); 1509 mutex_unlock(&con->mutex);
1521 1510
1522 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1511 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
1523 msg, le64_to_cpu(msg->hdr.seq), 1512 msg, le64_to_cpu(msg->hdr.seq),
1524 ENTITY_NAME(msg->hdr.src.name), 1513 ENTITY_NAME(msg->hdr.src),
1525 le16_to_cpu(msg->hdr.type), 1514 le16_to_cpu(msg->hdr.type),
1526 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1515 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1527 le32_to_cpu(msg->hdr.front_len), 1516 le32_to_cpu(msg->hdr.front_len),
@@ -1546,7 +1535,6 @@ static int try_write(struct ceph_connection *con)
1546 dout("try_write start %p state %lu nref %d\n", con, con->state, 1535 dout("try_write start %p state %lu nref %d\n", con, con->state,
1547 atomic_read(&con->nref)); 1536 atomic_read(&con->nref));
1548 1537
1549 mutex_lock(&con->mutex);
1550more: 1538more:
1551 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1539 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1552 1540
@@ -1639,7 +1627,6 @@ do_next:
1639done: 1627done:
1640 ret = 0; 1628 ret = 0;
1641out: 1629out:
1642 mutex_unlock(&con->mutex);
1643 dout("try_write done on %p\n", con); 1630 dout("try_write done on %p\n", con);
1644 return ret; 1631 return ret;
1645} 1632}
@@ -1651,7 +1638,6 @@ out:
1651 */ 1638 */
1652static int try_read(struct ceph_connection *con) 1639static int try_read(struct ceph_connection *con)
1653{ 1640{
1654 struct ceph_messenger *msgr;
1655 int ret = -1; 1641 int ret = -1;
1656 1642
1657 if (!con->sock) 1643 if (!con->sock)
@@ -1661,9 +1647,6 @@ static int try_read(struct ceph_connection *con)
1661 return 0; 1647 return 0;
1662 1648
1663 dout("try_read start on %p\n", con); 1649 dout("try_read start on %p\n", con);
1664 msgr = con->msgr;
1665
1666 mutex_lock(&con->mutex);
1667 1650
1668more: 1651more:
1669 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1652 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
@@ -1758,7 +1741,6 @@ more:
1758done: 1741done:
1759 ret = 0; 1742 ret = 0;
1760out: 1743out:
1761 mutex_unlock(&con->mutex);
1762 dout("try_read done on %p\n", con); 1744 dout("try_read done on %p\n", con);
1763 return ret; 1745 return ret;
1764 1746
@@ -1830,6 +1812,8 @@ more:
1830 dout("con_work %p start, clearing QUEUED\n", con); 1812 dout("con_work %p start, clearing QUEUED\n", con);
1831 clear_bit(QUEUED, &con->state); 1813 clear_bit(QUEUED, &con->state);
1832 1814
1815 mutex_lock(&con->mutex);
1816
1833 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 1817 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
1834 dout("con_work CLOSED\n"); 1818 dout("con_work CLOSED\n");
1835 con_close_socket(con); 1819 con_close_socket(con);
@@ -1844,11 +1828,16 @@ more:
1844 if (test_and_clear_bit(SOCK_CLOSED, &con->state) || 1828 if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
1845 try_read(con) < 0 || 1829 try_read(con) < 0 ||
1846 try_write(con) < 0) { 1830 try_write(con) < 0) {
1831 mutex_unlock(&con->mutex);
1847 backoff = 1; 1832 backoff = 1;
1848 ceph_fault(con); /* error/fault path */ 1833 ceph_fault(con); /* error/fault path */
1834 goto done_unlocked;
1849 } 1835 }
1850 1836
1851done: 1837done:
1838 mutex_unlock(&con->mutex);
1839
1840done_unlocked:
1852 clear_bit(BUSY, &con->state); 1841 clear_bit(BUSY, &con->state);
1853 dout("con->state=%lu\n", con->state); 1842 dout("con->state=%lu\n", con->state);
1854 if (test_bit(QUEUED, &con->state)) { 1843 if (test_bit(QUEUED, &con->state)) {
@@ -1947,7 +1936,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
1947 1936
1948 /* the zero page is needed if a request is "canceled" while the message 1937 /* the zero page is needed if a request is "canceled" while the message
1949 * is being written over the socket */ 1938 * is being written over the socket */
1950 msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO); 1939 msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
1951 if (!msgr->zero_page) { 1940 if (!msgr->zero_page) {
1952 kfree(msgr); 1941 kfree(msgr);
1953 return ERR_PTR(-ENOMEM); 1942 return ERR_PTR(-ENOMEM);
@@ -1987,9 +1976,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1987 } 1976 }
1988 1977
1989 /* set src+dst */ 1978 /* set src+dst */
1990 msg->hdr.src.name = con->msgr->inst.name; 1979 msg->hdr.src = con->msgr->inst.name;
1991 msg->hdr.src.addr = con->msgr->my_enc_addr;
1992 msg->hdr.orig_src = msg->hdr.src;
1993 1980
1994 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 1981 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
1995 1982
@@ -2083,12 +2070,11 @@ void ceph_con_keepalive(struct ceph_connection *con)
2083 * construct a new message with given type, size 2070 * construct a new message with given type, size
2084 * the new msg has a ref count of 1. 2071 * the new msg has a ref count of 1.
2085 */ 2072 */
2086struct ceph_msg *ceph_msg_new(int type, int front_len, 2073struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2087 int page_len, int page_off, struct page **pages)
2088{ 2074{
2089 struct ceph_msg *m; 2075 struct ceph_msg *m;
2090 2076
2091 m = kmalloc(sizeof(*m), GFP_NOFS); 2077 m = kmalloc(sizeof(*m), flags);
2092 if (m == NULL) 2078 if (m == NULL)
2093 goto out; 2079 goto out;
2094 kref_init(&m->kref); 2080 kref_init(&m->kref);
@@ -2100,8 +2086,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
2100 m->hdr.version = 0; 2086 m->hdr.version = 0;
2101 m->hdr.front_len = cpu_to_le32(front_len); 2087 m->hdr.front_len = cpu_to_le32(front_len);
2102 m->hdr.middle_len = 0; 2088 m->hdr.middle_len = 0;
2103 m->hdr.data_len = cpu_to_le32(page_len); 2089 m->hdr.data_len = 0;
2104 m->hdr.data_off = cpu_to_le16(page_off); 2090 m->hdr.data_off = 0;
2105 m->hdr.reserved = 0; 2091 m->hdr.reserved = 0;
2106 m->footer.front_crc = 0; 2092 m->footer.front_crc = 0;
2107 m->footer.middle_crc = 0; 2093 m->footer.middle_crc = 0;
@@ -2115,11 +2101,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
2115 /* front */ 2101 /* front */
2116 if (front_len) { 2102 if (front_len) {
2117 if (front_len > PAGE_CACHE_SIZE) { 2103 if (front_len > PAGE_CACHE_SIZE) {
2118 m->front.iov_base = __vmalloc(front_len, GFP_NOFS, 2104 m->front.iov_base = __vmalloc(front_len, flags,
2119 PAGE_KERNEL); 2105 PAGE_KERNEL);
2120 m->front_is_vmalloc = true; 2106 m->front_is_vmalloc = true;
2121 } else { 2107 } else {
2122 m->front.iov_base = kmalloc(front_len, GFP_NOFS); 2108 m->front.iov_base = kmalloc(front_len, flags);
2123 } 2109 }
2124 if (m->front.iov_base == NULL) { 2110 if (m->front.iov_base == NULL) {
2125 pr_err("msg_new can't allocate %d bytes\n", 2111 pr_err("msg_new can't allocate %d bytes\n",
@@ -2135,19 +2121,18 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
2135 m->middle = NULL; 2121 m->middle = NULL;
2136 2122
2137 /* data */ 2123 /* data */
2138 m->nr_pages = calc_pages_for(page_off, page_len); 2124 m->nr_pages = 0;
2139 m->pages = pages; 2125 m->pages = NULL;
2140 m->pagelist = NULL; 2126 m->pagelist = NULL;
2141 2127
2142 dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, 2128 dout("ceph_msg_new %p front %d\n", m, front_len);
2143 m->nr_pages);
2144 return m; 2129 return m;
2145 2130
2146out2: 2131out2:
2147 ceph_msg_put(m); 2132 ceph_msg_put(m);
2148out: 2133out:
2149 pr_err("msg_new can't create type %d len %d\n", type, front_len); 2134 pr_err("msg_new can't create type %d front %d\n", type, front_len);
2150 return ERR_PTR(-ENOMEM); 2135 return NULL;
2151} 2136}
2152 2137
2153/* 2138/*
@@ -2190,29 +2175,25 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2190 mutex_unlock(&con->mutex); 2175 mutex_unlock(&con->mutex);
2191 msg = con->ops->alloc_msg(con, hdr, skip); 2176 msg = con->ops->alloc_msg(con, hdr, skip);
2192 mutex_lock(&con->mutex); 2177 mutex_lock(&con->mutex);
2193 if (IS_ERR(msg)) 2178 if (!msg || *skip)
2194 return msg;
2195
2196 if (*skip)
2197 return NULL; 2179 return NULL;
2198 } 2180 }
2199 if (!msg) { 2181 if (!msg) {
2200 *skip = 0; 2182 *skip = 0;
2201 msg = ceph_msg_new(type, front_len, 0, 0, NULL); 2183 msg = ceph_msg_new(type, front_len, GFP_NOFS);
2202 if (!msg) { 2184 if (!msg) {
2203 pr_err("unable to allocate msg type %d len %d\n", 2185 pr_err("unable to allocate msg type %d len %d\n",
2204 type, front_len); 2186 type, front_len);
2205 return ERR_PTR(-ENOMEM); 2187 return NULL;
2206 } 2188 }
2207 } 2189 }
2208 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2190 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2209 2191
2210 if (middle_len) { 2192 if (middle_len && !msg->middle) {
2211 ret = ceph_alloc_middle(con, msg); 2193 ret = ceph_alloc_middle(con, msg);
2212
2213 if (ret < 0) { 2194 if (ret < 0) {
2214 ceph_msg_put(msg); 2195 ceph_msg_put(msg);
2215 return msg; 2196 return NULL;
2216 } 2197 }
2217 } 2198 }
2218 2199
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index a5caf91cc971..00a9430b1ffc 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -49,10 +49,8 @@ struct ceph_connection_operations {
49 int *skip); 49 int *skip);
50}; 50};
51 51
52extern const char *ceph_name_type_str(int t);
53
54/* use format string %s%d */ 52/* use format string %s%d */
55#define ENTITY_NAME(n) ceph_name_type_str((n).type), le64_to_cpu((n).num) 53#define ENTITY_NAME(n) ceph_entity_type_name((n).type), le64_to_cpu((n).num)
56 54
57struct ceph_messenger { 55struct ceph_messenger {
58 struct ceph_entity_inst inst; /* my name+address */ 56 struct ceph_entity_inst inst; /* my name+address */
@@ -144,6 +142,7 @@ struct ceph_connection {
144 struct ceph_entity_addr peer_addr; /* peer address */ 142 struct ceph_entity_addr peer_addr; /* peer address */
145 struct ceph_entity_name peer_name; /* peer name */ 143 struct ceph_entity_name peer_name; /* peer name */
146 struct ceph_entity_addr peer_addr_for_me; 144 struct ceph_entity_addr peer_addr_for_me;
145 unsigned peer_features;
147 u32 connect_seq; /* identify the most recent connection 146 u32 connect_seq; /* identify the most recent connection
148 attempt for this connection, client */ 147 attempt for this connection, client */
149 u32 peer_global_seq; /* peer's global seq for this connection */ 148 u32 peer_global_seq; /* peer's global seq for this connection */
@@ -158,7 +157,6 @@ struct ceph_connection {
158 struct list_head out_queue; 157 struct list_head out_queue;
159 struct list_head out_sent; /* sending or sent but unacked */ 158 struct list_head out_sent; /* sending or sent but unacked */
160 u64 out_seq; /* last message queued for send */ 159 u64 out_seq; /* last message queued for send */
161 u64 out_seq_sent; /* last message sent */
162 bool out_keepalive_pending; 160 bool out_keepalive_pending;
163 161
164 u64 in_seq, in_seq_acked; /* last message received, acked */ 162 u64 in_seq, in_seq_acked; /* last message received, acked */
@@ -234,9 +232,7 @@ extern void ceph_con_keepalive(struct ceph_connection *con);
234extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); 232extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
235extern void ceph_con_put(struct ceph_connection *con); 233extern void ceph_con_put(struct ceph_connection *con);
236 234
237extern struct ceph_msg *ceph_msg_new(int type, int front_len, 235extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags);
238 int page_len, int page_off,
239 struct page **pages);
240extern void ceph_msg_kfree(struct ceph_msg *m); 236extern void ceph_msg_kfree(struct ceph_msg *m);
241 237
242 238
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 8fdc011ca956..f6510a476e7e 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -28,7 +28,7 @@
28 * resend any outstanding requests. 28 * resend any outstanding requests.
29 */ 29 */
30 30
31const static struct ceph_connection_operations mon_con_ops; 31static const struct ceph_connection_operations mon_con_ops;
32 32
33static int __validate_auth(struct ceph_mon_client *monc); 33static int __validate_auth(struct ceph_mon_client *monc);
34 34
@@ -104,6 +104,7 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
104 monc->pending_auth = 1; 104 monc->pending_auth = 1;
105 monc->m_auth->front.iov_len = len; 105 monc->m_auth->front.iov_len = len;
106 monc->m_auth->hdr.front_len = cpu_to_le32(len); 106 monc->m_auth->hdr.front_len = cpu_to_le32(len);
107 ceph_con_revoke(monc->con, monc->m_auth);
107 ceph_msg_get(monc->m_auth); /* keep our ref */ 108 ceph_msg_get(monc->m_auth); /* keep our ref */
108 ceph_con_send(monc->con, monc->m_auth); 109 ceph_con_send(monc->con, monc->m_auth);
109} 110}
@@ -187,16 +188,12 @@ static void __send_subscribe(struct ceph_mon_client *monc)
187 monc->want_next_osdmap); 188 monc->want_next_osdmap);
188 if ((__sub_expired(monc) && !monc->sub_sent) || 189 if ((__sub_expired(monc) && !monc->sub_sent) ||
189 monc->want_next_osdmap == 1) { 190 monc->want_next_osdmap == 1) {
190 struct ceph_msg *msg; 191 struct ceph_msg *msg = monc->m_subscribe;
191 struct ceph_mon_subscribe_item *i; 192 struct ceph_mon_subscribe_item *i;
192 void *p, *end; 193 void *p, *end;
193 194
194 msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, 0, 0, NULL);
195 if (!msg)
196 return;
197
198 p = msg->front.iov_base; 195 p = msg->front.iov_base;
199 end = p + msg->front.iov_len; 196 end = p + msg->front_max;
200 197
201 dout("__send_subscribe to 'mdsmap' %u+\n", 198 dout("__send_subscribe to 'mdsmap' %u+\n",
202 (unsigned)monc->have_mdsmap); 199 (unsigned)monc->have_mdsmap);
@@ -226,7 +223,8 @@ static void __send_subscribe(struct ceph_mon_client *monc)
226 223
227 msg->front.iov_len = p - msg->front.iov_base; 224 msg->front.iov_len = p - msg->front.iov_base;
228 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 225 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
229 ceph_con_send(monc->con, msg); 226 ceph_con_revoke(monc->con, msg);
227 ceph_con_send(monc->con, ceph_msg_get(msg));
230 228
231 monc->sub_sent = jiffies | 1; /* never 0 */ 229 monc->sub_sent = jiffies | 1; /* never 0 */
232 } 230 }
@@ -353,14 +351,14 @@ out:
353/* 351/*
354 * statfs 352 * statfs
355 */ 353 */
356static struct ceph_mon_statfs_request *__lookup_statfs( 354static struct ceph_mon_generic_request *__lookup_generic_req(
357 struct ceph_mon_client *monc, u64 tid) 355 struct ceph_mon_client *monc, u64 tid)
358{ 356{
359 struct ceph_mon_statfs_request *req; 357 struct ceph_mon_generic_request *req;
360 struct rb_node *n = monc->statfs_request_tree.rb_node; 358 struct rb_node *n = monc->generic_request_tree.rb_node;
361 359
362 while (n) { 360 while (n) {
363 req = rb_entry(n, struct ceph_mon_statfs_request, node); 361 req = rb_entry(n, struct ceph_mon_generic_request, node);
364 if (tid < req->tid) 362 if (tid < req->tid)
365 n = n->rb_left; 363 n = n->rb_left;
366 else if (tid > req->tid) 364 else if (tid > req->tid)
@@ -371,16 +369,16 @@ static struct ceph_mon_statfs_request *__lookup_statfs(
371 return NULL; 369 return NULL;
372} 370}
373 371
374static void __insert_statfs(struct ceph_mon_client *monc, 372static void __insert_generic_request(struct ceph_mon_client *monc,
375 struct ceph_mon_statfs_request *new) 373 struct ceph_mon_generic_request *new)
376{ 374{
377 struct rb_node **p = &monc->statfs_request_tree.rb_node; 375 struct rb_node **p = &monc->generic_request_tree.rb_node;
378 struct rb_node *parent = NULL; 376 struct rb_node *parent = NULL;
379 struct ceph_mon_statfs_request *req = NULL; 377 struct ceph_mon_generic_request *req = NULL;
380 378
381 while (*p) { 379 while (*p) {
382 parent = *p; 380 parent = *p;
383 req = rb_entry(parent, struct ceph_mon_statfs_request, node); 381 req = rb_entry(parent, struct ceph_mon_generic_request, node);
384 if (new->tid < req->tid) 382 if (new->tid < req->tid)
385 p = &(*p)->rb_left; 383 p = &(*p)->rb_left;
386 else if (new->tid > req->tid) 384 else if (new->tid > req->tid)
@@ -390,113 +388,157 @@ static void __insert_statfs(struct ceph_mon_client *monc,
390 } 388 }
391 389
392 rb_link_node(&new->node, parent, p); 390 rb_link_node(&new->node, parent, p);
393 rb_insert_color(&new->node, &monc->statfs_request_tree); 391 rb_insert_color(&new->node, &monc->generic_request_tree);
392}
393
394static void release_generic_request(struct kref *kref)
395{
396 struct ceph_mon_generic_request *req =
397 container_of(kref, struct ceph_mon_generic_request, kref);
398
399 if (req->reply)
400 ceph_msg_put(req->reply);
401 if (req->request)
402 ceph_msg_put(req->request);
403}
404
405static void put_generic_request(struct ceph_mon_generic_request *req)
406{
407 kref_put(&req->kref, release_generic_request);
408}
409
410static void get_generic_request(struct ceph_mon_generic_request *req)
411{
412 kref_get(&req->kref);
413}
414
415static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
416 struct ceph_msg_header *hdr,
417 int *skip)
418{
419 struct ceph_mon_client *monc = con->private;
420 struct ceph_mon_generic_request *req;
421 u64 tid = le64_to_cpu(hdr->tid);
422 struct ceph_msg *m;
423
424 mutex_lock(&monc->mutex);
425 req = __lookup_generic_req(monc, tid);
426 if (!req) {
427 dout("get_generic_reply %lld dne\n", tid);
428 *skip = 1;
429 m = NULL;
430 } else {
431 dout("get_generic_reply %lld got %p\n", tid, req->reply);
432 m = ceph_msg_get(req->reply);
433 /*
434 * we don't need to track the connection reading into
435 * this reply because we only have one open connection
436 * at a time, ever.
437 */
438 }
439 mutex_unlock(&monc->mutex);
440 return m;
394} 441}
395 442
396static void handle_statfs_reply(struct ceph_mon_client *monc, 443static void handle_statfs_reply(struct ceph_mon_client *monc,
397 struct ceph_msg *msg) 444 struct ceph_msg *msg)
398{ 445{
399 struct ceph_mon_statfs_request *req; 446 struct ceph_mon_generic_request *req;
400 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 447 struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
401 u64 tid; 448 u64 tid = le64_to_cpu(msg->hdr.tid);
402 449
403 if (msg->front.iov_len != sizeof(*reply)) 450 if (msg->front.iov_len != sizeof(*reply))
404 goto bad; 451 goto bad;
405 tid = le64_to_cpu(msg->hdr.tid);
406 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 452 dout("handle_statfs_reply %p tid %llu\n", msg, tid);
407 453
408 mutex_lock(&monc->mutex); 454 mutex_lock(&monc->mutex);
409 req = __lookup_statfs(monc, tid); 455 req = __lookup_generic_req(monc, tid);
410 if (req) { 456 if (req) {
411 *req->buf = reply->st; 457 *(struct ceph_statfs *)req->buf = reply->st;
412 req->result = 0; 458 req->result = 0;
459 get_generic_request(req);
413 } 460 }
414 mutex_unlock(&monc->mutex); 461 mutex_unlock(&monc->mutex);
415 if (req) 462 if (req) {
416 complete(&req->completion); 463 complete(&req->completion);
464 put_generic_request(req);
465 }
417 return; 466 return;
418 467
419bad: 468bad:
420 pr_err("corrupt statfs reply, no tid\n"); 469 pr_err("corrupt generic reply, no tid\n");
421 ceph_msg_dump(msg); 470 ceph_msg_dump(msg);
422} 471}
423 472
424/* 473/*
425 * (re)send a statfs request 474 * Do a synchronous statfs().
426 */ 475 */
427static int send_statfs(struct ceph_mon_client *monc, 476int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
428 struct ceph_mon_statfs_request *req)
429{ 477{
430 struct ceph_msg *msg; 478 struct ceph_mon_generic_request *req;
431 struct ceph_mon_statfs *h; 479 struct ceph_mon_statfs *h;
480 int err;
432 481
433 dout("send_statfs tid %llu\n", req->tid); 482 req = kzalloc(sizeof(*req), GFP_NOFS);
434 msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL); 483 if (!req)
435 if (IS_ERR(msg)) 484 return -ENOMEM;
436 return PTR_ERR(msg); 485
437 req->request = msg; 486 kref_init(&req->kref);
438 msg->hdr.tid = cpu_to_le64(req->tid); 487 req->buf = buf;
439 h = msg->front.iov_base; 488 init_completion(&req->completion);
489
490 err = -ENOMEM;
491 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS);
492 if (!req->request)
493 goto out;
494 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS);
495 if (!req->reply)
496 goto out;
497
498 /* fill out request */
499 h = req->request->front.iov_base;
440 h->monhdr.have_version = 0; 500 h->monhdr.have_version = 0;
441 h->monhdr.session_mon = cpu_to_le16(-1); 501 h->monhdr.session_mon = cpu_to_le16(-1);
442 h->monhdr.session_mon_tid = 0; 502 h->monhdr.session_mon_tid = 0;
443 h->fsid = monc->monmap->fsid; 503 h->fsid = monc->monmap->fsid;
444 ceph_con_send(monc->con, msg);
445 return 0;
446}
447
448/*
449 * Do a synchronous statfs().
450 */
451int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
452{
453 struct ceph_mon_statfs_request req;
454 int err;
455
456 req.buf = buf;
457 init_completion(&req.completion);
458
459 /* allocate memory for reply */
460 err = ceph_msgpool_resv(&monc->msgpool_statfs_reply, 1);
461 if (err)
462 return err;
463 504
464 /* register request */ 505 /* register request */
465 mutex_lock(&monc->mutex); 506 mutex_lock(&monc->mutex);
466 req.tid = ++monc->last_tid; 507 req->tid = ++monc->last_tid;
467 req.last_attempt = jiffies; 508 req->request->hdr.tid = cpu_to_le64(req->tid);
468 req.delay = BASE_DELAY_INTERVAL; 509 __insert_generic_request(monc, req);
469 __insert_statfs(monc, &req); 510 monc->num_generic_requests++;
470 monc->num_statfs_requests++;
471 mutex_unlock(&monc->mutex); 511 mutex_unlock(&monc->mutex);
472 512
473 /* send request and wait */ 513 /* send request and wait */
474 err = send_statfs(monc, &req); 514 ceph_con_send(monc->con, ceph_msg_get(req->request));
475 if (!err) 515 err = wait_for_completion_interruptible(&req->completion);
476 err = wait_for_completion_interruptible(&req.completion);
477 516
478 mutex_lock(&monc->mutex); 517 mutex_lock(&monc->mutex);
479 rb_erase(&req.node, &monc->statfs_request_tree); 518 rb_erase(&req->node, &monc->generic_request_tree);
480 monc->num_statfs_requests--; 519 monc->num_generic_requests--;
481 ceph_msgpool_resv(&monc->msgpool_statfs_reply, -1);
482 mutex_unlock(&monc->mutex); 520 mutex_unlock(&monc->mutex);
483 521
484 if (!err) 522 if (!err)
485 err = req.result; 523 err = req->result;
524
525out:
526 kref_put(&req->kref, release_generic_request);
486 return err; 527 return err;
487} 528}
488 529
489/* 530/*
490 * Resend pending statfs requests. 531 * Resend pending statfs requests.
491 */ 532 */
492static void __resend_statfs(struct ceph_mon_client *monc) 533static void __resend_generic_request(struct ceph_mon_client *monc)
493{ 534{
494 struct ceph_mon_statfs_request *req; 535 struct ceph_mon_generic_request *req;
495 struct rb_node *p; 536 struct rb_node *p;
496 537
497 for (p = rb_first(&monc->statfs_request_tree); p; p = rb_next(p)) { 538 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
498 req = rb_entry(p, struct ceph_mon_statfs_request, node); 539 req = rb_entry(p, struct ceph_mon_generic_request, node);
499 send_statfs(monc, req); 540 ceph_con_revoke(monc->con, req->request);
541 ceph_con_send(monc->con, ceph_msg_get(req->request));
500 } 542 }
501} 543}
502 544
@@ -586,26 +628,26 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
586 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 628 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
587 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 629 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
588 630
589 /* msg pools */ 631 /* msgs */
590 err = ceph_msgpool_init(&monc->msgpool_subscribe_ack, 632 err = -ENOMEM;
591 sizeof(struct ceph_mon_subscribe_ack), 1, false); 633 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
592 if (err < 0) 634 sizeof(struct ceph_mon_subscribe_ack),
635 GFP_NOFS);
636 if (!monc->m_subscribe_ack)
593 goto out_monmap; 637 goto out_monmap;
594 err = ceph_msgpool_init(&monc->msgpool_statfs_reply, 638
595 sizeof(struct ceph_mon_statfs_reply), 0, false); 639 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS);
596 if (err < 0) 640 if (!monc->m_subscribe)
597 goto out_pool1; 641 goto out_subscribe_ack;
598 err = ceph_msgpool_init(&monc->msgpool_auth_reply, 4096, 1, false); 642
599 if (err < 0) 643 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS);
600 goto out_pool2; 644 if (!monc->m_auth_reply)
601 645 goto out_subscribe;
602 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, 0, 0, NULL); 646
647 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS);
603 monc->pending_auth = 0; 648 monc->pending_auth = 0;
604 if (IS_ERR(monc->m_auth)) { 649 if (!monc->m_auth)
605 err = PTR_ERR(monc->m_auth); 650 goto out_auth_reply;
606 monc->m_auth = NULL;
607 goto out_pool3;
608 }
609 651
610 monc->cur_mon = -1; 652 monc->cur_mon = -1;
611 monc->hunting = true; 653 monc->hunting = true;
@@ -613,8 +655,8 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
613 monc->sub_sent = 0; 655 monc->sub_sent = 0;
614 656
615 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 657 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
616 monc->statfs_request_tree = RB_ROOT; 658 monc->generic_request_tree = RB_ROOT;
617 monc->num_statfs_requests = 0; 659 monc->num_generic_requests = 0;
618 monc->last_tid = 0; 660 monc->last_tid = 0;
619 661
620 monc->have_mdsmap = 0; 662 monc->have_mdsmap = 0;
@@ -622,12 +664,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
622 monc->want_next_osdmap = 1; 664 monc->want_next_osdmap = 1;
623 return 0; 665 return 0;
624 666
625out_pool3: 667out_auth_reply:
626 ceph_msgpool_destroy(&monc->msgpool_auth_reply); 668 ceph_msg_put(monc->m_auth_reply);
627out_pool2: 669out_subscribe:
628 ceph_msgpool_destroy(&monc->msgpool_subscribe_ack); 670 ceph_msg_put(monc->m_subscribe);
629out_pool1: 671out_subscribe_ack:
630 ceph_msgpool_destroy(&monc->msgpool_statfs_reply); 672 ceph_msg_put(monc->m_subscribe_ack);
631out_monmap: 673out_monmap:
632 kfree(monc->monmap); 674 kfree(monc->monmap);
633out: 675out:
@@ -651,9 +693,9 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
651 ceph_auth_destroy(monc->auth); 693 ceph_auth_destroy(monc->auth);
652 694
653 ceph_msg_put(monc->m_auth); 695 ceph_msg_put(monc->m_auth);
654 ceph_msgpool_destroy(&monc->msgpool_subscribe_ack); 696 ceph_msg_put(monc->m_auth_reply);
655 ceph_msgpool_destroy(&monc->msgpool_statfs_reply); 697 ceph_msg_put(monc->m_subscribe);
656 ceph_msgpool_destroy(&monc->msgpool_auth_reply); 698 ceph_msg_put(monc->m_subscribe_ack);
657 699
658 kfree(monc->monmap); 700 kfree(monc->monmap);
659} 701}
@@ -681,7 +723,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
681 monc->client->msgr->inst.name.num = monc->auth->global_id; 723 monc->client->msgr->inst.name.num = monc->auth->global_id;
682 724
683 __send_subscribe(monc); 725 __send_subscribe(monc);
684 __resend_statfs(monc); 726 __resend_generic_request(monc);
685 } 727 }
686 mutex_unlock(&monc->mutex); 728 mutex_unlock(&monc->mutex);
687} 729}
@@ -770,18 +812,17 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
770 812
771 switch (type) { 813 switch (type) {
772 case CEPH_MSG_MON_SUBSCRIBE_ACK: 814 case CEPH_MSG_MON_SUBSCRIBE_ACK:
773 m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len); 815 m = ceph_msg_get(monc->m_subscribe_ack);
774 break; 816 break;
775 case CEPH_MSG_STATFS_REPLY: 817 case CEPH_MSG_STATFS_REPLY:
776 m = ceph_msgpool_get(&monc->msgpool_statfs_reply, front_len); 818 return get_generic_reply(con, hdr, skip);
777 break;
778 case CEPH_MSG_AUTH_REPLY: 819 case CEPH_MSG_AUTH_REPLY:
779 m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len); 820 m = ceph_msg_get(monc->m_auth_reply);
780 break; 821 break;
781 case CEPH_MSG_MON_MAP: 822 case CEPH_MSG_MON_MAP:
782 case CEPH_MSG_MDS_MAP: 823 case CEPH_MSG_MDS_MAP:
783 case CEPH_MSG_OSD_MAP: 824 case CEPH_MSG_OSD_MAP:
784 m = ceph_msg_new(type, front_len, 0, 0, NULL); 825 m = ceph_msg_new(type, front_len, GFP_NOFS);
785 break; 826 break;
786 } 827 }
787 828
@@ -826,7 +867,7 @@ out:
826 mutex_unlock(&monc->mutex); 867 mutex_unlock(&monc->mutex);
827} 868}
828 869
829const static struct ceph_connection_operations mon_con_ops = { 870static const struct ceph_connection_operations mon_con_ops = {
830 .get = ceph_con_get, 871 .get = ceph_con_get,
831 .put = ceph_con_put, 872 .put = ceph_con_put,
832 .dispatch = dispatch, 873 .dispatch = dispatch,
diff --git a/fs/ceph/mon_client.h b/fs/ceph/mon_client.h
index b958ad5afa06..174d794321d0 100644
--- a/fs/ceph/mon_client.h
+++ b/fs/ceph/mon_client.h
@@ -2,10 +2,10 @@
2#define _FS_CEPH_MON_CLIENT_H 2#define _FS_CEPH_MON_CLIENT_H
3 3
4#include <linux/completion.h> 4#include <linux/completion.h>
5#include <linux/kref.h>
5#include <linux/rbtree.h> 6#include <linux/rbtree.h>
6 7
7#include "messenger.h" 8#include "messenger.h"
8#include "msgpool.h"
9 9
10struct ceph_client; 10struct ceph_client;
11struct ceph_mount_args; 11struct ceph_mount_args;
@@ -22,7 +22,7 @@ struct ceph_monmap {
22}; 22};
23 23
24struct ceph_mon_client; 24struct ceph_mon_client;
25struct ceph_mon_statfs_request; 25struct ceph_mon_generic_request;
26 26
27 27
28/* 28/*
@@ -40,17 +40,19 @@ struct ceph_mon_request {
40}; 40};
41 41
42/* 42/*
43 * statfs() is done a bit differently because we need to get data back 43 * ceph_mon_generic_request is being used for the statfs and poolop requests
44 * which are bening done a bit differently because we need to get data back
44 * to the caller 45 * to the caller
45 */ 46 */
46struct ceph_mon_statfs_request { 47struct ceph_mon_generic_request {
48 struct kref kref;
47 u64 tid; 49 u64 tid;
48 struct rb_node node; 50 struct rb_node node;
49 int result; 51 int result;
50 struct ceph_statfs *buf; 52 void *buf;
51 struct completion completion; 53 struct completion completion;
52 unsigned long last_attempt, delay; /* jiffies */
53 struct ceph_msg *request; /* original request */ 54 struct ceph_msg *request; /* original request */
55 struct ceph_msg *reply; /* and reply */
54}; 56};
55 57
56struct ceph_mon_client { 58struct ceph_mon_client {
@@ -61,7 +63,7 @@ struct ceph_mon_client {
61 struct delayed_work delayed_work; 63 struct delayed_work delayed_work;
62 64
63 struct ceph_auth_client *auth; 65 struct ceph_auth_client *auth;
64 struct ceph_msg *m_auth; 66 struct ceph_msg *m_auth, *m_auth_reply, *m_subscribe, *m_subscribe_ack;
65 int pending_auth; 67 int pending_auth;
66 68
67 bool hunting; 69 bool hunting;
@@ -70,14 +72,9 @@ struct ceph_mon_client {
70 struct ceph_connection *con; 72 struct ceph_connection *con;
71 bool have_fsid; 73 bool have_fsid;
72 74
73 /* msg pools */ 75 /* pending generic requests */
74 struct ceph_msgpool msgpool_subscribe_ack; 76 struct rb_root generic_request_tree;
75 struct ceph_msgpool msgpool_statfs_reply; 77 int num_generic_requests;
76 struct ceph_msgpool msgpool_auth_reply;
77
78 /* pending statfs requests */
79 struct rb_root statfs_request_tree;
80 int num_statfs_requests;
81 u64 last_tid; 78 u64 last_tid;
82 79
83 /* mds/osd map */ 80 /* mds/osd map */
diff --git a/fs/ceph/msgpool.c b/fs/ceph/msgpool.c
index ca3b44a89f2d..dd65a6438131 100644
--- a/fs/ceph/msgpool.c
+++ b/fs/ceph/msgpool.c
@@ -7,180 +7,58 @@
7 7
8#include "msgpool.h" 8#include "msgpool.h"
9 9
10/* 10static void *alloc_fn(gfp_t gfp_mask, void *arg)
11 * We use msg pools to preallocate memory for messages we expect to 11{
12 * receive over the wire, to avoid getting ourselves into OOM 12 struct ceph_msgpool *pool = arg;
13 * conditions at unexpected times. We take use a few different 13 void *p;
14 * strategies:
15 *
16 * - for request/response type interactions, we preallocate the
17 * memory needed for the response when we generate the request.
18 *
19 * - for messages we can receive at any time from the MDS, we preallocate
20 * a pool of messages we can re-use.
21 *
22 * - for writeback, we preallocate some number of messages to use for
23 * requests and their replies, so that we always make forward
24 * progress.
25 *
26 * The msgpool behaves like a mempool_t, but keeps preallocated
27 * ceph_msgs strung together on a list_head instead of using a pointer
28 * vector. This avoids vector reallocation when we adjust the number
29 * of preallocated items (which happens frequently).
30 */
31 14
15 p = ceph_msg_new(0, pool->front_len, gfp_mask);
16 if (!p)
17 pr_err("msgpool %s alloc failed\n", pool->name);
18 return p;
19}
32 20
33/* 21static void free_fn(void *element, void *arg)
34 * Allocate or release as necessary to meet our target pool size.
35 */
36static int __fill_msgpool(struct ceph_msgpool *pool)
37{ 22{
38 struct ceph_msg *msg; 23 ceph_msg_put(element);
39
40 while (pool->num < pool->min) {
41 dout("fill_msgpool %p %d/%d allocating\n", pool, pool->num,
42 pool->min);
43 spin_unlock(&pool->lock);
44 msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
45 spin_lock(&pool->lock);
46 if (IS_ERR(msg))
47 return PTR_ERR(msg);
48 msg->pool = pool;
49 list_add(&msg->list_head, &pool->msgs);
50 pool->num++;
51 }
52 while (pool->num > pool->min) {
53 msg = list_first_entry(&pool->msgs, struct ceph_msg, list_head);
54 dout("fill_msgpool %p %d/%d releasing %p\n", pool, pool->num,
55 pool->min, msg);
56 list_del_init(&msg->list_head);
57 pool->num--;
58 ceph_msg_kfree(msg);
59 }
60 return 0;
61} 24}
62 25
63int ceph_msgpool_init(struct ceph_msgpool *pool, 26int ceph_msgpool_init(struct ceph_msgpool *pool,
64 int front_len, int min, bool blocking) 27 int front_len, int size, bool blocking, const char *name)
65{ 28{
66 int ret;
67
68 dout("msgpool_init %p front_len %d min %d\n", pool, front_len, min);
69 spin_lock_init(&pool->lock);
70 pool->front_len = front_len; 29 pool->front_len = front_len;
71 INIT_LIST_HEAD(&pool->msgs); 30 pool->pool = mempool_create(size, alloc_fn, free_fn, pool);
72 pool->num = 0; 31 if (!pool->pool)
73 pool->min = min; 32 return -ENOMEM;
74 pool->blocking = blocking; 33 pool->name = name;
75 init_waitqueue_head(&pool->wait); 34 return 0;
76
77 spin_lock(&pool->lock);
78 ret = __fill_msgpool(pool);
79 spin_unlock(&pool->lock);
80 return ret;
81} 35}
82 36
83void ceph_msgpool_destroy(struct ceph_msgpool *pool) 37void ceph_msgpool_destroy(struct ceph_msgpool *pool)
84{ 38{
85 dout("msgpool_destroy %p\n", pool); 39 mempool_destroy(pool->pool);
86 spin_lock(&pool->lock);
87 pool->min = 0;
88 __fill_msgpool(pool);
89 spin_unlock(&pool->lock);
90} 40}
91 41
92int ceph_msgpool_resv(struct ceph_msgpool *pool, int delta) 42struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
43 int front_len)
93{ 44{
94 int ret; 45 if (front_len > pool->front_len) {
95 46 pr_err("msgpool_get pool %s need front %d, pool size is %d\n",
96 spin_lock(&pool->lock); 47 pool->name, front_len, pool->front_len);
97 dout("msgpool_resv %p delta %d\n", pool, delta);
98 pool->min += delta;
99 ret = __fill_msgpool(pool);
100 spin_unlock(&pool->lock);
101 return ret;
102}
103
104struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len)
105{
106 wait_queue_t wait;
107 struct ceph_msg *msg;
108
109 if (front_len && front_len > pool->front_len) {
110 pr_err("msgpool_get pool %p need front %d, pool size is %d\n",
111 pool, front_len, pool->front_len);
112 WARN_ON(1); 48 WARN_ON(1);
113 49
114 /* try to alloc a fresh message */ 50 /* try to alloc a fresh message */
115 msg = ceph_msg_new(0, front_len, 0, 0, NULL); 51 return ceph_msg_new(0, front_len, GFP_NOFS);
116 if (!IS_ERR(msg))
117 return msg;
118 }
119
120 if (!front_len)
121 front_len = pool->front_len;
122
123 if (pool->blocking) {
124 /* mempool_t behavior; first try to alloc */
125 msg = ceph_msg_new(0, front_len, 0, 0, NULL);
126 if (!IS_ERR(msg))
127 return msg;
128 } 52 }
129 53
130 while (1) { 54 return mempool_alloc(pool->pool, GFP_NOFS);
131 spin_lock(&pool->lock);
132 if (likely(pool->num)) {
133 msg = list_entry(pool->msgs.next, struct ceph_msg,
134 list_head);
135 list_del_init(&msg->list_head);
136 pool->num--;
137 dout("msgpool_get %p got %p, now %d/%d\n", pool, msg,
138 pool->num, pool->min);
139 spin_unlock(&pool->lock);
140 return msg;
141 }
142 pr_err("msgpool_get %p now %d/%d, %s\n", pool, pool->num,
143 pool->min, pool->blocking ? "waiting" : "may fail");
144 spin_unlock(&pool->lock);
145
146 if (!pool->blocking) {
147 WARN_ON(1);
148
149 /* maybe we can allocate it now? */
150 msg = ceph_msg_new(0, front_len, 0, 0, NULL);
151 if (!IS_ERR(msg))
152 return msg;
153
154 pr_err("msgpool_get %p empty + alloc failed\n", pool);
155 return ERR_PTR(-ENOMEM);
156 }
157
158 init_wait(&wait);
159 prepare_to_wait(&pool->wait, &wait, TASK_UNINTERRUPTIBLE);
160 schedule();
161 finish_wait(&pool->wait, &wait);
162 }
163} 55}
164 56
165void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg) 57void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
166{ 58{
167 spin_lock(&pool->lock); 59 /* reset msg front_len; user may have changed it */
168 if (pool->num < pool->min) { 60 msg->front.iov_len = pool->front_len;
169 /* reset msg front_len; user may have changed it */ 61 msg->hdr.front_len = cpu_to_le32(pool->front_len);
170 msg->front.iov_len = pool->front_len;
171 msg->hdr.front_len = cpu_to_le32(pool->front_len);
172 62
173 kref_set(&msg->kref, 1); /* retake a single ref */ 63 kref_init(&msg->kref); /* retake single ref */
174 list_add(&msg->list_head, &pool->msgs);
175 pool->num++;
176 dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg,
177 pool->num, pool->min);
178 spin_unlock(&pool->lock);
179 wake_up(&pool->wait);
180 } else {
181 dout("msgpool_put %p drop %p, at %d/%d\n", pool, msg,
182 pool->num, pool->min);
183 spin_unlock(&pool->lock);
184 ceph_msg_kfree(msg);
185 }
186} 64}
diff --git a/fs/ceph/msgpool.h b/fs/ceph/msgpool.h
index bc834bfcd720..a362605f9368 100644
--- a/fs/ceph/msgpool.h
+++ b/fs/ceph/msgpool.h
@@ -1,6 +1,7 @@
1#ifndef _FS_CEPH_MSGPOOL 1#ifndef _FS_CEPH_MSGPOOL
2#define _FS_CEPH_MSGPOOL 2#define _FS_CEPH_MSGPOOL
3 3
4#include <linux/mempool.h>
4#include "messenger.h" 5#include "messenger.h"
5 6
6/* 7/*
@@ -8,18 +9,15 @@
8 * avoid unexpected OOM conditions. 9 * avoid unexpected OOM conditions.
9 */ 10 */
10struct ceph_msgpool { 11struct ceph_msgpool {
11 spinlock_t lock; 12 const char *name;
13 mempool_t *pool;
12 int front_len; /* preallocated payload size */ 14 int front_len; /* preallocated payload size */
13 struct list_head msgs; /* msgs in the pool; each has 1 ref */
14 int num, min; /* cur, min # msgs in the pool */
15 bool blocking;
16 wait_queue_head_t wait;
17}; 15};
18 16
19extern int ceph_msgpool_init(struct ceph_msgpool *pool, 17extern int ceph_msgpool_init(struct ceph_msgpool *pool,
20 int front_len, int size, bool blocking); 18 int front_len, int size, bool blocking,
19 const char *name);
21extern void ceph_msgpool_destroy(struct ceph_msgpool *pool); 20extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
22extern int ceph_msgpool_resv(struct ceph_msgpool *, int delta);
23extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *, 21extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *,
24 int front_len); 22 int front_len);
25extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *); 23extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *);
diff --git a/fs/ceph/msgr.h b/fs/ceph/msgr.h
index 8aaab414f3f8..892a0298dfdf 100644
--- a/fs/ceph/msgr.h
+++ b/fs/ceph/msgr.h
@@ -50,7 +50,6 @@ struct ceph_entity_name {
50#define CEPH_ENTITY_TYPE_MDS 0x02 50#define CEPH_ENTITY_TYPE_MDS 0x02
51#define CEPH_ENTITY_TYPE_OSD 0x04 51#define CEPH_ENTITY_TYPE_OSD 0x04
52#define CEPH_ENTITY_TYPE_CLIENT 0x08 52#define CEPH_ENTITY_TYPE_CLIENT 0x08
53#define CEPH_ENTITY_TYPE_ADMIN 0x10
54#define CEPH_ENTITY_TYPE_AUTH 0x20 53#define CEPH_ENTITY_TYPE_AUTH 0x20
55 54
56#define CEPH_ENTITY_TYPE_ANY 0xFF 55#define CEPH_ENTITY_TYPE_ANY 0xFF
@@ -120,7 +119,7 @@ struct ceph_msg_connect_reply {
120/* 119/*
121 * message header 120 * message header
122 */ 121 */
123struct ceph_msg_header { 122struct ceph_msg_header_old {
124 __le64 seq; /* message seq# for this session */ 123 __le64 seq; /* message seq# for this session */
125 __le64 tid; /* transaction id */ 124 __le64 tid; /* transaction id */
126 __le16 type; /* message type */ 125 __le16 type; /* message type */
@@ -138,6 +137,24 @@ struct ceph_msg_header {
138 __le32 crc; /* header crc32c */ 137 __le32 crc; /* header crc32c */
139} __attribute__ ((packed)); 138} __attribute__ ((packed));
140 139
140struct ceph_msg_header {
141 __le64 seq; /* message seq# for this session */
142 __le64 tid; /* transaction id */
143 __le16 type; /* message type */
144 __le16 priority; /* priority. higher value == higher priority */
145 __le16 version; /* version of message encoding */
146
147 __le32 front_len; /* bytes in main payload */
148 __le32 middle_len;/* bytes in middle payload */
149 __le32 data_len; /* bytes of data payload */
150 __le16 data_off; /* sender: include full offset;
151 receiver: mask against ~PAGE_MASK */
152
153 struct ceph_entity_name src;
154 __le32 reserved;
155 __le32 crc; /* header crc32c */
156} __attribute__ ((packed));
157
141#define CEPH_MSG_PRIO_LOW 64 158#define CEPH_MSG_PRIO_LOW 64
142#define CEPH_MSG_PRIO_DEFAULT 127 159#define CEPH_MSG_PRIO_DEFAULT 127
143#define CEPH_MSG_PRIO_HIGH 196 160#define CEPH_MSG_PRIO_HIGH 196
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 3514f71ff85f..afa7bb3895c4 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -16,7 +16,7 @@
16#define OSD_OP_FRONT_LEN 4096 16#define OSD_OP_FRONT_LEN 4096
17#define OSD_OPREPLY_FRONT_LEN 512 17#define OSD_OPREPLY_FRONT_LEN 512
18 18
19const static struct ceph_connection_operations osd_con_ops; 19static const struct ceph_connection_operations osd_con_ops;
20static int __kick_requests(struct ceph_osd_client *osdc, 20static int __kick_requests(struct ceph_osd_client *osdc,
21 struct ceph_osd *kickosd); 21 struct ceph_osd *kickosd);
22 22
@@ -147,7 +147,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
147 req = kzalloc(sizeof(*req), GFP_NOFS); 147 req = kzalloc(sizeof(*req), GFP_NOFS);
148 } 148 }
149 if (req == NULL) 149 if (req == NULL)
150 return ERR_PTR(-ENOMEM); 150 return NULL;
151 151
152 req->r_osdc = osdc; 152 req->r_osdc = osdc;
153 req->r_mempool = use_mempool; 153 req->r_mempool = use_mempool;
@@ -164,10 +164,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
164 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 164 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
165 else 165 else
166 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 166 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
167 OSD_OPREPLY_FRONT_LEN, 0, 0, NULL); 167 OSD_OPREPLY_FRONT_LEN, GFP_NOFS);
168 if (IS_ERR(msg)) { 168 if (!msg) {
169 ceph_osdc_put_request(req); 169 ceph_osdc_put_request(req);
170 return ERR_PTR(PTR_ERR(msg)); 170 return NULL;
171 } 171 }
172 req->r_reply = msg; 172 req->r_reply = msg;
173 173
@@ -178,10 +178,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
178 if (use_mempool) 178 if (use_mempool)
179 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 179 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
180 else 180 else
181 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL); 181 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS);
182 if (IS_ERR(msg)) { 182 if (!msg) {
183 ceph_osdc_put_request(req); 183 ceph_osdc_put_request(req);
184 return ERR_PTR(PTR_ERR(msg)); 184 return NULL;
185 } 185 }
186 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); 186 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
187 memset(msg->front.iov_base, 0, msg->front.iov_len); 187 memset(msg->front.iov_base, 0, msg->front.iov_len);
@@ -715,7 +715,7 @@ static void handle_timeout(struct work_struct *work)
715 * should mark the osd as failed and we should find out about 715 * should mark the osd as failed and we should find out about
716 * it from an updated osd map. 716 * it from an updated osd map.
717 */ 717 */
718 while (!list_empty(&osdc->req_lru)) { 718 while (timeout && !list_empty(&osdc->req_lru)) {
719 req = list_entry(osdc->req_lru.next, struct ceph_osd_request, 719 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
720 r_req_lru_item); 720 r_req_lru_item);
721 721
@@ -1078,6 +1078,7 @@ done:
1078 if (newmap) 1078 if (newmap)
1079 kick_requests(osdc, NULL); 1079 kick_requests(osdc, NULL);
1080 up_read(&osdc->map_sem); 1080 up_read(&osdc->map_sem);
1081 wake_up(&osdc->client->auth_wq);
1081 return; 1082 return;
1082 1083
1083bad: 1084bad:
@@ -1087,45 +1088,6 @@ bad:
1087 return; 1088 return;
1088} 1089}
1089 1090
1090
1091/*
1092 * A read request prepares specific pages that data is to be read into.
1093 * When a message is being read off the wire, we call prepare_pages to
1094 * find those pages.
1095 * 0 = success, -1 failure.
1096 */
1097static int __prepare_pages(struct ceph_connection *con,
1098 struct ceph_msg_header *hdr,
1099 struct ceph_osd_request *req,
1100 u64 tid,
1101 struct ceph_msg *m)
1102{
1103 struct ceph_osd *osd = con->private;
1104 struct ceph_osd_client *osdc;
1105 int ret = -1;
1106 int data_len = le32_to_cpu(hdr->data_len);
1107 unsigned data_off = le16_to_cpu(hdr->data_off);
1108
1109 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1110
1111 if (!osd)
1112 return -1;
1113
1114 osdc = osd->o_osdc;
1115
1116 dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
1117 tid, req->r_num_pages, want);
1118 if (unlikely(req->r_num_pages < want))
1119 goto out;
1120 m->pages = req->r_pages;
1121 m->nr_pages = req->r_num_pages;
1122 ret = 0; /* success */
1123out:
1124 BUG_ON(ret < 0 || m->nr_pages < want);
1125
1126 return ret;
1127}
1128
1129/* 1091/*
1130 * Register request, send initial attempt. 1092 * Register request, send initial attempt.
1131 */ 1093 */
@@ -1252,11 +1214,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1252 if (!osdc->req_mempool) 1214 if (!osdc->req_mempool)
1253 goto out; 1215 goto out;
1254 1216
1255 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true); 1217 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1218 "osd_op");
1256 if (err < 0) 1219 if (err < 0)
1257 goto out_mempool; 1220 goto out_mempool;
1258 err = ceph_msgpool_init(&osdc->msgpool_op_reply, 1221 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1259 OSD_OPREPLY_FRONT_LEN, 10, true); 1222 OSD_OPREPLY_FRONT_LEN, 10, true,
1223 "osd_op_reply");
1260 if (err < 0) 1224 if (err < 0)
1261 goto out_msgpool; 1225 goto out_msgpool;
1262 return 0; 1226 return 0;
@@ -1302,8 +1266,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1302 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1266 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1303 NULL, 0, truncate_seq, truncate_size, NULL, 1267 NULL, 0, truncate_seq, truncate_size, NULL,
1304 false, 1); 1268 false, 1);
1305 if (IS_ERR(req)) 1269 if (!req)
1306 return PTR_ERR(req); 1270 return -ENOMEM;
1307 1271
1308 /* it may be a short read due to an object boundary */ 1272 /* it may be a short read due to an object boundary */
1309 req->r_pages = pages; 1273 req->r_pages = pages;
@@ -1345,8 +1309,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1345 snapc, do_sync, 1309 snapc, do_sync,
1346 truncate_seq, truncate_size, mtime, 1310 truncate_seq, truncate_size, mtime,
1347 nofail, 1); 1311 nofail, 1);
1348 if (IS_ERR(req)) 1312 if (!req)
1349 return PTR_ERR(req); 1313 return -ENOMEM;
1350 1314
1351 /* it may be a short write due to an object boundary */ 1315 /* it may be a short write due to an object boundary */
1352 req->r_pages = pages; 1316 req->r_pages = pages;
@@ -1394,7 +1358,8 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1394} 1358}
1395 1359
1396/* 1360/*
1397 * lookup and return message for incoming reply 1361 * lookup and return message for incoming reply. set up reply message
1362 * pages.
1398 */ 1363 */
1399static struct ceph_msg *get_reply(struct ceph_connection *con, 1364static struct ceph_msg *get_reply(struct ceph_connection *con,
1400 struct ceph_msg_header *hdr, 1365 struct ceph_msg_header *hdr,
@@ -1407,7 +1372,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1407 int front = le32_to_cpu(hdr->front_len); 1372 int front = le32_to_cpu(hdr->front_len);
1408 int data_len = le32_to_cpu(hdr->data_len); 1373 int data_len = le32_to_cpu(hdr->data_len);
1409 u64 tid; 1374 u64 tid;
1410 int err;
1411 1375
1412 tid = le64_to_cpu(hdr->tid); 1376 tid = le64_to_cpu(hdr->tid);
1413 mutex_lock(&osdc->request_mutex); 1377 mutex_lock(&osdc->request_mutex);
@@ -1425,13 +1389,14 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1425 req->r_reply, req->r_con_filling_msg); 1389 req->r_reply, req->r_con_filling_msg);
1426 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); 1390 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1427 ceph_con_put(req->r_con_filling_msg); 1391 ceph_con_put(req->r_con_filling_msg);
1392 req->r_con_filling_msg = NULL;
1428 } 1393 }
1429 1394
1430 if (front > req->r_reply->front.iov_len) { 1395 if (front > req->r_reply->front.iov_len) {
1431 pr_warning("get_reply front %d > preallocated %d\n", 1396 pr_warning("get_reply front %d > preallocated %d\n",
1432 front, (int)req->r_reply->front.iov_len); 1397 front, (int)req->r_reply->front.iov_len);
1433 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL); 1398 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
1434 if (IS_ERR(m)) 1399 if (!m)
1435 goto out; 1400 goto out;
1436 ceph_msg_put(req->r_reply); 1401 ceph_msg_put(req->r_reply);
1437 req->r_reply = m; 1402 req->r_reply = m;
@@ -1439,12 +1404,19 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1439 m = ceph_msg_get(req->r_reply); 1404 m = ceph_msg_get(req->r_reply);
1440 1405
1441 if (data_len > 0) { 1406 if (data_len > 0) {
1442 err = __prepare_pages(con, hdr, req, tid, m); 1407 unsigned data_off = le16_to_cpu(hdr->data_off);
1443 if (err < 0) { 1408 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1409
1410 if (unlikely(req->r_num_pages < want)) {
1411 pr_warning("tid %lld reply %d > expected %d pages\n",
1412 tid, want, m->nr_pages);
1444 *skip = 1; 1413 *skip = 1;
1445 ceph_msg_put(m); 1414 ceph_msg_put(m);
1446 m = ERR_PTR(err); 1415 m = NULL;
1416 goto out;
1447 } 1417 }
1418 m->pages = req->r_pages;
1419 m->nr_pages = req->r_num_pages;
1448 } 1420 }
1449 *skip = 0; 1421 *skip = 0;
1450 req->r_con_filling_msg = ceph_con_get(con); 1422 req->r_con_filling_msg = ceph_con_get(con);
@@ -1466,7 +1438,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1466 1438
1467 switch (type) { 1439 switch (type) {
1468 case CEPH_MSG_OSD_MAP: 1440 case CEPH_MSG_OSD_MAP:
1469 return ceph_msg_new(type, front, 0, 0, NULL); 1441 return ceph_msg_new(type, front, GFP_NOFS);
1470 case CEPH_MSG_OSD_OPREPLY: 1442 case CEPH_MSG_OSD_OPREPLY:
1471 return get_reply(con, hdr, skip); 1443 return get_reply(con, hdr, skip);
1472 default: 1444 default:
@@ -1552,7 +1524,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
1552 return ceph_monc_validate_auth(&osdc->client->monc); 1524 return ceph_monc_validate_auth(&osdc->client->monc);
1553} 1525}
1554 1526
1555const static struct ceph_connection_operations osd_con_ops = { 1527static const struct ceph_connection_operations osd_con_ops = {
1556 .get = get_osd_con, 1528 .get = get_osd_con,
1557 .put = put_osd_con, 1529 .put = put_osd_con,
1558 .dispatch = dispatch, 1530 .dispatch = dispatch,
diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c
index 5f8dbf7c745a..b6859f47d364 100644
--- a/fs/ceph/pagelist.c
+++ b/fs/ceph/pagelist.c
@@ -20,7 +20,7 @@ int ceph_pagelist_release(struct ceph_pagelist *pl)
20 20
21static int ceph_pagelist_addpage(struct ceph_pagelist *pl) 21static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
22{ 22{
23 struct page *page = alloc_page(GFP_NOFS); 23 struct page *page = __page_cache_alloc(GFP_NOFS);
24 if (!page) 24 if (!page)
25 return -ENOMEM; 25 return -ENOMEM;
26 pl->room += PAGE_SIZE; 26 pl->room += PAGE_SIZE;
diff --git a/fs/ceph/rados.h b/fs/ceph/rados.h
index fd56451a871f..8fcc023056c7 100644
--- a/fs/ceph/rados.h
+++ b/fs/ceph/rados.h
@@ -101,8 +101,8 @@ struct ceph_pg_pool {
101 __le64 snap_seq; /* seq for per-pool snapshot */ 101 __le64 snap_seq; /* seq for per-pool snapshot */
102 __le32 snap_epoch; /* epoch of last snap */ 102 __le32 snap_epoch; /* epoch of last snap */
103 __le32 num_snaps; 103 __le32 num_snaps;
104 __le32 num_removed_snap_intervals; 104 __le32 num_removed_snap_intervals; /* if non-empty, NO per-pool snaps */
105 __le64 uid; 105 __le64 auid; /* who owns the pg */
106} __attribute__ ((packed)); 106} __attribute__ ((packed));
107 107
108/* 108/*
@@ -208,6 +208,7 @@ enum {
208 /* read */ 208 /* read */
209 CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1, 209 CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
210 CEPH_OSD_OP_GETXATTRS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2, 210 CEPH_OSD_OP_GETXATTRS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2,
211 CEPH_OSD_OP_CMPXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 3,
211 212
212 /* write */ 213 /* write */
213 CEPH_OSD_OP_SETXATTR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 1, 214 CEPH_OSD_OP_SETXATTR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 1,
@@ -305,6 +306,22 @@ enum {
305#define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/ 306#define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/
306#define EBLACKLISTED ESHUTDOWN /* blacklisted */ 307#define EBLACKLISTED ESHUTDOWN /* blacklisted */
307 308
309/* xattr comparison */
310enum {
311 CEPH_OSD_CMPXATTR_OP_NOP = 0,
312 CEPH_OSD_CMPXATTR_OP_EQ = 1,
313 CEPH_OSD_CMPXATTR_OP_NE = 2,
314 CEPH_OSD_CMPXATTR_OP_GT = 3,
315 CEPH_OSD_CMPXATTR_OP_GTE = 4,
316 CEPH_OSD_CMPXATTR_OP_LT = 5,
317 CEPH_OSD_CMPXATTR_OP_LTE = 6
318};
319
320enum {
321 CEPH_OSD_CMPXATTR_MODE_STRING = 1,
322 CEPH_OSD_CMPXATTR_MODE_U64 = 2
323};
324
308/* 325/*
309 * an individual object operation. each may be accompanied by some data 326 * an individual object operation. each may be accompanied by some data
310 * payload 327 * payload
@@ -321,6 +338,8 @@ struct ceph_osd_op {
321 struct { 338 struct {
322 __le32 name_len; 339 __le32 name_len;
323 __le32 value_len; 340 __le32 value_len;
341 __u8 cmp_op; /* CEPH_OSD_CMPXATTR_OP_* */
342 __u8 cmp_mode; /* CEPH_OSD_CMPXATTR_MODE_* */
324 } __attribute__ ((packed)) xattr; 343 } __attribute__ ((packed)) xattr;
325 struct { 344 struct {
326 __u8 class_len; 345 __u8 class_len;
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index d5114db70453..c0b26b6badba 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -512,7 +512,7 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
512 struct ceph_cap_snap *capsnap) 512 struct ceph_cap_snap *capsnap)
513{ 513{
514 struct inode *inode = &ci->vfs_inode; 514 struct inode *inode = &ci->vfs_inode;
515 struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; 515 struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
516 516
517 BUG_ON(capsnap->writing); 517 BUG_ON(capsnap->writing);
518 capsnap->size = inode->i_size; 518 capsnap->size = inode->i_size;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 9307bbee6fbe..7c663d9b9f81 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -8,14 +8,11 @@
8#include <linux/module.h> 8#include <linux/module.h>
9#include <linux/mount.h> 9#include <linux/mount.h>
10#include <linux/parser.h> 10#include <linux/parser.h>
11#include <linux/rwsem.h>
12#include <linux/sched.h> 11#include <linux/sched.h>
13#include <linux/seq_file.h> 12#include <linux/seq_file.h>
14#include <linux/slab.h> 13#include <linux/slab.h>
15#include <linux/statfs.h> 14#include <linux/statfs.h>
16#include <linux/string.h> 15#include <linux/string.h>
17#include <linux/version.h>
18#include <linux/vmalloc.h>
19 16
20#include "decode.h" 17#include "decode.h"
21#include "super.h" 18#include "super.h"
@@ -107,12 +104,40 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
107static int ceph_syncfs(struct super_block *sb, int wait) 104static int ceph_syncfs(struct super_block *sb, int wait)
108{ 105{
109 dout("sync_fs %d\n", wait); 106 dout("sync_fs %d\n", wait);
110 ceph_osdc_sync(&ceph_client(sb)->osdc); 107 ceph_osdc_sync(&ceph_sb_to_client(sb)->osdc);
111 ceph_mdsc_sync(&ceph_client(sb)->mdsc); 108 ceph_mdsc_sync(&ceph_sb_to_client(sb)->mdsc);
112 dout("sync_fs %d done\n", wait); 109 dout("sync_fs %d done\n", wait);
113 return 0; 110 return 0;
114} 111}
115 112
113static int default_congestion_kb(void)
114{
115 int congestion_kb;
116
117 /*
118 * Copied from NFS
119 *
120 * congestion size, scale with available memory.
121 *
122 * 64MB: 8192k
123 * 128MB: 11585k
124 * 256MB: 16384k
125 * 512MB: 23170k
126 * 1GB: 32768k
127 * 2GB: 46340k
128 * 4GB: 65536k
129 * 8GB: 92681k
130 * 16GB: 131072k
131 *
132 * This allows larger machines to have larger/more transfers.
133 * Limit the default to 256M
134 */
135 congestion_kb = (16*int_sqrt(totalram_pages)) << (PAGE_SHIFT-10);
136 if (congestion_kb > 256*1024)
137 congestion_kb = 256*1024;
138
139 return congestion_kb;
140}
116 141
117/** 142/**
118 * ceph_show_options - Show mount options in /proc/mounts 143 * ceph_show_options - Show mount options in /proc/mounts
@@ -138,6 +163,35 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
138 seq_puts(m, ",nocrc"); 163 seq_puts(m, ",nocrc");
139 if (args->flags & CEPH_OPT_NOASYNCREADDIR) 164 if (args->flags & CEPH_OPT_NOASYNCREADDIR)
140 seq_puts(m, ",noasyncreaddir"); 165 seq_puts(m, ",noasyncreaddir");
166
167 if (args->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT)
168 seq_printf(m, ",mount_timeout=%d", args->mount_timeout);
169 if (args->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
170 seq_printf(m, ",osd_idle_ttl=%d", args->osd_idle_ttl);
171 if (args->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT)
172 seq_printf(m, ",osdtimeout=%d", args->osd_timeout);
173 if (args->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
174 seq_printf(m, ",osdkeepalivetimeout=%d",
175 args->osd_keepalive_timeout);
176 if (args->wsize)
177 seq_printf(m, ",wsize=%d", args->wsize);
178 if (args->rsize != CEPH_MOUNT_RSIZE_DEFAULT)
179 seq_printf(m, ",rsize=%d", args->rsize);
180 if (args->congestion_kb != default_congestion_kb())
181 seq_printf(m, ",write_congestion_kb=%d", args->congestion_kb);
182 if (args->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
183 seq_printf(m, ",caps_wanted_delay_min=%d",
184 args->caps_wanted_delay_min);
185 if (args->caps_wanted_delay_max != CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT)
186 seq_printf(m, ",caps_wanted_delay_max=%d",
187 args->caps_wanted_delay_max);
188 if (args->cap_release_safety != CEPH_CAP_RELEASE_SAFETY_DEFAULT)
189 seq_printf(m, ",cap_release_safety=%d",
190 args->cap_release_safety);
191 if (args->max_readdir != CEPH_MAX_READDIR_DEFAULT)
192 seq_printf(m, ",readdir_max_entries=%d", args->max_readdir);
193 if (args->max_readdir_bytes != CEPH_MAX_READDIR_BYTES_DEFAULT)
194 seq_printf(m, ",readdir_max_bytes=%d", args->max_readdir_bytes);
141 if (strcmp(args->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT)) 195 if (strcmp(args->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT))
142 seq_printf(m, ",snapdirname=%s", args->snapdir_name); 196 seq_printf(m, ",snapdirname=%s", args->snapdir_name);
143 if (args->name) 197 if (args->name)
@@ -161,35 +215,6 @@ static void ceph_inode_init_once(void *foo)
161 inode_init_once(&ci->vfs_inode); 215 inode_init_once(&ci->vfs_inode);
162} 216}
163 217
164static int default_congestion_kb(void)
165{
166 int congestion_kb;
167
168 /*
169 * Copied from NFS
170 *
171 * congestion size, scale with available memory.
172 *
173 * 64MB: 8192k
174 * 128MB: 11585k
175 * 256MB: 16384k
176 * 512MB: 23170k
177 * 1GB: 32768k
178 * 2GB: 46340k
179 * 4GB: 65536k
180 * 8GB: 92681k
181 * 16GB: 131072k
182 *
183 * This allows larger machines to have larger/more transfers.
184 * Limit the default to 256M
185 */
186 congestion_kb = (16*int_sqrt(totalram_pages)) << (PAGE_SHIFT-10);
187 if (congestion_kb > 256*1024)
188 congestion_kb = 256*1024;
189
190 return congestion_kb;
191}
192
193static int __init init_caches(void) 218static int __init init_caches(void)
194{ 219{
195 ceph_inode_cachep = kmem_cache_create("ceph_inode_info", 220 ceph_inode_cachep = kmem_cache_create("ceph_inode_info",
@@ -308,7 +333,9 @@ enum {
308 Opt_osd_idle_ttl, 333 Opt_osd_idle_ttl,
309 Opt_caps_wanted_delay_min, 334 Opt_caps_wanted_delay_min,
310 Opt_caps_wanted_delay_max, 335 Opt_caps_wanted_delay_max,
336 Opt_cap_release_safety,
311 Opt_readdir_max_entries, 337 Opt_readdir_max_entries,
338 Opt_readdir_max_bytes,
312 Opt_congestion_kb, 339 Opt_congestion_kb,
313 Opt_last_int, 340 Opt_last_int,
314 /* int args above */ 341 /* int args above */
@@ -339,7 +366,9 @@ static match_table_t arg_tokens = {
339 {Opt_osd_idle_ttl, "osd_idle_ttl=%d"}, 366 {Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
340 {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"}, 367 {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
341 {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"}, 368 {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
369 {Opt_cap_release_safety, "cap_release_safety=%d"},
342 {Opt_readdir_max_entries, "readdir_max_entries=%d"}, 370 {Opt_readdir_max_entries, "readdir_max_entries=%d"},
371 {Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
343 {Opt_congestion_kb, "write_congestion_kb=%d"}, 372 {Opt_congestion_kb, "write_congestion_kb=%d"},
344 /* int args above */ 373 /* int args above */
345 {Opt_snapdirname, "snapdirname=%s"}, 374 {Opt_snapdirname, "snapdirname=%s"},
@@ -388,8 +417,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
388 args->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT; 417 args->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
389 args->rsize = CEPH_MOUNT_RSIZE_DEFAULT; 418 args->rsize = CEPH_MOUNT_RSIZE_DEFAULT;
390 args->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL); 419 args->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
391 args->cap_release_safety = CEPH_CAPS_PER_RELEASE * 4; 420 args->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
392 args->max_readdir = 1024; 421 args->max_readdir = CEPH_MAX_READDIR_DEFAULT;
422 args->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
393 args->congestion_kb = default_congestion_kb(); 423 args->congestion_kb = default_congestion_kb();
394 424
395 /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */ 425 /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
@@ -497,6 +527,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
497 case Opt_readdir_max_entries: 527 case Opt_readdir_max_entries:
498 args->max_readdir = intval; 528 args->max_readdir = intval;
499 break; 529 break;
530 case Opt_readdir_max_bytes:
531 args->max_readdir_bytes = intval;
532 break;
500 case Opt_congestion_kb: 533 case Opt_congestion_kb:
501 args->congestion_kb = intval; 534 args->congestion_kb = intval;
502 break; 535 break;
@@ -682,9 +715,10 @@ int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid)
682/* 715/*
683 * true if we have the mon map (and have thus joined the cluster) 716 * true if we have the mon map (and have thus joined the cluster)
684 */ 717 */
685static int have_mon_map(struct ceph_client *client) 718static int have_mon_and_osd_map(struct ceph_client *client)
686{ 719{
687 return client->monc.monmap && client->monc.monmap->epoch; 720 return client->monc.monmap && client->monc.monmap->epoch &&
721 client->osdc.osdmap && client->osdc.osdmap->epoch;
688} 722}
689 723
690/* 724/*
@@ -762,7 +796,7 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
762 if (err < 0) 796 if (err < 0)
763 goto out; 797 goto out;
764 798
765 while (!have_mon_map(client)) { 799 while (!have_mon_and_osd_map(client)) {
766 err = -EIO; 800 err = -EIO;
767 if (timeout && time_after_eq(jiffies, started + timeout)) 801 if (timeout && time_after_eq(jiffies, started + timeout))
768 goto out; 802 goto out;
@@ -770,8 +804,8 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
770 /* wait */ 804 /* wait */
771 dout("mount waiting for mon_map\n"); 805 dout("mount waiting for mon_map\n");
772 err = wait_event_interruptible_timeout(client->auth_wq, 806 err = wait_event_interruptible_timeout(client->auth_wq,
773 have_mon_map(client) || (client->auth_err < 0), 807 have_mon_and_osd_map(client) || (client->auth_err < 0),
774 timeout); 808 timeout);
775 if (err == -EINTR || err == -ERESTARTSYS) 809 if (err == -EINTR || err == -ERESTARTSYS)
776 goto out; 810 goto out;
777 if (client->auth_err < 0) { 811 if (client->auth_err < 0) {
@@ -884,6 +918,8 @@ static int ceph_compare_super(struct super_block *sb, void *data)
884/* 918/*
885 * construct our own bdi so we can control readahead, etc. 919 * construct our own bdi so we can control readahead, etc.
886 */ 920 */
921static atomic_long_t bdi_seq = ATOMIC_INIT(0);
922
887static int ceph_register_bdi(struct super_block *sb, struct ceph_client *client) 923static int ceph_register_bdi(struct super_block *sb, struct ceph_client *client)
888{ 924{
889 int err; 925 int err;
@@ -893,7 +929,8 @@ static int ceph_register_bdi(struct super_block *sb, struct ceph_client *client)
893 client->backing_dev_info.ra_pages = 929 client->backing_dev_info.ra_pages =
894 (client->mount_args->rsize + PAGE_CACHE_SIZE - 1) 930 (client->mount_args->rsize + PAGE_CACHE_SIZE - 1)
895 >> PAGE_SHIFT; 931 >> PAGE_SHIFT;
896 err = bdi_register_dev(&client->backing_dev_info, sb->s_dev); 932 err = bdi_register(&client->backing_dev_info, NULL, "ceph-%d",
933 atomic_long_inc_return(&bdi_seq));
897 if (!err) 934 if (!err)
898 sb->s_bdi = &client->backing_dev_info; 935 sb->s_bdi = &client->backing_dev_info;
899 return err; 936 return err;
@@ -932,9 +969,9 @@ static int ceph_get_sb(struct file_system_type *fs_type,
932 goto out; 969 goto out;
933 } 970 }
934 971
935 if (ceph_client(sb) != client) { 972 if (ceph_sb_to_client(sb) != client) {
936 ceph_destroy_client(client); 973 ceph_destroy_client(client);
937 client = ceph_client(sb); 974 client = ceph_sb_to_client(sb);
938 dout("get_sb got existing client %p\n", client); 975 dout("get_sb got existing client %p\n", client);
939 } else { 976 } else {
940 dout("get_sb using new client %p\n", client); 977 dout("get_sb using new client %p\n", client);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 13513b80d87f..3725c9ee9d08 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -52,24 +52,25 @@
52 52
53struct ceph_mount_args { 53struct ceph_mount_args {
54 int sb_flags; 54 int sb_flags;
55 int flags;
56 struct ceph_fsid fsid;
57 struct ceph_entity_addr my_addr;
55 int num_mon; 58 int num_mon;
56 struct ceph_entity_addr *mon_addr; 59 struct ceph_entity_addr *mon_addr;
57 int flags;
58 int mount_timeout; 60 int mount_timeout;
59 int osd_idle_ttl; 61 int osd_idle_ttl;
60 int caps_wanted_delay_min, caps_wanted_delay_max;
61 struct ceph_fsid fsid;
62 struct ceph_entity_addr my_addr;
63 int wsize;
64 int rsize; /* max readahead */
65 int max_readdir; /* max readdir size */
66 int congestion_kb; /* max readdir size */
67 int osd_timeout; 62 int osd_timeout;
68 int osd_keepalive_timeout; 63 int osd_keepalive_timeout;
64 int wsize;
65 int rsize; /* max readahead */
66 int congestion_kb; /* max writeback in flight */
67 int caps_wanted_delay_min, caps_wanted_delay_max;
68 int cap_release_safety;
69 int max_readdir; /* max readdir result (entires) */
70 int max_readdir_bytes; /* max readdir result (bytes) */
69 char *snapdir_name; /* default ".snap" */ 71 char *snapdir_name; /* default ".snap" */
70 char *name; 72 char *name;
71 char *secret; 73 char *secret;
72 int cap_release_safety;
73}; 74};
74 75
75/* 76/*
@@ -80,13 +81,14 @@ struct ceph_mount_args {
80#define CEPH_OSD_KEEPALIVE_DEFAULT 5 81#define CEPH_OSD_KEEPALIVE_DEFAULT 5
81#define CEPH_OSD_IDLE_TTL_DEFAULT 60 82#define CEPH_OSD_IDLE_TTL_DEFAULT 60
82#define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */ 83#define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */
84#define CEPH_MAX_READDIR_DEFAULT 1024
85#define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024)
83 86
84#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) 87#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
85#define CEPH_MSG_MAX_DATA_LEN (16*1024*1024) 88#define CEPH_MSG_MAX_DATA_LEN (16*1024*1024)
86 89
87#define CEPH_SNAPDIRNAME_DEFAULT ".snap" 90#define CEPH_SNAPDIRNAME_DEFAULT ".snap"
88#define CEPH_AUTH_NAME_DEFAULT "guest" 91#define CEPH_AUTH_NAME_DEFAULT "guest"
89
90/* 92/*
91 * Delay telling the MDS we no longer want caps, in case we reopen 93 * Delay telling the MDS we no longer want caps, in case we reopen
92 * the file. Delay a minimum amount of time, even if we send a cap 94 * the file. Delay a minimum amount of time, even if we send a cap
@@ -96,6 +98,7 @@ struct ceph_mount_args {
96#define CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT 5 /* cap release delay */ 98#define CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT 5 /* cap release delay */
97#define CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT 60 /* cap release delay */ 99#define CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT 60 /* cap release delay */
98 100
101#define CEPH_CAP_RELEASE_SAFETY_DEFAULT (CEPH_CAPS_PER_RELEASE * 4)
99 102
100/* mount state */ 103/* mount state */
101enum { 104enum {
@@ -160,12 +163,6 @@ struct ceph_client {
160#endif 163#endif
161}; 164};
162 165
163static inline struct ceph_client *ceph_client(struct super_block *sb)
164{
165 return sb->s_fs_info;
166}
167
168
169/* 166/*
170 * File i/o capability. This tracks shared state with the metadata 167 * File i/o capability. This tracks shared state with the metadata
171 * server that allows us to cache or writeback attributes or to read 168 * server that allows us to cache or writeback attributes or to read
@@ -871,6 +868,7 @@ extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
871extern void ceph_dentry_lru_add(struct dentry *dn); 868extern void ceph_dentry_lru_add(struct dentry *dn);
872extern void ceph_dentry_lru_touch(struct dentry *dn); 869extern void ceph_dentry_lru_touch(struct dentry *dn);
873extern void ceph_dentry_lru_del(struct dentry *dn); 870extern void ceph_dentry_lru_del(struct dentry *dn);
871extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
874 872
875/* 873/*
876 * our d_ops vary depending on whether the inode is live, 874 * our d_ops vary depending on whether the inode is live,
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 2845422907fc..68aeebc69681 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -7,7 +7,8 @@
7 7
8static bool ceph_is_valid_xattr(const char *name) 8static bool ceph_is_valid_xattr(const char *name)
9{ 9{
10 return !strncmp(name, XATTR_SECURITY_PREFIX, 10 return !strncmp(name, "ceph.", 5) ||
11 !strncmp(name, XATTR_SECURITY_PREFIX,
11 XATTR_SECURITY_PREFIX_LEN) || 12 XATTR_SECURITY_PREFIX_LEN) ||
12 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) || 13 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
13 !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN); 14 !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN);
@@ -76,14 +77,14 @@ static size_t ceph_vxattrcb_rctime(struct ceph_inode_info *ci, char *val,
76} 77}
77 78
78static struct ceph_vxattr_cb ceph_dir_vxattrs[] = { 79static struct ceph_vxattr_cb ceph_dir_vxattrs[] = {
79 { true, "user.ceph.dir.entries", ceph_vxattrcb_entries}, 80 { true, "ceph.dir.entries", ceph_vxattrcb_entries},
80 { true, "user.ceph.dir.files", ceph_vxattrcb_files}, 81 { true, "ceph.dir.files", ceph_vxattrcb_files},
81 { true, "user.ceph.dir.subdirs", ceph_vxattrcb_subdirs}, 82 { true, "ceph.dir.subdirs", ceph_vxattrcb_subdirs},
82 { true, "user.ceph.dir.rentries", ceph_vxattrcb_rentries}, 83 { true, "ceph.dir.rentries", ceph_vxattrcb_rentries},
83 { true, "user.ceph.dir.rfiles", ceph_vxattrcb_rfiles}, 84 { true, "ceph.dir.rfiles", ceph_vxattrcb_rfiles},
84 { true, "user.ceph.dir.rsubdirs", ceph_vxattrcb_rsubdirs}, 85 { true, "ceph.dir.rsubdirs", ceph_vxattrcb_rsubdirs},
85 { true, "user.ceph.dir.rbytes", ceph_vxattrcb_rbytes}, 86 { true, "ceph.dir.rbytes", ceph_vxattrcb_rbytes},
86 { true, "user.ceph.dir.rctime", ceph_vxattrcb_rctime}, 87 { true, "ceph.dir.rctime", ceph_vxattrcb_rctime},
87 { true, NULL, NULL } 88 { true, NULL, NULL }
88}; 89};
89 90
@@ -107,7 +108,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
107} 108}
108 109
109static struct ceph_vxattr_cb ceph_file_vxattrs[] = { 110static struct ceph_vxattr_cb ceph_file_vxattrs[] = {
110 { true, "user.ceph.layout", ceph_vxattrcb_layout}, 111 { true, "ceph.layout", ceph_vxattrcb_layout},
111 { NULL, NULL } 112 { NULL, NULL }
112}; 113};
113 114
@@ -186,12 +187,6 @@ static int __set_xattr(struct ceph_inode_info *ci,
186 ci->i_xattrs.names_size -= xattr->name_len; 187 ci->i_xattrs.names_size -= xattr->name_len;
187 ci->i_xattrs.vals_size -= xattr->val_len; 188 ci->i_xattrs.vals_size -= xattr->val_len;
188 } 189 }
189 if (!xattr) {
190 pr_err("__set_xattr ENOMEM on %p %llx.%llx xattr %s=%s\n",
191 &ci->vfs_inode, ceph_vinop(&ci->vfs_inode), name,
192 xattr->val);
193 return -ENOMEM;
194 }
195 ci->i_xattrs.names_size += name_len; 190 ci->i_xattrs.names_size += name_len;
196 ci->i_xattrs.vals_size += val_len; 191 ci->i_xattrs.vals_size += val_len;
197 if (val) 192 if (val)
@@ -574,7 +569,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
574 ci->i_xattrs.version, ci->i_xattrs.index_version); 569 ci->i_xattrs.version, ci->i_xattrs.index_version);
575 570
576 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) && 571 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
577 (ci->i_xattrs.index_version > ci->i_xattrs.version)) { 572 (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
578 goto list_xattr; 573 goto list_xattr;
579 } else { 574 } else {
580 spin_unlock(&inode->i_lock); 575 spin_unlock(&inode->i_lock);
@@ -622,7 +617,7 @@ out:
622static int ceph_sync_setxattr(struct dentry *dentry, const char *name, 617static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
623 const char *value, size_t size, int flags) 618 const char *value, size_t size, int flags)
624{ 619{
625 struct ceph_client *client = ceph_client(dentry->d_sb); 620 struct ceph_client *client = ceph_sb_to_client(dentry->d_sb);
626 struct inode *inode = dentry->d_inode; 621 struct inode *inode = dentry->d_inode;
627 struct ceph_inode_info *ci = ceph_inode(inode); 622 struct ceph_inode_info *ci = ceph_inode(inode);
628 struct inode *parent_inode = dentry->d_parent->d_inode; 623 struct inode *parent_inode = dentry->d_parent->d_inode;
@@ -641,7 +636,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
641 return -ENOMEM; 636 return -ENOMEM;
642 err = -ENOMEM; 637 err = -ENOMEM;
643 for (i = 0; i < nr_pages; i++) { 638 for (i = 0; i < nr_pages; i++) {
644 pages[i] = alloc_page(GFP_NOFS); 639 pages[i] = __page_cache_alloc(GFP_NOFS);
645 if (!pages[i]) { 640 if (!pages[i]) {
646 nr_pages = i; 641 nr_pages = i;
647 goto out; 642 goto out;
@@ -779,7 +774,7 @@ out:
779 774
780static int ceph_send_removexattr(struct dentry *dentry, const char *name) 775static int ceph_send_removexattr(struct dentry *dentry, const char *name)
781{ 776{
782 struct ceph_client *client = ceph_client(dentry->d_sb); 777 struct ceph_client *client = ceph_sb_to_client(dentry->d_sb);
783 struct ceph_mds_client *mdsc = &client->mdsc; 778 struct ceph_mds_client *mdsc = &client->mdsc;
784 struct inode *inode = dentry->d_inode; 779 struct inode *inode = dentry->d_inode;
785 struct inode *parent_inode = dentry->d_parent->d_inode; 780 struct inode *parent_inode = dentry->d_parent->d_inode;