aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2016-09-17 05:49:11 -0400
committerDavid Howells <dhowells@redhat.com>2016-09-17 05:51:54 -0400
commit816c9fce12f3745abc959c0fca8ace1c2c51421c (patch)
tree98af1e193f2abe3a28347fbbbc4cc12a45cdefb7 /net
parent2e2ea51dec2ab6a81950d4b436eb66ebf45dd507 (diff)
rxrpc: Fix handling of the last packet in rxrpc_recvmsg_data()
The code for determining the last packet in rxrpc_recvmsg_data() has been using the RXRPC_CALL_RX_LAST flag to determine if the rx_top pointer points to the last packet or not. This isn't a good idea, however, as the input code may be running simultaneously on another CPU and that sets the flag *before* updating the top pointer. Fix this by the following means: (1) Restrict the use of RXRPC_CALL_RX_LAST to the input routines only. There's otherwise a synchronisation problem between detecting the flag and checking tx_top. This could probably be dealt with by appropriate application of memory barriers, but there's a simpler way. (2) Set RXRPC_CALL_RX_LAST after setting rx_top. (3) Make rxrpc_rotate_rx_window() consult the flags header field of the DATA packet it's about to discard to see if that was the last packet. Use this as the basis for ending the Rx phase. This shouldn't be a problem because the recvmsg side of things is guaranteed to see the packets in order. (4) Make rxrpc_recvmsg_data() return 1 to indicate the end of the data if: (a) the packet it has just processed is marked as RXRPC_LAST_PACKET (b) the call's Rx phase has been ended. Signed-off-by: David Howells <dhowells@redhat.com>
Diffstat (limited to 'net')
-rw-r--r--net/rxrpc/input.c4
-rw-r--r--net/rxrpc/recvmsg.c49
2 files changed, 36 insertions, 17 deletions
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 75af0bd316c7..f0d9115b9b7e 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -238,7 +238,7 @@ next_subpacket:
238 len = RXRPC_JUMBO_DATALEN; 238 len = RXRPC_JUMBO_DATALEN;
239 239
240 if (flags & RXRPC_LAST_PACKET) { 240 if (flags & RXRPC_LAST_PACKET) {
241 if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) && 241 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
242 seq != call->rx_top) 242 seq != call->rx_top)
243 return rxrpc_proto_abort("LSN", call, seq); 243 return rxrpc_proto_abort("LSN", call, seq);
244 } else { 244 } else {
@@ -282,6 +282,8 @@ next_subpacket:
282 call->rxtx_buffer[ix] = skb; 282 call->rxtx_buffer[ix] = skb;
283 if (after(seq, call->rx_top)) 283 if (after(seq, call->rx_top))
284 smp_store_release(&call->rx_top, seq); 284 smp_store_release(&call->rx_top, seq);
285 if (flags & RXRPC_LAST_PACKET)
286 set_bit(RXRPC_CALL_RX_LAST, &call->flags);
285 queued = true; 287 queued = true;
286 288
287 if (after_eq(seq, call->rx_expect_next)) { 289 if (after_eq(seq, call->rx_expect_next)) {
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 1edf2cf62cc5..8b8d7e14f800 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -134,6 +134,8 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call)
134{ 134{
135 _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]); 135 _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
136 136
137 ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
138
137 if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) { 139 if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
138 rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, 0, true, false); 140 rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, 0, true, false);
139 rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK); 141 rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK);
@@ -163,8 +165,10 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call)
163 */ 165 */
164static void rxrpc_rotate_rx_window(struct rxrpc_call *call) 166static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
165{ 167{
168 struct rxrpc_skb_priv *sp;
166 struct sk_buff *skb; 169 struct sk_buff *skb;
167 rxrpc_seq_t hard_ack, top; 170 rxrpc_seq_t hard_ack, top;
171 u8 flags;
168 int ix; 172 int ix;
169 173
170 _enter("%d", call->debug_id); 174 _enter("%d", call->debug_id);
@@ -177,6 +181,8 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
177 ix = hard_ack & RXRPC_RXTX_BUFF_MASK; 181 ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
178 skb = call->rxtx_buffer[ix]; 182 skb = call->rxtx_buffer[ix];
179 rxrpc_see_skb(skb); 183 rxrpc_see_skb(skb);
184 sp = rxrpc_skb(skb);
185 flags = sp->hdr.flags;
180 call->rxtx_buffer[ix] = NULL; 186 call->rxtx_buffer[ix] = NULL;
181 call->rxtx_annotations[ix] = 0; 187 call->rxtx_annotations[ix] = 0;
182 /* Barrier against rxrpc_input_data(). */ 188 /* Barrier against rxrpc_input_data(). */
@@ -184,8 +190,8 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
184 190
185 rxrpc_free_skb(skb); 191 rxrpc_free_skb(skb);
186 192
187 _debug("%u,%u,%lx", hard_ack, top, call->flags); 193 _debug("%u,%u,%02x", hard_ack, top, flags);
188 if (hard_ack == top && test_bit(RXRPC_CALL_RX_LAST, &call->flags)) 194 if (flags & RXRPC_LAST_PACKET)
189 rxrpc_end_rx_phase(call); 195 rxrpc_end_rx_phase(call);
190} 196}
191 197
@@ -278,13 +284,19 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
278 size_t remain; 284 size_t remain;
279 bool last; 285 bool last;
280 unsigned int rx_pkt_offset, rx_pkt_len; 286 unsigned int rx_pkt_offset, rx_pkt_len;
281 int ix, copy, ret = 0; 287 int ix, copy, ret = -EAGAIN, ret2;
282 288
283 _enter(""); 289 _enter("");
284 290
285 rx_pkt_offset = call->rx_pkt_offset; 291 rx_pkt_offset = call->rx_pkt_offset;
286 rx_pkt_len = call->rx_pkt_len; 292 rx_pkt_len = call->rx_pkt_len;
287 293
294 if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
295 seq = call->rx_hard_ack;
296 ret = 1;
297 goto done;
298 }
299
288 /* Barriers against rxrpc_input_data(). */ 300 /* Barriers against rxrpc_input_data(). */
289 hard_ack = call->rx_hard_ack; 301 hard_ack = call->rx_hard_ack;
290 top = smp_load_acquire(&call->rx_top); 302 top = smp_load_acquire(&call->rx_top);
@@ -301,11 +313,13 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
301 sock_recv_timestamp(msg, sock->sk, skb); 313 sock_recv_timestamp(msg, sock->sk, skb);
302 314
303 if (rx_pkt_offset == 0) { 315 if (rx_pkt_offset == 0) {
304 ret = rxrpc_locate_data(call, skb, 316 ret2 = rxrpc_locate_data(call, skb,
305 &call->rxtx_annotations[ix], 317 &call->rxtx_annotations[ix],
306 &rx_pkt_offset, &rx_pkt_len); 318 &rx_pkt_offset, &rx_pkt_len);
307 if (ret < 0) 319 if (ret2 < 0) {
320 ret = ret2;
308 goto out; 321 goto out;
322 }
309 } 323 }
310 _debug("recvmsg %x DATA #%u { %d, %d }", 324 _debug("recvmsg %x DATA #%u { %d, %d }",
311 sp->hdr.callNumber, seq, rx_pkt_offset, rx_pkt_len); 325 sp->hdr.callNumber, seq, rx_pkt_offset, rx_pkt_len);
@@ -316,10 +330,12 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
316 if (copy > remain) 330 if (copy > remain)
317 copy = remain; 331 copy = remain;
318 if (copy > 0) { 332 if (copy > 0) {
319 ret = skb_copy_datagram_iter(skb, rx_pkt_offset, iter, 333 ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
320 copy); 334 copy);
321 if (ret < 0) 335 if (ret2 < 0) {
336 ret = ret2;
322 goto out; 337 goto out;
338 }
323 339
324 /* handle piecemeal consumption of data packets */ 340 /* handle piecemeal consumption of data packets */
325 _debug("copied %d @%zu", copy, *_offset); 341 _debug("copied %d @%zu", copy, *_offset);
@@ -332,6 +348,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
332 if (rx_pkt_len > 0) { 348 if (rx_pkt_len > 0) {
333 _debug("buffer full"); 349 _debug("buffer full");
334 ASSERTCMP(*_offset, ==, len); 350 ASSERTCMP(*_offset, ==, len);
351 ret = 0;
335 break; 352 break;
336 } 353 }
337 354
@@ -342,19 +359,19 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
342 rx_pkt_offset = 0; 359 rx_pkt_offset = 0;
343 rx_pkt_len = 0; 360 rx_pkt_len = 0;
344 361
345 ASSERTIFCMP(last, seq, ==, top); 362 if (last) {
346 } 363 ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
347
348 if (after(seq, top)) {
349 ret = -EAGAIN;
350 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags))
351 ret = 1; 364 ret = 1;
365 goto out;
366 }
352 } 367 }
368
353out: 369out:
354 if (!(flags & MSG_PEEK)) { 370 if (!(flags & MSG_PEEK)) {
355 call->rx_pkt_offset = rx_pkt_offset; 371 call->rx_pkt_offset = rx_pkt_offset;
356 call->rx_pkt_len = rx_pkt_len; 372 call->rx_pkt_len = rx_pkt_len;
357 } 373 }
374done:
358 _leave(" = %d [%u/%u]", ret, seq, top); 375 _leave(" = %d [%u/%u]", ret, seq, top);
359 return ret; 376 return ret;
360} 377}