aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2010-05-13 14:19:06 -0400
committerSage Weil <sage@newdream.net>2010-05-17 13:25:44 -0400
commite1518c7c0a67a75727f7285780dbef0ca7121cc9 (patch)
tree0f0895ab0291f7ddf2ccbcfbd77018eebce171d5 /fs/ceph/mds_client.c
parente40152ee1e1c7a63f4777791863215e3faa37a86 (diff)
ceph: clean up mds reply, error handling
We would occasionally BUG out in the reply handler because r_reply was nonzero, due to a race with ceph_mdsc_do_request temporarily setting r_reply to an ERR_PTR value. This is unnecessary, messy, and also wrong in the EIO case. Clean up by consistently using r_err for errors and r_reply for messages. Also fix the abort logic to trigger consistently for all errors that return to the caller early (e.g., EIO from timeout case). If an abort races with a reply, use the result from the reply. Also fix locking for r_err, r_reply update in the reply handler. Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c111
1 files changed, 54 insertions, 57 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 24561a557e01..b3b19f05b821 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1517,7 +1517,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
1517 } 1517 }
1518 msg = create_request_message(mdsc, req, mds); 1518 msg = create_request_message(mdsc, req, mds);
1519 if (IS_ERR(msg)) { 1519 if (IS_ERR(msg)) {
1520 req->r_reply = ERR_PTR(PTR_ERR(msg)); 1520 req->r_err = PTR_ERR(msg);
1521 complete_request(mdsc, req); 1521 complete_request(mdsc, req);
1522 return -PTR_ERR(msg); 1522 return -PTR_ERR(msg);
1523 } 1523 }
@@ -1552,7 +1552,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
1552 int mds = -1; 1552 int mds = -1;
1553 int err = -EAGAIN; 1553 int err = -EAGAIN;
1554 1554
1555 if (req->r_reply) 1555 if (req->r_err || req->r_got_result)
1556 goto out; 1556 goto out;
1557 1557
1558 if (req->r_timeout && 1558 if (req->r_timeout &&
@@ -1609,7 +1609,7 @@ out:
1609 return err; 1609 return err;
1610 1610
1611finish: 1611finish:
1612 req->r_reply = ERR_PTR(err); 1612 req->r_err = err;
1613 complete_request(mdsc, req); 1613 complete_request(mdsc, req);
1614 goto out; 1614 goto out;
1615} 1615}
@@ -1689,59 +1689,53 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1689 __register_request(mdsc, req, dir); 1689 __register_request(mdsc, req, dir);
1690 __do_request(mdsc, req); 1690 __do_request(mdsc, req);
1691 1691
1692 /* wait */ 1692 if (req->r_err) {
1693 if (!req->r_reply) { 1693 err = req->r_err;
1694 mutex_unlock(&mdsc->mutex); 1694 __unregister_request(mdsc, req);
1695 if (req->r_timeout) { 1695 dout("do_request early error %d\n", err);
1696 err = (long)wait_for_completion_interruptible_timeout( 1696 goto out;
1697 &req->r_completion, req->r_timeout);
1698 if (err == 0)
1699 req->r_reply = ERR_PTR(-EIO);
1700 else if (err < 0)
1701 req->r_reply = ERR_PTR(err);
1702 } else {
1703 err = wait_for_completion_interruptible(
1704 &req->r_completion);
1705 if (err)
1706 req->r_reply = ERR_PTR(err);
1707 }
1708 mutex_lock(&mdsc->mutex);
1709 } 1697 }
1710 1698
1711 if (IS_ERR(req->r_reply)) { 1699 /* wait */
1712 err = PTR_ERR(req->r_reply); 1700 mutex_unlock(&mdsc->mutex);
1713 req->r_reply = NULL; 1701 dout("do_request waiting\n");
1714 1702 if (req->r_timeout) {
1715 if (err == -ERESTARTSYS) { 1703 err = (long)wait_for_completion_interruptible_timeout(
1716 /* aborted */ 1704 &req->r_completion, req->r_timeout);
1717 req->r_aborted = true; 1705 if (err == 0)
1706 err = -EIO;
1707 } else {
1708 err = wait_for_completion_interruptible(&req->r_completion);
1709 }
1710 dout("do_request waited, got %d\n", err);
1711 mutex_lock(&mdsc->mutex);
1718 1712
1719 if (req->r_locked_dir && 1713 /* only abort if we didn't race with a real reply */
1720 (req->r_op & CEPH_MDS_OP_WRITE)) { 1714 if (req->r_got_result) {
1721 struct ceph_inode_info *ci = 1715 err = le32_to_cpu(req->r_reply_info.head->result);
1722 ceph_inode(req->r_locked_dir); 1716 } else if (err < 0) {
1717 dout("aborted request %lld with %d\n", req->r_tid, err);
1718 req->r_err = err;
1719 req->r_aborted = true;
1723 1720
1724 dout("aborted, clearing I_COMPLETE on %p\n", 1721 if (req->r_locked_dir &&
1725 req->r_locked_dir); 1722 (req->r_op & CEPH_MDS_OP_WRITE)) {
1726 spin_lock(&req->r_locked_dir->i_lock); 1723 struct ceph_inode_info *ci =
1727 ci->i_ceph_flags &= ~CEPH_I_COMPLETE; 1724 ceph_inode(req->r_locked_dir);
1728 ci->i_release_count++; 1725
1729 spin_unlock(&req->r_locked_dir->i_lock); 1726 dout("aborted, clearing I_COMPLETE on %p\n",
1730 } 1727 req->r_locked_dir);
1731 } else { 1728 spin_lock(&req->r_locked_dir->i_lock);
1732 /* clean up this request */ 1729 ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1733 __unregister_request(mdsc, req); 1730 ci->i_release_count++;
1734 if (!list_empty(&req->r_unsafe_item)) 1731 spin_unlock(&req->r_locked_dir->i_lock);
1735 list_del_init(&req->r_unsafe_item);
1736 complete(&req->r_safe_completion);
1737 } 1732 }
1738 } else if (req->r_err) {
1739 err = req->r_err;
1740 } else { 1733 } else {
1741 err = le32_to_cpu(req->r_reply_info.head->result); 1734 err = req->r_err;
1742 } 1735 }
1743 mutex_unlock(&mdsc->mutex);
1744 1736
1737out:
1738 mutex_unlock(&mdsc->mutex);
1745 dout("do_request %p done, result %d\n", req, err); 1739 dout("do_request %p done, result %d\n", req, err);
1746 return err; 1740 return err;
1747} 1741}
@@ -1838,11 +1832,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1838 mutex_unlock(&mdsc->mutex); 1832 mutex_unlock(&mdsc->mutex);
1839 goto out; 1833 goto out;
1840 } 1834 }
1841 } 1835 } else {
1842
1843 BUG_ON(req->r_reply);
1844
1845 if (!head->safe) {
1846 req->r_got_unsafe = true; 1836 req->r_got_unsafe = true;
1847 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 1837 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
1848 } 1838 }
@@ -1880,12 +1870,19 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1880 1870
1881 up_read(&mdsc->snap_rwsem); 1871 up_read(&mdsc->snap_rwsem);
1882out_err: 1872out_err:
1883 if (err) { 1873 mutex_lock(&mdsc->mutex);
1884 req->r_err = err; 1874 if (!req->r_aborted) {
1875 if (err) {
1876 req->r_err = err;
1877 } else {
1878 req->r_reply = msg;
1879 ceph_msg_get(msg);
1880 req->r_got_result = true;
1881 }
1885 } else { 1882 } else {
1886 req->r_reply = msg; 1883 dout("reply arrived after request %lld was aborted\n", tid);
1887 ceph_msg_get(msg);
1888 } 1884 }
1885 mutex_unlock(&mdsc->mutex);
1889 1886
1890 add_cap_releases(mdsc, req->r_session, -1); 1887 add_cap_releases(mdsc, req->r_session, -1);
1891 mutex_unlock(&session->s_mutex); 1888 mutex_unlock(&session->s_mutex);