aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndy Grover <andy.grover@oracle.com>2010-03-23 20:39:07 -0400
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:12:12 -0400
commit049ee3f500954176a87f22e6ee3e98aecb1b8958 (patch)
tree5dfd8cf3e6d9a7a15e80f6ddee7f4ce7c4aa7a8c
parentf17a1a55fb672d7f64be7f2e940ef5669e5efa0a (diff)
RDS: Change send lock from a mutex to a spinlock
This change allows us to call rds_send_xmit() from a tasklet, which is crucial to our new operating model. * Change c_send_lock to a spinlock * Update stats fields "sem_" to "_lock" * Remove unneeded rds_conn_is_sending() About locking between shutdown and send -- send checks if the connection is up. Shutdown puts the connection into DISCONNECTING. After this, all threads entering send will exit immediately. However, a thread could be *in* send_xmit(), so shutdown acquires the c_send_lock to ensure everyone is out before proceeding with connection shutdown. Signed-off-by: Andy Grover <andy.grover@oracle.com>
-rw-r--r--net/rds/connection.c22
-rw-r--r--net/rds/rds.h6
-rw-r--r--net/rds/send.c15
-rw-r--r--net/rds/stats.c4
4 files changed, 18 insertions, 29 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 88bcaf3f3e16..56aebe444ad3 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -62,18 +62,6 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
62 var |= RDS_INFO_CONNECTION_FLAG_##suffix; \ 62 var |= RDS_INFO_CONNECTION_FLAG_##suffix; \
63} while (0) 63} while (0)
64 64
65static inline int rds_conn_is_sending(struct rds_connection *conn)
66{
67 int ret = 0;
68
69 if (!mutex_trylock(&conn->c_send_lock))
70 ret = 1;
71 else
72 mutex_unlock(&conn->c_send_lock);
73
74 return ret;
75}
76
77static struct rds_connection *rds_conn_lookup(struct hlist_head *head, 65static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
78 __be32 laddr, __be32 faddr, 66 __be32 laddr, __be32 faddr,
79 struct rds_transport *trans) 67 struct rds_transport *trans)
@@ -158,7 +146,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
158 spin_lock_init(&conn->c_lock); 146 spin_lock_init(&conn->c_lock);
159 conn->c_next_tx_seq = 1; 147 conn->c_next_tx_seq = 1;
160 148
161 mutex_init(&conn->c_send_lock); 149 spin_lock_init(&conn->c_send_lock);
162 INIT_LIST_HEAD(&conn->c_send_queue); 150 INIT_LIST_HEAD(&conn->c_send_queue);
163 INIT_LIST_HEAD(&conn->c_retrans); 151 INIT_LIST_HEAD(&conn->c_retrans);
164 152
@@ -283,10 +271,12 @@ void rds_conn_shutdown(struct rds_connection *conn)
283 } 271 }
284 mutex_unlock(&conn->c_cm_lock); 272 mutex_unlock(&conn->c_cm_lock);
285 273
286 mutex_lock(&conn->c_send_lock); 274 /* verify everybody's out of rds_send_xmit() */
275 spin_lock_irq(&conn->c_send_lock);
276 spin_unlock_irq(&conn->c_send_lock);
277
287 conn->c_trans->conn_shutdown(conn); 278 conn->c_trans->conn_shutdown(conn);
288 rds_conn_reset(conn); 279 rds_conn_reset(conn);
289 mutex_unlock(&conn->c_send_lock);
290 280
291 if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) { 281 if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
292 /* This can happen - eg when we're in the middle of tearing 282 /* This can happen - eg when we're in the middle of tearing
@@ -476,7 +466,7 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
476 cinfo->flags = 0; 466 cinfo->flags = 0;
477 467
478 rds_conn_info_set(cinfo->flags, 468 rds_conn_info_set(cinfo->flags,
479 rds_conn_is_sending(conn), SENDING); 469 spin_is_locked(&conn->c_send_lock), SENDING);
480 /* XXX Future: return the state rather than these funky bits */ 470 /* XXX Future: return the state rather than these funky bits */
481 rds_conn_info_set(cinfo->flags, 471 rds_conn_info_set(cinfo->flags,
482 atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, 472 atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
diff --git a/net/rds/rds.h b/net/rds/rds.h
index e81d7e478474..c3a668b9cc14 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -91,7 +91,7 @@ struct rds_connection {
91 struct rds_cong_map *c_lcong; 91 struct rds_cong_map *c_lcong;
92 struct rds_cong_map *c_fcong; 92 struct rds_cong_map *c_fcong;
93 93
94 struct mutex c_send_lock; /* protect send ring */ 94 spinlock_t c_send_lock; /* protect send ring */
95 struct rds_message *c_xmit_rm; 95 struct rds_message *c_xmit_rm;
96 unsigned long c_xmit_sg; 96 unsigned long c_xmit_sg;
97 unsigned int c_xmit_hdr_off; 97 unsigned int c_xmit_hdr_off;
@@ -548,8 +548,8 @@ struct rds_statistics {
548 uint64_t s_recv_ping; 548 uint64_t s_recv_ping;
549 uint64_t s_send_queue_empty; 549 uint64_t s_send_queue_empty;
550 uint64_t s_send_queue_full; 550 uint64_t s_send_queue_full;
551 uint64_t s_send_sem_contention; 551 uint64_t s_send_lock_contention;
552 uint64_t s_send_sem_queue_raced; 552 uint64_t s_send_lock_queue_raced;
553 uint64_t s_send_immediate_retry; 553 uint64_t s_send_immediate_retry;
554 uint64_t s_send_delayed_retry; 554 uint64_t s_send_delayed_retry;
555 uint64_t s_send_drop_acked; 555 uint64_t s_send_drop_acked;
diff --git a/net/rds/send.c b/net/rds/send.c
index 8a0647af5d95..d4feec6ad09c 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -116,19 +116,18 @@ int rds_send_xmit(struct rds_connection *conn)
116 int was_empty = 0; 116 int was_empty = 0;
117 LIST_HEAD(to_be_dropped); 117 LIST_HEAD(to_be_dropped);
118 118
119 if (!rds_conn_up(conn))
120 goto out;
121
119 /* 122 /*
120 * sendmsg calls here after having queued its message on the send 123 * sendmsg calls here after having queued its message on the send
121 * queue. We only have one task feeding the connection at a time. If 124 * queue. We only have one task feeding the connection at a time. If
122 * another thread is already feeding the queue then we back off. This 125 * another thread is already feeding the queue then we back off. This
123 * avoids blocking the caller and trading per-connection data between 126 * avoids blocking the caller and trading per-connection data between
124 * caches per message. 127 * caches per message.
125 *
126 * The sem holder will issue a retry if they notice that someone queued
127 * a message after they stopped walking the send queue but before they
128 * dropped the sem.
129 */ 128 */
130 if (!mutex_trylock(&conn->c_send_lock)) { 129 if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
131 rds_stats_inc(s_send_sem_contention); 130 rds_stats_inc(s_send_lock_contention);
132 ret = -ENOMEM; 131 ret = -ENOMEM;
133 goto out; 132 goto out;
134 } 133 }
@@ -346,7 +345,7 @@ int rds_send_xmit(struct rds_connection *conn)
346 * stop processing the loop when the transport hasn't taken 345 * stop processing the loop when the transport hasn't taken
347 * responsibility for forward progress. 346 * responsibility for forward progress.
348 */ 347 */
349 mutex_unlock(&conn->c_send_lock); 348 spin_unlock_irqrestore(&conn->c_send_lock, flags);
350 349
351 if (send_quota == 0 && !was_empty) { 350 if (send_quota == 0 && !was_empty) {
352 /* We exhausted the send quota, but there's work left to 351 /* We exhausted the send quota, but there's work left to
@@ -360,7 +359,7 @@ int rds_send_xmit(struct rds_connection *conn)
360 * spin lock */ 359 * spin lock */
361 spin_lock_irqsave(&conn->c_lock, flags); 360 spin_lock_irqsave(&conn->c_lock, flags);
362 if (!list_empty(&conn->c_send_queue)) { 361 if (!list_empty(&conn->c_send_queue)) {
363 rds_stats_inc(s_send_sem_queue_raced); 362 rds_stats_inc(s_send_lock_queue_raced);
364 ret = -EAGAIN; 363 ret = -EAGAIN;
365 } 364 }
366 spin_unlock_irqrestore(&conn->c_lock, flags); 365 spin_unlock_irqrestore(&conn->c_lock, flags);
diff --git a/net/rds/stats.c b/net/rds/stats.c
index c66d95d9c262..b77be8be33ba 100644
--- a/net/rds/stats.c
+++ b/net/rds/stats.c
@@ -57,8 +57,8 @@ static const char *const rds_stat_names[] = {
57 "recv_ping", 57 "recv_ping",
58 "send_queue_empty", 58 "send_queue_empty",
59 "send_queue_full", 59 "send_queue_full",
60 "send_sem_contention", 60 "send_lock_contention",
61 "send_sem_queue_raced", 61 "send_lock_queue_raced",
62 "send_immediate_retry", 62 "send_immediate_retry",
63 "send_delayed_retry", 63 "send_delayed_retry",
64 "send_drop_acked", 64 "send_drop_acked",