summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2019-08-27 04:51:30 -0400
committerDavid Howells <dhowells@redhat.com>2019-08-27 04:51:30 -0400
commite2de6c4048989007b353164b19d6b7d5be4fa9e3 (patch)
tree09df8d01294fbeb0477e7eebe82ff213b46e7288
parentc3c9e3df49f8d83db09d1f61c8bed54e7fed8662 (diff)
rxrpc: Use info in skbuff instead of reparsing a jumbo packet
Use the information now cached in the skbuff private data to avoid the need to reparse a jumbo packet. We can find all the subpackets by dead reckoning, so it's only necessary to note how many there are, whether the last one is flagged as LAST_PACKET and whether any have the REQUEST_ACK flag set. This is necessary as once recvmsg() can see the packet, it can start modifying it, such as doing in-place decryption. Fixes: 248f219cb8bc ("rxrpc: Rewrite the data and ack handling code") Signed-off-by: David Howells <dhowells@redhat.com>
-rw-r--r--net/rxrpc/ar-internal.h3
-rw-r--r--net/rxrpc/input.c231
-rw-r--r--net/rxrpc/recvmsg.c41
3 files changed, 139 insertions, 136 deletions
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 87cff6c218b6..20d7907a5bc6 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -617,8 +617,7 @@ struct rxrpc_call {
617#define RXRPC_TX_ANNO_LAST 0x04 617#define RXRPC_TX_ANNO_LAST 0x04
618#define RXRPC_TX_ANNO_RESENT 0x08 618#define RXRPC_TX_ANNO_RESENT 0x08
619 619
620#define RXRPC_RX_ANNO_JUMBO 0x3f /* Jumbo subpacket number + 1 if not zero */ 620#define RXRPC_RX_ANNO_SUBPACKET 0x3f /* Subpacket number in jumbogram */
621#define RXRPC_RX_ANNO_JLAST 0x40 /* Set if last element of a jumbo packet */
622#define RXRPC_RX_ANNO_VERIFIED 0x80 /* Set if verified and decrypted */ 621#define RXRPC_RX_ANNO_VERIFIED 0x80 /* Set if verified and decrypted */
623 rxrpc_seq_t tx_hard_ack; /* Dead slot in buffer; the first transmitted but 622 rxrpc_seq_t tx_hard_ack; /* Dead slot in buffer; the first transmitted but
624 * not hard-ACK'd packet follows this. 623 * not hard-ACK'd packet follows this.
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index ffcec5117954..35b1a9368d80 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -405,10 +405,10 @@ protocol_error:
405 * (that information is encoded in the ACK packet). 405 * (that information is encoded in the ACK packet).
406 */ 406 */
407static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq, 407static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq,
408 u8 annotation, bool *_jumbo_bad) 408 bool is_jumbo, bool *_jumbo_bad)
409{ 409{
410 /* Discard normal packets that are duplicates. */ 410 /* Discard normal packets that are duplicates. */
411 if (annotation == 0) 411 if (is_jumbo)
412 return; 412 return;
413 413
414 /* Skip jumbo subpackets that are duplicates. When we've had three or 414 /* Skip jumbo subpackets that are duplicates. When we've had three or
@@ -428,19 +428,17 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
428{ 428{
429 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 429 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
430 enum rxrpc_call_state state; 430 enum rxrpc_call_state state;
431 unsigned int offset = sizeof(struct rxrpc_wire_header); 431 unsigned int j;
432 unsigned int ix;
433 rxrpc_serial_t serial = sp->hdr.serial, ack_serial = 0; 432 rxrpc_serial_t serial = sp->hdr.serial, ack_serial = 0;
434 rxrpc_seq_t seq = sp->hdr.seq, hard_ack; 433 rxrpc_seq_t seq0 = sp->hdr.seq, hard_ack;
435 bool immediate_ack = false, jumbo_bad = false, queued; 434 bool immediate_ack = false, jumbo_bad = false;
436 u16 len; 435 u8 ack = 0;
437 u8 ack = 0, flags, annotation = 0;
438 436
439 _enter("{%u,%u},{%u,%u}", 437 _enter("{%u,%u},{%u,%u}",
440 call->rx_hard_ack, call->rx_top, skb->len, seq); 438 call->rx_hard_ack, call->rx_top, skb->len, seq0);
441 439
442 _proto("Rx DATA %%%u { #%u f=%02x }", 440 _proto("Rx DATA %%%u { #%u f=%02x n=%u }",
443 sp->hdr.serial, seq, sp->hdr.flags); 441 sp->hdr.serial, seq0, sp->hdr.flags, sp->nr_subpackets);
444 442
445 state = READ_ONCE(call->state); 443 state = READ_ONCE(call->state);
446 if (state >= RXRPC_CALL_COMPLETE) 444 if (state >= RXRPC_CALL_COMPLETE)
@@ -469,137 +467,136 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
469 !rxrpc_receiving_reply(call)) 467 !rxrpc_receiving_reply(call))
470 goto unlock; 468 goto unlock;
471 469
472 call->ackr_prev_seq = seq; 470 call->ackr_prev_seq = seq0;
473
474 hard_ack = READ_ONCE(call->rx_hard_ack); 471 hard_ack = READ_ONCE(call->rx_hard_ack);
475 if (after(seq, hard_ack + call->rx_winsize)) {
476 ack = RXRPC_ACK_EXCEEDS_WINDOW;
477 ack_serial = serial;
478 goto ack;
479 }
480 472
481 flags = sp->hdr.flags; 473 if (sp->nr_subpackets > 1) {
482 if (flags & RXRPC_JUMBO_PACKET) {
483 if (call->nr_jumbo_bad > 3) { 474 if (call->nr_jumbo_bad > 3) {
484 ack = RXRPC_ACK_NOSPACE; 475 ack = RXRPC_ACK_NOSPACE;
485 ack_serial = serial; 476 ack_serial = serial;
486 goto ack; 477 goto ack;
487 } 478 }
488 annotation = 1;
489 } 479 }
490 480
491next_subpacket: 481 for (j = 0; j < sp->nr_subpackets; j++) {
492 queued = false; 482 rxrpc_serial_t serial = sp->hdr.serial + j;
493 ix = seq & RXRPC_RXTX_BUFF_MASK; 483 rxrpc_seq_t seq = seq0 + j;
494 len = skb->len; 484 unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
495 if (flags & RXRPC_JUMBO_PACKET) 485 bool terminal = (j == sp->nr_subpackets - 1);
496 len = RXRPC_JUMBO_DATALEN; 486 bool last = terminal && (sp->rx_flags & RXRPC_SKB_INCL_LAST);
497 487 u8 flags, annotation = j;
498 if (flags & RXRPC_LAST_PACKET) { 488
499 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && 489 _proto("Rx DATA+%u %%%u { #%x t=%u l=%u }",
500 seq != call->rx_top) { 490 j, serial, seq, terminal, last);
501 rxrpc_proto_abort("LSN", call, seq); 491
502 goto unlock; 492 if (last) {
503 } 493 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
504 } else { 494 seq != call->rx_top) {
505 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && 495 rxrpc_proto_abort("LSN", call, seq);
506 after_eq(seq, call->rx_top)) { 496 goto unlock;
507 rxrpc_proto_abort("LSA", call, seq); 497 }
508 goto unlock; 498 } else {
499 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
500 after_eq(seq, call->rx_top)) {
501 rxrpc_proto_abort("LSA", call, seq);
502 goto unlock;
503 }
509 } 504 }
510 }
511
512 trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
513 if (before_eq(seq, hard_ack)) {
514 ack = RXRPC_ACK_DUPLICATE;
515 ack_serial = serial;
516 goto skip;
517 }
518 505
519 if (flags & RXRPC_REQUEST_ACK && !ack) { 506 flags = 0;
520 ack = RXRPC_ACK_REQUESTED; 507 if (last)
521 ack_serial = serial; 508 flags |= RXRPC_LAST_PACKET;
522 } 509 if (!terminal)
510 flags |= RXRPC_JUMBO_PACKET;
511 if (test_bit(j, sp->rx_req_ack))
512 flags |= RXRPC_REQUEST_ACK;
513 trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
523 514
524 if (call->rxtx_buffer[ix]) { 515 if (before_eq(seq, hard_ack)) {
525 rxrpc_input_dup_data(call, seq, annotation, &jumbo_bad);
526 if (ack != RXRPC_ACK_DUPLICATE) {
527 ack = RXRPC_ACK_DUPLICATE; 516 ack = RXRPC_ACK_DUPLICATE;
528 ack_serial = serial; 517 ack_serial = serial;
518 continue;
529 } 519 }
530 immediate_ack = true;
531 goto skip;
532 }
533
534 /* Queue the packet. We use a couple of memory barriers here as need
535 * to make sure that rx_top is perceived to be set after the buffer
536 * pointer and that the buffer pointer is set after the annotation and
537 * the skb data.
538 *
539 * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
540 * and also rxrpc_fill_out_ack().
541 */
542 rxrpc_get_skb(skb, rxrpc_skb_rx_got);
543 call->rxtx_annotations[ix] = annotation;
544 smp_wmb();
545 call->rxtx_buffer[ix] = skb;
546 if (after(seq, call->rx_top)) {
547 smp_store_release(&call->rx_top, seq);
548 } else if (before(seq, call->rx_top)) {
549 /* Send an immediate ACK if we fill in a hole */
550 if (!ack) {
551 ack = RXRPC_ACK_DELAY;
552 ack_serial = serial;
553 }
554 immediate_ack = true;
555 }
556 if (flags & RXRPC_LAST_PACKET) {
557 set_bit(RXRPC_CALL_RX_LAST, &call->flags);
558 trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
559 } else {
560 trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
561 }
562 queued = true;
563 520
564 if (after_eq(seq, call->rx_expect_next)) { 521 if (call->rxtx_buffer[ix]) {
565 if (after(seq, call->rx_expect_next)) { 522 rxrpc_input_dup_data(call, seq, sp->nr_subpackets > 1,
566 _net("OOS %u > %u", seq, call->rx_expect_next); 523 &jumbo_bad);
567 ack = RXRPC_ACK_OUT_OF_SEQUENCE; 524 if (ack != RXRPC_ACK_DUPLICATE) {
568 ack_serial = serial; 525 ack = RXRPC_ACK_DUPLICATE;
526 ack_serial = serial;
527 }
528 immediate_ack = true;
529 continue;
569 } 530 }
570 call->rx_expect_next = seq + 1;
571 }
572 531
573skip:
574 offset += len;
575 if (flags & RXRPC_JUMBO_PACKET) {
576 if (skb_copy_bits(skb, offset, &flags, 1) < 0) {
577 rxrpc_proto_abort("XJF", call, seq);
578 goto unlock;
579 }
580 offset += sizeof(struct rxrpc_jumbo_header);
581 seq++;
582 serial++;
583 annotation++;
584 if (flags & RXRPC_JUMBO_PACKET)
585 annotation |= RXRPC_RX_ANNO_JLAST;
586 if (after(seq, hard_ack + call->rx_winsize)) { 532 if (after(seq, hard_ack + call->rx_winsize)) {
587 ack = RXRPC_ACK_EXCEEDS_WINDOW; 533 ack = RXRPC_ACK_EXCEEDS_WINDOW;
588 ack_serial = serial; 534 ack_serial = serial;
589 if (!jumbo_bad) { 535 if (flags & RXRPC_JUMBO_PACKET) {
590 call->nr_jumbo_bad++; 536 if (!jumbo_bad) {
591 jumbo_bad = true; 537 call->nr_jumbo_bad++;
538 jumbo_bad = true;
539 }
592 } 540 }
541
593 goto ack; 542 goto ack;
594 } 543 }
595 544
596 _proto("Rx DATA Jumbo %%%u", serial); 545 if (flags & RXRPC_REQUEST_ACK && !ack) {
597 goto next_subpacket; 546 ack = RXRPC_ACK_REQUESTED;
598 } 547 ack_serial = serial;
548 }
549
550 /* Queue the packet. We use a couple of memory barriers here as need
551 * to make sure that rx_top is perceived to be set after the buffer
552 * pointer and that the buffer pointer is set after the annotation and
553 * the skb data.
554 *
555 * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
556 * and also rxrpc_fill_out_ack().
557 */
558 rxrpc_get_skb(skb, rxrpc_skb_rx_got);
559 call->rxtx_annotations[ix] = annotation;
560 smp_wmb();
561 call->rxtx_buffer[ix] = skb;
562 if (after(seq, call->rx_top)) {
563 smp_store_release(&call->rx_top, seq);
564 } else if (before(seq, call->rx_top)) {
565 /* Send an immediate ACK if we fill in a hole */
566 if (!ack) {
567 ack = RXRPC_ACK_DELAY;
568 ack_serial = serial;
569 }
570 immediate_ack = true;
571 }
572
573 if (terminal) {
574 /* From this point on, we're not allowed to touch the
575 * packet any longer as its ref now belongs to the Rx
576 * ring.
577 */
578 skb = NULL;
579 }
599 580
600 if (queued && flags & RXRPC_LAST_PACKET && !ack) { 581 if (last) {
601 ack = RXRPC_ACK_DELAY; 582 set_bit(RXRPC_CALL_RX_LAST, &call->flags);
602 ack_serial = serial; 583 if (!ack) {
584 ack = RXRPC_ACK_DELAY;
585 ack_serial = serial;
586 }
587 trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
588 } else {
589 trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
590 }
591
592 if (after_eq(seq, call->rx_expect_next)) {
593 if (after(seq, call->rx_expect_next)) {
594 _net("OOS %u > %u", seq, call->rx_expect_next);
595 ack = RXRPC_ACK_OUT_OF_SEQUENCE;
596 ack_serial = serial;
597 }
598 call->rx_expect_next = seq + 1;
599 }
603 } 600 }
604 601
605ack: 602ack:
@@ -612,7 +609,7 @@ ack:
612 false, true, 609 false, true,
613 rxrpc_propose_ack_input_data); 610 rxrpc_propose_ack_input_data);
614 611
615 if (sp->hdr.seq == READ_ONCE(call->rx_hard_ack) + 1) { 612 if (seq0 == READ_ONCE(call->rx_hard_ack) + 1) {
616 trace_rxrpc_notify_socket(call->debug_id, serial); 613 trace_rxrpc_notify_socket(call->debug_id, serial);
617 rxrpc_notify_socket(call); 614 rxrpc_notify_socket(call);
618 } 615 }
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 9a7e1bc9791d..e49eacfaf4d6 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -177,7 +177,8 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
177 struct sk_buff *skb; 177 struct sk_buff *skb;
178 rxrpc_serial_t serial; 178 rxrpc_serial_t serial;
179 rxrpc_seq_t hard_ack, top; 179 rxrpc_seq_t hard_ack, top;
180 u8 flags; 180 bool last = false;
181 u8 subpacket;
181 int ix; 182 int ix;
182 183
183 _enter("%d", call->debug_id); 184 _enter("%d", call->debug_id);
@@ -191,10 +192,13 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
191 skb = call->rxtx_buffer[ix]; 192 skb = call->rxtx_buffer[ix];
192 rxrpc_see_skb(skb, rxrpc_skb_rx_rotated); 193 rxrpc_see_skb(skb, rxrpc_skb_rx_rotated);
193 sp = rxrpc_skb(skb); 194 sp = rxrpc_skb(skb);
194 flags = sp->hdr.flags; 195
195 serial = sp->hdr.serial; 196 subpacket = call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
196 if (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) 197 serial = sp->hdr.serial + subpacket;
197 serial += (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) - 1; 198
199 if (subpacket == sp->nr_subpackets - 1 &&
200 sp->rx_flags & RXRPC_SKB_INCL_LAST)
201 last = true;
198 202
199 call->rxtx_buffer[ix] = NULL; 203 call->rxtx_buffer[ix] = NULL;
200 call->rxtx_annotations[ix] = 0; 204 call->rxtx_annotations[ix] = 0;
@@ -203,9 +207,8 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
203 207
204 rxrpc_free_skb(skb, rxrpc_skb_rx_freed); 208 rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
205 209
206 _debug("%u,%u,%02x", hard_ack, top, flags);
207 trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack); 210 trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
208 if (flags & RXRPC_LAST_PACKET) { 211 if (last) {
209 rxrpc_end_rx_phase(call, serial); 212 rxrpc_end_rx_phase(call, serial);
210 } else { 213 } else {
211 /* Check to see if there's an ACK that needs sending. */ 214 /* Check to see if there's an ACK that needs sending. */
@@ -233,18 +236,19 @@ static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
233 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 236 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
234 rxrpc_seq_t seq = sp->hdr.seq; 237 rxrpc_seq_t seq = sp->hdr.seq;
235 u16 cksum = sp->hdr.cksum; 238 u16 cksum = sp->hdr.cksum;
239 u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
236 240
237 _enter(""); 241 _enter("");
238 242
239 /* For all but the head jumbo subpacket, the security checksum is in a 243 /* For all but the head jumbo subpacket, the security checksum is in a
240 * jumbo header immediately prior to the data. 244 * jumbo header immediately prior to the data.
241 */ 245 */
242 if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) { 246 if (subpacket > 0) {
243 __be16 tmp; 247 __be16 tmp;
244 if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0) 248 if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
245 BUG(); 249 BUG();
246 cksum = ntohs(tmp); 250 cksum = ntohs(tmp);
247 seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1; 251 seq += subpacket;
248 } 252 }
249 253
250 return call->conn->security->verify_packet(call, skb, offset, len, 254 return call->conn->security->verify_packet(call, skb, offset, len,
@@ -265,19 +269,18 @@ static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
265 u8 *_annotation, 269 u8 *_annotation,
266 unsigned int *_offset, unsigned int *_len) 270 unsigned int *_offset, unsigned int *_len)
267{ 271{
272 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
268 unsigned int offset = sizeof(struct rxrpc_wire_header); 273 unsigned int offset = sizeof(struct rxrpc_wire_header);
269 unsigned int len; 274 unsigned int len;
270 int ret; 275 int ret;
271 u8 annotation = *_annotation; 276 u8 annotation = *_annotation;
277 u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
272 278
273 /* Locate the subpacket */ 279 /* Locate the subpacket */
280 offset += subpacket * RXRPC_JUMBO_SUBPKTLEN;
274 len = skb->len - offset; 281 len = skb->len - offset;
275 if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) { 282 if (subpacket < sp->nr_subpackets - 1)
276 offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) * 283 len = RXRPC_JUMBO_DATALEN;
277 RXRPC_JUMBO_SUBPKTLEN);
278 len = (annotation & RXRPC_RX_ANNO_JLAST) ?
279 skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
280 }
281 284
282 if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) { 285 if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
283 ret = rxrpc_verify_packet(call, skb, annotation, offset, len); 286 ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
@@ -303,6 +306,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
303{ 306{
304 struct rxrpc_skb_priv *sp; 307 struct rxrpc_skb_priv *sp;
305 struct sk_buff *skb; 308 struct sk_buff *skb;
309 rxrpc_serial_t serial;
306 rxrpc_seq_t hard_ack, top, seq; 310 rxrpc_seq_t hard_ack, top, seq;
307 size_t remain; 311 size_t remain;
308 bool last; 312 bool last;
@@ -339,9 +343,12 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
339 rxrpc_see_skb(skb, rxrpc_skb_rx_seen); 343 rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
340 sp = rxrpc_skb(skb); 344 sp = rxrpc_skb(skb);
341 345
342 if (!(flags & MSG_PEEK)) 346 if (!(flags & MSG_PEEK)) {
347 serial = sp->hdr.serial;
348 serial += call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
343 trace_rxrpc_receive(call, rxrpc_receive_front, 349 trace_rxrpc_receive(call, rxrpc_receive_front,
344 sp->hdr.serial, seq); 350 serial, seq);
351 }
345 352
346 if (msg) 353 if (msg)
347 sock_recv_timestamp(msg, sock->sk, skb); 354 sock_recv_timestamp(msg, sock->sk, skb);