diff options
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 362 |
1 files changed, 209 insertions, 153 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 2a30c0bb3045..baf2844b00d6 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c | |||
@@ -1693,6 +1693,14 @@ static int __ceph_osdc_start_request(struct ceph_osd_client *osdc, | |||
1693 | return 0; | 1693 | return 0; |
1694 | } | 1694 | } |
1695 | 1695 | ||
1696 | static void __complete_request(struct ceph_osd_request *req) | ||
1697 | { | ||
1698 | if (req->r_callback) | ||
1699 | req->r_callback(req); | ||
1700 | else | ||
1701 | complete_all(&req->r_completion); | ||
1702 | } | ||
1703 | |||
1696 | /* | 1704 | /* |
1697 | * Timeout callback, called every N seconds when 1 or more osd | 1705 | * Timeout callback, called every N seconds when 1 or more osd |
1698 | * requests has been active for more than N seconds. When this | 1706 | * requests has been active for more than N seconds. When this |
@@ -1875,107 +1883,76 @@ e_inval: | |||
1875 | goto out; | 1883 | goto out; |
1876 | } | 1884 | } |
1877 | 1885 | ||
1878 | static void complete_request(struct ceph_osd_request *req) | 1886 | struct MOSDOpReply { |
1879 | { | 1887 | struct ceph_pg pgid; |
1880 | complete_all(&req->r_safe_completion); /* fsync waiter */ | 1888 | u64 flags; |
1881 | } | 1889 | int result; |
1890 | u32 epoch; | ||
1891 | int num_ops; | ||
1892 | u32 outdata_len[CEPH_OSD_MAX_OPS]; | ||
1893 | s32 rval[CEPH_OSD_MAX_OPS]; | ||
1894 | int retry_attempt; | ||
1895 | struct ceph_eversion replay_version; | ||
1896 | u64 user_version; | ||
1897 | struct ceph_request_redirect redirect; | ||
1898 | }; | ||
1882 | 1899 | ||
1883 | /* | 1900 | static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m) |
1884 | * handle osd op reply. either call the callback if it is specified, | ||
1885 | * or do the completion to wake up the waiting thread. | ||
1886 | */ | ||
1887 | static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) | ||
1888 | { | 1901 | { |
1889 | void *p, *end; | 1902 | void *p = msg->front.iov_base; |
1890 | struct ceph_osd_request *req; | 1903 | void *const end = p + msg->front.iov_len; |
1891 | struct ceph_request_redirect redir; | 1904 | u16 version = le16_to_cpu(msg->hdr.version); |
1892 | u64 tid; | 1905 | struct ceph_eversion bad_replay_version; |
1893 | int object_len; | ||
1894 | unsigned int numops; | ||
1895 | int payload_len, flags; | ||
1896 | s32 result; | ||
1897 | s32 retry_attempt; | ||
1898 | struct ceph_pg pg; | ||
1899 | int err; | ||
1900 | u32 reassert_epoch; | ||
1901 | u64 reassert_version; | ||
1902 | u32 osdmap_epoch; | ||
1903 | int already_completed; | ||
1904 | u32 bytes; | ||
1905 | u8 decode_redir; | 1906 | u8 decode_redir; |
1906 | unsigned int i; | 1907 | u32 len; |
1907 | 1908 | int ret; | |
1908 | tid = le64_to_cpu(msg->hdr.tid); | 1909 | int i; |
1909 | dout("handle_reply %p tid %llu\n", msg, tid); | ||
1910 | |||
1911 | p = msg->front.iov_base; | ||
1912 | end = p + msg->front.iov_len; | ||
1913 | 1910 | ||
1914 | ceph_decode_need(&p, end, 4, bad); | 1911 | ceph_decode_32_safe(&p, end, len, e_inval); |
1915 | object_len = ceph_decode_32(&p); | 1912 | ceph_decode_need(&p, end, len, e_inval); |
1916 | ceph_decode_need(&p, end, object_len, bad); | 1913 | p += len; /* skip oid */ |
1917 | p += object_len; | ||
1918 | 1914 | ||
1919 | err = ceph_decode_pgid(&p, end, &pg); | 1915 | ret = ceph_decode_pgid(&p, end, &m->pgid); |
1920 | if (err) | 1916 | if (ret) |
1921 | goto bad; | 1917 | return ret; |
1922 | 1918 | ||
1923 | ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); | 1919 | ceph_decode_64_safe(&p, end, m->flags, e_inval); |
1924 | flags = ceph_decode_64(&p); | 1920 | ceph_decode_32_safe(&p, end, m->result, e_inval); |
1925 | result = ceph_decode_32(&p); | 1921 | ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval); |
1926 | reassert_epoch = ceph_decode_32(&p); | 1922 | memcpy(&bad_replay_version, p, sizeof(bad_replay_version)); |
1927 | reassert_version = ceph_decode_64(&p); | 1923 | p += sizeof(bad_replay_version); |
1928 | osdmap_epoch = ceph_decode_32(&p); | 1924 | ceph_decode_32_safe(&p, end, m->epoch, e_inval); |
1929 | 1925 | ||
1930 | /* lookup */ | 1926 | ceph_decode_32_safe(&p, end, m->num_ops, e_inval); |
1931 | down_read(&osdc->map_sem); | 1927 | if (m->num_ops > ARRAY_SIZE(m->outdata_len)) |
1932 | mutex_lock(&osdc->request_mutex); | 1928 | goto e_inval; |
1933 | req = lookup_request(&osdc->requests, tid); | ||
1934 | if (req == NULL) { | ||
1935 | dout("handle_reply tid %llu dne\n", tid); | ||
1936 | goto bad_mutex; | ||
1937 | } | ||
1938 | ceph_osdc_get_request(req); | ||
1939 | 1929 | ||
1940 | dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, | 1930 | ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op), |
1941 | req, result); | 1931 | e_inval); |
1942 | 1932 | for (i = 0; i < m->num_ops; i++) { | |
1943 | ceph_decode_need(&p, end, 4, bad_put); | ||
1944 | numops = ceph_decode_32(&p); | ||
1945 | if (numops > CEPH_OSD_MAX_OPS) | ||
1946 | goto bad_put; | ||
1947 | if (numops != req->r_num_ops) | ||
1948 | goto bad_put; | ||
1949 | payload_len = 0; | ||
1950 | ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put); | ||
1951 | for (i = 0; i < numops; i++) { | ||
1952 | struct ceph_osd_op *op = p; | 1933 | struct ceph_osd_op *op = p; |
1953 | int len; | ||
1954 | 1934 | ||
1955 | len = le32_to_cpu(op->payload_len); | 1935 | m->outdata_len[i] = le32_to_cpu(op->payload_len); |
1956 | req->r_ops[i].outdata_len = len; | ||
1957 | dout(" op %d has %d bytes\n", i, len); | ||
1958 | payload_len += len; | ||
1959 | p += sizeof(*op); | 1936 | p += sizeof(*op); |
1960 | } | 1937 | } |
1961 | bytes = le32_to_cpu(msg->hdr.data_len); | ||
1962 | if (payload_len != bytes) { | ||
1963 | pr_warn("sum of op payload lens %d != data_len %d\n", | ||
1964 | payload_len, bytes); | ||
1965 | goto bad_put; | ||
1966 | } | ||
1967 | 1938 | ||
1968 | ceph_decode_need(&p, end, 4 + numops * 4, bad_put); | 1939 | ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval); |
1969 | retry_attempt = ceph_decode_32(&p); | 1940 | for (i = 0; i < m->num_ops; i++) |
1970 | for (i = 0; i < numops; i++) | 1941 | ceph_decode_32_safe(&p, end, m->rval[i], e_inval); |
1971 | req->r_ops[i].rval = ceph_decode_32(&p); | ||
1972 | 1942 | ||
1973 | if (le16_to_cpu(msg->hdr.version) >= 6) { | 1943 | if (version >= 5) { |
1974 | p += 8 + 4; /* skip replay_version */ | 1944 | ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval); |
1975 | p += 8; /* skip user_version */ | 1945 | memcpy(&m->replay_version, p, sizeof(m->replay_version)); |
1946 | p += sizeof(m->replay_version); | ||
1947 | ceph_decode_64_safe(&p, end, m->user_version, e_inval); | ||
1948 | } else { | ||
1949 | m->replay_version = bad_replay_version; /* struct */ | ||
1950 | m->user_version = le64_to_cpu(m->replay_version.version); | ||
1951 | } | ||
1976 | 1952 | ||
1977 | if (le16_to_cpu(msg->hdr.version) >= 7) | 1953 | if (version >= 6) { |
1978 | ceph_decode_8_safe(&p, end, decode_redir, bad_put); | 1954 | if (version >= 7) |
1955 | ceph_decode_8_safe(&p, end, decode_redir, e_inval); | ||
1979 | else | 1956 | else |
1980 | decode_redir = 1; | 1957 | decode_redir = 1; |
1981 | } else { | 1958 | } else { |
@@ -1983,19 +1960,96 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) | |||
1983 | } | 1960 | } |
1984 | 1961 | ||
1985 | if (decode_redir) { | 1962 | if (decode_redir) { |
1986 | err = ceph_redirect_decode(&p, end, &redir); | 1963 | ret = ceph_redirect_decode(&p, end, &m->redirect); |
1987 | if (err) | 1964 | if (ret) |
1988 | goto bad_put; | 1965 | return ret; |
1989 | } else { | 1966 | } else { |
1990 | redir.oloc.pool = -1; | 1967 | ceph_oloc_init(&m->redirect.oloc); |
1991 | } | 1968 | } |
1992 | 1969 | ||
1993 | if (!ceph_oloc_empty(&redir.oloc)) { | 1970 | return 0; |
1994 | dout("redirect pool %lld\n", redir.oloc.pool); | 1971 | |
1972 | e_inval: | ||
1973 | return -EINVAL; | ||
1974 | } | ||
1975 | |||
1976 | /* | ||
1977 | * We are done with @req if | ||
1978 | * - @m is a safe reply, or | ||
1979 | * - @m is an unsafe reply and we didn't want a safe one | ||
1980 | */ | ||
1981 | static bool done_request(const struct ceph_osd_request *req, | ||
1982 | const struct MOSDOpReply *m) | ||
1983 | { | ||
1984 | return (m->result < 0 || | ||
1985 | (m->flags & CEPH_OSD_FLAG_ONDISK) || | ||
1986 | !(req->r_flags & CEPH_OSD_FLAG_ONDISK)); | ||
1987 | } | ||
1995 | 1988 | ||
1989 | /* | ||
1990 | * handle osd op reply. either call the callback if it is specified, | ||
1991 | * or do the completion to wake up the waiting thread. | ||
1992 | * | ||
1993 | * ->r_unsafe_callback is set? yes no | ||
1994 | * | ||
1995 | * first reply is OK (needed r_cb/r_completion, r_cb/r_completion, | ||
1996 | * any or needed/got safe) r_safe_completion r_safe_completion | ||
1997 | * | ||
1998 | * first reply is unsafe r_unsafe_cb(true) (nothing) | ||
1999 | * | ||
2000 | * when we get the safe reply r_unsafe_cb(false), r_cb/r_completion, | ||
2001 | * r_safe_completion r_safe_completion | ||
2002 | */ | ||
2003 | static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) | ||
2004 | { | ||
2005 | struct ceph_osd_request *req; | ||
2006 | struct MOSDOpReply m; | ||
2007 | u64 tid = le64_to_cpu(msg->hdr.tid); | ||
2008 | u32 data_len = 0; | ||
2009 | bool already_acked; | ||
2010 | int ret; | ||
2011 | int i; | ||
2012 | |||
2013 | dout("%s msg %p tid %llu\n", __func__, msg, tid); | ||
2014 | |||
2015 | down_read(&osdc->map_sem); | ||
2016 | mutex_lock(&osdc->request_mutex); | ||
2017 | req = lookup_request(&osdc->requests, tid); | ||
2018 | if (!req) { | ||
2019 | dout("%s no tid %llu\n", __func__, tid); | ||
2020 | goto out_unlock; | ||
2021 | } | ||
2022 | ceph_osdc_get_request(req); | ||
2023 | |||
2024 | ret = decode_MOSDOpReply(msg, &m); | ||
2025 | if (ret) { | ||
2026 | pr_err("failed to decode MOSDOpReply for tid %llu: %d\n", | ||
2027 | req->r_tid, ret); | ||
2028 | ceph_msg_dump(msg); | ||
2029 | goto fail_request; | ||
2030 | } | ||
2031 | dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n", | ||
2032 | __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed, | ||
2033 | m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch), | ||
2034 | le64_to_cpu(m.replay_version.version), m.user_version); | ||
2035 | |||
2036 | if (m.retry_attempt >= 0) { | ||
2037 | if (m.retry_attempt != req->r_attempts - 1) { | ||
2038 | dout("req %p tid %llu retry_attempt %d != %d, ignoring\n", | ||
2039 | req, req->r_tid, m.retry_attempt, | ||
2040 | req->r_attempts - 1); | ||
2041 | goto out_put; | ||
2042 | } | ||
2043 | } else { | ||
2044 | WARN_ON(1); /* MOSDOpReply v4 is assumed */ | ||
2045 | } | ||
2046 | |||
2047 | if (!ceph_oloc_empty(&m.redirect.oloc)) { | ||
2048 | dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid, | ||
2049 | m.redirect.oloc.pool); | ||
1996 | __unregister_request(osdc, req); | 2050 | __unregister_request(osdc, req); |
1997 | 2051 | ||
1998 | ceph_oloc_copy(&req->r_t.target_oloc, &redir.oloc); | 2052 | ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc); |
1999 | 2053 | ||
2000 | /* | 2054 | /* |
2001 | * Start redirect requests with nofail=true. If | 2055 | * Start redirect requests with nofail=true. If |
@@ -2005,85 +2059,85 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) | |||
2005 | * successfully. In the future we might want to follow | 2059 | * successfully. In the future we might want to follow |
2006 | * original request's nofail setting here. | 2060 | * original request's nofail setting here. |
2007 | */ | 2061 | */ |
2008 | err = __ceph_osdc_start_request(osdc, req, true); | 2062 | ret = __ceph_osdc_start_request(osdc, req, true); |
2009 | BUG_ON(err); | 2063 | BUG_ON(ret); |
2010 | 2064 | ||
2011 | goto out_unlock; | 2065 | goto out_put; |
2012 | } | 2066 | } |
2013 | 2067 | ||
2014 | already_completed = req->r_got_reply; | 2068 | if (m.num_ops != req->r_num_ops) { |
2015 | if (!req->r_got_reply) { | 2069 | pr_err("num_ops %d != %d for tid %llu\n", m.num_ops, |
2016 | req->r_result = result; | 2070 | req->r_num_ops, req->r_tid); |
2017 | dout("handle_reply result %d bytes %d\n", req->r_result, | 2071 | goto fail_request; |
2018 | bytes); | ||
2019 | if (req->r_result == 0) | ||
2020 | req->r_result = bytes; | ||
2021 | |||
2022 | /* in case this is a write and we need to replay, */ | ||
2023 | req->r_replay_version.epoch = cpu_to_le32(reassert_epoch); | ||
2024 | req->r_replay_version.version = cpu_to_le64(reassert_version); | ||
2025 | |||
2026 | req->r_got_reply = 1; | ||
2027 | } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { | ||
2028 | dout("handle_reply tid %llu dup ack\n", tid); | ||
2029 | goto out_unlock; | ||
2030 | } | 2072 | } |
2031 | 2073 | for (i = 0; i < req->r_num_ops; i++) { | |
2032 | dout("handle_reply tid %llu flags %d\n", tid, flags); | 2074 | dout(" req %p tid %llu op %d rval %d len %u\n", req, |
2033 | 2075 | req->r_tid, i, m.rval[i], m.outdata_len[i]); | |
2034 | if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) | 2076 | req->r_ops[i].rval = m.rval[i]; |
2035 | __register_linger_request(osdc, req); | 2077 | req->r_ops[i].outdata_len = m.outdata_len[i]; |
2036 | 2078 | data_len += m.outdata_len[i]; | |
2037 | /* either this is a read, or we got the safe response */ | 2079 | } |
2038 | if (result < 0 || | 2080 | if (data_len != le32_to_cpu(msg->hdr.data_len)) { |
2039 | (flags & CEPH_OSD_FLAG_ONDISK) || | 2081 | pr_err("sum of lens %u != %u for tid %llu\n", data_len, |
2040 | ((flags & CEPH_OSD_FLAG_WRITE) == 0)) | 2082 | le32_to_cpu(msg->hdr.data_len), req->r_tid); |
2083 | goto fail_request; | ||
2084 | } | ||
2085 | dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__, | ||
2086 | req, req->r_tid, req->r_got_reply, m.result, data_len); | ||
2087 | |||
2088 | already_acked = req->r_got_reply; | ||
2089 | if (!already_acked) { | ||
2090 | req->r_result = m.result ?: data_len; | ||
2091 | req->r_replay_version = m.replay_version; /* struct */ | ||
2092 | req->r_got_reply = true; | ||
2093 | } else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) { | ||
2094 | dout("req %p tid %llu dup ack\n", req, req->r_tid); | ||
2095 | goto out_put; | ||
2096 | } | ||
2097 | |||
2098 | if (done_request(req, &m)) { | ||
2041 | __unregister_request(osdc, req); | 2099 | __unregister_request(osdc, req); |
2100 | if (req->r_linger) { | ||
2101 | WARN_ON(req->r_unsafe_callback); | ||
2102 | __register_linger_request(osdc, req); | ||
2103 | } | ||
2104 | } | ||
2042 | 2105 | ||
2043 | mutex_unlock(&osdc->request_mutex); | 2106 | mutex_unlock(&osdc->request_mutex); |
2044 | up_read(&osdc->map_sem); | 2107 | up_read(&osdc->map_sem); |
2045 | 2108 | ||
2046 | if (!already_completed) { | 2109 | if (done_request(req, &m)) { |
2047 | if (req->r_unsafe_callback && | 2110 | if (already_acked && req->r_unsafe_callback) { |
2048 | result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) | 2111 | dout("req %p tid %llu safe-cb\n", req, req->r_tid); |
2049 | req->r_unsafe_callback(req, true); | ||
2050 | if (req->r_callback) | ||
2051 | req->r_callback(req); | ||
2052 | else | ||
2053 | complete_all(&req->r_completion); | ||
2054 | } | ||
2055 | |||
2056 | if (flags & CEPH_OSD_FLAG_ONDISK) { | ||
2057 | if (req->r_unsafe_callback && already_completed) | ||
2058 | req->r_unsafe_callback(req, false); | 2112 | req->r_unsafe_callback(req, false); |
2059 | complete_request(req); | 2113 | } else { |
2114 | dout("req %p tid %llu cb\n", req, req->r_tid); | ||
2115 | __complete_request(req); | ||
2116 | } | ||
2117 | } else { | ||
2118 | if (req->r_unsafe_callback) { | ||
2119 | dout("req %p tid %llu unsafe-cb\n", req, req->r_tid); | ||
2120 | req->r_unsafe_callback(req, true); | ||
2121 | } else { | ||
2122 | WARN_ON(1); | ||
2123 | } | ||
2060 | } | 2124 | } |
2125 | if (m.flags & CEPH_OSD_FLAG_ONDISK) | ||
2126 | complete_all(&req->r_safe_completion); | ||
2061 | 2127 | ||
2062 | out: | ||
2063 | dout("req=%p req->r_linger=%d\n", req, req->r_linger); | ||
2064 | ceph_osdc_put_request(req); | 2128 | ceph_osdc_put_request(req); |
2065 | return; | 2129 | return; |
2066 | out_unlock: | ||
2067 | mutex_unlock(&osdc->request_mutex); | ||
2068 | up_read(&osdc->map_sem); | ||
2069 | goto out; | ||
2070 | 2130 | ||
2071 | bad_put: | 2131 | fail_request: |
2072 | req->r_result = -EIO; | 2132 | req->r_result = -EIO; |
2073 | __unregister_request(osdc, req); | 2133 | __unregister_request(osdc, req); |
2074 | if (req->r_callback) | 2134 | __complete_request(req); |
2075 | req->r_callback(req); | 2135 | complete_all(&req->r_safe_completion); |
2076 | else | 2136 | out_put: |
2077 | complete_all(&req->r_completion); | ||
2078 | complete_request(req); | ||
2079 | ceph_osdc_put_request(req); | 2137 | ceph_osdc_put_request(req); |
2080 | bad_mutex: | 2138 | out_unlock: |
2081 | mutex_unlock(&osdc->request_mutex); | 2139 | mutex_unlock(&osdc->request_mutex); |
2082 | up_read(&osdc->map_sem); | 2140 | up_read(&osdc->map_sem); |
2083 | bad: | ||
2084 | pr_err("corrupt osd_op_reply got %d %d\n", | ||
2085 | (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); | ||
2086 | ceph_msg_dump(msg); | ||
2087 | } | 2141 | } |
2088 | 2142 | ||
2089 | static void reset_changed_osds(struct ceph_osd_client *osdc) | 2143 | static void reset_changed_osds(struct ceph_osd_client *osdc) |
@@ -2591,7 +2645,9 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc, | |||
2591 | if (rc < 0) { | 2645 | if (rc < 0) { |
2592 | dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid); | 2646 | dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid); |
2593 | ceph_osdc_cancel_request(req); | 2647 | ceph_osdc_cancel_request(req); |
2594 | complete_request(req); | 2648 | |
2649 | /* kludge - need to to wake ceph_osdc_sync() */ | ||
2650 | complete_all(&req->r_safe_completion); | ||
2595 | return rc; | 2651 | return rc; |
2596 | } | 2652 | } |
2597 | 2653 | ||