aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c409
1 files changed, 251 insertions, 158 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 60a9a4ae47be..885aa5710cfd 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -40,7 +40,7 @@
40static void __wake_requests(struct ceph_mds_client *mdsc, 40static void __wake_requests(struct ceph_mds_client *mdsc,
41 struct list_head *head); 41 struct list_head *head);
42 42
43const static struct ceph_connection_operations mds_con_ops; 43static const struct ceph_connection_operations mds_con_ops;
44 44
45 45
46/* 46/*
@@ -665,10 +665,10 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
665 struct ceph_msg *msg; 665 struct ceph_msg *msg;
666 struct ceph_mds_session_head *h; 666 struct ceph_mds_session_head *h;
667 667
668 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL); 668 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
669 if (IS_ERR(msg)) { 669 if (!msg) {
670 pr_err("create_session_msg ENOMEM creating msg\n"); 670 pr_err("create_session_msg ENOMEM creating msg\n");
671 return ERR_PTR(PTR_ERR(msg)); 671 return NULL;
672 } 672 }
673 h = msg->front.iov_base; 673 h = msg->front.iov_base;
674 h->op = cpu_to_le32(op); 674 h->op = cpu_to_le32(op);
@@ -687,7 +687,6 @@ static int __open_session(struct ceph_mds_client *mdsc,
687 struct ceph_msg *msg; 687 struct ceph_msg *msg;
688 int mstate; 688 int mstate;
689 int mds = session->s_mds; 689 int mds = session->s_mds;
690 int err = 0;
691 690
692 /* wait for mds to go active? */ 691 /* wait for mds to go active? */
693 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 692 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
@@ -698,13 +697,9 @@ static int __open_session(struct ceph_mds_client *mdsc,
698 697
699 /* send connect message */ 698 /* send connect message */
700 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); 699 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
701 if (IS_ERR(msg)) { 700 if (!msg)
702 err = PTR_ERR(msg); 701 return -ENOMEM;
703 goto out;
704 }
705 ceph_con_send(&session->s_con, msg); 702 ceph_con_send(&session->s_con, msg);
706
707out:
708 return 0; 703 return 0;
709} 704}
710 705
@@ -736,9 +731,10 @@ static void cleanup_cap_releases(struct ceph_mds_session *session)
736} 731}
737 732
738/* 733/*
739 * Helper to safely iterate over all caps associated with a session. 734 * Helper to safely iterate over all caps associated with a session, with
735 * special care taken to handle a racing __ceph_remove_cap().
740 * 736 *
741 * caller must hold session s_mutex 737 * Caller must hold session s_mutex.
742 */ 738 */
743static int iterate_session_caps(struct ceph_mds_session *session, 739static int iterate_session_caps(struct ceph_mds_session *session,
744 int (*cb)(struct inode *, struct ceph_cap *, 740 int (*cb)(struct inode *, struct ceph_cap *,
@@ -803,12 +799,49 @@ out:
803} 799}
804 800
805static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 801static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
806 void *arg) 802 void *arg)
807{ 803{
808 struct ceph_inode_info *ci = ceph_inode(inode); 804 struct ceph_inode_info *ci = ceph_inode(inode);
805 int drop = 0;
806
809 dout("removing cap %p, ci is %p, inode is %p\n", 807 dout("removing cap %p, ci is %p, inode is %p\n",
810 cap, ci, &ci->vfs_inode); 808 cap, ci, &ci->vfs_inode);
811 ceph_remove_cap(cap); 809 spin_lock(&inode->i_lock);
810 __ceph_remove_cap(cap);
811 if (!__ceph_is_any_real_caps(ci)) {
812 struct ceph_mds_client *mdsc =
813 &ceph_sb_to_client(inode->i_sb)->mdsc;
814
815 spin_lock(&mdsc->cap_dirty_lock);
816 if (!list_empty(&ci->i_dirty_item)) {
817 pr_info(" dropping dirty %s state for %p %lld\n",
818 ceph_cap_string(ci->i_dirty_caps),
819 inode, ceph_ino(inode));
820 ci->i_dirty_caps = 0;
821 list_del_init(&ci->i_dirty_item);
822 drop = 1;
823 }
824 if (!list_empty(&ci->i_flushing_item)) {
825 pr_info(" dropping dirty+flushing %s state for %p %lld\n",
826 ceph_cap_string(ci->i_flushing_caps),
827 inode, ceph_ino(inode));
828 ci->i_flushing_caps = 0;
829 list_del_init(&ci->i_flushing_item);
830 mdsc->num_cap_flushing--;
831 drop = 1;
832 }
833 if (drop && ci->i_wrbuffer_ref) {
834 pr_info(" dropping dirty data for %p %lld\n",
835 inode, ceph_ino(inode));
836 ci->i_wrbuffer_ref = 0;
837 ci->i_wrbuffer_ref_head = 0;
838 drop++;
839 }
840 spin_unlock(&mdsc->cap_dirty_lock);
841 }
842 spin_unlock(&inode->i_lock);
843 while (drop--)
844 iput(inode);
812 return 0; 845 return 0;
813} 846}
814 847
@@ -820,6 +853,7 @@ static void remove_session_caps(struct ceph_mds_session *session)
820 dout("remove_session_caps on %p\n", session); 853 dout("remove_session_caps on %p\n", session);
821 iterate_session_caps(session, remove_session_caps_cb, NULL); 854 iterate_session_caps(session, remove_session_caps_cb, NULL);
822 BUG_ON(session->s_nr_caps > 0); 855 BUG_ON(session->s_nr_caps > 0);
856 BUG_ON(!list_empty(&session->s_cap_flushing));
823 cleanup_cap_releases(session); 857 cleanup_cap_releases(session);
824} 858}
825 859
@@ -882,8 +916,8 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
882 ceph_mds_state_name(state)); 916 ceph_mds_state_name(state));
883 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 917 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
884 ++session->s_renew_seq); 918 ++session->s_renew_seq);
885 if (IS_ERR(msg)) 919 if (!msg)
886 return PTR_ERR(msg); 920 return -ENOMEM;
887 ceph_con_send(&session->s_con, msg); 921 ceph_con_send(&session->s_con, msg);
888 return 0; 922 return 0;
889} 923}
@@ -930,17 +964,15 @@ static int request_close_session(struct ceph_mds_client *mdsc,
930 struct ceph_mds_session *session) 964 struct ceph_mds_session *session)
931{ 965{
932 struct ceph_msg *msg; 966 struct ceph_msg *msg;
933 int err = 0;
934 967
935 dout("request_close_session mds%d state %s seq %lld\n", 968 dout("request_close_session mds%d state %s seq %lld\n",
936 session->s_mds, session_state_name(session->s_state), 969 session->s_mds, session_state_name(session->s_state),
937 session->s_seq); 970 session->s_seq);
938 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 971 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
939 if (IS_ERR(msg)) 972 if (!msg)
940 err = PTR_ERR(msg); 973 return -ENOMEM;
941 else 974 ceph_con_send(&session->s_con, msg);
942 ceph_con_send(&session->s_con, msg); 975 return 0;
943 return err;
944} 976}
945 977
946/* 978/*
@@ -1058,7 +1090,7 @@ static int add_cap_releases(struct ceph_mds_client *mdsc,
1058 while (session->s_num_cap_releases < session->s_nr_caps + extra) { 1090 while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1059 spin_unlock(&session->s_cap_lock); 1091 spin_unlock(&session->s_cap_lock);
1060 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, 1092 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1061 0, 0, NULL); 1093 GFP_NOFS);
1062 if (!msg) 1094 if (!msg)
1063 goto out_unlocked; 1095 goto out_unlocked;
1064 dout("add_cap_releases %p msg %p now %d\n", session, msg, 1096 dout("add_cap_releases %p msg %p now %d\n", session, msg,
@@ -1150,10 +1182,8 @@ static void send_cap_releases(struct ceph_mds_client *mdsc,
1150 struct ceph_msg *msg; 1182 struct ceph_msg *msg;
1151 1183
1152 dout("send_cap_releases mds%d\n", session->s_mds); 1184 dout("send_cap_releases mds%d\n", session->s_mds);
1153 while (1) { 1185 spin_lock(&session->s_cap_lock);
1154 spin_lock(&session->s_cap_lock); 1186 while (!list_empty(&session->s_cap_releases_done)) {
1155 if (list_empty(&session->s_cap_releases_done))
1156 break;
1157 msg = list_first_entry(&session->s_cap_releases_done, 1187 msg = list_first_entry(&session->s_cap_releases_done,
1158 struct ceph_msg, list_head); 1188 struct ceph_msg, list_head);
1159 list_del_init(&msg->list_head); 1189 list_del_init(&msg->list_head);
@@ -1161,10 +1191,49 @@ static void send_cap_releases(struct ceph_mds_client *mdsc,
1161 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1191 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1162 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1192 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1163 ceph_con_send(&session->s_con, msg); 1193 ceph_con_send(&session->s_con, msg);
1194 spin_lock(&session->s_cap_lock);
1164 } 1195 }
1165 spin_unlock(&session->s_cap_lock); 1196 spin_unlock(&session->s_cap_lock);
1166} 1197}
1167 1198
1199static void discard_cap_releases(struct ceph_mds_client *mdsc,
1200 struct ceph_mds_session *session)
1201{
1202 struct ceph_msg *msg;
1203 struct ceph_mds_cap_release *head;
1204 unsigned num;
1205
1206 dout("discard_cap_releases mds%d\n", session->s_mds);
1207 spin_lock(&session->s_cap_lock);
1208
1209 /* zero out the in-progress message */
1210 msg = list_first_entry(&session->s_cap_releases,
1211 struct ceph_msg, list_head);
1212 head = msg->front.iov_base;
1213 num = le32_to_cpu(head->num);
1214 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1215 head->num = cpu_to_le32(0);
1216 session->s_num_cap_releases += num;
1217
1218 /* requeue completed messages */
1219 while (!list_empty(&session->s_cap_releases_done)) {
1220 msg = list_first_entry(&session->s_cap_releases_done,
1221 struct ceph_msg, list_head);
1222 list_del_init(&msg->list_head);
1223
1224 head = msg->front.iov_base;
1225 num = le32_to_cpu(head->num);
1226 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1227 num);
1228 session->s_num_cap_releases += num;
1229 head->num = cpu_to_le32(0);
1230 msg->front.iov_len = sizeof(*head);
1231 list_add(&msg->list_head, &session->s_cap_releases);
1232 }
1233
1234 spin_unlock(&session->s_cap_lock);
1235}
1236
1168/* 1237/*
1169 * requests 1238 * requests
1170 */ 1239 */
@@ -1180,6 +1249,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1180 if (!req) 1249 if (!req)
1181 return ERR_PTR(-ENOMEM); 1250 return ERR_PTR(-ENOMEM);
1182 1251
1252 mutex_init(&req->r_fill_mutex);
1183 req->r_started = jiffies; 1253 req->r_started = jiffies;
1184 req->r_resend_mds = -1; 1254 req->r_resend_mds = -1;
1185 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1255 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1250,7 +1320,7 @@ retry:
1250 len += 1 + temp->d_name.len; 1320 len += 1 + temp->d_name.len;
1251 temp = temp->d_parent; 1321 temp = temp->d_parent;
1252 if (temp == NULL) { 1322 if (temp == NULL) {
1253 pr_err("build_path_dentry corrupt dentry %p\n", dentry); 1323 pr_err("build_path corrupt dentry %p\n", dentry);
1254 return ERR_PTR(-EINVAL); 1324 return ERR_PTR(-EINVAL);
1255 } 1325 }
1256 } 1326 }
@@ -1266,7 +1336,7 @@ retry:
1266 struct inode *inode = temp->d_inode; 1336 struct inode *inode = temp->d_inode;
1267 1337
1268 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1338 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1269 dout("build_path_dentry path+%d: %p SNAPDIR\n", 1339 dout("build_path path+%d: %p SNAPDIR\n",
1270 pos, temp); 1340 pos, temp);
1271 } else if (stop_on_nosnap && inode && 1341 } else if (stop_on_nosnap && inode &&
1272 ceph_snap(inode) == CEPH_NOSNAP) { 1342 ceph_snap(inode) == CEPH_NOSNAP) {
@@ -1277,20 +1347,18 @@ retry:
1277 break; 1347 break;
1278 strncpy(path + pos, temp->d_name.name, 1348 strncpy(path + pos, temp->d_name.name,
1279 temp->d_name.len); 1349 temp->d_name.len);
1280 dout("build_path_dentry path+%d: %p '%.*s'\n",
1281 pos, temp, temp->d_name.len, path + pos);
1282 } 1350 }
1283 if (pos) 1351 if (pos)
1284 path[--pos] = '/'; 1352 path[--pos] = '/';
1285 temp = temp->d_parent; 1353 temp = temp->d_parent;
1286 if (temp == NULL) { 1354 if (temp == NULL) {
1287 pr_err("build_path_dentry corrupt dentry\n"); 1355 pr_err("build_path corrupt dentry\n");
1288 kfree(path); 1356 kfree(path);
1289 return ERR_PTR(-EINVAL); 1357 return ERR_PTR(-EINVAL);
1290 } 1358 }
1291 } 1359 }
1292 if (pos != 0) { 1360 if (pos != 0) {
1293 pr_err("build_path_dentry did not end path lookup where " 1361 pr_err("build_path did not end path lookup where "
1294 "expected, namelen is %d, pos is %d\n", len, pos); 1362 "expected, namelen is %d, pos is %d\n", len, pos);
1295 /* presumably this is only possible if racing with a 1363 /* presumably this is only possible if racing with a
1296 rename of one of the parent directories (we can not 1364 rename of one of the parent directories (we can not
@@ -1302,7 +1370,7 @@ retry:
1302 1370
1303 *base = ceph_ino(temp->d_inode); 1371 *base = ceph_ino(temp->d_inode);
1304 *plen = len; 1372 *plen = len;
1305 dout("build_path_dentry on %p %d built %llx '%.*s'\n", 1373 dout("build_path on %p %d built %llx '%.*s'\n",
1306 dentry, atomic_read(&dentry->d_count), *base, len, path); 1374 dentry, atomic_read(&dentry->d_count), *base, len, path);
1307 return path; 1375 return path;
1308} 1376}
@@ -1425,9 +1493,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1425 if (req->r_old_dentry_drop) 1493 if (req->r_old_dentry_drop)
1426 len += req->r_old_dentry->d_name.len; 1494 len += req->r_old_dentry->d_name.len;
1427 1495
1428 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL); 1496 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
1429 if (IS_ERR(msg)) 1497 if (!msg) {
1498 msg = ERR_PTR(-ENOMEM);
1430 goto out_free2; 1499 goto out_free2;
1500 }
1431 1501
1432 msg->hdr.tid = cpu_to_le64(req->r_tid); 1502 msg->hdr.tid = cpu_to_le64(req->r_tid);
1433 1503
@@ -1516,9 +1586,9 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
1516 } 1586 }
1517 msg = create_request_message(mdsc, req, mds); 1587 msg = create_request_message(mdsc, req, mds);
1518 if (IS_ERR(msg)) { 1588 if (IS_ERR(msg)) {
1519 req->r_reply = ERR_PTR(PTR_ERR(msg)); 1589 req->r_err = PTR_ERR(msg);
1520 complete_request(mdsc, req); 1590 complete_request(mdsc, req);
1521 return -PTR_ERR(msg); 1591 return PTR_ERR(msg);
1522 } 1592 }
1523 req->r_request = msg; 1593 req->r_request = msg;
1524 1594
@@ -1551,7 +1621,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
1551 int mds = -1; 1621 int mds = -1;
1552 int err = -EAGAIN; 1622 int err = -EAGAIN;
1553 1623
1554 if (req->r_reply) 1624 if (req->r_err || req->r_got_result)
1555 goto out; 1625 goto out;
1556 1626
1557 if (req->r_timeout && 1627 if (req->r_timeout &&
@@ -1608,7 +1678,7 @@ out:
1608 return err; 1678 return err;
1609 1679
1610finish: 1680finish:
1611 req->r_reply = ERR_PTR(err); 1681 req->r_err = err;
1612 complete_request(mdsc, req); 1682 complete_request(mdsc, req);
1613 goto out; 1683 goto out;
1614} 1684}
@@ -1629,10 +1699,9 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
1629 1699
1630/* 1700/*
1631 * Wake up threads with requests pending for @mds, so that they can 1701 * Wake up threads with requests pending for @mds, so that they can
1632 * resubmit their requests to a possibly different mds. If @all is set, 1702 * resubmit their requests to a possibly different mds.
1633 * wake up if their requests has been forwarded to @mds, too.
1634 */ 1703 */
1635static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all) 1704static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1636{ 1705{
1637 struct ceph_mds_request *req; 1706 struct ceph_mds_request *req;
1638 struct rb_node *p; 1707 struct rb_node *p;
@@ -1688,64 +1757,78 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1688 __register_request(mdsc, req, dir); 1757 __register_request(mdsc, req, dir);
1689 __do_request(mdsc, req); 1758 __do_request(mdsc, req);
1690 1759
1691 /* wait */ 1760 if (req->r_err) {
1692 if (!req->r_reply) { 1761 err = req->r_err;
1693 mutex_unlock(&mdsc->mutex); 1762 __unregister_request(mdsc, req);
1694 if (req->r_timeout) { 1763 dout("do_request early error %d\n", err);
1695 err = (long)wait_for_completion_interruptible_timeout( 1764 goto out;
1696 &req->r_completion, req->r_timeout);
1697 if (err == 0)
1698 req->r_reply = ERR_PTR(-EIO);
1699 else if (err < 0)
1700 req->r_reply = ERR_PTR(err);
1701 } else {
1702 err = wait_for_completion_interruptible(
1703 &req->r_completion);
1704 if (err)
1705 req->r_reply = ERR_PTR(err);
1706 }
1707 mutex_lock(&mdsc->mutex);
1708 } 1765 }
1709 1766
1710 if (IS_ERR(req->r_reply)) { 1767 /* wait */
1711 err = PTR_ERR(req->r_reply); 1768 mutex_unlock(&mdsc->mutex);
1712 req->r_reply = NULL; 1769 dout("do_request waiting\n");
1770 if (req->r_timeout) {
1771 err = (long)wait_for_completion_interruptible_timeout(
1772 &req->r_completion, req->r_timeout);
1773 if (err == 0)
1774 err = -EIO;
1775 } else {
1776 err = wait_for_completion_interruptible(&req->r_completion);
1777 }
1778 dout("do_request waited, got %d\n", err);
1779 mutex_lock(&mdsc->mutex);
1713 1780
1714 if (err == -ERESTARTSYS) { 1781 /* only abort if we didn't race with a real reply */
1715 /* aborted */ 1782 if (req->r_got_result) {
1716 req->r_aborted = true; 1783 err = le32_to_cpu(req->r_reply_info.head->result);
1784 } else if (err < 0) {
1785 dout("aborted request %lld with %d\n", req->r_tid, err);
1717 1786
1718 if (req->r_locked_dir && 1787 /*
1719 (req->r_op & CEPH_MDS_OP_WRITE)) { 1788 * ensure we aren't running concurrently with
1720 struct ceph_inode_info *ci = 1789 * ceph_fill_trace or ceph_readdir_prepopulate, which
1721 ceph_inode(req->r_locked_dir); 1790 * rely on locks (dir mutex) held by our caller.
1791 */
1792 mutex_lock(&req->r_fill_mutex);
1793 req->r_err = err;
1794 req->r_aborted = true;
1795 mutex_unlock(&req->r_fill_mutex);
1722 1796
1723 dout("aborted, clearing I_COMPLETE on %p\n", 1797 if (req->r_locked_dir &&
1724 req->r_locked_dir); 1798 (req->r_op & CEPH_MDS_OP_WRITE))
1725 spin_lock(&req->r_locked_dir->i_lock); 1799 ceph_invalidate_dir_request(req);
1726 ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1727 ci->i_release_count++;
1728 spin_unlock(&req->r_locked_dir->i_lock);
1729 }
1730 } else {
1731 /* clean up this request */
1732 __unregister_request(mdsc, req);
1733 if (!list_empty(&req->r_unsafe_item))
1734 list_del_init(&req->r_unsafe_item);
1735 complete(&req->r_safe_completion);
1736 }
1737 } else if (req->r_err) {
1738 err = req->r_err;
1739 } else { 1800 } else {
1740 err = le32_to_cpu(req->r_reply_info.head->result); 1801 err = req->r_err;
1741 } 1802 }
1742 mutex_unlock(&mdsc->mutex);
1743 1803
1804out:
1805 mutex_unlock(&mdsc->mutex);
1744 dout("do_request %p done, result %d\n", req, err); 1806 dout("do_request %p done, result %d\n", req, err);
1745 return err; 1807 return err;
1746} 1808}
1747 1809
1748/* 1810/*
1811 * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
1812 * namespace request.
1813 */
1814void ceph_invalidate_dir_request(struct ceph_mds_request *req)
1815{
1816 struct inode *inode = req->r_locked_dir;
1817 struct ceph_inode_info *ci = ceph_inode(inode);
1818
1819 dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
1820 spin_lock(&inode->i_lock);
1821 ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1822 ci->i_release_count++;
1823 spin_unlock(&inode->i_lock);
1824
1825 if (req->r_dentry)
1826 ceph_invalidate_dentry_lease(req->r_dentry);
1827 if (req->r_old_dentry)
1828 ceph_invalidate_dentry_lease(req->r_old_dentry);
1829}
1830
1831/*
1749 * Handle mds reply. 1832 * Handle mds reply.
1750 * 1833 *
1751 * We take the session mutex and parse and process the reply immediately. 1834 * We take the session mutex and parse and process the reply immediately.
@@ -1796,6 +1879,12 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1796 mutex_unlock(&mdsc->mutex); 1879 mutex_unlock(&mdsc->mutex);
1797 goto out; 1880 goto out;
1798 } 1881 }
1882 if (req->r_got_safe && !head->safe) {
1883 pr_warning("got unsafe after safe on %llu from mds%d\n",
1884 tid, mds);
1885 mutex_unlock(&mdsc->mutex);
1886 goto out;
1887 }
1799 1888
1800 result = le32_to_cpu(head->result); 1889 result = le32_to_cpu(head->result);
1801 1890
@@ -1837,11 +1926,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1837 mutex_unlock(&mdsc->mutex); 1926 mutex_unlock(&mdsc->mutex);
1838 goto out; 1927 goto out;
1839 } 1928 }
1840 } 1929 } else {
1841
1842 BUG_ON(req->r_reply);
1843
1844 if (!head->safe) {
1845 req->r_got_unsafe = true; 1930 req->r_got_unsafe = true;
1846 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 1931 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
1847 } 1932 }
@@ -1870,21 +1955,30 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1870 } 1955 }
1871 1956
1872 /* insert trace into our cache */ 1957 /* insert trace into our cache */
1958 mutex_lock(&req->r_fill_mutex);
1873 err = ceph_fill_trace(mdsc->client->sb, req, req->r_session); 1959 err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
1874 if (err == 0) { 1960 if (err == 0) {
1875 if (result == 0 && rinfo->dir_nr) 1961 if (result == 0 && rinfo->dir_nr)
1876 ceph_readdir_prepopulate(req, req->r_session); 1962 ceph_readdir_prepopulate(req, req->r_session);
1877 ceph_unreserve_caps(&req->r_caps_reservation); 1963 ceph_unreserve_caps(&req->r_caps_reservation);
1878 } 1964 }
1965 mutex_unlock(&req->r_fill_mutex);
1879 1966
1880 up_read(&mdsc->snap_rwsem); 1967 up_read(&mdsc->snap_rwsem);
1881out_err: 1968out_err:
1882 if (err) { 1969 mutex_lock(&mdsc->mutex);
1883 req->r_err = err; 1970 if (!req->r_aborted) {
1971 if (err) {
1972 req->r_err = err;
1973 } else {
1974 req->r_reply = msg;
1975 ceph_msg_get(msg);
1976 req->r_got_result = true;
1977 }
1884 } else { 1978 } else {
1885 req->r_reply = msg; 1979 dout("reply arrived after request %lld was aborted\n", tid);
1886 ceph_msg_get(msg);
1887 } 1980 }
1981 mutex_unlock(&mdsc->mutex);
1888 1982
1889 add_cap_releases(mdsc, req->r_session, -1); 1983 add_cap_releases(mdsc, req->r_session, -1);
1890 mutex_unlock(&session->s_mutex); 1984 mutex_unlock(&session->s_mutex);
@@ -1983,6 +2077,8 @@ static void handle_session(struct ceph_mds_session *session,
1983 2077
1984 switch (op) { 2078 switch (op) {
1985 case CEPH_SESSION_OPEN: 2079 case CEPH_SESSION_OPEN:
2080 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2081 pr_info("mds%d reconnect success\n", session->s_mds);
1986 session->s_state = CEPH_MDS_SESSION_OPEN; 2082 session->s_state = CEPH_MDS_SESSION_OPEN;
1987 renewed_caps(mdsc, session, 0); 2083 renewed_caps(mdsc, session, 0);
1988 wake = 1; 2084 wake = 1;
@@ -1996,10 +2092,12 @@ static void handle_session(struct ceph_mds_session *session,
1996 break; 2092 break;
1997 2093
1998 case CEPH_SESSION_CLOSE: 2094 case CEPH_SESSION_CLOSE:
2095 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2096 pr_info("mds%d reconnect denied\n", session->s_mds);
1999 remove_session_caps(session); 2097 remove_session_caps(session);
2000 wake = 1; /* for good measure */ 2098 wake = 1; /* for good measure */
2001 complete(&mdsc->session_close_waiters); 2099 complete(&mdsc->session_close_waiters);
2002 kick_requests(mdsc, mds, 0); /* cur only */ 2100 kick_requests(mdsc, mds);
2003 break; 2101 break;
2004 2102
2005 case CEPH_SESSION_STALE: 2103 case CEPH_SESSION_STALE:
@@ -2131,61 +2229,51 @@ out:
2131 * 2229 *
2132 * called with mdsc->mutex held. 2230 * called with mdsc->mutex held.
2133 */ 2231 */
2134static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) 2232static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2233 struct ceph_mds_session *session)
2135{ 2234{
2136 struct ceph_mds_session *session = NULL;
2137 struct ceph_msg *reply; 2235 struct ceph_msg *reply;
2138 struct rb_node *p; 2236 struct rb_node *p;
2139 int err; 2237 int mds = session->s_mds;
2238 int err = -ENOMEM;
2140 struct ceph_pagelist *pagelist; 2239 struct ceph_pagelist *pagelist;
2141 2240
2142 pr_info("reconnect to recovering mds%d\n", mds); 2241 pr_info("mds%d reconnect start\n", mds);
2143 2242
2144 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 2243 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2145 if (!pagelist) 2244 if (!pagelist)
2146 goto fail_nopagelist; 2245 goto fail_nopagelist;
2147 ceph_pagelist_init(pagelist); 2246 ceph_pagelist_init(pagelist);
2148 2247
2149 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL); 2248 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
2150 if (IS_ERR(reply)) { 2249 if (!reply)
2151 err = PTR_ERR(reply);
2152 goto fail_nomsg; 2250 goto fail_nomsg;
2153 }
2154
2155 /* find session */
2156 session = __ceph_lookup_mds_session(mdsc, mds);
2157 mutex_unlock(&mdsc->mutex); /* drop lock for duration */
2158
2159 if (session) {
2160 mutex_lock(&session->s_mutex);
2161 2251
2162 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 2252 mutex_lock(&session->s_mutex);
2163 session->s_seq = 0; 2253 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2254 session->s_seq = 0;
2164 2255
2165 ceph_con_open(&session->s_con, 2256 ceph_con_open(&session->s_con,
2166 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 2257 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2167 2258
2168 /* replay unsafe requests */ 2259 /* replay unsafe requests */
2169 replay_unsafe_requests(mdsc, session); 2260 replay_unsafe_requests(mdsc, session);
2170 } else {
2171 dout("no session for mds%d, will send short reconnect\n",
2172 mds);
2173 }
2174 2261
2175 down_read(&mdsc->snap_rwsem); 2262 down_read(&mdsc->snap_rwsem);
2176 2263
2177 if (!session)
2178 goto send;
2179 dout("session %p state %s\n", session, 2264 dout("session %p state %s\n", session,
2180 session_state_name(session->s_state)); 2265 session_state_name(session->s_state));
2181 2266
2267 /* drop old cap expires; we're about to reestablish that state */
2268 discard_cap_releases(mdsc, session);
2269
2182 /* traverse this session's caps */ 2270 /* traverse this session's caps */
2183 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); 2271 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2184 if (err) 2272 if (err)
2185 goto fail; 2273 goto fail;
2186 err = iterate_session_caps(session, encode_caps_cb, pagelist); 2274 err = iterate_session_caps(session, encode_caps_cb, pagelist);
2187 if (err < 0) 2275 if (err < 0)
2188 goto out; 2276 goto fail;
2189 2277
2190 /* 2278 /*
2191 * snaprealms. we provide mds with the ino, seq (version), and 2279 * snaprealms. we provide mds with the ino, seq (version), and
@@ -2207,34 +2295,30 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
2207 goto fail; 2295 goto fail;
2208 } 2296 }
2209 2297
2210send:
2211 reply->pagelist = pagelist; 2298 reply->pagelist = pagelist;
2212 reply->hdr.data_len = cpu_to_le32(pagelist->length); 2299 reply->hdr.data_len = cpu_to_le32(pagelist->length);
2213 reply->nr_pages = calc_pages_for(0, pagelist->length); 2300 reply->nr_pages = calc_pages_for(0, pagelist->length);
2214 ceph_con_send(&session->s_con, reply); 2301 ceph_con_send(&session->s_con, reply);
2215 2302
2216 if (session) { 2303 mutex_unlock(&session->s_mutex);
2217 session->s_state = CEPH_MDS_SESSION_OPEN;
2218 __wake_requests(mdsc, &session->s_waiting);
2219 }
2220 2304
2221out:
2222 up_read(&mdsc->snap_rwsem);
2223 if (session) {
2224 mutex_unlock(&session->s_mutex);
2225 ceph_put_mds_session(session);
2226 }
2227 mutex_lock(&mdsc->mutex); 2305 mutex_lock(&mdsc->mutex);
2306 __wake_requests(mdsc, &session->s_waiting);
2307 mutex_unlock(&mdsc->mutex);
2308
2309 up_read(&mdsc->snap_rwsem);
2228 return; 2310 return;
2229 2311
2230fail: 2312fail:
2231 ceph_msg_put(reply); 2313 ceph_msg_put(reply);
2314 up_read(&mdsc->snap_rwsem);
2315 mutex_unlock(&session->s_mutex);
2232fail_nomsg: 2316fail_nomsg:
2233 ceph_pagelist_release(pagelist); 2317 ceph_pagelist_release(pagelist);
2234 kfree(pagelist); 2318 kfree(pagelist);
2235fail_nopagelist: 2319fail_nopagelist:
2236 pr_err("ENOMEM preparing reconnect for mds%d\n", mds); 2320 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2237 goto out; 2321 return;
2238} 2322}
2239 2323
2240 2324
@@ -2286,7 +2370,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
2286 } 2370 }
2287 2371
2288 /* kick any requests waiting on the recovering mds */ 2372 /* kick any requests waiting on the recovering mds */
2289 kick_requests(mdsc, i, 1); 2373 kick_requests(mdsc, i);
2290 } else if (oldstate == newstate) { 2374 } else if (oldstate == newstate) {
2291 continue; /* nothing new with this mds */ 2375 continue; /* nothing new with this mds */
2292 } 2376 }
@@ -2295,22 +2379,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
2295 * send reconnect? 2379 * send reconnect?
2296 */ 2380 */
2297 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 2381 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2298 newstate >= CEPH_MDS_STATE_RECONNECT) 2382 newstate >= CEPH_MDS_STATE_RECONNECT) {
2299 send_mds_reconnect(mdsc, i); 2383 mutex_unlock(&mdsc->mutex);
2384 send_mds_reconnect(mdsc, s);
2385 mutex_lock(&mdsc->mutex);
2386 }
2300 2387
2301 /* 2388 /*
2302 * kick requests on any mds that has gone active. 2389 * kick request on any mds that has gone active.
2303 *
2304 * kick requests on cur or forwarder: we may have sent
2305 * the request to mds1, mds1 told us it forwarded it
2306 * to mds2, but then we learn mds1 failed and can't be
2307 * sure it successfully forwarded our request before
2308 * it died.
2309 */ 2390 */
2310 if (oldstate < CEPH_MDS_STATE_ACTIVE && 2391 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2311 newstate >= CEPH_MDS_STATE_ACTIVE) { 2392 newstate >= CEPH_MDS_STATE_ACTIVE) {
2312 pr_info("mds%d reconnect completed\n", s->s_mds); 2393 if (oldstate != CEPH_MDS_STATE_CREATING &&
2313 kick_requests(mdsc, i, 1); 2394 oldstate != CEPH_MDS_STATE_STARTING)
2395 pr_info("mds%d recovery completed\n", s->s_mds);
2396 kick_requests(mdsc, i);
2314 ceph_kick_flushing_caps(mdsc, s); 2397 ceph_kick_flushing_caps(mdsc, s);
2315 wake_up_session_caps(s, 1); 2398 wake_up_session_caps(s, 1);
2316 } 2399 }
@@ -2453,8 +2536,8 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2453 dnamelen = dentry->d_name.len; 2536 dnamelen = dentry->d_name.len;
2454 len += dnamelen; 2537 len += dnamelen;
2455 2538
2456 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL); 2539 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
2457 if (IS_ERR(msg)) 2540 if (!msg)
2458 return; 2541 return;
2459 lease = msg->front.iov_base; 2542 lease = msg->front.iov_base;
2460 lease->action = action; 2543 lease->action = action;
@@ -2599,7 +2682,9 @@ static void delayed_work(struct work_struct *work)
2599 else 2682 else
2600 ceph_con_keepalive(&s->s_con); 2683 ceph_con_keepalive(&s->s_con);
2601 add_cap_releases(mdsc, s, -1); 2684 add_cap_releases(mdsc, s, -1);
2602 send_cap_releases(mdsc, s); 2685 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2686 s->s_state == CEPH_MDS_SESSION_HUNG)
2687 send_cap_releases(mdsc, s);
2603 mutex_unlock(&s->s_mutex); 2688 mutex_unlock(&s->s_mutex);
2604 ceph_put_mds_session(s); 2689 ceph_put_mds_session(s);
2605 2690
@@ -2616,6 +2701,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2616 mdsc->client = client; 2701 mdsc->client = client;
2617 mutex_init(&mdsc->mutex); 2702 mutex_init(&mdsc->mutex);
2618 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 2703 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2704 if (mdsc->mdsmap == NULL)
2705 return -ENOMEM;
2706
2619 init_completion(&mdsc->safe_umount_waiters); 2707 init_completion(&mdsc->safe_umount_waiters);
2620 init_completion(&mdsc->session_close_waiters); 2708 init_completion(&mdsc->session_close_waiters);
2621 INIT_LIST_HEAD(&mdsc->waiting_for_map); 2709 INIT_LIST_HEAD(&mdsc->waiting_for_map);
@@ -2641,6 +2729,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2641 init_waitqueue_head(&mdsc->cap_flushing_wq); 2729 init_waitqueue_head(&mdsc->cap_flushing_wq);
2642 spin_lock_init(&mdsc->dentry_lru_lock); 2730 spin_lock_init(&mdsc->dentry_lru_lock);
2643 INIT_LIST_HEAD(&mdsc->dentry_lru); 2731 INIT_LIST_HEAD(&mdsc->dentry_lru);
2732
2644 return 0; 2733 return 0;
2645} 2734}
2646 2735
@@ -2736,6 +2825,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
2736{ 2825{
2737 u64 want_tid, want_flush; 2826 u64 want_tid, want_flush;
2738 2827
2828 if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
2829 return;
2830
2739 dout("sync\n"); 2831 dout("sync\n");
2740 mutex_lock(&mdsc->mutex); 2832 mutex_lock(&mdsc->mutex);
2741 want_tid = mdsc->last_tid; 2833 want_tid = mdsc->last_tid;
@@ -2918,9 +3010,10 @@ static void con_put(struct ceph_connection *con)
2918static void peer_reset(struct ceph_connection *con) 3010static void peer_reset(struct ceph_connection *con)
2919{ 3011{
2920 struct ceph_mds_session *s = con->private; 3012 struct ceph_mds_session *s = con->private;
3013 struct ceph_mds_client *mdsc = s->s_mdsc;
2921 3014
2922 pr_err("mds%d gave us the boot. IMPLEMENT RECONNECT.\n", 3015 pr_warning("mds%d closed our session\n", s->s_mds);
2923 s->s_mds); 3016 send_mds_reconnect(mdsc, s);
2924} 3017}
2925 3018
2926static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 3019static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
@@ -3027,7 +3120,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
3027 return ceph_monc_validate_auth(&mdsc->client->monc); 3120 return ceph_monc_validate_auth(&mdsc->client->monc);
3028} 3121}
3029 3122
3030const static struct ceph_connection_operations mds_con_ops = { 3123static const struct ceph_connection_operations mds_con_ops = {
3031 .get = con_get, 3124 .get = con_get,
3032 .put = con_put, 3125 .put = con_put,
3033 .dispatch = dispatch, 3126 .dispatch = dispatch,