diff options
Diffstat (limited to 'net/ceph/messenger.c')
| -rw-r--r-- | net/ceph/messenger.c | 133 |
1 files changed, 85 insertions, 48 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index dff633d62e5b..05f357828a2f 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c | |||
| @@ -252,8 +252,12 @@ static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) | |||
| 252 | { | 252 | { |
| 253 | struct kvec iov = {buf, len}; | 253 | struct kvec iov = {buf, len}; |
| 254 | struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; | 254 | struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; |
| 255 | int r; | ||
| 255 | 256 | ||
| 256 | return kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); | 257 | r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); |
| 258 | if (r == -EAGAIN) | ||
| 259 | r = 0; | ||
| 260 | return r; | ||
| 257 | } | 261 | } |
| 258 | 262 | ||
| 259 | /* | 263 | /* |
| @@ -264,13 +268,17 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, | |||
| 264 | size_t kvlen, size_t len, int more) | 268 | size_t kvlen, size_t len, int more) |
| 265 | { | 269 | { |
| 266 | struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; | 270 | struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; |
| 271 | int r; | ||
| 267 | 272 | ||
| 268 | if (more) | 273 | if (more) |
| 269 | msg.msg_flags |= MSG_MORE; | 274 | msg.msg_flags |= MSG_MORE; |
| 270 | else | 275 | else |
| 271 | msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ | 276 | msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ |
| 272 | 277 | ||
| 273 | return kernel_sendmsg(sock, &msg, iov, kvlen, len); | 278 | r = kernel_sendmsg(sock, &msg, iov, kvlen, len); |
| 279 | if (r == -EAGAIN) | ||
| 280 | r = 0; | ||
| 281 | return r; | ||
| 274 | } | 282 | } |
| 275 | 283 | ||
| 276 | 284 | ||
| @@ -328,7 +336,6 @@ static void reset_connection(struct ceph_connection *con) | |||
| 328 | ceph_msg_put(con->out_msg); | 336 | ceph_msg_put(con->out_msg); |
| 329 | con->out_msg = NULL; | 337 | con->out_msg = NULL; |
| 330 | } | 338 | } |
| 331 | con->out_keepalive_pending = false; | ||
| 332 | con->in_seq = 0; | 339 | con->in_seq = 0; |
| 333 | con->in_seq_acked = 0; | 340 | con->in_seq_acked = 0; |
| 334 | } | 341 | } |
| @@ -847,6 +854,8 @@ static int write_partial_msg_pages(struct ceph_connection *con) | |||
| 847 | (msg->pages || msg->pagelist || msg->bio || in_trail)) | 854 | (msg->pages || msg->pagelist || msg->bio || in_trail)) |
| 848 | kunmap(page); | 855 | kunmap(page); |
| 849 | 856 | ||
| 857 | if (ret == -EAGAIN) | ||
| 858 | ret = 0; | ||
| 850 | if (ret <= 0) | 859 | if (ret <= 0) |
| 851 | goto out; | 860 | goto out; |
| 852 | 861 | ||
| @@ -1238,8 +1247,6 @@ static int process_connect(struct ceph_connection *con) | |||
| 1238 | con->auth_retry); | 1247 | con->auth_retry); |
| 1239 | if (con->auth_retry == 2) { | 1248 | if (con->auth_retry == 2) { |
| 1240 | con->error_msg = "connect authorization failure"; | 1249 | con->error_msg = "connect authorization failure"; |
| 1241 | reset_connection(con); | ||
| 1242 | set_bit(CLOSED, &con->state); | ||
| 1243 | return -1; | 1250 | return -1; |
| 1244 | } | 1251 | } |
| 1245 | con->auth_retry = 1; | 1252 | con->auth_retry = 1; |
| @@ -1705,14 +1712,6 @@ more: | |||
| 1705 | 1712 | ||
| 1706 | /* open the socket first? */ | 1713 | /* open the socket first? */ |
| 1707 | if (con->sock == NULL) { | 1714 | if (con->sock == NULL) { |
| 1708 | /* | ||
| 1709 | * if we were STANDBY and are reconnecting _this_ | ||
| 1710 | * connection, bump connect_seq now. Always bump | ||
| 1711 | * global_seq. | ||
| 1712 | */ | ||
| 1713 | if (test_and_clear_bit(STANDBY, &con->state)) | ||
| 1714 | con->connect_seq++; | ||
| 1715 | |||
| 1716 | prepare_write_banner(msgr, con); | 1715 | prepare_write_banner(msgr, con); |
| 1717 | prepare_write_connect(msgr, con, 1); | 1716 | prepare_write_connect(msgr, con, 1); |
| 1718 | prepare_read_banner(con); | 1717 | prepare_read_banner(con); |
| @@ -1737,16 +1736,12 @@ more_kvec: | |||
| 1737 | if (con->out_skip) { | 1736 | if (con->out_skip) { |
| 1738 | ret = write_partial_skip(con); | 1737 | ret = write_partial_skip(con); |
| 1739 | if (ret <= 0) | 1738 | if (ret <= 0) |
| 1740 | goto done; | 1739 | goto out; |
| 1741 | if (ret < 0) { | ||
| 1742 | dout("try_write write_partial_skip err %d\n", ret); | ||
| 1743 | goto done; | ||
| 1744 | } | ||
| 1745 | } | 1740 | } |
| 1746 | if (con->out_kvec_left) { | 1741 | if (con->out_kvec_left) { |
| 1747 | ret = write_partial_kvec(con); | 1742 | ret = write_partial_kvec(con); |
| 1748 | if (ret <= 0) | 1743 | if (ret <= 0) |
| 1749 | goto done; | 1744 | goto out; |
| 1750 | } | 1745 | } |
| 1751 | 1746 | ||
| 1752 | /* msg pages? */ | 1747 | /* msg pages? */ |
| @@ -1761,11 +1756,11 @@ more_kvec: | |||
| 1761 | if (ret == 1) | 1756 | if (ret == 1) |
| 1762 | goto more_kvec; /* we need to send the footer, too! */ | 1757 | goto more_kvec; /* we need to send the footer, too! */ |
| 1763 | if (ret == 0) | 1758 | if (ret == 0) |
| 1764 | goto done; | 1759 | goto out; |
| 1765 | if (ret < 0) { | 1760 | if (ret < 0) { |
| 1766 | dout("try_write write_partial_msg_pages err %d\n", | 1761 | dout("try_write write_partial_msg_pages err %d\n", |
| 1767 | ret); | 1762 | ret); |
| 1768 | goto done; | 1763 | goto out; |
| 1769 | } | 1764 | } |
| 1770 | } | 1765 | } |
| 1771 | 1766 | ||
| @@ -1789,10 +1784,9 @@ do_next: | |||
| 1789 | /* Nothing to do! */ | 1784 | /* Nothing to do! */ |
| 1790 | clear_bit(WRITE_PENDING, &con->state); | 1785 | clear_bit(WRITE_PENDING, &con->state); |
| 1791 | dout("try_write nothing else to write.\n"); | 1786 | dout("try_write nothing else to write.\n"); |
| 1792 | done: | ||
| 1793 | ret = 0; | 1787 | ret = 0; |
| 1794 | out: | 1788 | out: |
| 1795 | dout("try_write done on %p\n", con); | 1789 | dout("try_write done on %p ret %d\n", con, ret); |
| 1796 | return ret; | 1790 | return ret; |
| 1797 | } | 1791 | } |
| 1798 | 1792 | ||
| @@ -1821,19 +1815,17 @@ more: | |||
| 1821 | dout("try_read connecting\n"); | 1815 | dout("try_read connecting\n"); |
| 1822 | ret = read_partial_banner(con); | 1816 | ret = read_partial_banner(con); |
| 1823 | if (ret <= 0) | 1817 | if (ret <= 0) |
| 1824 | goto done; | ||
| 1825 | if (process_banner(con) < 0) { | ||
| 1826 | ret = -1; | ||
| 1827 | goto out; | 1818 | goto out; |
| 1828 | } | 1819 | ret = process_banner(con); |
| 1820 | if (ret < 0) | ||
| 1821 | goto out; | ||
| 1829 | } | 1822 | } |
| 1830 | ret = read_partial_connect(con); | 1823 | ret = read_partial_connect(con); |
| 1831 | if (ret <= 0) | 1824 | if (ret <= 0) |
| 1832 | goto done; | ||
| 1833 | if (process_connect(con) < 0) { | ||
| 1834 | ret = -1; | ||
| 1835 | goto out; | 1825 | goto out; |
| 1836 | } | 1826 | ret = process_connect(con); |
| 1827 | if (ret < 0) | ||
| 1828 | goto out; | ||
| 1837 | goto more; | 1829 | goto more; |
| 1838 | } | 1830 | } |
| 1839 | 1831 | ||
| @@ -1848,7 +1840,7 @@ more: | |||
| 1848 | dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); | 1840 | dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); |
| 1849 | ret = ceph_tcp_recvmsg(con->sock, buf, skip); | 1841 | ret = ceph_tcp_recvmsg(con->sock, buf, skip); |
| 1850 | if (ret <= 0) | 1842 | if (ret <= 0) |
| 1851 | goto done; | 1843 | goto out; |
| 1852 | con->in_base_pos += ret; | 1844 | con->in_base_pos += ret; |
| 1853 | if (con->in_base_pos) | 1845 | if (con->in_base_pos) |
| 1854 | goto more; | 1846 | goto more; |
| @@ -1859,7 +1851,7 @@ more: | |||
| 1859 | */ | 1851 | */ |
| 1860 | ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); | 1852 | ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); |
| 1861 | if (ret <= 0) | 1853 | if (ret <= 0) |
| 1862 | goto done; | 1854 | goto out; |
| 1863 | dout("try_read got tag %d\n", (int)con->in_tag); | 1855 | dout("try_read got tag %d\n", (int)con->in_tag); |
| 1864 | switch (con->in_tag) { | 1856 | switch (con->in_tag) { |
| 1865 | case CEPH_MSGR_TAG_MSG: | 1857 | case CEPH_MSGR_TAG_MSG: |
| @@ -1870,7 +1862,7 @@ more: | |||
| 1870 | break; | 1862 | break; |
| 1871 | case CEPH_MSGR_TAG_CLOSE: | 1863 | case CEPH_MSGR_TAG_CLOSE: |
| 1872 | set_bit(CLOSED, &con->state); /* fixme */ | 1864 | set_bit(CLOSED, &con->state); /* fixme */ |
| 1873 | goto done; | 1865 | goto out; |
| 1874 | default: | 1866 | default: |
| 1875 | goto bad_tag; | 1867 | goto bad_tag; |
| 1876 | } | 1868 | } |
| @@ -1882,13 +1874,12 @@ more: | |||
| 1882 | case -EBADMSG: | 1874 | case -EBADMSG: |
| 1883 | con->error_msg = "bad crc"; | 1875 | con->error_msg = "bad crc"; |
| 1884 | ret = -EIO; | 1876 | ret = -EIO; |
| 1885 | goto out; | 1877 | break; |
| 1886 | case -EIO: | 1878 | case -EIO: |
| 1887 | con->error_msg = "io error"; | 1879 | con->error_msg = "io error"; |
| 1888 | goto out; | 1880 | break; |
| 1889 | default: | ||
| 1890 | goto done; | ||
| 1891 | } | 1881 | } |
| 1882 | goto out; | ||
| 1892 | } | 1883 | } |
| 1893 | if (con->in_tag == CEPH_MSGR_TAG_READY) | 1884 | if (con->in_tag == CEPH_MSGR_TAG_READY) |
| 1894 | goto more; | 1885 | goto more; |
| @@ -1898,15 +1889,13 @@ more: | |||
| 1898 | if (con->in_tag == CEPH_MSGR_TAG_ACK) { | 1889 | if (con->in_tag == CEPH_MSGR_TAG_ACK) { |
| 1899 | ret = read_partial_ack(con); | 1890 | ret = read_partial_ack(con); |
| 1900 | if (ret <= 0) | 1891 | if (ret <= 0) |
| 1901 | goto done; | 1892 | goto out; |
| 1902 | process_ack(con); | 1893 | process_ack(con); |
| 1903 | goto more; | 1894 | goto more; |
| 1904 | } | 1895 | } |
| 1905 | 1896 | ||
| 1906 | done: | ||
| 1907 | ret = 0; | ||
| 1908 | out: | 1897 | out: |
| 1909 | dout("try_read done on %p\n", con); | 1898 | dout("try_read done on %p ret %d\n", con, ret); |
| 1910 | return ret; | 1899 | return ret; |
| 1911 | 1900 | ||
| 1912 | bad_tag: | 1901 | bad_tag: |
| @@ -1951,7 +1940,24 @@ static void con_work(struct work_struct *work) | |||
| 1951 | work.work); | 1940 | work.work); |
| 1952 | 1941 | ||
| 1953 | mutex_lock(&con->mutex); | 1942 | mutex_lock(&con->mutex); |
| 1943 | if (test_and_clear_bit(BACKOFF, &con->state)) { | ||
| 1944 | dout("con_work %p backing off\n", con); | ||
| 1945 | if (queue_delayed_work(ceph_msgr_wq, &con->work, | ||
| 1946 | round_jiffies_relative(con->delay))) { | ||
| 1947 | dout("con_work %p backoff %lu\n", con, con->delay); | ||
| 1948 | mutex_unlock(&con->mutex); | ||
| 1949 | return; | ||
| 1950 | } else { | ||
| 1951 | con->ops->put(con); | ||
| 1952 | dout("con_work %p FAILED to back off %lu\n", con, | ||
| 1953 | con->delay); | ||
| 1954 | } | ||
| 1955 | } | ||
| 1954 | 1956 | ||
| 1957 | if (test_bit(STANDBY, &con->state)) { | ||
| 1958 | dout("con_work %p STANDBY\n", con); | ||
| 1959 | goto done; | ||
| 1960 | } | ||
| 1955 | if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ | 1961 | if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ |
| 1956 | dout("con_work CLOSED\n"); | 1962 | dout("con_work CLOSED\n"); |
| 1957 | con_close_socket(con); | 1963 | con_close_socket(con); |
| @@ -2008,10 +2014,12 @@ static void ceph_fault(struct ceph_connection *con) | |||
| 2008 | /* Requeue anything that hasn't been acked */ | 2014 | /* Requeue anything that hasn't been acked */ |
| 2009 | list_splice_init(&con->out_sent, &con->out_queue); | 2015 | list_splice_init(&con->out_sent, &con->out_queue); |
| 2010 | 2016 | ||
| 2011 | /* If there are no messages in the queue, place the connection | 2017 | /* If there are no messages queued or keepalive pending, place |
| 2012 | * in a STANDBY state (i.e., don't try to reconnect just yet). */ | 2018 | * the connection in a STANDBY state */ |
| 2013 | if (list_empty(&con->out_queue) && !con->out_keepalive_pending) { | 2019 | if (list_empty(&con->out_queue) && |
| 2014 | dout("fault setting STANDBY\n"); | 2020 | !test_bit(KEEPALIVE_PENDING, &con->state)) { |
| 2021 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); | ||
| 2022 | clear_bit(WRITE_PENDING, &con->state); | ||
| 2015 | set_bit(STANDBY, &con->state); | 2023 | set_bit(STANDBY, &con->state); |
| 2016 | } else { | 2024 | } else { |
| 2017 | /* retry after a delay. */ | 2025 | /* retry after a delay. */ |
| @@ -2019,11 +2027,24 @@ static void ceph_fault(struct ceph_connection *con) | |||
| 2019 | con->delay = BASE_DELAY_INTERVAL; | 2027 | con->delay = BASE_DELAY_INTERVAL; |
| 2020 | else if (con->delay < MAX_DELAY_INTERVAL) | 2028 | else if (con->delay < MAX_DELAY_INTERVAL) |
| 2021 | con->delay *= 2; | 2029 | con->delay *= 2; |
| 2022 | dout("fault queueing %p delay %lu\n", con, con->delay); | ||
| 2023 | con->ops->get(con); | 2030 | con->ops->get(con); |
| 2024 | if (queue_delayed_work(ceph_msgr_wq, &con->work, | 2031 | if (queue_delayed_work(ceph_msgr_wq, &con->work, |
| 2025 | round_jiffies_relative(con->delay)) == 0) | 2032 | round_jiffies_relative(con->delay))) { |
| 2033 | dout("fault queued %p delay %lu\n", con, con->delay); | ||
| 2034 | } else { | ||
| 2026 | con->ops->put(con); | 2035 | con->ops->put(con); |
| 2036 | dout("fault failed to queue %p delay %lu, backoff\n", | ||
| 2037 | con, con->delay); | ||
| 2038 | /* | ||
| 2039 | * In many cases we see a socket state change | ||
| 2040 | * while con_work is running and end up | ||
| 2041 | * queuing (non-delayed) work, such that we | ||
| 2042 | * can't backoff with a delay. Set a flag so | ||
| 2043 | * that when con_work restarts we schedule the | ||
| 2044 | * delay then. | ||
| 2045 | */ | ||
| 2046 | set_bit(BACKOFF, &con->state); | ||
| 2047 | } | ||
| 2027 | } | 2048 | } |
| 2028 | 2049 | ||
| 2029 | out_unlock: | 2050 | out_unlock: |
| @@ -2094,6 +2115,19 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) | |||
| 2094 | } | 2115 | } |
| 2095 | EXPORT_SYMBOL(ceph_messenger_destroy); | 2116 | EXPORT_SYMBOL(ceph_messenger_destroy); |
| 2096 | 2117 | ||
| 2118 | static void clear_standby(struct ceph_connection *con) | ||
| 2119 | { | ||
| 2120 | /* come back from STANDBY? */ | ||
| 2121 | if (test_and_clear_bit(STANDBY, &con->state)) { | ||
| 2122 | mutex_lock(&con->mutex); | ||
| 2123 | dout("clear_standby %p and ++connect_seq\n", con); | ||
| 2124 | con->connect_seq++; | ||
| 2125 | WARN_ON(test_bit(WRITE_PENDING, &con->state)); | ||
| 2126 | WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); | ||
| 2127 | mutex_unlock(&con->mutex); | ||
| 2128 | } | ||
| 2129 | } | ||
| 2130 | |||
| 2097 | /* | 2131 | /* |
| 2098 | * Queue up an outgoing message on the given connection. | 2132 | * Queue up an outgoing message on the given connection. |
| 2099 | */ | 2133 | */ |
| @@ -2126,6 +2160,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
| 2126 | 2160 | ||
| 2127 | /* if there wasn't anything waiting to send before, queue | 2161 | /* if there wasn't anything waiting to send before, queue |
| 2128 | * new work */ | 2162 | * new work */ |
| 2163 | clear_standby(con); | ||
| 2129 | if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) | 2164 | if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) |
| 2130 | queue_con(con); | 2165 | queue_con(con); |
| 2131 | } | 2166 | } |
| @@ -2191,6 +2226,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) | |||
| 2191 | */ | 2226 | */ |
| 2192 | void ceph_con_keepalive(struct ceph_connection *con) | 2227 | void ceph_con_keepalive(struct ceph_connection *con) |
| 2193 | { | 2228 | { |
| 2229 | dout("con_keepalive %p\n", con); | ||
| 2230 | clear_standby(con); | ||
| 2194 | if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && | 2231 | if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && |
| 2195 | test_and_set_bit(WRITE_PENDING, &con->state) == 0) | 2232 | test_and_set_bit(WRITE_PENDING, &con->state) == 0) |
| 2196 | queue_con(con); | 2233 | queue_con(con); |
