diff options
author | Andy Grover <andy.grover@oracle.com> | 2010-02-03 22:36:44 -0500 |
---|---|---|
committer | Andy Grover <andy.grover@oracle.com> | 2010-09-08 21:12:01 -0400 |
commit | 5b2366bd2835919e2e6a836e837eab4a9274bd46 (patch) | |
tree | ee0be5166dfec4acc006a23ddc9ea5788dbb90ea /net/rds/send.c | |
parent | 6c7cc6e4694dc464ae884332f2a322973497e3cf (diff) |
RDS: Rewrite rds_send_xmit
Simplify rds_send_xmit().
Send a congestion map (via xmit_cong_map) without
decrementing send_quota.
Move resetting of conn xmit variables to end of loop.
Update comments.
Implement a special case to turn off sending an rds header
when there is an atomic op and no other data.
Signed-off-by: Andy Grover <andy.grover@oracle.com>
Diffstat (limited to 'net/rds/send.c')
-rw-r--r-- | net/rds/send.c | 131 |
1 files changed, 71 insertions, 60 deletions
diff --git a/net/rds/send.c b/net/rds/send.c index d60d31309032..66dc6b045261 100644 --- a/net/rds/send.c +++ b/net/rds/send.c | |||
@@ -72,8 +72,9 @@ void rds_send_reset(struct rds_connection *conn) | |||
72 | conn->c_xmit_sg = 0; | 72 | conn->c_xmit_sg = 0; |
73 | conn->c_xmit_hdr_off = 0; | 73 | conn->c_xmit_hdr_off = 0; |
74 | conn->c_xmit_data_off = 0; | 74 | conn->c_xmit_data_off = 0; |
75 | conn->c_xmit_rdma_sent = 0; | ||
76 | conn->c_xmit_atomic_sent = 0; | 75 | conn->c_xmit_atomic_sent = 0; |
76 | conn->c_xmit_rdma_sent = 0; | ||
77 | conn->c_xmit_data_sent = 0; | ||
77 | 78 | ||
78 | conn->c_map_queued = 0; | 79 | conn->c_map_queued = 0; |
79 | 80 | ||
@@ -137,69 +138,54 @@ int rds_send_xmit(struct rds_connection *conn) | |||
137 | 138 | ||
138 | /* | 139 | /* |
139 | * spin trying to push headers and data down the connection until | 140 | * spin trying to push headers and data down the connection until |
140 | * the connection doens't make forward progress. | 141 | * the connection doesn't make forward progress. |
141 | */ | 142 | */ |
142 | while (--send_quota) { | 143 | while (--send_quota) { |
143 | /* | ||
144 | * See if need to send a congestion map update if we're | ||
145 | * between sending messages. The send_sem protects our sole | ||
146 | * use of c_map_offset and _bytes. | ||
147 | * Note this is used only by transports that define a special | ||
148 | * xmit_cong_map function. For all others, we create allocate | ||
149 | * a cong_map message and treat it just like any other send. | ||
150 | */ | ||
151 | if (conn->c_map_bytes) { | ||
152 | ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, | ||
153 | conn->c_map_offset); | ||
154 | if (ret <= 0) | ||
155 | break; | ||
156 | 144 | ||
157 | conn->c_map_offset += ret; | ||
158 | conn->c_map_bytes -= ret; | ||
159 | if (conn->c_map_bytes) | ||
160 | continue; | ||
161 | } | ||
162 | |||
163 | /* If we're done sending the current message, clear the | ||
164 | * offset and S/G temporaries. | ||
165 | */ | ||
166 | rm = conn->c_xmit_rm; | 145 | rm = conn->c_xmit_rm; |
167 | if (rm && | ||
168 | conn->c_xmit_hdr_off == sizeof(struct rds_header) && | ||
169 | conn->c_xmit_sg == rm->data.op_nents) { | ||
170 | conn->c_xmit_rm = NULL; | ||
171 | conn->c_xmit_sg = 0; | ||
172 | conn->c_xmit_hdr_off = 0; | ||
173 | conn->c_xmit_data_off = 0; | ||
174 | conn->c_xmit_rdma_sent = 0; | ||
175 | conn->c_xmit_atomic_sent = 0; | ||
176 | |||
177 | /* Release the reference to the previous message. */ | ||
178 | rds_message_put(rm); | ||
179 | rm = NULL; | ||
180 | } | ||
181 | 146 | ||
182 | /* If we're asked to send a cong map update, do so. | 147 | /* |
148 | * If between sending messages, we can send a pending congestion | ||
149 | * map update. | ||
150 | * | ||
151 | * Transports either define a special xmit_cong_map function, | ||
152 | * or we allocate a cong_map message and treat it just like any | ||
153 | * other send. | ||
183 | */ | 154 | */ |
184 | if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { | 155 | if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { |
185 | if (conn->c_trans->xmit_cong_map) { | 156 | if (conn->c_trans->xmit_cong_map) { |
186 | conn->c_map_offset = 0; | 157 | unsigned long map_offset = 0; |
187 | conn->c_map_bytes = sizeof(struct rds_header) + | 158 | unsigned long map_bytes = sizeof(struct rds_header) + |
188 | RDS_CONG_MAP_BYTES; | 159 | RDS_CONG_MAP_BYTES; |
189 | continue; | ||
190 | } | ||
191 | 160 | ||
192 | rm = rds_cong_update_alloc(conn); | 161 | while (map_bytes) { |
193 | if (IS_ERR(rm)) { | 162 | ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, |
194 | ret = PTR_ERR(rm); | 163 | map_offset); |
195 | break; | 164 | if (ret <= 0) { |
196 | } | 165 | /* too far down the rabbithole! */ |
166 | mutex_unlock(&conn->c_send_lock); | ||
167 | rds_conn_error(conn, "Cong map xmit failed\n"); | ||
168 | goto out; | ||
169 | } | ||
170 | |||
171 | map_offset += ret; | ||
172 | map_bytes -= ret; | ||
173 | } | ||
174 | } else { | ||
175 | /* send cong update like a normal rm */ | ||
176 | rm = rds_cong_update_alloc(conn); | ||
177 | if (IS_ERR(rm)) { | ||
178 | ret = PTR_ERR(rm); | ||
179 | break; | ||
180 | } | ||
181 | rm->data.op_active = 1; | ||
197 | 182 | ||
198 | conn->c_xmit_rm = rm; | 183 | conn->c_xmit_rm = rm; |
184 | } | ||
199 | } | 185 | } |
200 | 186 | ||
201 | /* | 187 | /* |
202 | * Grab the next message from the send queue, if there is one. | 188 | * If not already working on one, grab the next message. |
203 | * | 189 | * |
204 | * c_xmit_rm holds a ref while we're sending this message down | 190 | * c_xmit_rm holds a ref while we're sending this message down |
205 | * the connction. We can use this ref while holding the | 191 | * the connction. We can use this ref while holding the |
@@ -264,7 +250,6 @@ int rds_send_xmit(struct rds_connection *conn) | |||
264 | conn->c_xmit_rm = rm; | 250 | conn->c_xmit_rm = rm; |
265 | } | 251 | } |
266 | 252 | ||
267 | |||
268 | if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { | 253 | if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { |
269 | ret = conn->c_trans->xmit_atomic(conn, rm); | 254 | ret = conn->c_trans->xmit_atomic(conn, rm); |
270 | if (ret) | 255 | if (ret) |
@@ -273,13 +258,20 @@ int rds_send_xmit(struct rds_connection *conn) | |||
273 | /* The transport owns the mapped memory for now. | 258 | /* The transport owns the mapped memory for now. |
274 | * You can't unmap it while it's on the send queue */ | 259 | * You can't unmap it while it's on the send queue */ |
275 | set_bit(RDS_MSG_MAPPED, &rm->m_flags); | 260 | set_bit(RDS_MSG_MAPPED, &rm->m_flags); |
261 | |||
262 | /* | ||
263 | * This is evil, muahaha. | ||
264 | * We permit 0-byte sends. (rds-ping depends on this.) | ||
265 | * BUT if there is an atomic op and no sent data, | ||
266 | * we turn off sending the header, to achieve | ||
267 | * "silent" atomics. | ||
268 | * But see below; RDMA op might toggle this back on! | ||
269 | */ | ||
270 | if (rm->data.op_nents == 0) | ||
271 | rm->data.op_active = 0; | ||
276 | } | 272 | } |
277 | 273 | ||
278 | /* | 274 | /* The transport either sends the whole rdma or none of it */ |
279 | * Try and send an rdma message. Let's see if we can | ||
280 | * keep this simple and require that the transport either | ||
281 | * send the whole rdma or none of it. | ||
282 | */ | ||
283 | if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { | 275 | if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { |
284 | ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); | 276 | ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); |
285 | if (ret) | 277 | if (ret) |
@@ -294,9 +286,7 @@ int rds_send_xmit(struct rds_connection *conn) | |||
294 | set_bit(RDS_MSG_MAPPED, &rm->m_flags); | 286 | set_bit(RDS_MSG_MAPPED, &rm->m_flags); |
295 | } | 287 | } |
296 | 288 | ||
297 | if (rm->data.op_active | 289 | if (rm->data.op_active && !conn->c_xmit_data_sent) { |
298 | && (conn->c_xmit_hdr_off < sizeof(struct rds_header) || | ||
299 | conn->c_xmit_sg < rm->data.op_nents)) { | ||
300 | ret = conn->c_trans->xmit(conn, rm, | 290 | ret = conn->c_trans->xmit(conn, rm, |
301 | conn->c_xmit_hdr_off, | 291 | conn->c_xmit_hdr_off, |
302 | conn->c_xmit_sg, | 292 | conn->c_xmit_sg, |
@@ -326,6 +316,27 @@ int rds_send_xmit(struct rds_connection *conn) | |||
326 | conn->c_xmit_sg == rm->data.op_nents); | 316 | conn->c_xmit_sg == rm->data.op_nents); |
327 | } | 317 | } |
328 | } | 318 | } |
319 | |||
320 | if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && | ||
321 | (conn->c_xmit_sg == rm->data.op_nents)) | ||
322 | conn->c_xmit_data_sent = 1; | ||
323 | } | ||
324 | |||
325 | /* | ||
326 | * A rm will only take multiple times through this loop | ||
327 | * if there is a data op. Thus, if the data is sent (or there was | ||
328 | * none), then we're done with the rm. | ||
329 | */ | ||
330 | if (!rm->data.op_active || conn->c_xmit_data_sent) { | ||
331 | conn->c_xmit_rm = NULL; | ||
332 | conn->c_xmit_sg = 0; | ||
333 | conn->c_xmit_hdr_off = 0; | ||
334 | conn->c_xmit_data_off = 0; | ||
335 | conn->c_xmit_rdma_sent = 0; | ||
336 | conn->c_xmit_atomic_sent = 0; | ||
337 | conn->c_xmit_data_sent = 0; | ||
338 | |||
339 | rds_message_put(rm); | ||
329 | } | 340 | } |
330 | } | 341 | } |
331 | 342 | ||
@@ -350,7 +361,7 @@ int rds_send_xmit(struct rds_connection *conn) | |||
350 | */ | 361 | */ |
351 | mutex_unlock(&conn->c_send_lock); | 362 | mutex_unlock(&conn->c_send_lock); |
352 | 363 | ||
353 | if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { | 364 | if (send_quota == 0 && !was_empty) { |
354 | /* We exhausted the send quota, but there's work left to | 365 | /* We exhausted the send quota, but there's work left to |
355 | * do. Return and (re-)schedule the send worker. | 366 | * do. Return and (re-)schedule the send worker. |
356 | */ | 367 | */ |