aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/messenger.c
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-11-10 17:34:36 -0500
committerSage Weil <sage@newdream.net>2009-11-10 17:34:48 -0500
commiteed0ef2caf928327332da54d23579debe629d5bc (patch)
tree396319ef2e65cc775b1466f446edc535401c5cea /fs/ceph/messenger.c
parent685f9a5d14194fc35db73e5e7370740ccc14b64a (diff)
ceph: separate banner and connect during handshake into distinct stages
We need to make sure we only swab the address during the banner once. So break process_banner out of process_connect, and clean up the surrounding code so that these are distinct phases of the handshake. Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r--fs/ceph/messenger.c117
1 files changed, 75 insertions, 42 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 5cc374850d8f..e389656b2a6b 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -564,10 +564,26 @@ static void prepare_write_keepalive(struct ceph_connection *con)
564/* 564/*
565 * We connected to a peer and are saying hello. 565 * We connected to a peer and are saying hello.
566 */ 566 */
567static void prepare_write_connect(struct ceph_messenger *msgr, 567static void prepare_write_banner(struct ceph_messenger *msgr,
568 struct ceph_connection *con) 568 struct ceph_connection *con)
569{ 569{
570 int len = strlen(CEPH_BANNER); 570 int len = strlen(CEPH_BANNER);
571
572 con->out_kvec[0].iov_base = CEPH_BANNER;
573 con->out_kvec[0].iov_len = len;
574 con->out_kvec[1].iov_base = &msgr->my_enc_addr;
575 con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
576 con->out_kvec_left = 2;
577 con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
578 con->out_kvec_cur = con->out_kvec;
579 con->out_more = 0;
580 set_bit(WRITE_PENDING, &con->state);
581}
582
583static void prepare_write_connect(struct ceph_messenger *msgr,
584 struct ceph_connection *con,
585 int after_banner)
586{
571 unsigned global_seq = get_global_seq(con->msgr, 0); 587 unsigned global_seq = get_global_seq(con->msgr, 0);
572 int proto; 588 int proto;
573 589
@@ -595,32 +611,14 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
595 if (test_bit(LOSSYTX, &con->state)) 611 if (test_bit(LOSSYTX, &con->state))
596 con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY; 612 con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY;
597 613
598 con->out_kvec[0].iov_base = CEPH_BANNER; 614 if (!after_banner) {
599 con->out_kvec[0].iov_len = len; 615 con->out_kvec_left = 0;
600 con->out_kvec[1].iov_base = &msgr->my_enc_addr; 616 con->out_kvec_bytes = 0;
601 con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr); 617 }
602 con->out_kvec[2].iov_base = &con->out_connect; 618 con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
603 con->out_kvec[2].iov_len = sizeof(con->out_connect); 619 con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
604 con->out_kvec_left = 3; 620 con->out_kvec_left++;
605 con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr) + 621 con->out_kvec_bytes += sizeof(con->out_connect);
606 sizeof(con->out_connect);
607 con->out_kvec_cur = con->out_kvec;
608 con->out_more = 0;
609 set_bit(WRITE_PENDING, &con->state);
610}
611
612static void prepare_write_connect_retry(struct ceph_messenger *msgr,
613 struct ceph_connection *con)
614{
615 dout("prepare_write_connect_retry %p\n", con);
616 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
617 con->out_connect.global_seq =
618 cpu_to_le32(get_global_seq(con->msgr, 0));
619
620 con->out_kvec[0].iov_base = &con->out_connect;
621 con->out_kvec[0].iov_len = sizeof(con->out_connect);
622 con->out_kvec_left = 1;
623 con->out_kvec_bytes = sizeof(con->out_connect);
624 con->out_kvec_cur = con->out_kvec; 622 con->out_kvec_cur = con->out_kvec;
625 con->out_more = 0; 623 con->out_more = 0;
626 set_bit(WRITE_PENDING, &con->state); 624 set_bit(WRITE_PENDING, &con->state);
@@ -778,6 +776,12 @@ out:
778/* 776/*
779 * Prepare to read connection handshake, or an ack. 777 * Prepare to read connection handshake, or an ack.
780 */ 778 */
779static void prepare_read_banner(struct ceph_connection *con)
780{
781 dout("prepare_read_banner %p\n", con);
782 con->in_base_pos = 0;
783}
784
781static void prepare_read_connect(struct ceph_connection *con) 785static void prepare_read_connect(struct ceph_connection *con)
782{ 786{
783 dout("prepare_read_connect %p\n", con); 787 dout("prepare_read_connect %p\n", con);
@@ -829,11 +833,11 @@ static int read_partial(struct ceph_connection *con,
829/* 833/*
830 * Read all or part of the connect-side handshake on a new connection 834 * Read all or part of the connect-side handshake on a new connection
831 */ 835 */
832static int read_partial_connect(struct ceph_connection *con) 836static int read_partial_banner(struct ceph_connection *con)
833{ 837{
834 int ret, to = 0; 838 int ret, to = 0;
835 839
836 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 840 dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
837 841
838 /* peer's banner */ 842 /* peer's banner */
839 ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); 843 ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
@@ -847,6 +851,16 @@ static int read_partial_connect(struct ceph_connection *con)
847 &con->peer_addr_for_me); 851 &con->peer_addr_for_me);
848 if (ret <= 0) 852 if (ret <= 0)
849 goto out; 853 goto out;
854out:
855 return ret;
856}
857
858static int read_partial_connect(struct ceph_connection *con)
859{
860 int ret, to = 0;
861
862 dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
863
850 ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); 864 ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
851 if (ret <= 0) 865 if (ret <= 0)
852 goto out; 866 goto out;
@@ -856,6 +870,7 @@ static int read_partial_connect(struct ceph_connection *con)
856 le32_to_cpu(con->in_reply.global_seq)); 870 le32_to_cpu(con->in_reply.global_seq));
857out: 871out:
858 return ret; 872 return ret;
873
859} 874}
860 875
861/* 876/*
@@ -976,9 +991,9 @@ bad:
976 return -EINVAL; 991 return -EINVAL;
977} 992}
978 993
979static int process_connect(struct ceph_connection *con) 994static int process_banner(struct ceph_connection *con)
980{ 995{
981 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 996 dout("process_banner on %p\n", con);
982 997
983 if (verify_hello(con) < 0) 998 if (verify_hello(con) < 0)
984 return -1; 999 return -1;
@@ -1016,10 +1031,19 @@ static int process_connect(struct ceph_connection *con)
1016 sizeof(con->peer_addr_for_me.in_addr)); 1031 sizeof(con->peer_addr_for_me.in_addr));
1017 addr_set_port(&con->msgr->inst.addr.in_addr, port); 1032 addr_set_port(&con->msgr->inst.addr.in_addr, port);
1018 encode_my_addr(con->msgr); 1033 encode_my_addr(con->msgr);
1019 dout("process_connect learned my addr is %s\n", 1034 dout("process_banner learned my addr is %s\n",
1020 pr_addr(&con->msgr->inst.addr.in_addr)); 1035 pr_addr(&con->msgr->inst.addr.in_addr));
1021 } 1036 }
1022 1037
1038 set_bit(NEGOTIATING, &con->state);
1039 prepare_read_connect(con);
1040 return 0;
1041}
1042
1043static int process_connect(struct ceph_connection *con)
1044{
1045 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
1046
1023 switch (con->in_reply.tag) { 1047 switch (con->in_reply.tag) {
1024 case CEPH_MSGR_TAG_BADPROTOVER: 1048 case CEPH_MSGR_TAG_BADPROTOVER:
1025 dout("process_connect got BADPROTOVER my %d != their %d\n", 1049 dout("process_connect got BADPROTOVER my %d != their %d\n",
@@ -1053,7 +1077,7 @@ static int process_connect(struct ceph_connection *con)
1053 ENTITY_NAME(con->peer_name), 1077 ENTITY_NAME(con->peer_name),
1054 pr_addr(&con->peer_addr.in_addr)); 1078 pr_addr(&con->peer_addr.in_addr));
1055 reset_connection(con); 1079 reset_connection(con);
1056 prepare_write_connect_retry(con->msgr, con); 1080 prepare_write_connect(con->msgr, con, 0);
1057 prepare_read_connect(con); 1081 prepare_read_connect(con);
1058 1082
1059 /* Tell ceph about it. */ 1083 /* Tell ceph about it. */
@@ -1071,7 +1095,7 @@ static int process_connect(struct ceph_connection *con)
1071 le32_to_cpu(con->out_connect.connect_seq), 1095 le32_to_cpu(con->out_connect.connect_seq),
1072 le32_to_cpu(con->in_connect.connect_seq)); 1096 le32_to_cpu(con->in_connect.connect_seq));
1073 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1097 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
1074 prepare_write_connect_retry(con->msgr, con); 1098 prepare_write_connect(con->msgr, con, 0);
1075 prepare_read_connect(con); 1099 prepare_read_connect(con);
1076 break; 1100 break;
1077 1101
@@ -1080,19 +1104,17 @@ static int process_connect(struct ceph_connection *con)
1080 * If we sent a smaller global_seq than the peer has, try 1104 * If we sent a smaller global_seq than the peer has, try
1081 * again with a larger value. 1105 * again with a larger value.
1082 */ 1106 */
1083 dout("process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n", 1107 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
1084 con->peer_global_seq, 1108 con->peer_global_seq,
1085 le32_to_cpu(con->in_connect.global_seq)); 1109 le32_to_cpu(con->in_connect.global_seq));
1086 get_global_seq(con->msgr, 1110 get_global_seq(con->msgr,
1087 le32_to_cpu(con->in_connect.global_seq)); 1111 le32_to_cpu(con->in_connect.global_seq));
1088 prepare_write_connect_retry(con->msgr, con); 1112 prepare_write_connect(con->msgr, con, 0);
1089 prepare_read_connect(con); 1113 prepare_read_connect(con);
1090 break; 1114 break;
1091 1115
1092 case CEPH_MSGR_TAG_READY: 1116 case CEPH_MSGR_TAG_READY:
1093 clear_bit(CONNECTING, &con->state); 1117 clear_bit(CONNECTING, &con->state);
1094 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
1095 set_bit(LOSSYRX, &con->state);
1096 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1118 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1097 con->connect_seq++; 1119 con->connect_seq++;
1098 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1120 dout("process_connect got READY gseq %d cseq %d (%d)\n",
@@ -1420,9 +1442,11 @@ more:
1420 if (test_and_clear_bit(STANDBY, &con->state)) 1442 if (test_and_clear_bit(STANDBY, &con->state))
1421 con->connect_seq++; 1443 con->connect_seq++;
1422 1444
1423 prepare_write_connect(msgr, con); 1445 prepare_write_banner(msgr, con);
1424 prepare_read_connect(con); 1446 prepare_write_connect(msgr, con, 1);
1447 prepare_read_banner(con);
1425 set_bit(CONNECTING, &con->state); 1448 set_bit(CONNECTING, &con->state);
1449 clear_bit(NEGOTIATING, &con->state);
1426 1450
1427 con->in_tag = CEPH_MSGR_TAG_READY; 1451 con->in_tag = CEPH_MSGR_TAG_READY;
1428 dout("try_write initiating connect on %p new state %lu\n", 1452 dout("try_write initiating connect on %p new state %lu\n",
@@ -1521,7 +1545,16 @@ more:
1521 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1545 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1522 con->in_base_pos); 1546 con->in_base_pos);
1523 if (test_bit(CONNECTING, &con->state)) { 1547 if (test_bit(CONNECTING, &con->state)) {
1524 dout("try_read connecting\n"); 1548 if (!test_bit(NEGOTIATING, &con->state)) {
1549 dout("try_read connecting\n");
1550 ret = read_partial_banner(con);
1551 if (ret <= 0)
1552 goto done;
1553 if (process_banner(con) < 0) {
1554 ret = -1;
1555 goto out;
1556 }
1557 }
1525 ret = read_partial_connect(con); 1558 ret = read_partial_connect(con);
1526 if (ret <= 0) 1559 if (ret <= 0)
1527 goto done; 1560 goto done;