aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2016-04-28 10:07:24 -0400
committerIlya Dryomov <idryomov@gmail.com>2016-05-25 18:36:28 -0400
commitfe5da05e979830b43b115d8a18ead521d507c783 (patch)
tree6afb8cacfd090d9d3b46a0a9976930957bb2e6cf /net/ceph
parent85e084feb47349d62989efe1713a8723af95f4ea (diff)
libceph: redo callbacks and factor out MOSDOpReply decoding
If you specify ACK | ONDISK and set ->r_unsafe_callback, both ->r_callback and ->r_unsafe_callback(true) are called on ack. This is very confusing. Redo this so that only one of them is called: ->r_unsafe_callback(true), on ack ->r_unsafe_callback(false), on commit or ->r_callback, on ack|commit Decode everything in decode_MOSDOpReply() to reduce clutter. Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/osd_client.c362
1 files changed, 209 insertions, 153 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2a30c0bb3045..baf2844b00d6 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1693,6 +1693,14 @@ static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
1693 return 0; 1693 return 0;
1694} 1694}
1695 1695
1696static void __complete_request(struct ceph_osd_request *req)
1697{
1698 if (req->r_callback)
1699 req->r_callback(req);
1700 else
1701 complete_all(&req->r_completion);
1702}
1703
1696/* 1704/*
1697 * Timeout callback, called every N seconds when 1 or more osd 1705 * Timeout callback, called every N seconds when 1 or more osd
1698 * requests has been active for more than N seconds. When this 1706 * requests has been active for more than N seconds. When this
@@ -1875,107 +1883,76 @@ e_inval:
1875 goto out; 1883 goto out;
1876} 1884}
1877 1885
1878static void complete_request(struct ceph_osd_request *req) 1886struct MOSDOpReply {
1879{ 1887 struct ceph_pg pgid;
1880 complete_all(&req->r_safe_completion); /* fsync waiter */ 1888 u64 flags;
1881} 1889 int result;
1890 u32 epoch;
1891 int num_ops;
1892 u32 outdata_len[CEPH_OSD_MAX_OPS];
1893 s32 rval[CEPH_OSD_MAX_OPS];
1894 int retry_attempt;
1895 struct ceph_eversion replay_version;
1896 u64 user_version;
1897 struct ceph_request_redirect redirect;
1898};
1882 1899
1883/* 1900static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
1884 * handle osd op reply. either call the callback if it is specified,
1885 * or do the completion to wake up the waiting thread.
1886 */
1887static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1888{ 1901{
1889 void *p, *end; 1902 void *p = msg->front.iov_base;
1890 struct ceph_osd_request *req; 1903 void *const end = p + msg->front.iov_len;
1891 struct ceph_request_redirect redir; 1904 u16 version = le16_to_cpu(msg->hdr.version);
1892 u64 tid; 1905 struct ceph_eversion bad_replay_version;
1893 int object_len;
1894 unsigned int numops;
1895 int payload_len, flags;
1896 s32 result;
1897 s32 retry_attempt;
1898 struct ceph_pg pg;
1899 int err;
1900 u32 reassert_epoch;
1901 u64 reassert_version;
1902 u32 osdmap_epoch;
1903 int already_completed;
1904 u32 bytes;
1905 u8 decode_redir; 1906 u8 decode_redir;
1906 unsigned int i; 1907 u32 len;
1907 1908 int ret;
1908 tid = le64_to_cpu(msg->hdr.tid); 1909 int i;
1909 dout("handle_reply %p tid %llu\n", msg, tid);
1910
1911 p = msg->front.iov_base;
1912 end = p + msg->front.iov_len;
1913 1910
1914 ceph_decode_need(&p, end, 4, bad); 1911 ceph_decode_32_safe(&p, end, len, e_inval);
1915 object_len = ceph_decode_32(&p); 1912 ceph_decode_need(&p, end, len, e_inval);
1916 ceph_decode_need(&p, end, object_len, bad); 1913 p += len; /* skip oid */
1917 p += object_len;
1918 1914
1919 err = ceph_decode_pgid(&p, end, &pg); 1915 ret = ceph_decode_pgid(&p, end, &m->pgid);
1920 if (err) 1916 if (ret)
1921 goto bad; 1917 return ret;
1922 1918
1923 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1919 ceph_decode_64_safe(&p, end, m->flags, e_inval);
1924 flags = ceph_decode_64(&p); 1920 ceph_decode_32_safe(&p, end, m->result, e_inval);
1925 result = ceph_decode_32(&p); 1921 ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
1926 reassert_epoch = ceph_decode_32(&p); 1922 memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
1927 reassert_version = ceph_decode_64(&p); 1923 p += sizeof(bad_replay_version);
1928 osdmap_epoch = ceph_decode_32(&p); 1924 ceph_decode_32_safe(&p, end, m->epoch, e_inval);
1929 1925
1930 /* lookup */ 1926 ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
1931 down_read(&osdc->map_sem); 1927 if (m->num_ops > ARRAY_SIZE(m->outdata_len))
1932 mutex_lock(&osdc->request_mutex); 1928 goto e_inval;
1933 req = lookup_request(&osdc->requests, tid);
1934 if (req == NULL) {
1935 dout("handle_reply tid %llu dne\n", tid);
1936 goto bad_mutex;
1937 }
1938 ceph_osdc_get_request(req);
1939 1929
1940 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1930 ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
1941 req, result); 1931 e_inval);
1942 1932 for (i = 0; i < m->num_ops; i++) {
1943 ceph_decode_need(&p, end, 4, bad_put);
1944 numops = ceph_decode_32(&p);
1945 if (numops > CEPH_OSD_MAX_OPS)
1946 goto bad_put;
1947 if (numops != req->r_num_ops)
1948 goto bad_put;
1949 payload_len = 0;
1950 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put);
1951 for (i = 0; i < numops; i++) {
1952 struct ceph_osd_op *op = p; 1933 struct ceph_osd_op *op = p;
1953 int len;
1954 1934
1955 len = le32_to_cpu(op->payload_len); 1935 m->outdata_len[i] = le32_to_cpu(op->payload_len);
1956 req->r_ops[i].outdata_len = len;
1957 dout(" op %d has %d bytes\n", i, len);
1958 payload_len += len;
1959 p += sizeof(*op); 1936 p += sizeof(*op);
1960 } 1937 }
1961 bytes = le32_to_cpu(msg->hdr.data_len);
1962 if (payload_len != bytes) {
1963 pr_warn("sum of op payload lens %d != data_len %d\n",
1964 payload_len, bytes);
1965 goto bad_put;
1966 }
1967 1938
1968 ceph_decode_need(&p, end, 4 + numops * 4, bad_put); 1939 ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
1969 retry_attempt = ceph_decode_32(&p); 1940 for (i = 0; i < m->num_ops; i++)
1970 for (i = 0; i < numops; i++) 1941 ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
1971 req->r_ops[i].rval = ceph_decode_32(&p);
1972 1942
1973 if (le16_to_cpu(msg->hdr.version) >= 6) { 1943 if (version >= 5) {
1974 p += 8 + 4; /* skip replay_version */ 1944 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
1975 p += 8; /* skip user_version */ 1945 memcpy(&m->replay_version, p, sizeof(m->replay_version));
1946 p += sizeof(m->replay_version);
1947 ceph_decode_64_safe(&p, end, m->user_version, e_inval);
1948 } else {
1949 m->replay_version = bad_replay_version; /* struct */
1950 m->user_version = le64_to_cpu(m->replay_version.version);
1951 }
1976 1952
1977 if (le16_to_cpu(msg->hdr.version) >= 7) 1953 if (version >= 6) {
1978 ceph_decode_8_safe(&p, end, decode_redir, bad_put); 1954 if (version >= 7)
1955 ceph_decode_8_safe(&p, end, decode_redir, e_inval);
1979 else 1956 else
1980 decode_redir = 1; 1957 decode_redir = 1;
1981 } else { 1958 } else {
@@ -1983,19 +1960,96 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1983 } 1960 }
1984 1961
1985 if (decode_redir) { 1962 if (decode_redir) {
1986 err = ceph_redirect_decode(&p, end, &redir); 1963 ret = ceph_redirect_decode(&p, end, &m->redirect);
1987 if (err) 1964 if (ret)
1988 goto bad_put; 1965 return ret;
1989 } else { 1966 } else {
1990 redir.oloc.pool = -1; 1967 ceph_oloc_init(&m->redirect.oloc);
1991 } 1968 }
1992 1969
1993 if (!ceph_oloc_empty(&redir.oloc)) { 1970 return 0;
1994 dout("redirect pool %lld\n", redir.oloc.pool); 1971
1972e_inval:
1973 return -EINVAL;
1974}
1975
1976/*
1977 * We are done with @req if
1978 * - @m is a safe reply, or
1979 * - @m is an unsafe reply and we didn't want a safe one
1980 */
1981static bool done_request(const struct ceph_osd_request *req,
1982 const struct MOSDOpReply *m)
1983{
1984 return (m->result < 0 ||
1985 (m->flags & CEPH_OSD_FLAG_ONDISK) ||
1986 !(req->r_flags & CEPH_OSD_FLAG_ONDISK));
1987}
1995 1988
1989/*
1990 * handle osd op reply. either call the callback if it is specified,
1991 * or do the completion to wake up the waiting thread.
1992 *
1993 * ->r_unsafe_callback is set? yes no
1994 *
1995 * first reply is OK (needed r_cb/r_completion, r_cb/r_completion,
1996 * any or needed/got safe) r_safe_completion r_safe_completion
1997 *
1998 * first reply is unsafe r_unsafe_cb(true) (nothing)
1999 *
2000 * when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
2001 * r_safe_completion r_safe_completion
2002 */
2003static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
2004{
2005 struct ceph_osd_request *req;
2006 struct MOSDOpReply m;
2007 u64 tid = le64_to_cpu(msg->hdr.tid);
2008 u32 data_len = 0;
2009 bool already_acked;
2010 int ret;
2011 int i;
2012
2013 dout("%s msg %p tid %llu\n", __func__, msg, tid);
2014
2015 down_read(&osdc->map_sem);
2016 mutex_lock(&osdc->request_mutex);
2017 req = lookup_request(&osdc->requests, tid);
2018 if (!req) {
2019 dout("%s no tid %llu\n", __func__, tid);
2020 goto out_unlock;
2021 }
2022 ceph_osdc_get_request(req);
2023
2024 ret = decode_MOSDOpReply(msg, &m);
2025 if (ret) {
2026 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
2027 req->r_tid, ret);
2028 ceph_msg_dump(msg);
2029 goto fail_request;
2030 }
2031 dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
2032 __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
2033 m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
2034 le64_to_cpu(m.replay_version.version), m.user_version);
2035
2036 if (m.retry_attempt >= 0) {
2037 if (m.retry_attempt != req->r_attempts - 1) {
2038 dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
2039 req, req->r_tid, m.retry_attempt,
2040 req->r_attempts - 1);
2041 goto out_put;
2042 }
2043 } else {
2044 WARN_ON(1); /* MOSDOpReply v4 is assumed */
2045 }
2046
2047 if (!ceph_oloc_empty(&m.redirect.oloc)) {
2048 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
2049 m.redirect.oloc.pool);
1996 __unregister_request(osdc, req); 2050 __unregister_request(osdc, req);
1997 2051
1998 ceph_oloc_copy(&req->r_t.target_oloc, &redir.oloc); 2052 ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
1999 2053
2000 /* 2054 /*
2001 * Start redirect requests with nofail=true. If 2055 * Start redirect requests with nofail=true. If
@@ -2005,85 +2059,85 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
2005 * successfully. In the future we might want to follow 2059 * successfully. In the future we might want to follow
2006 * original request's nofail setting here. 2060 * original request's nofail setting here.
2007 */ 2061 */
2008 err = __ceph_osdc_start_request(osdc, req, true); 2062 ret = __ceph_osdc_start_request(osdc, req, true);
2009 BUG_ON(err); 2063 BUG_ON(ret);
2010 2064
2011 goto out_unlock; 2065 goto out_put;
2012 } 2066 }
2013 2067
2014 already_completed = req->r_got_reply; 2068 if (m.num_ops != req->r_num_ops) {
2015 if (!req->r_got_reply) { 2069 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
2016 req->r_result = result; 2070 req->r_num_ops, req->r_tid);
2017 dout("handle_reply result %d bytes %d\n", req->r_result, 2071 goto fail_request;
2018 bytes);
2019 if (req->r_result == 0)
2020 req->r_result = bytes;
2021
2022 /* in case this is a write and we need to replay, */
2023 req->r_replay_version.epoch = cpu_to_le32(reassert_epoch);
2024 req->r_replay_version.version = cpu_to_le64(reassert_version);
2025
2026 req->r_got_reply = 1;
2027 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
2028 dout("handle_reply tid %llu dup ack\n", tid);
2029 goto out_unlock;
2030 } 2072 }
2031 2073 for (i = 0; i < req->r_num_ops; i++) {
2032 dout("handle_reply tid %llu flags %d\n", tid, flags); 2074 dout(" req %p tid %llu op %d rval %d len %u\n", req,
2033 2075 req->r_tid, i, m.rval[i], m.outdata_len[i]);
2034 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 2076 req->r_ops[i].rval = m.rval[i];
2035 __register_linger_request(osdc, req); 2077 req->r_ops[i].outdata_len = m.outdata_len[i];
2036 2078 data_len += m.outdata_len[i];
2037 /* either this is a read, or we got the safe response */ 2079 }
2038 if (result < 0 || 2080 if (data_len != le32_to_cpu(msg->hdr.data_len)) {
2039 (flags & CEPH_OSD_FLAG_ONDISK) || 2081 pr_err("sum of lens %u != %u for tid %llu\n", data_len,
2040 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 2082 le32_to_cpu(msg->hdr.data_len), req->r_tid);
2083 goto fail_request;
2084 }
2085 dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__,
2086 req, req->r_tid, req->r_got_reply, m.result, data_len);
2087
2088 already_acked = req->r_got_reply;
2089 if (!already_acked) {
2090 req->r_result = m.result ?: data_len;
2091 req->r_replay_version = m.replay_version; /* struct */
2092 req->r_got_reply = true;
2093 } else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
2094 dout("req %p tid %llu dup ack\n", req, req->r_tid);
2095 goto out_put;
2096 }
2097
2098 if (done_request(req, &m)) {
2041 __unregister_request(osdc, req); 2099 __unregister_request(osdc, req);
2100 if (req->r_linger) {
2101 WARN_ON(req->r_unsafe_callback);
2102 __register_linger_request(osdc, req);
2103 }
2104 }
2042 2105
2043 mutex_unlock(&osdc->request_mutex); 2106 mutex_unlock(&osdc->request_mutex);
2044 up_read(&osdc->map_sem); 2107 up_read(&osdc->map_sem);
2045 2108
2046 if (!already_completed) { 2109 if (done_request(req, &m)) {
2047 if (req->r_unsafe_callback && 2110 if (already_acked && req->r_unsafe_callback) {
2048 result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) 2111 dout("req %p tid %llu safe-cb\n", req, req->r_tid);
2049 req->r_unsafe_callback(req, true);
2050 if (req->r_callback)
2051 req->r_callback(req);
2052 else
2053 complete_all(&req->r_completion);
2054 }
2055
2056 if (flags & CEPH_OSD_FLAG_ONDISK) {
2057 if (req->r_unsafe_callback && already_completed)
2058 req->r_unsafe_callback(req, false); 2112 req->r_unsafe_callback(req, false);
2059 complete_request(req); 2113 } else {
2114 dout("req %p tid %llu cb\n", req, req->r_tid);
2115 __complete_request(req);
2116 }
2117 } else {
2118 if (req->r_unsafe_callback) {
2119 dout("req %p tid %llu unsafe-cb\n", req, req->r_tid);
2120 req->r_unsafe_callback(req, true);
2121 } else {
2122 WARN_ON(1);
2123 }
2060 } 2124 }
2125 if (m.flags & CEPH_OSD_FLAG_ONDISK)
2126 complete_all(&req->r_safe_completion);
2061 2127
2062out:
2063 dout("req=%p req->r_linger=%d\n", req, req->r_linger);
2064 ceph_osdc_put_request(req); 2128 ceph_osdc_put_request(req);
2065 return; 2129 return;
2066out_unlock:
2067 mutex_unlock(&osdc->request_mutex);
2068 up_read(&osdc->map_sem);
2069 goto out;
2070 2130
2071bad_put: 2131fail_request:
2072 req->r_result = -EIO; 2132 req->r_result = -EIO;
2073 __unregister_request(osdc, req); 2133 __unregister_request(osdc, req);
2074 if (req->r_callback) 2134 __complete_request(req);
2075 req->r_callback(req); 2135 complete_all(&req->r_safe_completion);
2076 else 2136out_put:
2077 complete_all(&req->r_completion);
2078 complete_request(req);
2079 ceph_osdc_put_request(req); 2137 ceph_osdc_put_request(req);
2080bad_mutex: 2138out_unlock:
2081 mutex_unlock(&osdc->request_mutex); 2139 mutex_unlock(&osdc->request_mutex);
2082 up_read(&osdc->map_sem); 2140 up_read(&osdc->map_sem);
2083bad:
2084 pr_err("corrupt osd_op_reply got %d %d\n",
2085 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
2086 ceph_msg_dump(msg);
2087} 2141}
2088 2142
2089static void reset_changed_osds(struct ceph_osd_client *osdc) 2143static void reset_changed_osds(struct ceph_osd_client *osdc)
@@ -2591,7 +2645,9 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
2591 if (rc < 0) { 2645 if (rc < 0) {
2592 dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid); 2646 dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
2593 ceph_osdc_cancel_request(req); 2647 ceph_osdc_cancel_request(req);
2594 complete_request(req); 2648
2649 /* kludge - need to to wake ceph_osdc_sync() */
2650 complete_all(&req->r_safe_completion);
2595 return rc; 2651 return rc;
2596 } 2652 }
2597 2653