aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndy Grover <andy.grover@oracle.com>2010-02-03 22:36:44 -0500
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:12:01 -0400
commit5b2366bd2835919e2e6a836e837eab4a9274bd46 (patch)
treeee0be5166dfec4acc006a23ddc9ea5788dbb90ea
parent6c7cc6e4694dc464ae884332f2a322973497e3cf (diff)
RDS: Rewrite rds_send_xmit
Simplify rds_send_xmit(). Send a congestion map (via xmit_cong_map) without decrementing send_quota. Move resetting of conn xmit variables to end of loop. Update comments. Implement a special case to turn off sending an rds header when there is an atomic op and no other data. Signed-off-by: Andy Grover <andy.grover@oracle.com>
-rw-r--r--net/rds/rds.h5
-rw-r--r--net/rds/send.c131
2 files changed, 73 insertions, 63 deletions
diff --git a/net/rds/rds.h b/net/rds/rds.h
index d70284989124..13ed30ac424d 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -96,8 +96,9 @@ struct rds_connection {
96 unsigned long c_xmit_sg; 96 unsigned long c_xmit_sg;
97 unsigned int c_xmit_hdr_off; 97 unsigned int c_xmit_hdr_off;
98 unsigned int c_xmit_data_off; 98 unsigned int c_xmit_data_off;
99 unsigned int c_xmit_rdma_sent;
100 unsigned int c_xmit_atomic_sent; 99 unsigned int c_xmit_atomic_sent;
100 unsigned int c_xmit_rdma_sent;
101 unsigned int c_xmit_data_sent;
101 102
102 spinlock_t c_lock; /* protect msg queues */ 103 spinlock_t c_lock; /* protect msg queues */
103 u64 c_next_tx_seq; 104 u64 c_next_tx_seq;
@@ -120,8 +121,6 @@ struct rds_connection {
120 121
121 struct list_head c_map_item; 122 struct list_head c_map_item;
122 unsigned long c_map_queued; 123 unsigned long c_map_queued;
123 unsigned long c_map_offset;
124 unsigned long c_map_bytes;
125 124
126 unsigned int c_unacked_packets; 125 unsigned int c_unacked_packets;
127 unsigned int c_unacked_bytes; 126 unsigned int c_unacked_bytes;
diff --git a/net/rds/send.c b/net/rds/send.c
index d60d31309032..66dc6b045261 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -72,8 +72,9 @@ void rds_send_reset(struct rds_connection *conn)
72 conn->c_xmit_sg = 0; 72 conn->c_xmit_sg = 0;
73 conn->c_xmit_hdr_off = 0; 73 conn->c_xmit_hdr_off = 0;
74 conn->c_xmit_data_off = 0; 74 conn->c_xmit_data_off = 0;
75 conn->c_xmit_rdma_sent = 0;
76 conn->c_xmit_atomic_sent = 0; 75 conn->c_xmit_atomic_sent = 0;
76 conn->c_xmit_rdma_sent = 0;
77 conn->c_xmit_data_sent = 0;
77 78
78 conn->c_map_queued = 0; 79 conn->c_map_queued = 0;
79 80
@@ -137,69 +138,54 @@ int rds_send_xmit(struct rds_connection *conn)
137 138
138 /* 139 /*
139 * spin trying to push headers and data down the connection until 140 * spin trying to push headers and data down the connection until
140 * the connection doens't make forward progress. 141 * the connection doesn't make forward progress.
141 */ 142 */
142 while (--send_quota) { 143 while (--send_quota) {
143 /*
144 * See if need to send a congestion map update if we're
145 * between sending messages. The send_sem protects our sole
146 * use of c_map_offset and _bytes.
147 * Note this is used only by transports that define a special
148 * xmit_cong_map function. For all others, we create allocate
149 * a cong_map message and treat it just like any other send.
150 */
151 if (conn->c_map_bytes) {
152 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
153 conn->c_map_offset);
154 if (ret <= 0)
155 break;
156 144
157 conn->c_map_offset += ret;
158 conn->c_map_bytes -= ret;
159 if (conn->c_map_bytes)
160 continue;
161 }
162
163 /* If we're done sending the current message, clear the
164 * offset and S/G temporaries.
165 */
166 rm = conn->c_xmit_rm; 145 rm = conn->c_xmit_rm;
167 if (rm &&
168 conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
169 conn->c_xmit_sg == rm->data.op_nents) {
170 conn->c_xmit_rm = NULL;
171 conn->c_xmit_sg = 0;
172 conn->c_xmit_hdr_off = 0;
173 conn->c_xmit_data_off = 0;
174 conn->c_xmit_rdma_sent = 0;
175 conn->c_xmit_atomic_sent = 0;
176
177 /* Release the reference to the previous message. */
178 rds_message_put(rm);
179 rm = NULL;
180 }
181 146
182 /* If we're asked to send a cong map update, do so. 147 /*
148 * If between sending messages, we can send a pending congestion
149 * map update.
150 *
151 * Transports either define a special xmit_cong_map function,
152 * or we allocate a cong_map message and treat it just like any
153 * other send.
183 */ 154 */
184 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { 155 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
185 if (conn->c_trans->xmit_cong_map) { 156 if (conn->c_trans->xmit_cong_map) {
186 conn->c_map_offset = 0; 157 unsigned long map_offset = 0;
187 conn->c_map_bytes = sizeof(struct rds_header) + 158 unsigned long map_bytes = sizeof(struct rds_header) +
188 RDS_CONG_MAP_BYTES; 159 RDS_CONG_MAP_BYTES;
189 continue;
190 }
191 160
192 rm = rds_cong_update_alloc(conn); 161 while (map_bytes) {
193 if (IS_ERR(rm)) { 162 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
194 ret = PTR_ERR(rm); 163 map_offset);
195 break; 164 if (ret <= 0) {
196 } 165 /* too far down the rabbithole! */
166 mutex_unlock(&conn->c_send_lock);
167 rds_conn_error(conn, "Cong map xmit failed\n");
168 goto out;
169 }
170
171 map_offset += ret;
172 map_bytes -= ret;
173 }
174 } else {
175 /* send cong update like a normal rm */
176 rm = rds_cong_update_alloc(conn);
177 if (IS_ERR(rm)) {
178 ret = PTR_ERR(rm);
179 break;
180 }
181 rm->data.op_active = 1;
197 182
198 conn->c_xmit_rm = rm; 183 conn->c_xmit_rm = rm;
184 }
199 } 185 }
200 186
201 /* 187 /*
202 * Grab the next message from the send queue, if there is one. 188 * If not already working on one, grab the next message.
203 * 189 *
204 * c_xmit_rm holds a ref while we're sending this message down 190 * c_xmit_rm holds a ref while we're sending this message down
205 * the connction. We can use this ref while holding the 191 * the connction. We can use this ref while holding the
@@ -264,7 +250,6 @@ int rds_send_xmit(struct rds_connection *conn)
264 conn->c_xmit_rm = rm; 250 conn->c_xmit_rm = rm;
265 } 251 }
266 252
267
268 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { 253 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
269 ret = conn->c_trans->xmit_atomic(conn, rm); 254 ret = conn->c_trans->xmit_atomic(conn, rm);
270 if (ret) 255 if (ret)
@@ -273,13 +258,20 @@ int rds_send_xmit(struct rds_connection *conn)
273 /* The transport owns the mapped memory for now. 258 /* The transport owns the mapped memory for now.
274 * You can't unmap it while it's on the send queue */ 259 * You can't unmap it while it's on the send queue */
275 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 260 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
261
262 /*
263 * This is evil, muahaha.
264 * We permit 0-byte sends. (rds-ping depends on this.)
265 * BUT if there is an atomic op and no sent data,
266 * we turn off sending the header, to achieve
267 * "silent" atomics.
268 * But see below; RDMA op might toggle this back on!
269 */
270 if (rm->data.op_nents == 0)
271 rm->data.op_active = 0;
276 } 272 }
277 273
278 /* 274 /* The transport either sends the whole rdma or none of it */
279 * Try and send an rdma message. Let's see if we can
280 * keep this simple and require that the transport either
281 * send the whole rdma or none of it.
282 */
283 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { 275 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
284 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); 276 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
285 if (ret) 277 if (ret)
@@ -294,9 +286,7 @@ int rds_send_xmit(struct rds_connection *conn)
294 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 286 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
295 } 287 }
296 288
297 if (rm->data.op_active 289 if (rm->data.op_active && !conn->c_xmit_data_sent) {
298 && (conn->c_xmit_hdr_off < sizeof(struct rds_header) ||
299 conn->c_xmit_sg < rm->data.op_nents)) {
300 ret = conn->c_trans->xmit(conn, rm, 290 ret = conn->c_trans->xmit(conn, rm,
301 conn->c_xmit_hdr_off, 291 conn->c_xmit_hdr_off,
302 conn->c_xmit_sg, 292 conn->c_xmit_sg,
@@ -326,6 +316,27 @@ int rds_send_xmit(struct rds_connection *conn)
326 conn->c_xmit_sg == rm->data.op_nents); 316 conn->c_xmit_sg == rm->data.op_nents);
327 } 317 }
328 } 318 }
319
320 if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
321 (conn->c_xmit_sg == rm->data.op_nents))
322 conn->c_xmit_data_sent = 1;
323 }
324
325 /*
326 * A rm will only take multiple times through this loop
327 * if there is a data op. Thus, if the data is sent (or there was
328 * none), then we're done with the rm.
329 */
330 if (!rm->data.op_active || conn->c_xmit_data_sent) {
331 conn->c_xmit_rm = NULL;
332 conn->c_xmit_sg = 0;
333 conn->c_xmit_hdr_off = 0;
334 conn->c_xmit_data_off = 0;
335 conn->c_xmit_rdma_sent = 0;
336 conn->c_xmit_atomic_sent = 0;
337 conn->c_xmit_data_sent = 0;
338
339 rds_message_put(rm);
329 } 340 }
330 } 341 }
331 342
@@ -350,7 +361,7 @@ int rds_send_xmit(struct rds_connection *conn)
350 */ 361 */
351 mutex_unlock(&conn->c_send_lock); 362 mutex_unlock(&conn->c_send_lock);
352 363
353 if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { 364 if (send_quota == 0 && !was_empty) {
354 /* We exhausted the send quota, but there's work left to 365 /* We exhausted the send quota, but there's work left to
355 * do. Return and (re-)schedule the send worker. 366 * do. Return and (re-)schedule the send worker.
356 */ 367 */