aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c270
1 files changed, 190 insertions, 80 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index fad95f8f2608..0c1d91756528 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1,17 +1,20 @@
1#include "ceph_debug.h" 1#include <linux/ceph/ceph_debug.h>
2 2
3#include <linux/fs.h>
3#include <linux/wait.h> 4#include <linux/wait.h>
4#include <linux/slab.h> 5#include <linux/slab.h>
5#include <linux/sched.h> 6#include <linux/sched.h>
6#include <linux/smp_lock.h> 7#include <linux/debugfs.h>
8#include <linux/seq_file.h>
7 9
8#include "mds_client.h"
9#include "mon_client.h"
10#include "super.h" 10#include "super.h"
11#include "messenger.h" 11#include "mds_client.h"
12#include "decode.h" 12
13#include "auth.h" 13#include <linux/ceph/messenger.h>
14#include "pagelist.h" 14#include <linux/ceph/decode.h>
15#include <linux/ceph/pagelist.h>
16#include <linux/ceph/auth.h>
17#include <linux/ceph/debugfs.h>
15 18
16/* 19/*
17 * A cluster of MDS (metadata server) daemons is responsible for 20 * A cluster of MDS (metadata server) daemons is responsible for
@@ -57,7 +60,8 @@ static const struct ceph_connection_operations mds_con_ops;
57 * parse individual inode info 60 * parse individual inode info
58 */ 61 */
59static int parse_reply_info_in(void **p, void *end, 62static int parse_reply_info_in(void **p, void *end,
60 struct ceph_mds_reply_info_in *info) 63 struct ceph_mds_reply_info_in *info,
64 int features)
61{ 65{
62 int err = -EIO; 66 int err = -EIO;
63 67
@@ -71,6 +75,12 @@ static int parse_reply_info_in(void **p, void *end,
71 info->symlink = *p; 75 info->symlink = *p;
72 *p += info->symlink_len; 76 *p += info->symlink_len;
73 77
78 if (features & CEPH_FEATURE_DIRLAYOUTHASH)
79 ceph_decode_copy_safe(p, end, &info->dir_layout,
80 sizeof(info->dir_layout), bad);
81 else
82 memset(&info->dir_layout, 0, sizeof(info->dir_layout));
83
74 ceph_decode_32_safe(p, end, info->xattr_len, bad); 84 ceph_decode_32_safe(p, end, info->xattr_len, bad);
75 ceph_decode_need(p, end, info->xattr_len, bad); 85 ceph_decode_need(p, end, info->xattr_len, bad);
76 info->xattr_data = *p; 86 info->xattr_data = *p;
@@ -85,12 +95,13 @@ bad:
85 * target inode. 95 * target inode.
86 */ 96 */
87static int parse_reply_info_trace(void **p, void *end, 97static int parse_reply_info_trace(void **p, void *end,
88 struct ceph_mds_reply_info_parsed *info) 98 struct ceph_mds_reply_info_parsed *info,
99 int features)
89{ 100{
90 int err; 101 int err;
91 102
92 if (info->head->is_dentry) { 103 if (info->head->is_dentry) {
93 err = parse_reply_info_in(p, end, &info->diri); 104 err = parse_reply_info_in(p, end, &info->diri, features);
94 if (err < 0) 105 if (err < 0)
95 goto out_bad; 106 goto out_bad;
96 107
@@ -111,7 +122,7 @@ static int parse_reply_info_trace(void **p, void *end,
111 } 122 }
112 123
113 if (info->head->is_target) { 124 if (info->head->is_target) {
114 err = parse_reply_info_in(p, end, &info->targeti); 125 err = parse_reply_info_in(p, end, &info->targeti, features);
115 if (err < 0) 126 if (err < 0)
116 goto out_bad; 127 goto out_bad;
117 } 128 }
@@ -131,7 +142,8 @@ out_bad:
131 * parse readdir results 142 * parse readdir results
132 */ 143 */
133static int parse_reply_info_dir(void **p, void *end, 144static int parse_reply_info_dir(void **p, void *end,
134 struct ceph_mds_reply_info_parsed *info) 145 struct ceph_mds_reply_info_parsed *info,
146 int features)
135{ 147{
136 u32 num, i = 0; 148 u32 num, i = 0;
137 int err; 149 int err;
@@ -179,7 +191,7 @@ static int parse_reply_info_dir(void **p, void *end,
179 *p += sizeof(struct ceph_mds_reply_lease); 191 *p += sizeof(struct ceph_mds_reply_lease);
180 192
181 /* inode */ 193 /* inode */
182 err = parse_reply_info_in(p, end, &info->dir_in[i]); 194 err = parse_reply_info_in(p, end, &info->dir_in[i], features);
183 if (err < 0) 195 if (err < 0)
184 goto out_bad; 196 goto out_bad;
185 i++; 197 i++;
@@ -199,10 +211,45 @@ out_bad:
199} 211}
200 212
201/* 213/*
214 * parse fcntl F_GETLK results
215 */
216static int parse_reply_info_filelock(void **p, void *end,
217 struct ceph_mds_reply_info_parsed *info,
218 int features)
219{
220 if (*p + sizeof(*info->filelock_reply) > end)
221 goto bad;
222
223 info->filelock_reply = *p;
224 *p += sizeof(*info->filelock_reply);
225
226 if (unlikely(*p != end))
227 goto bad;
228 return 0;
229
230bad:
231 return -EIO;
232}
233
234/*
235 * parse extra results
236 */
237static int parse_reply_info_extra(void **p, void *end,
238 struct ceph_mds_reply_info_parsed *info,
239 int features)
240{
241 if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
242 return parse_reply_info_filelock(p, end, info, features);
243 else
244 return parse_reply_info_dir(p, end, info, features);
245}
246
247/*
202 * parse entire mds reply 248 * parse entire mds reply
203 */ 249 */
204static int parse_reply_info(struct ceph_msg *msg, 250static int parse_reply_info(struct ceph_msg *msg,
205 struct ceph_mds_reply_info_parsed *info) 251 struct ceph_mds_reply_info_parsed *info,
252 int features)
206{ 253{
207 void *p, *end; 254 void *p, *end;
208 u32 len; 255 u32 len;
@@ -215,15 +262,15 @@ static int parse_reply_info(struct ceph_msg *msg,
215 /* trace */ 262 /* trace */
216 ceph_decode_32_safe(&p, end, len, bad); 263 ceph_decode_32_safe(&p, end, len, bad);
217 if (len > 0) { 264 if (len > 0) {
218 err = parse_reply_info_trace(&p, p+len, info); 265 err = parse_reply_info_trace(&p, p+len, info, features);
219 if (err < 0) 266 if (err < 0)
220 goto out_bad; 267 goto out_bad;
221 } 268 }
222 269
223 /* dir content */ 270 /* extra */
224 ceph_decode_32_safe(&p, end, len, bad); 271 ceph_decode_32_safe(&p, end, len, bad);
225 if (len > 0) { 272 if (len > 0) {
226 err = parse_reply_info_dir(&p, p+len, info); 273 err = parse_reply_info_extra(&p, p+len, info, features);
227 if (err < 0) 274 if (err < 0)
228 goto out_bad; 275 goto out_bad;
229 } 276 }
@@ -286,8 +333,9 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
286 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); 333 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
287 if (atomic_dec_and_test(&s->s_ref)) { 334 if (atomic_dec_and_test(&s->s_ref)) {
288 if (s->s_authorizer) 335 if (s->s_authorizer)
289 s->s_mdsc->client->monc.auth->ops->destroy_authorizer( 336 s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
290 s->s_mdsc->client->monc.auth, s->s_authorizer); 337 s->s_mdsc->fsc->client->monc.auth,
338 s->s_authorizer);
291 kfree(s); 339 kfree(s);
292 } 340 }
293} 341}
@@ -344,7 +392,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
344 s->s_seq = 0; 392 s->s_seq = 0;
345 mutex_init(&s->s_mutex); 393 mutex_init(&s->s_mutex);
346 394
347 ceph_con_init(mdsc->client->msgr, &s->s_con); 395 ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
348 s->s_con.private = s; 396 s->s_con.private = s;
349 s->s_con.ops = &mds_con_ops; 397 s->s_con.ops = &mds_con_ops;
350 s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS; 398 s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
@@ -524,9 +572,13 @@ static void __register_request(struct ceph_mds_client *mdsc,
524 ceph_mdsc_get_request(req); 572 ceph_mdsc_get_request(req);
525 __insert_request(mdsc, req); 573 __insert_request(mdsc, req);
526 574
575 req->r_uid = current_fsuid();
576 req->r_gid = current_fsgid();
577
527 if (dir) { 578 if (dir) {
528 struct ceph_inode_info *ci = ceph_inode(dir); 579 struct ceph_inode_info *ci = ceph_inode(dir);
529 580
581 ihold(dir);
530 spin_lock(&ci->i_unsafe_lock); 582 spin_lock(&ci->i_unsafe_lock);
531 req->r_unsafe_dir = dir; 583 req->r_unsafe_dir = dir;
532 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 584 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
@@ -547,6 +599,9 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
547 spin_lock(&ci->i_unsafe_lock); 599 spin_lock(&ci->i_unsafe_lock);
548 list_del_init(&req->r_unsafe_dir_item); 600 list_del_init(&req->r_unsafe_dir_item);
549 spin_unlock(&ci->i_unsafe_lock); 601 spin_unlock(&ci->i_unsafe_lock);
602
603 iput(req->r_unsafe_dir);
604 req->r_unsafe_dir = NULL;
550 } 605 }
551 606
552 ceph_mdsc_put_request(req); 607 ceph_mdsc_put_request(req);
@@ -599,7 +654,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
599 } else if (req->r_dentry) { 654 } else if (req->r_dentry) {
600 struct inode *dir = req->r_dentry->d_parent->d_inode; 655 struct inode *dir = req->r_dentry->d_parent->d_inode;
601 656
602 if (dir->i_sb != mdsc->client->sb) { 657 if (dir->i_sb != mdsc->fsc->sb) {
603 /* not this fs! */ 658 /* not this fs! */
604 inode = req->r_dentry->d_inode; 659 inode = req->r_dentry->d_inode;
605 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 660 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
@@ -615,7 +670,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
615 } else { 670 } else {
616 /* dir + name */ 671 /* dir + name */
617 inode = dir; 672 inode = dir;
618 hash = req->r_dentry->d_name.hash; 673 hash = ceph_dentry_hash(req->r_dentry);
619 is_hash = true; 674 is_hash = true;
620 } 675 }
621 } 676 }
@@ -642,9 +697,11 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
642 dout("choose_mds %p %llx.%llx " 697 dout("choose_mds %p %llx.%llx "
643 "frag %u mds%d (%d/%d)\n", 698 "frag %u mds%d (%d/%d)\n",
644 inode, ceph_vinop(inode), 699 inode, ceph_vinop(inode),
645 frag.frag, frag.mds, 700 frag.frag, mds,
646 (int)r, frag.ndist); 701 (int)r, frag.ndist);
647 return mds; 702 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
703 CEPH_MDS_STATE_ACTIVE)
704 return mds;
648 } 705 }
649 706
650 /* since this file/dir wasn't known to be 707 /* since this file/dir wasn't known to be
@@ -657,7 +714,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
657 dout("choose_mds %p %llx.%llx " 714 dout("choose_mds %p %llx.%llx "
658 "frag %u mds%d (auth)\n", 715 "frag %u mds%d (auth)\n",
659 inode, ceph_vinop(inode), frag.frag, mds); 716 inode, ceph_vinop(inode), frag.frag, mds);
660 return mds; 717 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
718 CEPH_MDS_STATE_ACTIVE)
719 return mds;
661 } 720 }
662 } 721 }
663 } 722 }
@@ -884,7 +943,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
884 __ceph_remove_cap(cap); 943 __ceph_remove_cap(cap);
885 if (!__ceph_is_any_real_caps(ci)) { 944 if (!__ceph_is_any_real_caps(ci)) {
886 struct ceph_mds_client *mdsc = 945 struct ceph_mds_client *mdsc =
887 &ceph_sb_to_client(inode->i_sb)->mdsc; 946 ceph_sb_to_client(inode->i_sb)->mdsc;
888 947
889 spin_lock(&mdsc->cap_dirty_lock); 948 spin_lock(&mdsc->cap_dirty_lock);
890 if (!list_empty(&ci->i_dirty_item)) { 949 if (!list_empty(&ci->i_dirty_item)) {
@@ -1146,7 +1205,7 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1146 struct ceph_msg *msg, *partial = NULL; 1205 struct ceph_msg *msg, *partial = NULL;
1147 struct ceph_mds_cap_release *head; 1206 struct ceph_mds_cap_release *head;
1148 int err = -ENOMEM; 1207 int err = -ENOMEM;
1149 int extra = mdsc->client->mount_args->cap_release_safety; 1208 int extra = mdsc->fsc->mount_options->cap_release_safety;
1150 int num; 1209 int num;
1151 1210
1152 dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, 1211 dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
@@ -1379,12 +1438,15 @@ char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1379 struct dentry *temp; 1438 struct dentry *temp;
1380 char *path; 1439 char *path;
1381 int len, pos; 1440 int len, pos;
1441 unsigned seq;
1382 1442
1383 if (dentry == NULL) 1443 if (dentry == NULL)
1384 return ERR_PTR(-EINVAL); 1444 return ERR_PTR(-EINVAL);
1385 1445
1386retry: 1446retry:
1387 len = 0; 1447 len = 0;
1448 seq = read_seqbegin(&rename_lock);
1449 rcu_read_lock();
1388 for (temp = dentry; !IS_ROOT(temp);) { 1450 for (temp = dentry; !IS_ROOT(temp);) {
1389 struct inode *inode = temp->d_inode; 1451 struct inode *inode = temp->d_inode;
1390 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) 1452 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
@@ -1396,10 +1458,12 @@ retry:
1396 len += 1 + temp->d_name.len; 1458 len += 1 + temp->d_name.len;
1397 temp = temp->d_parent; 1459 temp = temp->d_parent;
1398 if (temp == NULL) { 1460 if (temp == NULL) {
1461 rcu_read_unlock();
1399 pr_err("build_path corrupt dentry %p\n", dentry); 1462 pr_err("build_path corrupt dentry %p\n", dentry);
1400 return ERR_PTR(-EINVAL); 1463 return ERR_PTR(-EINVAL);
1401 } 1464 }
1402 } 1465 }
1466 rcu_read_unlock();
1403 if (len) 1467 if (len)
1404 len--; /* no leading '/' */ 1468 len--; /* no leading '/' */
1405 1469
@@ -1408,9 +1472,12 @@ retry:
1408 return ERR_PTR(-ENOMEM); 1472 return ERR_PTR(-ENOMEM);
1409 pos = len; 1473 pos = len;
1410 path[pos] = 0; /* trailing null */ 1474 path[pos] = 0; /* trailing null */
1475 rcu_read_lock();
1411 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) { 1476 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1412 struct inode *inode = temp->d_inode; 1477 struct inode *inode;
1413 1478
1479 spin_lock(&temp->d_lock);
1480 inode = temp->d_inode;
1414 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1481 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1415 dout("build_path path+%d: %p SNAPDIR\n", 1482 dout("build_path path+%d: %p SNAPDIR\n",
1416 pos, temp); 1483 pos, temp);
@@ -1419,21 +1486,26 @@ retry:
1419 break; 1486 break;
1420 } else { 1487 } else {
1421 pos -= temp->d_name.len; 1488 pos -= temp->d_name.len;
1422 if (pos < 0) 1489 if (pos < 0) {
1490 spin_unlock(&temp->d_lock);
1423 break; 1491 break;
1492 }
1424 strncpy(path + pos, temp->d_name.name, 1493 strncpy(path + pos, temp->d_name.name,
1425 temp->d_name.len); 1494 temp->d_name.len);
1426 } 1495 }
1496 spin_unlock(&temp->d_lock);
1427 if (pos) 1497 if (pos)
1428 path[--pos] = '/'; 1498 path[--pos] = '/';
1429 temp = temp->d_parent; 1499 temp = temp->d_parent;
1430 if (temp == NULL) { 1500 if (temp == NULL) {
1501 rcu_read_unlock();
1431 pr_err("build_path corrupt dentry\n"); 1502 pr_err("build_path corrupt dentry\n");
1432 kfree(path); 1503 kfree(path);
1433 return ERR_PTR(-EINVAL); 1504 return ERR_PTR(-EINVAL);
1434 } 1505 }
1435 } 1506 }
1436 if (pos != 0) { 1507 rcu_read_unlock();
1508 if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1437 pr_err("build_path did not end path lookup where " 1509 pr_err("build_path did not end path lookup where "
1438 "expected, namelen is %d, pos is %d\n", len, pos); 1510 "expected, namelen is %d, pos is %d\n", len, pos);
1439 /* presumably this is only possible if racing with a 1511 /* presumably this is only possible if racing with a
@@ -1447,7 +1519,7 @@ retry:
1447 *base = ceph_ino(temp->d_inode); 1519 *base = ceph_ino(temp->d_inode);
1448 *plen = len; 1520 *plen = len;
1449 dout("build_path on %p %d built %llx '%.*s'\n", 1521 dout("build_path on %p %d built %llx '%.*s'\n",
1450 dentry, atomic_read(&dentry->d_count), *base, len, path); 1522 dentry, dentry->d_count, *base, len, path);
1451 return path; 1523 return path;
1452} 1524}
1453 1525
@@ -1583,8 +1655,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1583 1655
1584 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 1656 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1585 head->op = cpu_to_le32(req->r_op); 1657 head->op = cpu_to_le32(req->r_op);
1586 head->caller_uid = cpu_to_le32(current_fsuid()); 1658 head->caller_uid = cpu_to_le32(req->r_uid);
1587 head->caller_gid = cpu_to_le32(current_fsgid()); 1659 head->caller_gid = cpu_to_le32(req->r_gid);
1588 head->args = req->r_args; 1660 head->args = req->r_args;
1589 1661
1590 ceph_encode_filepath(&p, end, ino1, path1); 1662 ceph_encode_filepath(&p, end, ino1, path1);
@@ -1654,7 +1726,6 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
1654 struct ceph_msg *msg; 1726 struct ceph_msg *msg;
1655 int flags = 0; 1727 int flags = 0;
1656 1728
1657 req->r_mds = mds;
1658 req->r_attempts++; 1729 req->r_attempts++;
1659 if (req->r_inode) { 1730 if (req->r_inode) {
1660 struct ceph_cap *cap = 1731 struct ceph_cap *cap =
@@ -1741,6 +1812,8 @@ static int __do_request(struct ceph_mds_client *mdsc,
1741 goto finish; 1812 goto finish;
1742 } 1813 }
1743 1814
1815 put_request_session(req);
1816
1744 mds = __choose_mds(mdsc, req); 1817 mds = __choose_mds(mdsc, req);
1745 if (mds < 0 || 1818 if (mds < 0 ||
1746 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 1819 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
@@ -1758,6 +1831,8 @@ static int __do_request(struct ceph_mds_client *mdsc,
1758 goto finish; 1831 goto finish;
1759 } 1832 }
1760 } 1833 }
1834 req->r_session = get_session(session);
1835
1761 dout("do_request mds%d session %p state %s\n", mds, session, 1836 dout("do_request mds%d session %p state %s\n", mds, session,
1762 session_state_name(session->s_state)); 1837 session_state_name(session->s_state));
1763 if (session->s_state != CEPH_MDS_SESSION_OPEN && 1838 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
@@ -1770,7 +1845,6 @@ static int __do_request(struct ceph_mds_client *mdsc,
1770 } 1845 }
1771 1846
1772 /* send request */ 1847 /* send request */
1773 req->r_session = get_session(session);
1774 req->r_resend_mds = -1; /* forget any previous mds hint */ 1848 req->r_resend_mds = -1; /* forget any previous mds hint */
1775 1849
1776 if (req->r_request_started == 0) /* note request start time */ 1850 if (req->r_request_started == 0) /* note request start time */
@@ -1824,7 +1898,6 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1824 if (req->r_session && 1898 if (req->r_session &&
1825 req->r_session->s_mds == mds) { 1899 req->r_session->s_mds == mds) {
1826 dout(" kicking tid %llu\n", req->r_tid); 1900 dout(" kicking tid %llu\n", req->r_tid);
1827 put_request_session(req);
1828 __do_request(mdsc, req); 1901 __do_request(mdsc, req);
1829 } 1902 }
1830 } 1903 }
@@ -2017,8 +2090,11 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2017 goto out; 2090 goto out;
2018 } else { 2091 } else {
2019 struct ceph_inode_info *ci = ceph_inode(req->r_inode); 2092 struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2020 struct ceph_cap *cap = 2093 struct ceph_cap *cap = NULL;
2021 ceph_get_cap_for_mds(ci, req->r_mds);; 2094
2095 if (req->r_session)
2096 cap = ceph_get_cap_for_mds(ci,
2097 req->r_session->s_mds);
2022 2098
2023 dout("already using auth"); 2099 dout("already using auth");
2024 if ((!cap || cap != ci->i_auth_cap) || 2100 if ((!cap || cap != ci->i_auth_cap) ||
@@ -2062,12 +2138,12 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2062 2138
2063 dout("handle_reply tid %lld result %d\n", tid, result); 2139 dout("handle_reply tid %lld result %d\n", tid, result);
2064 rinfo = &req->r_reply_info; 2140 rinfo = &req->r_reply_info;
2065 err = parse_reply_info(msg, rinfo); 2141 err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2066 mutex_unlock(&mdsc->mutex); 2142 mutex_unlock(&mdsc->mutex);
2067 2143
2068 mutex_lock(&session->s_mutex); 2144 mutex_lock(&session->s_mutex);
2069 if (err < 0) { 2145 if (err < 0) {
2070 pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds); 2146 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2071 ceph_msg_dump(msg); 2147 ceph_msg_dump(msg);
2072 goto out_err; 2148 goto out_err;
2073 } 2149 }
@@ -2085,9 +2161,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2085 2161
2086 /* insert trace into our cache */ 2162 /* insert trace into our cache */
2087 mutex_lock(&req->r_fill_mutex); 2163 mutex_lock(&req->r_fill_mutex);
2088 err = ceph_fill_trace(mdsc->client->sb, req, req->r_session); 2164 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2089 if (err == 0) { 2165 if (err == 0) {
2090 if (result == 0 && rinfo->dir_nr) 2166 if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
2167 rinfo->dir_nr)
2091 ceph_readdir_prepopulate(req, req->r_session); 2168 ceph_readdir_prepopulate(req, req->r_session);
2092 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2169 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2093 } 2170 }
@@ -2361,19 +2438,35 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2361 2438
2362 if (recon_state->flock) { 2439 if (recon_state->flock) {
2363 int num_fcntl_locks, num_flock_locks; 2440 int num_fcntl_locks, num_flock_locks;
2364 2441 struct ceph_pagelist_cursor trunc_point;
2365 lock_kernel(); 2442
2366 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 2443 ceph_pagelist_set_cursor(pagelist, &trunc_point);
2367 rec.v2.flock_len = (2*sizeof(u32) + 2444 do {
2368 (num_fcntl_locks+num_flock_locks) * 2445 lock_flocks();
2369 sizeof(struct ceph_filelock)); 2446 ceph_count_locks(inode, &num_fcntl_locks,
2370 2447 &num_flock_locks);
2371 err = ceph_pagelist_append(pagelist, &rec, reclen); 2448 rec.v2.flock_len = (2*sizeof(u32) +
2372 if (!err) 2449 (num_fcntl_locks+num_flock_locks) *
2373 err = ceph_encode_locks(inode, pagelist, 2450 sizeof(struct ceph_filelock));
2374 num_fcntl_locks, 2451 unlock_flocks();
2375 num_flock_locks); 2452
2376 unlock_kernel(); 2453 /* pre-alloc pagelist */
2454 ceph_pagelist_truncate(pagelist, &trunc_point);
2455 err = ceph_pagelist_append(pagelist, &rec, reclen);
2456 if (!err)
2457 err = ceph_pagelist_reserve(pagelist,
2458 rec.v2.flock_len);
2459
2460 /* encode locks */
2461 if (!err) {
2462 lock_flocks();
2463 err = ceph_encode_locks(inode,
2464 pagelist,
2465 num_fcntl_locks,
2466 num_flock_locks);
2467 unlock_flocks();
2468 }
2469 } while (err == -ENOSPC);
2377 } else { 2470 } else {
2378 err = ceph_pagelist_append(pagelist, &rec, reclen); 2471 err = ceph_pagelist_append(pagelist, &rec, reclen);
2379 } 2472 }
@@ -2613,9 +2706,8 @@ static void handle_lease(struct ceph_mds_client *mdsc,
2613 struct ceph_mds_session *session, 2706 struct ceph_mds_session *session,
2614 struct ceph_msg *msg) 2707 struct ceph_msg *msg)
2615{ 2708{
2616 struct super_block *sb = mdsc->client->sb; 2709 struct super_block *sb = mdsc->fsc->sb;
2617 struct inode *inode; 2710 struct inode *inode;
2618 struct ceph_inode_info *ci;
2619 struct dentry *parent, *dentry; 2711 struct dentry *parent, *dentry;
2620 struct ceph_dentry_info *di; 2712 struct ceph_dentry_info *di;
2621 int mds = session->s_mds; 2713 int mds = session->s_mds;
@@ -2652,7 +2744,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
2652 dout("handle_lease no inode %llx\n", vino.ino); 2744 dout("handle_lease no inode %llx\n", vino.ino);
2653 goto release; 2745 goto release;
2654 } 2746 }
2655 ci = ceph_inode(inode);
2656 2747
2657 /* dentry */ 2748 /* dentry */
2658 parent = d_find_alias(inode); 2749 parent = d_find_alias(inode);
@@ -2891,10 +2982,16 @@ static void delayed_work(struct work_struct *work)
2891 schedule_delayed(mdsc); 2982 schedule_delayed(mdsc);
2892} 2983}
2893 2984
2985int ceph_mdsc_init(struct ceph_fs_client *fsc)
2894 2986
2895int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2896{ 2987{
2897 mdsc->client = client; 2988 struct ceph_mds_client *mdsc;
2989
2990 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
2991 if (!mdsc)
2992 return -ENOMEM;
2993 mdsc->fsc = fsc;
2994 fsc->mdsc = mdsc;
2898 mutex_init(&mdsc->mutex); 2995 mutex_init(&mdsc->mutex);
2899 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 2996 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2900 if (mdsc->mdsmap == NULL) 2997 if (mdsc->mdsmap == NULL)
@@ -2920,6 +3017,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2920 spin_lock_init(&mdsc->snap_flush_lock); 3017 spin_lock_init(&mdsc->snap_flush_lock);
2921 mdsc->cap_flush_seq = 0; 3018 mdsc->cap_flush_seq = 0;
2922 INIT_LIST_HEAD(&mdsc->cap_dirty); 3019 INIT_LIST_HEAD(&mdsc->cap_dirty);
3020 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
2923 mdsc->num_cap_flushing = 0; 3021 mdsc->num_cap_flushing = 0;
2924 spin_lock_init(&mdsc->cap_dirty_lock); 3022 spin_lock_init(&mdsc->cap_dirty_lock);
2925 init_waitqueue_head(&mdsc->cap_flushing_wq); 3023 init_waitqueue_head(&mdsc->cap_flushing_wq);
@@ -2927,7 +3025,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2927 INIT_LIST_HEAD(&mdsc->dentry_lru); 3025 INIT_LIST_HEAD(&mdsc->dentry_lru);
2928 3026
2929 ceph_caps_init(mdsc); 3027 ceph_caps_init(mdsc);
2930 ceph_adjust_min_caps(mdsc, client->min_caps); 3028 ceph_adjust_min_caps(mdsc, fsc->min_caps);
2931 3029
2932 return 0; 3030 return 0;
2933} 3031}
@@ -2939,7 +3037,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2939static void wait_requests(struct ceph_mds_client *mdsc) 3037static void wait_requests(struct ceph_mds_client *mdsc)
2940{ 3038{
2941 struct ceph_mds_request *req; 3039 struct ceph_mds_request *req;
2942 struct ceph_client *client = mdsc->client; 3040 struct ceph_fs_client *fsc = mdsc->fsc;
2943 3041
2944 mutex_lock(&mdsc->mutex); 3042 mutex_lock(&mdsc->mutex);
2945 if (__get_oldest_req(mdsc)) { 3043 if (__get_oldest_req(mdsc)) {
@@ -2947,7 +3045,7 @@ static void wait_requests(struct ceph_mds_client *mdsc)
2947 3045
2948 dout("wait_requests waiting for requests\n"); 3046 dout("wait_requests waiting for requests\n");
2949 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 3047 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
2950 client->mount_args->mount_timeout * HZ); 3048 fsc->client->options->mount_timeout * HZ);
2951 3049
2952 /* tear down remaining requests */ 3050 /* tear down remaining requests */
2953 mutex_lock(&mdsc->mutex); 3051 mutex_lock(&mdsc->mutex);
@@ -3030,7 +3128,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3030{ 3128{
3031 u64 want_tid, want_flush; 3129 u64 want_tid, want_flush;
3032 3130
3033 if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN) 3131 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3034 return; 3132 return;
3035 3133
3036 dout("sync\n"); 3134 dout("sync\n");
@@ -3053,7 +3151,7 @@ bool done_closing_sessions(struct ceph_mds_client *mdsc)
3053{ 3151{
3054 int i, n = 0; 3152 int i, n = 0;
3055 3153
3056 if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN) 3154 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3057 return true; 3155 return true;
3058 3156
3059 mutex_lock(&mdsc->mutex); 3157 mutex_lock(&mdsc->mutex);
@@ -3071,8 +3169,8 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3071{ 3169{
3072 struct ceph_mds_session *session; 3170 struct ceph_mds_session *session;
3073 int i; 3171 int i;
3074 struct ceph_client *client = mdsc->client; 3172 struct ceph_fs_client *fsc = mdsc->fsc;
3075 unsigned long timeout = client->mount_args->mount_timeout * HZ; 3173 unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3076 3174
3077 dout("close_sessions\n"); 3175 dout("close_sessions\n");
3078 3176
@@ -3119,7 +3217,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3119 dout("stopped\n"); 3217 dout("stopped\n");
3120} 3218}
3121 3219
3122void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 3220static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3123{ 3221{
3124 dout("stop\n"); 3222 dout("stop\n");
3125 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3223 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
@@ -3129,6 +3227,21 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3129 ceph_caps_finalize(mdsc); 3227 ceph_caps_finalize(mdsc);
3130} 3228}
3131 3229
3230void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3231{
3232 struct ceph_mds_client *mdsc = fsc->mdsc;
3233
3234 dout("mdsc_destroy %p\n", mdsc);
3235 ceph_mdsc_stop(mdsc);
3236
3237 /* flush out any connection work with references to us */
3238 ceph_msgr_flush();
3239
3240 fsc->mdsc = NULL;
3241 kfree(mdsc);
3242 dout("mdsc_destroy %p done\n", mdsc);
3243}
3244
3132 3245
3133/* 3246/*
3134 * handle mds map update. 3247 * handle mds map update.
@@ -3145,14 +3258,14 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3145 3258
3146 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 3259 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3147 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3260 ceph_decode_copy(&p, &fsid, sizeof(fsid));
3148 if (ceph_check_fsid(mdsc->client, &fsid) < 0) 3261 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3149 return; 3262 return;
3150 epoch = ceph_decode_32(&p); 3263 epoch = ceph_decode_32(&p);
3151 maplen = ceph_decode_32(&p); 3264 maplen = ceph_decode_32(&p);
3152 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 3265 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3153 3266
3154 /* do we need it? */ 3267 /* do we need it? */
3155 ceph_monc_got_mdsmap(&mdsc->client->monc, epoch); 3268 ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3156 mutex_lock(&mdsc->mutex); 3269 mutex_lock(&mdsc->mutex);
3157 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 3270 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3158 dout("handle_map epoch %u <= our %u\n", 3271 dout("handle_map epoch %u <= our %u\n",
@@ -3176,7 +3289,7 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3176 } else { 3289 } else {
3177 mdsc->mdsmap = newmap; /* first mds map */ 3290 mdsc->mdsmap = newmap; /* first mds map */
3178 } 3291 }
3179 mdsc->client->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size; 3292 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3180 3293
3181 __wake_requests(mdsc, &mdsc->waiting_for_map); 3294 __wake_requests(mdsc, &mdsc->waiting_for_map);
3182 3295
@@ -3207,8 +3320,8 @@ static void con_put(struct ceph_connection *con)
3207{ 3320{
3208 struct ceph_mds_session *s = con->private; 3321 struct ceph_mds_session *s = con->private;
3209 3322
3323 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3210 ceph_put_mds_session(s); 3324 ceph_put_mds_session(s);
3211 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
3212} 3325}
3213 3326
3214/* 3327/*
@@ -3277,7 +3390,7 @@ static int get_authorizer(struct ceph_connection *con,
3277{ 3390{
3278 struct ceph_mds_session *s = con->private; 3391 struct ceph_mds_session *s = con->private;
3279 struct ceph_mds_client *mdsc = s->s_mdsc; 3392 struct ceph_mds_client *mdsc = s->s_mdsc;
3280 struct ceph_auth_client *ac = mdsc->client->monc.auth; 3393 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3281 int ret = 0; 3394 int ret = 0;
3282 3395
3283 if (force_new && s->s_authorizer) { 3396 if (force_new && s->s_authorizer) {
@@ -3311,7 +3424,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
3311{ 3424{
3312 struct ceph_mds_session *s = con->private; 3425 struct ceph_mds_session *s = con->private;
3313 struct ceph_mds_client *mdsc = s->s_mdsc; 3426 struct ceph_mds_client *mdsc = s->s_mdsc;
3314 struct ceph_auth_client *ac = mdsc->client->monc.auth; 3427 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3315 3428
3316 return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len); 3429 return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
3317} 3430}
@@ -3320,12 +3433,12 @@ static int invalidate_authorizer(struct ceph_connection *con)
3320{ 3433{
3321 struct ceph_mds_session *s = con->private; 3434 struct ceph_mds_session *s = con->private;
3322 struct ceph_mds_client *mdsc = s->s_mdsc; 3435 struct ceph_mds_client *mdsc = s->s_mdsc;
3323 struct ceph_auth_client *ac = mdsc->client->monc.auth; 3436 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3324 3437
3325 if (ac->ops->invalidate_authorizer) 3438 if (ac->ops->invalidate_authorizer)
3326 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 3439 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3327 3440
3328 return ceph_monc_validate_auth(&mdsc->client->monc); 3441 return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3329} 3442}
3330 3443
3331static const struct ceph_connection_operations mds_con_ops = { 3444static const struct ceph_connection_operations mds_con_ops = {
@@ -3338,7 +3451,4 @@ static const struct ceph_connection_operations mds_con_ops = {
3338 .peer_reset = peer_reset, 3451 .peer_reset = peer_reset,
3339}; 3452};
3340 3453
3341
3342
3343
3344/* eof */ 3454/* eof */