aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2011-01-03 08:49:46 -0500
committerSage Weil <sage@newdream.net>2011-01-12 18:15:14 -0500
commitf363e45fd1184219b472ea549cb7e192e24ef4d2 (patch)
tree1332feb2f7a0a47ce482a0fd4ee9afb547a27090 /net/ceph/messenger.c
parent01e6acc4ea4c284c44bfb3d46c76f4ae580c6435 (diff)
net/ceph: make ceph_msgr_wq non-reentrant
ceph messenger code does a rather complex dancing around multithread workqueue to make sure the same work item isn't executed concurrently on different CPUs. This restriction can be provided by workqueue with WQ_NON_REENTRANT. Make ceph_msgr_wq non-reentrant workqueue with the default concurrency level and remove the QUEUED/BUSY logic. * This removes backoff handling in con_work() but it couldn't reliably block execution of con_work() to begin with - queue_con() can be called after the work started but before BUSY is set. It seems that it was an optimization for a rather cold path and can be safely removed. * The number of concurrent work items is bound by the number of connections and connetions are independent from each other. With the default concurrency level, different connections will be executed independently. Signed-off-by: Tejun Heo <tj@kernel.org> Cc: Sage Weil <sage@newdream.net> Cc: ceph-devel@vger.kernel.org Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c46
1 files changed, 2 insertions, 44 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index b6ff4a1519ab..dff633d62e5b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -96,7 +96,7 @@ struct workqueue_struct *ceph_msgr_wq;
96 96
97int ceph_msgr_init(void) 97int ceph_msgr_init(void)
98{ 98{
99 ceph_msgr_wq = create_workqueue("ceph-msgr"); 99 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
100 if (!ceph_msgr_wq) { 100 if (!ceph_msgr_wq) {
101 pr_err("msgr_init failed to create workqueue\n"); 101 pr_err("msgr_init failed to create workqueue\n");
102 return -ENOMEM; 102 return -ENOMEM;
@@ -1920,20 +1920,6 @@ bad_tag:
1920/* 1920/*
1921 * Atomically queue work on a connection. Bump @con reference to 1921 * Atomically queue work on a connection. Bump @con reference to
1922 * avoid races with connection teardown. 1922 * avoid races with connection teardown.
1923 *
1924 * There is some trickery going on with QUEUED and BUSY because we
1925 * only want a _single_ thread operating on each connection at any
1926 * point in time, but we want to use all available CPUs.
1927 *
1928 * The worker thread only proceeds if it can atomically set BUSY. It
1929 * clears QUEUED and does it's thing. When it thinks it's done, it
1930 * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
1931 * (tries again to set BUSY).
1932 *
1933 * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
1934 * try to queue work. If that fails (work is already queued, or BUSY)
1935 * we give up (work also already being done or is queued) but leave QUEUED
1936 * set so that the worker thread will loop if necessary.
1937 */ 1923 */
1938static void queue_con(struct ceph_connection *con) 1924static void queue_con(struct ceph_connection *con)
1939{ 1925{
@@ -1948,11 +1934,7 @@ static void queue_con(struct ceph_connection *con)
1948 return; 1934 return;
1949 } 1935 }
1950 1936
1951 set_bit(QUEUED, &con->state); 1937 if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
1952 if (test_bit(BUSY, &con->state)) {
1953 dout("queue_con %p - already BUSY\n", con);
1954 con->ops->put(con);
1955 } else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
1956 dout("queue_con %p - already queued\n", con); 1938 dout("queue_con %p - already queued\n", con);
1957 con->ops->put(con); 1939 con->ops->put(con);
1958 } else { 1940 } else {
@@ -1967,15 +1949,6 @@ static void con_work(struct work_struct *work)
1967{ 1949{
1968 struct ceph_connection *con = container_of(work, struct ceph_connection, 1950 struct ceph_connection *con = container_of(work, struct ceph_connection,
1969 work.work); 1951 work.work);
1970 int backoff = 0;
1971
1972more:
1973 if (test_and_set_bit(BUSY, &con->state) != 0) {
1974 dout("con_work %p BUSY already set\n", con);
1975 goto out;
1976 }
1977 dout("con_work %p start, clearing QUEUED\n", con);
1978 clear_bit(QUEUED, &con->state);
1979 1952
1980 mutex_lock(&con->mutex); 1953 mutex_lock(&con->mutex);
1981 1954
@@ -1994,28 +1967,13 @@ more:
1994 try_read(con) < 0 || 1967 try_read(con) < 0 ||
1995 try_write(con) < 0) { 1968 try_write(con) < 0) {
1996 mutex_unlock(&con->mutex); 1969 mutex_unlock(&con->mutex);
1997 backoff = 1;
1998 ceph_fault(con); /* error/fault path */ 1970 ceph_fault(con); /* error/fault path */
1999 goto done_unlocked; 1971 goto done_unlocked;
2000 } 1972 }
2001 1973
2002done: 1974done:
2003 mutex_unlock(&con->mutex); 1975 mutex_unlock(&con->mutex);
2004
2005done_unlocked: 1976done_unlocked:
2006 clear_bit(BUSY, &con->state);
2007 dout("con->state=%lu\n", con->state);
2008 if (test_bit(QUEUED, &con->state)) {
2009 if (!backoff || test_bit(OPENING, &con->state)) {
2010 dout("con_work %p QUEUED reset, looping\n", con);
2011 goto more;
2012 }
2013 dout("con_work %p QUEUED reset, but just faulted\n", con);
2014 clear_bit(QUEUED, &con->state);
2015 }
2016 dout("con_work %p done\n", con);
2017
2018out:
2019 con->ops->put(con); 1977 con->ops->put(con);
2020} 1978}
2021 1979