aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
authorAlex Elder <elder@dreamhost.com>2012-02-14 15:05:33 -0500
committerAlex Elder <elder@dreamhost.com>2012-03-22 11:47:50 -0400
commit859eb7994876f26fd9f52d9589fbcab8e2fe8069 (patch)
tree128067d342a4937228ac5c307c153b9b95ec254e /net/ceph/messenger.c
parent963be4d7709f84d865f76d12d5b0ec7edad1c498 (diff)
libceph: encapsulate connection kvec operations
Encapsulate the operation of adding a new chunk of data to the next open slot in a ceph_connection's out_kvec array. Also add a "reset" operation to make subsequent add operations start at the beginning of the array again. Use these routines throughout, avoiding duplicate code and ensuring all calls are handled consistently. Signed-off-by: Alex Elder <elder@dreamhost.com> Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c117
1 files changed, 56 insertions, 61 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 0665945b6468..04d2b975ab0c 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -469,14 +469,35 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
469 return ret; 469 return ret;
470} 470}
471 471
472static void ceph_con_out_kvec_reset(struct ceph_connection *con)
473{
474 con->out_kvec_left = 0;
475 con->out_kvec_bytes = 0;
476 con->out_kvec_cur = &con->out_kvec[0];
477}
478
479static void ceph_con_out_kvec_add(struct ceph_connection *con,
480 size_t size, void *data)
481{
482 int index;
483
484 index = con->out_kvec_left;
485 BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
486
487 con->out_kvec[index].iov_len = size;
488 con->out_kvec[index].iov_base = data;
489 con->out_kvec_left++;
490 con->out_kvec_bytes += size;
491}
472 492
473/* 493/*
474 * Prepare footer for currently outgoing message, and finish things 494 * Prepare footer for currently outgoing message, and finish things
475 * off. Assumes out_kvec* are already valid.. we just add on to the end. 495 * off. Assumes out_kvec* are already valid.. we just add on to the end.
476 */ 496 */
477static void prepare_write_message_footer(struct ceph_connection *con, int v) 497static void prepare_write_message_footer(struct ceph_connection *con)
478{ 498{
479 struct ceph_msg *m = con->out_msg; 499 struct ceph_msg *m = con->out_msg;
500 int v = con->out_kvec_left;
480 501
481 dout("prepare_write_message_footer %p\n", con); 502 dout("prepare_write_message_footer %p\n", con);
482 con->out_kvec_is_msg = true; 503 con->out_kvec_is_msg = true;
@@ -494,9 +515,8 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v)
494static void prepare_write_message(struct ceph_connection *con) 515static void prepare_write_message(struct ceph_connection *con)
495{ 516{
496 struct ceph_msg *m; 517 struct ceph_msg *m;
497 int v = 0;
498 518
499 con->out_kvec_bytes = 0; 519 ceph_con_out_kvec_reset(con);
500 con->out_kvec_is_msg = true; 520 con->out_kvec_is_msg = true;
501 con->out_msg_done = false; 521 con->out_msg_done = false;
502 522
@@ -504,16 +524,13 @@ static void prepare_write_message(struct ceph_connection *con)
504 * TCP packet that's a good thing. */ 524 * TCP packet that's a good thing. */
505 if (con->in_seq > con->in_seq_acked) { 525 if (con->in_seq > con->in_seq_acked) {
506 con->in_seq_acked = con->in_seq; 526 con->in_seq_acked = con->in_seq;
507 con->out_kvec[v].iov_base = &tag_ack; 527 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
508 con->out_kvec[v++].iov_len = 1;
509 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 528 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
510 con->out_kvec[v].iov_base = &con->out_temp_ack; 529 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
511 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack); 530 &con->out_temp_ack);
512 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
513 } 531 }
514 532
515 m = list_first_entry(&con->out_queue, 533 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
516 struct ceph_msg, list_head);
517 con->out_msg = m; 534 con->out_msg = m;
518 535
519 /* put message on sent list */ 536 /* put message on sent list */
@@ -537,17 +554,13 @@ static void prepare_write_message(struct ceph_connection *con)
537 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 554 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
538 555
539 /* tag + hdr + front + middle */ 556 /* tag + hdr + front + middle */
540 con->out_kvec[v].iov_base = &tag_msg; 557 ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
541 con->out_kvec[v++].iov_len = 1; 558 ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
542 con->out_kvec[v].iov_base = &m->hdr; 559 ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
543 con->out_kvec[v++].iov_len = sizeof(m->hdr); 560
544 con->out_kvec[v++] = m->front;
545 if (m->middle) 561 if (m->middle)
546 con->out_kvec[v++] = m->middle->vec; 562 ceph_con_out_kvec_add(con, m->middle->vec.iov_len,
547 con->out_kvec_left = v; 563 m->middle->vec.iov_base);
548 con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
549 (m->middle ? m->middle->vec.iov_len : 0);
550 con->out_kvec_cur = con->out_kvec;
551 564
552 /* fill in crc (except data pages), footer */ 565 /* fill in crc (except data pages), footer */
553 con->out_msg->hdr.crc = 566 con->out_msg->hdr.crc =
@@ -580,7 +593,7 @@ static void prepare_write_message(struct ceph_connection *con)
580 con->out_more = 1; /* data + footer will follow */ 593 con->out_more = 1; /* data + footer will follow */
581 } else { 594 } else {
582 /* no, queue up footer too and be done */ 595 /* no, queue up footer too and be done */
583 prepare_write_message_footer(con, v); 596 prepare_write_message_footer(con);
584 } 597 }
585 598
586 set_bit(WRITE_PENDING, &con->state); 599 set_bit(WRITE_PENDING, &con->state);
@@ -595,14 +608,14 @@ static void prepare_write_ack(struct ceph_connection *con)
595 con->in_seq_acked, con->in_seq); 608 con->in_seq_acked, con->in_seq);
596 con->in_seq_acked = con->in_seq; 609 con->in_seq_acked = con->in_seq;
597 610
598 con->out_kvec[0].iov_base = &tag_ack; 611 ceph_con_out_kvec_reset(con);
599 con->out_kvec[0].iov_len = 1; 612
613 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
614
600 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 615 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
601 con->out_kvec[1].iov_base = &con->out_temp_ack; 616 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
602 con->out_kvec[1].iov_len = sizeof(con->out_temp_ack); 617 &con->out_temp_ack);
603 con->out_kvec_left = 2; 618
604 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
605 con->out_kvec_cur = con->out_kvec;
606 con->out_more = 1; /* more will follow.. eventually.. */ 619 con->out_more = 1; /* more will follow.. eventually.. */
607 set_bit(WRITE_PENDING, &con->state); 620 set_bit(WRITE_PENDING, &con->state);
608} 621}
@@ -613,11 +626,8 @@ static void prepare_write_ack(struct ceph_connection *con)
613static void prepare_write_keepalive(struct ceph_connection *con) 626static void prepare_write_keepalive(struct ceph_connection *con)
614{ 627{
615 dout("prepare_write_keepalive %p\n", con); 628 dout("prepare_write_keepalive %p\n", con);
616 con->out_kvec[0].iov_base = &tag_keepalive; 629 ceph_con_out_kvec_reset(con);
617 con->out_kvec[0].iov_len = 1; 630 ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
618 con->out_kvec_left = 1;
619 con->out_kvec_bytes = 1;
620 con->out_kvec_cur = con->out_kvec;
621 set_bit(WRITE_PENDING, &con->state); 631 set_bit(WRITE_PENDING, &con->state);
622} 632}
623 633
@@ -646,12 +656,9 @@ static int prepare_connect_authorizer(struct ceph_connection *con)
646 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 656 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
647 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 657 con->out_connect.authorizer_len = cpu_to_le32(auth_len);
648 658
649 if (auth_len) { 659 if (auth_len)
650 con->out_kvec[con->out_kvec_left].iov_base = auth_buf; 660 ceph_con_out_kvec_add(con, auth_len, auth_buf);
651 con->out_kvec[con->out_kvec_left].iov_len = auth_len; 661
652 con->out_kvec_left++;
653 con->out_kvec_bytes += auth_len;
654 }
655 return 0; 662 return 0;
656} 663}
657 664
@@ -661,15 +668,11 @@ static int prepare_connect_authorizer(struct ceph_connection *con)
661static void prepare_write_banner(struct ceph_messenger *msgr, 668static void prepare_write_banner(struct ceph_messenger *msgr,
662 struct ceph_connection *con) 669 struct ceph_connection *con)
663{ 670{
664 int len = strlen(CEPH_BANNER); 671 ceph_con_out_kvec_reset(con);
672 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
673 ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr),
674 &msgr->my_enc_addr);
665 675
666 con->out_kvec[0].iov_base = CEPH_BANNER;
667 con->out_kvec[0].iov_len = len;
668 con->out_kvec[1].iov_base = &msgr->my_enc_addr;
669 con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
670 con->out_kvec_left = 2;
671 con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
672 con->out_kvec_cur = con->out_kvec;
673 con->out_more = 0; 676 con->out_more = 0;
674 set_bit(WRITE_PENDING, &con->state); 677 set_bit(WRITE_PENDING, &con->state);
675} 678}
@@ -707,22 +710,16 @@ static int prepare_write_connect(struct ceph_messenger *msgr,
707 710
708 if (include_banner) 711 if (include_banner)
709 prepare_write_banner(msgr, con); 712 prepare_write_banner(msgr, con);
710 else { 713 else
711 con->out_kvec_left = 0; 714 ceph_con_out_kvec_reset(con);
712 con->out_kvec_bytes = 0; 715 ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect);
713 } 716
714 con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
715 con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
716 con->out_kvec_left++;
717 con->out_kvec_bytes += sizeof(con->out_connect);
718 con->out_kvec_cur = con->out_kvec;
719 con->out_more = 0; 717 con->out_more = 0;
720 set_bit(WRITE_PENDING, &con->state); 718 set_bit(WRITE_PENDING, &con->state);
721 719
722 return prepare_connect_authorizer(con); 720 return prepare_connect_authorizer(con);
723} 721}
724 722
725
726/* 723/*
727 * write as much of pending kvecs to the socket as we can. 724 * write as much of pending kvecs to the socket as we can.
728 * 1 -> done 725 * 1 -> done
@@ -919,10 +916,8 @@ static int write_partial_msg_pages(struct ceph_connection *con)
919 /* prepare and queue up footer, too */ 916 /* prepare and queue up footer, too */
920 if (!crc) 917 if (!crc)
921 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 918 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
922 con->out_kvec_bytes = 0; 919 ceph_con_out_kvec_reset(con);
923 con->out_kvec_left = 0; 920 prepare_write_message_footer(con);
924 con->out_kvec_cur = con->out_kvec;
925 prepare_write_message_footer(con, 0);
926 ret = 1; 921 ret = 1;
927out: 922out:
928 return ret; 923 return ret;