diff options
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r-- | net/ceph/messenger.c | 67 |
1 files changed, 11 insertions, 56 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 0e8157ee5d43..dff633d62e5b 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c | |||
@@ -96,12 +96,10 @@ struct workqueue_struct *ceph_msgr_wq; | |||
96 | 96 | ||
97 | int ceph_msgr_init(void) | 97 | int ceph_msgr_init(void) |
98 | { | 98 | { |
99 | ceph_msgr_wq = create_workqueue("ceph-msgr"); | 99 | ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); |
100 | if (IS_ERR(ceph_msgr_wq)) { | 100 | if (!ceph_msgr_wq) { |
101 | int ret = PTR_ERR(ceph_msgr_wq); | 101 | pr_err("msgr_init failed to create workqueue\n"); |
102 | pr_err("msgr_init failed to create workqueue: %d\n", ret); | 102 | return -ENOMEM; |
103 | ceph_msgr_wq = NULL; | ||
104 | return ret; | ||
105 | } | 103 | } |
106 | return 0; | 104 | return 0; |
107 | } | 105 | } |
@@ -540,8 +538,7 @@ static void prepare_write_message(struct ceph_connection *con) | |||
540 | /* initialize page iterator */ | 538 | /* initialize page iterator */ |
541 | con->out_msg_pos.page = 0; | 539 | con->out_msg_pos.page = 0; |
542 | if (m->pages) | 540 | if (m->pages) |
543 | con->out_msg_pos.page_pos = | 541 | con->out_msg_pos.page_pos = m->page_alignment; |
544 | le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK; | ||
545 | else | 542 | else |
546 | con->out_msg_pos.page_pos = 0; | 543 | con->out_msg_pos.page_pos = 0; |
547 | con->out_msg_pos.data_pos = 0; | 544 | con->out_msg_pos.data_pos = 0; |
@@ -1491,7 +1488,7 @@ static int read_partial_message(struct ceph_connection *con) | |||
1491 | struct ceph_msg *m = con->in_msg; | 1488 | struct ceph_msg *m = con->in_msg; |
1492 | int ret; | 1489 | int ret; |
1493 | int to, left; | 1490 | int to, left; |
1494 | unsigned front_len, middle_len, data_len, data_off; | 1491 | unsigned front_len, middle_len, data_len; |
1495 | int datacrc = con->msgr->nocrc; | 1492 | int datacrc = con->msgr->nocrc; |
1496 | int skip; | 1493 | int skip; |
1497 | u64 seq; | 1494 | u64 seq; |
@@ -1527,19 +1524,17 @@ static int read_partial_message(struct ceph_connection *con) | |||
1527 | data_len = le32_to_cpu(con->in_hdr.data_len); | 1524 | data_len = le32_to_cpu(con->in_hdr.data_len); |
1528 | if (data_len > CEPH_MSG_MAX_DATA_LEN) | 1525 | if (data_len > CEPH_MSG_MAX_DATA_LEN) |
1529 | return -EIO; | 1526 | return -EIO; |
1530 | data_off = le16_to_cpu(con->in_hdr.data_off); | ||
1531 | 1527 | ||
1532 | /* verify seq# */ | 1528 | /* verify seq# */ |
1533 | seq = le64_to_cpu(con->in_hdr.seq); | 1529 | seq = le64_to_cpu(con->in_hdr.seq); |
1534 | if ((s64)seq - (s64)con->in_seq < 1) { | 1530 | if ((s64)seq - (s64)con->in_seq < 1) { |
1535 | pr_info("skipping %s%lld %s seq %lld, expected %lld\n", | 1531 | pr_info("skipping %s%lld %s seq %lld expected %lld\n", |
1536 | ENTITY_NAME(con->peer_name), | 1532 | ENTITY_NAME(con->peer_name), |
1537 | ceph_pr_addr(&con->peer_addr.in_addr), | 1533 | ceph_pr_addr(&con->peer_addr.in_addr), |
1538 | seq, con->in_seq + 1); | 1534 | seq, con->in_seq + 1); |
1539 | con->in_base_pos = -front_len - middle_len - data_len - | 1535 | con->in_base_pos = -front_len - middle_len - data_len - |
1540 | sizeof(m->footer); | 1536 | sizeof(m->footer); |
1541 | con->in_tag = CEPH_MSGR_TAG_READY; | 1537 | con->in_tag = CEPH_MSGR_TAG_READY; |
1542 | con->in_seq++; | ||
1543 | return 0; | 1538 | return 0; |
1544 | } else if ((s64)seq - (s64)con->in_seq > 1) { | 1539 | } else if ((s64)seq - (s64)con->in_seq > 1) { |
1545 | pr_err("read_partial_message bad seq %lld expected %lld\n", | 1540 | pr_err("read_partial_message bad seq %lld expected %lld\n", |
@@ -1576,7 +1571,7 @@ static int read_partial_message(struct ceph_connection *con) | |||
1576 | 1571 | ||
1577 | con->in_msg_pos.page = 0; | 1572 | con->in_msg_pos.page = 0; |
1578 | if (m->pages) | 1573 | if (m->pages) |
1579 | con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; | 1574 | con->in_msg_pos.page_pos = m->page_alignment; |
1580 | else | 1575 | else |
1581 | con->in_msg_pos.page_pos = 0; | 1576 | con->in_msg_pos.page_pos = 0; |
1582 | con->in_msg_pos.data_pos = 0; | 1577 | con->in_msg_pos.data_pos = 0; |
@@ -1925,20 +1920,6 @@ bad_tag: | |||
1925 | /* | 1920 | /* |
1926 | * Atomically queue work on a connection. Bump @con reference to | 1921 | * Atomically queue work on a connection. Bump @con reference to |
1927 | * avoid races with connection teardown. | 1922 | * avoid races with connection teardown. |
1928 | * | ||
1929 | * There is some trickery going on with QUEUED and BUSY because we | ||
1930 | * only want a _single_ thread operating on each connection at any | ||
1931 | * point in time, but we want to use all available CPUs. | ||
1932 | * | ||
1933 | * The worker thread only proceeds if it can atomically set BUSY. It | ||
1934 | * clears QUEUED and does it's thing. When it thinks it's done, it | ||
1935 | * clears BUSY, then rechecks QUEUED.. if it's set again, it loops | ||
1936 | * (tries again to set BUSY). | ||
1937 | * | ||
1938 | * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we | ||
1939 | * try to queue work. If that fails (work is already queued, or BUSY) | ||
1940 | * we give up (work also already being done or is queued) but leave QUEUED | ||
1941 | * set so that the worker thread will loop if necessary. | ||
1942 | */ | 1923 | */ |
1943 | static void queue_con(struct ceph_connection *con) | 1924 | static void queue_con(struct ceph_connection *con) |
1944 | { | 1925 | { |
@@ -1953,11 +1934,7 @@ static void queue_con(struct ceph_connection *con) | |||
1953 | return; | 1934 | return; |
1954 | } | 1935 | } |
1955 | 1936 | ||
1956 | set_bit(QUEUED, &con->state); | 1937 | if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { |
1957 | if (test_bit(BUSY, &con->state)) { | ||
1958 | dout("queue_con %p - already BUSY\n", con); | ||
1959 | con->ops->put(con); | ||
1960 | } else if (!queue_work(ceph_msgr_wq, &con->work.work)) { | ||
1961 | dout("queue_con %p - already queued\n", con); | 1938 | dout("queue_con %p - already queued\n", con); |
1962 | con->ops->put(con); | 1939 | con->ops->put(con); |
1963 | } else { | 1940 | } else { |
@@ -1972,15 +1949,6 @@ static void con_work(struct work_struct *work) | |||
1972 | { | 1949 | { |
1973 | struct ceph_connection *con = container_of(work, struct ceph_connection, | 1950 | struct ceph_connection *con = container_of(work, struct ceph_connection, |
1974 | work.work); | 1951 | work.work); |
1975 | int backoff = 0; | ||
1976 | |||
1977 | more: | ||
1978 | if (test_and_set_bit(BUSY, &con->state) != 0) { | ||
1979 | dout("con_work %p BUSY already set\n", con); | ||
1980 | goto out; | ||
1981 | } | ||
1982 | dout("con_work %p start, clearing QUEUED\n", con); | ||
1983 | clear_bit(QUEUED, &con->state); | ||
1984 | 1952 | ||
1985 | mutex_lock(&con->mutex); | 1953 | mutex_lock(&con->mutex); |
1986 | 1954 | ||
@@ -1999,28 +1967,13 @@ more: | |||
1999 | try_read(con) < 0 || | 1967 | try_read(con) < 0 || |
2000 | try_write(con) < 0) { | 1968 | try_write(con) < 0) { |
2001 | mutex_unlock(&con->mutex); | 1969 | mutex_unlock(&con->mutex); |
2002 | backoff = 1; | ||
2003 | ceph_fault(con); /* error/fault path */ | 1970 | ceph_fault(con); /* error/fault path */ |
2004 | goto done_unlocked; | 1971 | goto done_unlocked; |
2005 | } | 1972 | } |
2006 | 1973 | ||
2007 | done: | 1974 | done: |
2008 | mutex_unlock(&con->mutex); | 1975 | mutex_unlock(&con->mutex); |
2009 | |||
2010 | done_unlocked: | 1976 | done_unlocked: |
2011 | clear_bit(BUSY, &con->state); | ||
2012 | dout("con->state=%lu\n", con->state); | ||
2013 | if (test_bit(QUEUED, &con->state)) { | ||
2014 | if (!backoff || test_bit(OPENING, &con->state)) { | ||
2015 | dout("con_work %p QUEUED reset, looping\n", con); | ||
2016 | goto more; | ||
2017 | } | ||
2018 | dout("con_work %p QUEUED reset, but just faulted\n", con); | ||
2019 | clear_bit(QUEUED, &con->state); | ||
2020 | } | ||
2021 | dout("con_work %p done\n", con); | ||
2022 | |||
2023 | out: | ||
2024 | con->ops->put(con); | 1977 | con->ops->put(con); |
2025 | } | 1978 | } |
2026 | 1979 | ||
@@ -2301,6 +2254,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) | |||
2301 | 2254 | ||
2302 | /* data */ | 2255 | /* data */ |
2303 | m->nr_pages = 0; | 2256 | m->nr_pages = 0; |
2257 | m->page_alignment = 0; | ||
2304 | m->pages = NULL; | 2258 | m->pages = NULL; |
2305 | m->pagelist = NULL; | 2259 | m->pagelist = NULL; |
2306 | m->bio = NULL; | 2260 | m->bio = NULL; |
@@ -2370,6 +2324,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | |||
2370 | type, front_len); | 2324 | type, front_len); |
2371 | return NULL; | 2325 | return NULL; |
2372 | } | 2326 | } |
2327 | msg->page_alignment = le16_to_cpu(hdr->data_off); | ||
2373 | } | 2328 | } |
2374 | memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); | 2329 | memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); |
2375 | 2330 | ||