aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c67
1 files changed, 11 insertions, 56 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 0e8157ee5d43..dff633d62e5b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -96,12 +96,10 @@ struct workqueue_struct *ceph_msgr_wq;
96 96
97int ceph_msgr_init(void) 97int ceph_msgr_init(void)
98{ 98{
99 ceph_msgr_wq = create_workqueue("ceph-msgr"); 99 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
100 if (IS_ERR(ceph_msgr_wq)) { 100 if (!ceph_msgr_wq) {
101 int ret = PTR_ERR(ceph_msgr_wq); 101 pr_err("msgr_init failed to create workqueue\n");
102 pr_err("msgr_init failed to create workqueue: %d\n", ret); 102 return -ENOMEM;
103 ceph_msgr_wq = NULL;
104 return ret;
105 } 103 }
106 return 0; 104 return 0;
107} 105}
@@ -540,8 +538,7 @@ static void prepare_write_message(struct ceph_connection *con)
540 /* initialize page iterator */ 538 /* initialize page iterator */
541 con->out_msg_pos.page = 0; 539 con->out_msg_pos.page = 0;
542 if (m->pages) 540 if (m->pages)
543 con->out_msg_pos.page_pos = 541 con->out_msg_pos.page_pos = m->page_alignment;
544 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
545 else 542 else
546 con->out_msg_pos.page_pos = 0; 543 con->out_msg_pos.page_pos = 0;
547 con->out_msg_pos.data_pos = 0; 544 con->out_msg_pos.data_pos = 0;
@@ -1491,7 +1488,7 @@ static int read_partial_message(struct ceph_connection *con)
1491 struct ceph_msg *m = con->in_msg; 1488 struct ceph_msg *m = con->in_msg;
1492 int ret; 1489 int ret;
1493 int to, left; 1490 int to, left;
1494 unsigned front_len, middle_len, data_len, data_off; 1491 unsigned front_len, middle_len, data_len;
1495 int datacrc = con->msgr->nocrc; 1492 int datacrc = con->msgr->nocrc;
1496 int skip; 1493 int skip;
1497 u64 seq; 1494 u64 seq;
@@ -1527,19 +1524,17 @@ static int read_partial_message(struct ceph_connection *con)
1527 data_len = le32_to_cpu(con->in_hdr.data_len); 1524 data_len = le32_to_cpu(con->in_hdr.data_len);
1528 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1525 if (data_len > CEPH_MSG_MAX_DATA_LEN)
1529 return -EIO; 1526 return -EIO;
1530 data_off = le16_to_cpu(con->in_hdr.data_off);
1531 1527
1532 /* verify seq# */ 1528 /* verify seq# */
1533 seq = le64_to_cpu(con->in_hdr.seq); 1529 seq = le64_to_cpu(con->in_hdr.seq);
1534 if ((s64)seq - (s64)con->in_seq < 1) { 1530 if ((s64)seq - (s64)con->in_seq < 1) {
1535 pr_info("skipping %s%lld %s seq %lld, expected %lld\n", 1531 pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1536 ENTITY_NAME(con->peer_name), 1532 ENTITY_NAME(con->peer_name),
1537 ceph_pr_addr(&con->peer_addr.in_addr), 1533 ceph_pr_addr(&con->peer_addr.in_addr),
1538 seq, con->in_seq + 1); 1534 seq, con->in_seq + 1);
1539 con->in_base_pos = -front_len - middle_len - data_len - 1535 con->in_base_pos = -front_len - middle_len - data_len -
1540 sizeof(m->footer); 1536 sizeof(m->footer);
1541 con->in_tag = CEPH_MSGR_TAG_READY; 1537 con->in_tag = CEPH_MSGR_TAG_READY;
1542 con->in_seq++;
1543 return 0; 1538 return 0;
1544 } else if ((s64)seq - (s64)con->in_seq > 1) { 1539 } else if ((s64)seq - (s64)con->in_seq > 1) {
1545 pr_err("read_partial_message bad seq %lld expected %lld\n", 1540 pr_err("read_partial_message bad seq %lld expected %lld\n",
@@ -1576,7 +1571,7 @@ static int read_partial_message(struct ceph_connection *con)
1576 1571
1577 con->in_msg_pos.page = 0; 1572 con->in_msg_pos.page = 0;
1578 if (m->pages) 1573 if (m->pages)
1579 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; 1574 con->in_msg_pos.page_pos = m->page_alignment;
1580 else 1575 else
1581 con->in_msg_pos.page_pos = 0; 1576 con->in_msg_pos.page_pos = 0;
1582 con->in_msg_pos.data_pos = 0; 1577 con->in_msg_pos.data_pos = 0;
@@ -1925,20 +1920,6 @@ bad_tag:
1925/* 1920/*
1926 * Atomically queue work on a connection. Bump @con reference to 1921 * Atomically queue work on a connection. Bump @con reference to
1927 * avoid races with connection teardown. 1922 * avoid races with connection teardown.
1928 *
1929 * There is some trickery going on with QUEUED and BUSY because we
1930 * only want a _single_ thread operating on each connection at any
1931 * point in time, but we want to use all available CPUs.
1932 *
1933 * The worker thread only proceeds if it can atomically set BUSY. It
1934 * clears QUEUED and does it's thing. When it thinks it's done, it
1935 * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
1936 * (tries again to set BUSY).
1937 *
1938 * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
1939 * try to queue work. If that fails (work is already queued, or BUSY)
1940 * we give up (work also already being done or is queued) but leave QUEUED
1941 * set so that the worker thread will loop if necessary.
1942 */ 1923 */
1943static void queue_con(struct ceph_connection *con) 1924static void queue_con(struct ceph_connection *con)
1944{ 1925{
@@ -1953,11 +1934,7 @@ static void queue_con(struct ceph_connection *con)
1953 return; 1934 return;
1954 } 1935 }
1955 1936
1956 set_bit(QUEUED, &con->state); 1937 if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
1957 if (test_bit(BUSY, &con->state)) {
1958 dout("queue_con %p - already BUSY\n", con);
1959 con->ops->put(con);
1960 } else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
1961 dout("queue_con %p - already queued\n", con); 1938 dout("queue_con %p - already queued\n", con);
1962 con->ops->put(con); 1939 con->ops->put(con);
1963 } else { 1940 } else {
@@ -1972,15 +1949,6 @@ static void con_work(struct work_struct *work)
1972{ 1949{
1973 struct ceph_connection *con = container_of(work, struct ceph_connection, 1950 struct ceph_connection *con = container_of(work, struct ceph_connection,
1974 work.work); 1951 work.work);
1975 int backoff = 0;
1976
1977more:
1978 if (test_and_set_bit(BUSY, &con->state) != 0) {
1979 dout("con_work %p BUSY already set\n", con);
1980 goto out;
1981 }
1982 dout("con_work %p start, clearing QUEUED\n", con);
1983 clear_bit(QUEUED, &con->state);
1984 1952
1985 mutex_lock(&con->mutex); 1953 mutex_lock(&con->mutex);
1986 1954
@@ -1999,28 +1967,13 @@ more:
1999 try_read(con) < 0 || 1967 try_read(con) < 0 ||
2000 try_write(con) < 0) { 1968 try_write(con) < 0) {
2001 mutex_unlock(&con->mutex); 1969 mutex_unlock(&con->mutex);
2002 backoff = 1;
2003 ceph_fault(con); /* error/fault path */ 1970 ceph_fault(con); /* error/fault path */
2004 goto done_unlocked; 1971 goto done_unlocked;
2005 } 1972 }
2006 1973
2007done: 1974done:
2008 mutex_unlock(&con->mutex); 1975 mutex_unlock(&con->mutex);
2009
2010done_unlocked: 1976done_unlocked:
2011 clear_bit(BUSY, &con->state);
2012 dout("con->state=%lu\n", con->state);
2013 if (test_bit(QUEUED, &con->state)) {
2014 if (!backoff || test_bit(OPENING, &con->state)) {
2015 dout("con_work %p QUEUED reset, looping\n", con);
2016 goto more;
2017 }
2018 dout("con_work %p QUEUED reset, but just faulted\n", con);
2019 clear_bit(QUEUED, &con->state);
2020 }
2021 dout("con_work %p done\n", con);
2022
2023out:
2024 con->ops->put(con); 1977 con->ops->put(con);
2025} 1978}
2026 1979
@@ -2301,6 +2254,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2301 2254
2302 /* data */ 2255 /* data */
2303 m->nr_pages = 0; 2256 m->nr_pages = 0;
2257 m->page_alignment = 0;
2304 m->pages = NULL; 2258 m->pages = NULL;
2305 m->pagelist = NULL; 2259 m->pagelist = NULL;
2306 m->bio = NULL; 2260 m->bio = NULL;
@@ -2370,6 +2324,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2370 type, front_len); 2324 type, front_len);
2371 return NULL; 2325 return NULL;
2372 } 2326 }
2327 msg->page_alignment = le16_to_cpu(hdr->data_off);
2373 } 2328 }
2374 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2329 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2375 2330