From e5e372da9a469dfe3ece40277090a7056c566838 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 22 May 2012 11:41:43 -0500 Subject: libceph: eliminate connection state "DEAD" The ceph connection state "DEAD" is never set and is therefore not needed. Eliminate it. Signed-off-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/messenger.c | 6 ------ 1 file changed, 6 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 1a80907282cc..42ca8aab6dcf 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2087,12 +2087,6 @@ bad_tag: */ static void queue_con(struct ceph_connection *con) { - if (test_bit(DEAD, &con->state)) { - dout("queue_con %p ignoring: DEAD\n", - con); - return; - } - if (!con->ops->get(con)) { dout("queue_con %p ref count 0\n", con); return; -- cgit v1.2.2 From 6384bb8b8e88a9c6bf2ae0d9517c2c0199177c34 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 29 May 2012 21:47:38 -0500 Subject: libceph: kill bad_proto ceph connection op No code sets a bad_proto method in its ceph connection operations vector, so just get rid of it. Signed-off-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/messenger.c | 5 ----- 1 file changed, 5 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 42ca8aab6dcf..07af9948e3f7 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1356,11 +1356,6 @@ static void fail_protocol(struct ceph_connection *con) { reset_connection(con); set_bit(CLOSED, &con->state); /* in case there's queued work */ - - mutex_unlock(&con->mutex); - if (con->ops->bad_proto) - con->ops->bad_proto(con); - mutex_lock(&con->mutex); } static int process_connect(struct ceph_connection *con) -- cgit v1.2.2 From 327800bdc2cb9b71f4b458ca07aa9d522668dde0 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 22 May 2012 11:41:43 -0500 Subject: libceph: rename socket callbacks Change the names of the three socket callback functions to make it more obvious they're specifically associated with a connection's socket (not the ceph connection that uses it). Signed-off-by: Alex Elder Reviewed-by: Yehuda Sadeh Reviewed-by: Sage Weil --- net/ceph/messenger.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 07af9948e3f7..545255823de2 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -153,46 +153,46 @@ EXPORT_SYMBOL(ceph_msgr_flush); */ /* data available on socket, or listen socket received a connect */ -static void ceph_data_ready(struct sock *sk, int count_unused) +static void ceph_sock_data_ready(struct sock *sk, int count_unused) { struct ceph_connection *con = sk->sk_user_data; if (sk->sk_state != TCP_CLOSE_WAIT) { - dout("ceph_data_ready on %p state = %lu, queueing work\n", + dout("%s on %p state = %lu, queueing work\n", __func__, con, con->state); queue_con(con); } } /* socket has buffer space for writing */ -static void ceph_write_space(struct sock *sk) +static void ceph_sock_write_space(struct sock *sk) { struct ceph_connection *con = sk->sk_user_data; /* only queue to workqueue if there is data we want to write, * and there is sufficient space in the socket buffer to accept - * more data. clear SOCK_NOSPACE so that ceph_write_space() + * more data. clear SOCK_NOSPACE so that ceph_sock_write_space() * doesn't get called again until try_write() fills the socket * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ if (test_bit(WRITE_PENDING, &con->state)) { if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { - dout("ceph_write_space %p queueing write work\n", con); + dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); queue_con(con); } } else { - dout("ceph_write_space %p nothing to write\n", con); + dout("%s %p nothing to write\n", __func__, con); } } /* socket's state has changed */ -static void ceph_state_change(struct sock *sk) +static void ceph_sock_state_change(struct sock *sk) { struct ceph_connection *con = sk->sk_user_data; - dout("ceph_state_change %p state = %lu sk_state = %u\n", + dout("%s %p state = %lu sk_state = %u\n", __func__, con, con->state, sk->sk_state); if (test_bit(CLOSED, &con->state)) @@ -200,9 +200,9 @@ static void ceph_state_change(struct sock *sk) switch (sk->sk_state) { case TCP_CLOSE: - dout("ceph_state_change TCP_CLOSE\n"); + dout("%s TCP_CLOSE\n", __func__); case TCP_CLOSE_WAIT: - dout("ceph_state_change TCP_CLOSE_WAIT\n"); + dout("%s TCP_CLOSE_WAIT\n", __func__); if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { if (test_bit(CONNECTING, &con->state)) con->error_msg = "connection failed"; @@ -212,7 +212,7 @@ static void ceph_state_change(struct sock *sk) } break; case TCP_ESTABLISHED: - dout("ceph_state_change TCP_ESTABLISHED\n"); + dout("%s TCP_ESTABLISHED\n", __func__); queue_con(con); break; default: /* Everything else is uninteresting */ @@ -228,9 +228,9 @@ static void set_sock_callbacks(struct socket *sock, { struct sock *sk = sock->sk; sk->sk_user_data = con; - sk->sk_data_ready = ceph_data_ready; - sk->sk_write_space = ceph_write_space; - sk->sk_state_change = ceph_state_change; + sk->sk_data_ready = ceph_sock_data_ready; + sk->sk_write_space = ceph_sock_write_space; + sk->sk_state_change = ceph_sock_state_change; } -- cgit v1.2.2 From e22004235a900213625acd6583ac913d5a30c155 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 23 May 2012 14:35:23 -0500 Subject: libceph: rename kvec_reset and kvec_add functions The functions ceph_con_out_kvec_reset() and ceph_con_out_kvec_add() are entirely private functions, so drop the "ceph_" prefix in their name to make them slightly more wieldy. Signed-off-by: Alex Elder Reviewed-by: Yehuda Sadeh Reviewed-by: Sage Weil --- net/ceph/messenger.c | 48 ++++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 545255823de2..2ca491fc50e2 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -486,14 +486,14 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) return ret; } -static void ceph_con_out_kvec_reset(struct ceph_connection *con) +static void con_out_kvec_reset(struct ceph_connection *con) { con->out_kvec_left = 0; con->out_kvec_bytes = 0; con->out_kvec_cur = &con->out_kvec[0]; } -static void ceph_con_out_kvec_add(struct ceph_connection *con, +static void con_out_kvec_add(struct ceph_connection *con, size_t size, void *data) { int index; @@ -534,7 +534,7 @@ static void prepare_write_message(struct ceph_connection *con) struct ceph_msg *m; u32 crc; - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); con->out_kvec_is_msg = true; con->out_msg_done = false; @@ -542,9 +542,9 @@ static void prepare_write_message(struct ceph_connection *con) * TCP packet that's a good thing. */ if (con->in_seq > con->in_seq_acked) { con->in_seq_acked = con->in_seq; - ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); + con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), + con_out_kvec_add(con, sizeof (con->out_temp_ack), &con->out_temp_ack); } @@ -572,12 +572,12 @@ static void prepare_write_message(struct ceph_connection *con) BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); /* tag + hdr + front + middle */ - ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); - ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); - ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); + con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); + con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); + con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); if (m->middle) - ceph_con_out_kvec_add(con, m->middle->vec.iov_len, + con_out_kvec_add(con, m->middle->vec.iov_len, m->middle->vec.iov_base); /* fill in crc (except data pages), footer */ @@ -626,12 +626,12 @@ static void prepare_write_ack(struct ceph_connection *con) con->in_seq_acked, con->in_seq); con->in_seq_acked = con->in_seq; - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); - ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); + con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), + con_out_kvec_add(con, sizeof (con->out_temp_ack), &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ @@ -644,8 +644,8 @@ static void prepare_write_ack(struct ceph_connection *con) static void prepare_write_keepalive(struct ceph_connection *con) { dout("prepare_write_keepalive %p\n", con); - ceph_con_out_kvec_reset(con); - ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); + con_out_kvec_reset(con); + con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); set_bit(WRITE_PENDING, &con->state); } @@ -690,8 +690,8 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection */ static void prepare_write_banner(struct ceph_connection *con) { - ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); - ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), + con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); + con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), &con->msgr->my_enc_addr); con->out_more = 0; @@ -738,10 +738,10 @@ static int prepare_write_connect(struct ceph_connection *con) con->out_connect.authorizer_len = auth ? cpu_to_le32(auth->authorizer_buf_len) : 0; - ceph_con_out_kvec_add(con, sizeof (con->out_connect), + con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); if (auth && auth->authorizer_buf_len) - ceph_con_out_kvec_add(con, auth->authorizer_buf_len, + con_out_kvec_add(con, auth->authorizer_buf_len, auth->authorizer_buf); con->out_more = 0; @@ -935,7 +935,7 @@ static int write_partial_msg_pages(struct ceph_connection *con) /* prepare and queue up footer, too */ if (!do_datacrc) con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); prepare_write_message_footer(con); ret = 1; out: @@ -1398,7 +1398,7 @@ static int process_connect(struct ceph_connection *con) return -1; } con->auth_retry = 1; - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1419,7 +1419,7 @@ static int process_connect(struct ceph_connection *con) ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr)); reset_connection(con); - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1445,7 +1445,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->out_connect.connect_seq), le32_to_cpu(con->in_connect.connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1462,7 +1462,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_connect.global_seq)); get_global_seq(con->msgr, le32_to_cpu(con->in_connect.global_seq)); - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1869,7 +1869,7 @@ more: /* open the socket first? */ if (con->sock == NULL) { - ceph_con_out_kvec_reset(con); + con_out_kvec_reset(con); prepare_write_banner(con); ret = prepare_write_connect(con); if (ret < 0) -- cgit v1.2.2 From 15d9882c336db2db73ccf9871ae2398e452f694c Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Sat, 26 May 2012 23:26:43 -0500 Subject: libceph: embed ceph messenger structure in ceph_client A ceph client has a pointer to a ceph messenger structure in it. There is always exactly one ceph messenger for a ceph client, so there is no need to allocate it separate from the ceph client structure. Switch the ceph_client structure to embed its ceph_messenger structure. Signed-off-by: Alex Elder Reviewed-by: Yehuda Sadeh Reviewed-by: Sage Weil --- net/ceph/messenger.c | 30 +++++++++--------------------- 1 file changed, 9 insertions(+), 21 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 2ca491fc50e2..d8423a3f6698 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2245,18 +2245,14 @@ out: /* - * create a new messenger instance + * initialize a new messenger instance */ -struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, - u32 supported_features, - u32 required_features) +void ceph_messenger_init(struct ceph_messenger *msgr, + struct ceph_entity_addr *myaddr, + u32 supported_features, + u32 required_features, + bool nocrc) { - struct ceph_messenger *msgr; - - msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); - if (msgr == NULL) - return ERR_PTR(-ENOMEM); - msgr->supported_features = supported_features; msgr->required_features = required_features; @@ -2269,19 +2265,11 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, msgr->inst.addr.type = 0; get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); encode_my_addr(msgr); + msgr->nocrc = nocrc; - dout("messenger_create %p\n", msgr); - return msgr; -} -EXPORT_SYMBOL(ceph_messenger_create); - -void ceph_messenger_destroy(struct ceph_messenger *msgr) -{ - dout("destroy %p\n", msgr); - kfree(msgr); - dout("destroyed messenger %p\n", msgr); + dout("%s %p\n", __func__, msgr); } -EXPORT_SYMBOL(ceph_messenger_destroy); +EXPORT_SYMBOL(ceph_messenger_init); static void clear_standby(struct ceph_connection *con) { -- cgit v1.2.2 From 928443cd9644e7cfd46f687dbeffda2d1a357ff9 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 22 May 2012 11:41:43 -0500 Subject: libceph: start separating connection flags from state A ceph_connection holds a mixture of connection state (as in "state machine" state) and connection flags in a single "state" field. To make the distinction more clear, define a new "flags" field and use it rather than the "state" field to hold Boolean flag values. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index d8423a3f6698..e84e4fd86bb7 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -176,7 +176,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ - if (test_bit(WRITE_PENDING, &con->state)) { + if (test_bit(WRITE_PENDING, &con->flags)) { if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); @@ -203,7 +203,7 @@ static void ceph_sock_state_change(struct sock *sk) dout("%s TCP_CLOSE\n", __func__); case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); - if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { + if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) { if (test_bit(CONNECTING, &con->state)) con->error_msg = "connection failed"; else @@ -395,9 +395,9 @@ void ceph_con_close(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); set_bit(CLOSED, &con->state); /* in case there's queued work */ clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ - clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ - clear_bit(KEEPALIVE_PENDING, &con->state); - clear_bit(WRITE_PENDING, &con->state); + clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ + clear_bit(KEEPALIVE_PENDING, &con->flags); + clear_bit(WRITE_PENDING, &con->flags); mutex_lock(&con->mutex); reset_connection(con); con->peer_global_seq = 0; @@ -614,7 +614,7 @@ static void prepare_write_message(struct ceph_connection *con) prepare_write_message_footer(con); } - set_bit(WRITE_PENDING, &con->state); + set_bit(WRITE_PENDING, &con->flags); } /* @@ -635,7 +635,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ - set_bit(WRITE_PENDING, &con->state); + set_bit(WRITE_PENDING, &con->flags); } /* @@ -646,7 +646,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); - set_bit(WRITE_PENDING, &con->state); + set_bit(WRITE_PENDING, &con->flags); } /* @@ -675,7 +675,7 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection if (IS_ERR(auth)) return auth; - if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) + if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) return ERR_PTR(-EAGAIN); con->auth_reply_buf = auth->authorizer_reply_buf; @@ -695,7 +695,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; - set_bit(WRITE_PENDING, &con->state); + set_bit(WRITE_PENDING, &con->flags); } static int prepare_write_connect(struct ceph_connection *con) @@ -745,7 +745,7 @@ static int prepare_write_connect(struct ceph_connection *con) auth->authorizer_buf); con->out_more = 0; - set_bit(WRITE_PENDING, &con->state); + set_bit(WRITE_PENDING, &con->flags); return 0; } @@ -1492,7 +1492,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) - set_bit(LOSSYTX, &con->state); + set_bit(LOSSYTX, &con->flags); prepare_read_tag(con); break; @@ -1933,14 +1933,14 @@ do_next: prepare_write_ack(con); goto more; } - if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { + if (test_and_clear_bit(KEEPALIVE_PENDING, &con->flags)) { prepare_write_keepalive(con); goto more; } } /* Nothing to do! */ - clear_bit(WRITE_PENDING, &con->state); + clear_bit(WRITE_PENDING, &con->flags); dout("try_write nothing else to write.\n"); ret = 0; out: @@ -2106,7 +2106,7 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: - if (test_and_clear_bit(BACKOFF, &con->state)) { + if (test_and_clear_bit(BACKOFF, &con->flags)) { dout("con_work %p backing off\n", con); if (queue_delayed_work(ceph_msgr_wq, &con->work, round_jiffies_relative(con->delay))) { @@ -2135,7 +2135,7 @@ restart: con_close_socket(con); } - if (test_and_clear_bit(SOCK_CLOSED, &con->state)) + if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) goto fault; ret = try_read(con); @@ -2174,7 +2174,7 @@ static void ceph_fault(struct ceph_connection *con) dout("fault %p state %lu to peer %s\n", con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); - if (test_bit(LOSSYTX, &con->state)) { + if (test_bit(LOSSYTX, &con->flags)) { dout("fault on LOSSYTX channel\n"); goto out; } @@ -2196,9 +2196,9 @@ static void ceph_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && - !test_bit(KEEPALIVE_PENDING, &con->state)) { + !test_bit(KEEPALIVE_PENDING, &con->flags)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); - clear_bit(WRITE_PENDING, &con->state); + clear_bit(WRITE_PENDING, &con->flags); set_bit(STANDBY, &con->state); } else { /* retry after a delay. */ @@ -2222,7 +2222,7 @@ static void ceph_fault(struct ceph_connection *con) * that when con_work restarts we schedule the * delay then. */ - set_bit(BACKOFF, &con->state); + set_bit(BACKOFF, &con->flags); } } @@ -2278,8 +2278,8 @@ static void clear_standby(struct ceph_connection *con) mutex_lock(&con->mutex); dout("clear_standby %p and ++connect_seq\n", con); con->connect_seq++; - WARN_ON(test_bit(WRITE_PENDING, &con->state)); - WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); + WARN_ON(test_bit(WRITE_PENDING, &con->flags)); + WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); mutex_unlock(&con->mutex); } } @@ -2317,7 +2317,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ clear_standby(con); - if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) + if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); @@ -2384,8 +2384,8 @@ void ceph_con_keepalive(struct ceph_connection *con) { dout("con_keepalive %p\n", con); clear_standby(con); - if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && - test_and_set_bit(WRITE_PENDING, &con->state) == 0) + if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 && + test_and_set_bit(WRITE_PENDING, &con->flags) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); -- cgit v1.2.2 From ce2c8903e76e690846a00a0284e4bd9ee954d680 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 22 May 2012 22:15:49 -0500 Subject: libceph: start tracking connection socket state Start explicitly keeping track of the state of a ceph connection's socket, separate from the state of the connection itself. Create placeholder functions to encapsulate the state transitions. -------- | NEW* | transient initial state -------- | con_sock_state_init() v ---------- | CLOSED | initialized, but no socket (and no ---------- TCP connection) ^ \ | \ con_sock_state_connecting() | ---------------------- | \ + con_sock_state_closed() \ |\ \ | \ \ | ----------- \ | | CLOSING | socket event; \ | ----------- await close \ | ^ | | | | | + con_sock_state_closing() | | / \ | | / --------------- | | / \ v | / -------------- | / -----------------| CONNECTING | socket created, TCP | | / -------------- connect initiated | | | con_sock_state_connected() | | v ------------- | CONNECTED | TCP connection established ------------- Make the socket state an atomic variable, reinforcing that it's a distinct transtion with no possible "intermediate/both" states. This is almost certainly overkill at this point, though the transitions into CONNECTED and CLOSING state do get called via socket callback (the rest of the transitions occur with the connection mutex held). We can back out the atomicity later. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e84e4fd86bb7..a4ac3deec161 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -29,6 +29,14 @@ * the sender. */ +/* State values for ceph_connection->sock_state; NEW is assumed to be 0 */ + +#define CON_SOCK_STATE_NEW 0 /* -> CLOSED */ +#define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */ +#define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */ +#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ +#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ + /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; @@ -147,6 +155,55 @@ void ceph_msgr_flush(void) } EXPORT_SYMBOL(ceph_msgr_flush); +/* Connection socket state transition functions */ + +static void con_sock_state_init(struct ceph_connection *con) +{ + int old_state; + + old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); + if (WARN_ON(old_state != CON_SOCK_STATE_NEW)) + printk("%s: unexpected old state %d\n", __func__, old_state); +} + +static void con_sock_state_connecting(struct ceph_connection *con) +{ + int old_state; + + old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); + if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED)) + printk("%s: unexpected old state %d\n", __func__, old_state); +} + +static void con_sock_state_connected(struct ceph_connection *con) +{ + int old_state; + + old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); + if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING)) + printk("%s: unexpected old state %d\n", __func__, old_state); +} + +static void con_sock_state_closing(struct ceph_connection *con) +{ + int old_state; + + old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); + if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING && + old_state != CON_SOCK_STATE_CONNECTED && + old_state != CON_SOCK_STATE_CLOSING)) + printk("%s: unexpected old state %d\n", __func__, old_state); +} + +static void con_sock_state_closed(struct ceph_connection *con) +{ + int old_state; + + old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); + if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && + old_state != CON_SOCK_STATE_CLOSING)) + printk("%s: unexpected old state %d\n", __func__, old_state); +} /* * socket callback functions @@ -203,6 +260,7 @@ static void ceph_sock_state_change(struct sock *sk) dout("%s TCP_CLOSE\n", __func__); case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); + con_sock_state_closing(con); if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) { if (test_bit(CONNECTING, &con->state)) con->error_msg = "connection failed"; @@ -213,6 +271,7 @@ static void ceph_sock_state_change(struct sock *sk) break; case TCP_ESTABLISHED: dout("%s TCP_ESTABLISHED\n", __func__); + con_sock_state_connected(con); queue_con(con); break; default: /* Everything else is uninteresting */ @@ -277,6 +336,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) return ret; } con->sock = sock; + con_sock_state_connecting(con); return 0; } @@ -343,6 +403,7 @@ static int con_close_socket(struct ceph_connection *con) sock_release(con->sock); con->sock = NULL; clear_bit(SOCK_CLOSED, &con->state); + con_sock_state_closed(con); return rc; } @@ -462,6 +523,9 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) memset(con, 0, sizeof(*con)); atomic_set(&con->nref, 1); con->msgr = msgr; + + con_sock_state_init(con); + mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); -- cgit v1.2.2 From a5988c490ef66cb04ea2f610681949b25c773b3c Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 29 May 2012 11:04:58 -0500 Subject: libceph: set CLOSED state bit in con_init Once a connection is fully initialized, it is really in a CLOSED state, so make that explicit by setting the bit in its state field. It is possible for a connection in NEGOTIATING state to get a failure, leading to ceph_fault() and ultimately ceph_con_close(). Clear that bits if it is set in that case, to reflect that the connection truly is closed and is no longer participating in a connect sequence. Issue a warning if ceph_con_open() is called on a connection that is not in CLOSED state. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index a4ac3deec161..36b440a00cc2 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -454,11 +454,14 @@ void ceph_con_close(struct ceph_connection *con) { dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr.in_addr)); - set_bit(CLOSED, &con->state); /* in case there's queued work */ + clear_bit(NEGOTIATING, &con->state); clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ + set_bit(CLOSED, &con->state); + clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ clear_bit(KEEPALIVE_PENDING, &con->flags); clear_bit(WRITE_PENDING, &con->flags); + mutex_lock(&con->mutex); reset_connection(con); con->peer_global_seq = 0; @@ -475,7 +478,8 @@ void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) { dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); set_bit(OPENING, &con->state); - clear_bit(CLOSED, &con->state); + WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); + memcpy(&con->peer_addr, addr, sizeof(*addr)); con->delay = 0; /* reset backoff memory */ queue_con(con); @@ -530,6 +534,8 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); INIT_DELAYED_WORK(&con->work, con_work); + + set_bit(CLOSED, &con->state); } EXPORT_SYMBOL(ceph_con_init); @@ -1933,14 +1939,15 @@ more: /* open the socket first? */ if (con->sock == NULL) { + clear_bit(NEGOTIATING, &con->state); + set_bit(CONNECTING, &con->state); + con_out_kvec_reset(con); prepare_write_banner(con); ret = prepare_write_connect(con); if (ret < 0) goto out; prepare_read_banner(con); - set_bit(CONNECTING, &con->state); - clear_bit(NEGOTIATING, &con->state); BUG_ON(con->in_msg); con->in_tag = CEPH_MSGR_TAG_READY; -- cgit v1.2.2 From 1bfd89f4e6e1adc6a782d94aa5d4c53be1e404d7 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Sat, 26 May 2012 23:26:43 -0500 Subject: libceph: fully initialize connection in con_init() Move the initialization of a ceph connection's private pointer, operations vector pointer, and peer name information into ceph_con_init(). Rearrange the arguments so the connection pointer is first. Hide the byte-swapping of the peer entity number inside ceph_con_init() Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 36b440a00cc2..3b65f6e6911b 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -521,15 +521,22 @@ void ceph_con_put(struct ceph_connection *con) /* * initialize a new connection. */ -void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) +void ceph_con_init(struct ceph_connection *con, void *private, + const struct ceph_connection_operations *ops, + struct ceph_messenger *msgr, __u8 entity_type, __u64 entity_num) { dout("con_init %p\n", con); memset(con, 0, sizeof(*con)); + con->private = private; + con->ops = ops; atomic_set(&con->nref, 1); con->msgr = msgr; con_sock_state_init(con); + con->peer_name.type = (__u8) entity_type; + con->peer_name.num = cpu_to_le64(entity_num); + mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); -- cgit v1.2.2 From 1c20f2d26795803fc4f5155fe4fca5717a5944b6 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 4 Jun 2012 14:43:32 -0500 Subject: libceph: tweak ceph_alloc_msg() The function ceph_alloc_msg() is only used to allocate a message that will be assigned to a connection's in_msg pointer. Rename the function so this implied usage is more clear. In addition, make that assignment inside the function (again, since that's precisely what it's intended to be used for). This allows us to return what is now provided via the passed-in address of a "skip" variable. The return type is now Boolean to be explicit that there are only two possible outcomes. Make sure the result of an ->alloc_msg method call always sets the value of *skip properly. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 61 +++++++++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 27 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 3b65f6e6911b..98ca23726ea6 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1655,9 +1655,8 @@ static int read_partial_message_section(struct ceph_connection *con, return 1; } -static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, - struct ceph_msg_header *hdr, - int *skip); +static bool ceph_con_in_msg_alloc(struct ceph_connection *con, + struct ceph_msg_header *hdr); static int read_partial_message_pages(struct ceph_connection *con, @@ -1740,7 +1739,6 @@ static int read_partial_message(struct ceph_connection *con) int ret; unsigned front_len, middle_len, data_len; bool do_datacrc = !con->msgr->nocrc; - int skip; u64 seq; u32 crc; @@ -1793,9 +1791,7 @@ static int read_partial_message(struct ceph_connection *con) if (!con->in_msg) { dout("got hdr type %d front %d data %d\n", con->in_hdr.type, con->in_hdr.front_len, con->in_hdr.data_len); - skip = 0; - con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); - if (skip) { + if (ceph_con_in_msg_alloc(con, &con->in_hdr)) { /* skip this message */ dout("alloc_msg said skip message\n"); BUG_ON(con->in_msg); @@ -2577,46 +2573,57 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) } /* - * Generic message allocator, for incoming messages. + * Allocate a message for receiving an incoming message on a + * connection, and save the result in con->in_msg. Uses the + * connection's private alloc_msg op if available. + * + * Returns true if the message should be skipped, false otherwise. + * If true is returned (skip message), con->in_msg will be NULL. + * If false is returned, con->in_msg will contain a pointer to the + * newly-allocated message, or NULL in case of memory exhaustion. */ -static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, - struct ceph_msg_header *hdr, - int *skip) +static bool ceph_con_in_msg_alloc(struct ceph_connection *con, + struct ceph_msg_header *hdr) { int type = le16_to_cpu(hdr->type); int front_len = le32_to_cpu(hdr->front_len); int middle_len = le32_to_cpu(hdr->middle_len); - struct ceph_msg *msg = NULL; int ret; + BUG_ON(con->in_msg != NULL); + if (con->ops->alloc_msg) { + int skip = 0; + mutex_unlock(&con->mutex); - msg = con->ops->alloc_msg(con, hdr, skip); + con->in_msg = con->ops->alloc_msg(con, hdr, &skip); mutex_lock(&con->mutex); - if (!msg || *skip) - return NULL; + if (skip) + con->in_msg = NULL; + + if (!con->in_msg) + return skip != 0; } - if (!msg) { - *skip = 0; - msg = ceph_msg_new(type, front_len, GFP_NOFS, false); - if (!msg) { + if (!con->in_msg) { + con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false); + if (!con->in_msg) { pr_err("unable to allocate msg type %d len %d\n", type, front_len); - return NULL; + return false; } - msg->page_alignment = le16_to_cpu(hdr->data_off); + con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); } - memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); + memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); - if (middle_len && !msg->middle) { - ret = ceph_alloc_middle(con, msg); + if (middle_len && !con->in_msg->middle) { + ret = ceph_alloc_middle(con, con->in_msg); if (ret < 0) { - ceph_msg_put(msg); - return NULL; + ceph_msg_put(con->in_msg); + con->in_msg = NULL; } } - return msg; + return false; } -- cgit v1.2.2 From 38941f8031bf042dba3ced6394ba3a3b16c244ea Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 1 Jun 2012 14:56:43 -0500 Subject: libceph: have messages point to their connection When a ceph message is queued for sending it is placed on a list of pending messages (ceph_connection->out_queue). When they are actually sent over the wire, they are moved from that list to another (ceph_connection->out_sent). When acknowledgement for the message is received, it is removed from the sent messages list. During that entire time the message is "in the possession" of a single ceph connection. Keep track of that connection in the message. This will be used in the next patch (and is a helpful bit of information for debugging anyway). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 98ca23726ea6..68b49b5b8e86 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -414,6 +414,9 @@ static int con_close_socket(struct ceph_connection *con) static void ceph_msg_remove(struct ceph_msg *msg) { list_del_init(&msg->list_head); + BUG_ON(msg->con == NULL); + msg->con = NULL; + ceph_msg_put(msg); } static void ceph_msg_remove_list(struct list_head *head) @@ -433,6 +436,8 @@ static void reset_connection(struct ceph_connection *con) ceph_msg_remove_list(&con->out_sent); if (con->in_msg) { + BUG_ON(con->in_msg->con != con); + con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; } @@ -625,8 +630,10 @@ static void prepare_write_message(struct ceph_connection *con) &con->out_temp_ack); } + BUG_ON(list_empty(&con->out_queue)); m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); con->out_msg = m; + BUG_ON(m->con != con); /* put message on sent list */ ceph_msg_get(m); @@ -1806,6 +1813,8 @@ static int read_partial_message(struct ceph_connection *con) "error allocating memory for incoming message"; return -ENOMEM; } + + BUG_ON(con->in_msg->con != con); m = con->in_msg; m->front.iov_len = 0; /* haven't read it yet */ if (m->middle) @@ -1901,6 +1910,8 @@ static void process_message(struct ceph_connection *con) { struct ceph_msg *msg; + BUG_ON(con->in_msg->con != con); + con->in_msg->con = NULL; msg = con->in_msg; con->in_msg = NULL; @@ -2260,6 +2271,8 @@ static void ceph_fault(struct ceph_connection *con) con_close_socket(con); if (con->in_msg) { + BUG_ON(con->in_msg->con != con); + con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; } @@ -2378,6 +2391,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* queue */ mutex_lock(&con->mutex); + BUG_ON(msg->con != NULL); + msg->con = con; BUG_ON(!list_empty(&msg->list_head)); list_add_tail(&msg->list_head, &con->out_queue); dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, @@ -2403,13 +2418,16 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) { mutex_lock(&con->mutex); if (!list_empty(&msg->list_head)) { - dout("con_revoke %p msg %p - was on queue\n", con, msg); + dout("%s %p msg %p - was on queue\n", __func__, con, msg); list_del_init(&msg->list_head); + BUG_ON(msg->con == NULL); + msg->con = NULL; + ceph_msg_put(msg); msg->hdr.seq = 0; } if (con->out_msg == msg) { - dout("con_revoke %p msg %p - was sending\n", con, msg); + dout("%s %p msg %p - was sending\n", __func__, con, msg); con->out_msg = NULL; if (con->out_kvec_is_msg) { con->out_skip = con->out_kvec_bytes; @@ -2478,6 +2496,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, if (m == NULL) goto out; kref_init(&m->kref); + + m->con = NULL; INIT_LIST_HEAD(&m->list_head); m->hdr.tid = 0; @@ -2598,6 +2618,8 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, mutex_unlock(&con->mutex); con->in_msg = con->ops->alloc_msg(con, hdr, &skip); mutex_lock(&con->mutex); + if (con->in_msg) + con->in_msg->con = con; if (skip) con->in_msg = NULL; @@ -2611,6 +2633,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, type, front_len); return false; } + con->in_msg->con = con; con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); } memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); -- cgit v1.2.2 From 92ce034b5a740046cc643a21ea21eaad589e0043 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 4 Jun 2012 14:43:33 -0500 Subject: libceph: have messages take a connection reference There are essentially two types of ceph messages: incoming and outgoing. Outgoing messages are always allocated via ceph_msg_new(), and at the time of their allocation they are not associated with any particular connection. Incoming messages are always allocated via ceph_con_in_msg_alloc(), and they are initially associated with the connection from which incoming data will be placed into the message. When an outgoing message gets sent, it becomes associated with a connection and remains that way until the message is successfully sent. The association of an incoming message goes away at the point it is sent to an upper layer via a con->ops->dispatch method. This patch implements reference counting for all ceph messages, such that every message holds a reference (and a pointer) to a connection if and only if it is associated with that connection (as described above). For background, here is an explanation of the ceph message lifecycle, emphasizing when an association exists between a message and a connection. Outgoing Messages An outgoing message is "owned" by its allocator, from the time it is allocated in ceph_msg_new() up to the point it gets queued for sending in ceph_con_send(). Prior to that point the message's msg->con pointer is null; at the point it is queued for sending its message pointer is assigned to refer to the connection. At that time the message is inserted into a connection's out_queue list. When a message on the out_queue list has been sent to the socket layer to be put on the wire, it is transferred out of that list and into the connection's out_sent list. At that point it is still owned by the connection, and will remain so until an acknowledgement is received from the recipient that indicates the message was successfully transferred. When such an acknowledgement is received (in process_ack()), the message is removed from its list (in ceph_msg_remove()), at which point it is no longer associated with the connection. So basically, any time a message is on one of a connection's lists, it is associated with that connection. Reference counting outgoing messages can thus be done at the points a message is added to the out_queue (in ceph_con_send()) and the point it is removed from either its two lists (in ceph_msg_remove())--at which point its connection pointer becomes null. Incoming Messages When an incoming message on a connection is getting read (in read_partial_message()) and there is no message in con->in_msg, a new one is allocated using ceph_con_in_msg_alloc(). At that point the message is associated with the connection. Once that message has been completely and successfully read, it is passed to upper layer code using the connection's con->ops->dispatch method. At that point the association between the message and the connection no longer exists. Reference counting of connections for incoming messages can be done by taking a reference to the connection when the message gets allocated, and releasing that reference when it gets handed off using the dispatch method. We should never fail to get a connection reference for a message--the since the caller should already hold one. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 68b49b5b8e86..88ac083bb995 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -415,6 +415,7 @@ static void ceph_msg_remove(struct ceph_msg *msg) { list_del_init(&msg->list_head); BUG_ON(msg->con == NULL); + ceph_con_put(msg->con); msg->con = NULL; ceph_msg_put(msg); @@ -440,6 +441,7 @@ static void reset_connection(struct ceph_connection *con) con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; + ceph_con_put(con->in_msg->con); } con->connect_seq = 0; @@ -1914,6 +1916,7 @@ static void process_message(struct ceph_connection *con) con->in_msg->con = NULL; msg = con->in_msg; con->in_msg = NULL; + ceph_con_put(con); /* if first message, set peer_name */ if (con->peer_name.type == 0) @@ -2275,6 +2278,7 @@ static void ceph_fault(struct ceph_connection *con) con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; + ceph_con_put(con); } /* Requeue anything that hasn't been acked */ @@ -2391,8 +2395,11 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* queue */ mutex_lock(&con->mutex); + BUG_ON(msg->con != NULL); - msg->con = con; + msg->con = ceph_con_get(con); + BUG_ON(msg->con == NULL); + BUG_ON(!list_empty(&msg->list_head)); list_add_tail(&msg->list_head, &con->out_queue); dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, @@ -2421,10 +2428,11 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) dout("%s %p msg %p - was on queue\n", __func__, con, msg); list_del_init(&msg->list_head); BUG_ON(msg->con == NULL); + ceph_con_put(msg->con); msg->con = NULL; + msg->hdr.seq = 0; ceph_msg_put(msg); - msg->hdr.seq = 0; } if (con->out_msg == msg) { dout("%s %p msg %p - was sending\n", __func__, con, msg); @@ -2433,8 +2441,9 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) con->out_skip = con->out_kvec_bytes; con->out_kvec_is_msg = false; } - ceph_msg_put(msg); msg->hdr.seq = 0; + + ceph_msg_put(msg); } mutex_unlock(&con->mutex); } @@ -2618,8 +2627,10 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, mutex_unlock(&con->mutex); con->in_msg = con->ops->alloc_msg(con, hdr, &skip); mutex_lock(&con->mutex); - if (con->in_msg) - con->in_msg->con = con; + if (con->in_msg) { + con->in_msg->con = ceph_con_get(con); + BUG_ON(con->in_msg->con == NULL); + } if (skip) con->in_msg = NULL; @@ -2633,7 +2644,8 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, type, front_len); return false; } - con->in_msg->con = con; + con->in_msg->con = ceph_con_get(con); + BUG_ON(con->in_msg->con == NULL); con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); } memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); -- cgit v1.2.2 From 6740a845b2543cc46e1902ba21bac743fbadd0dc Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 1 Jun 2012 14:56:43 -0500 Subject: libceph: make ceph_con_revoke() a msg operation ceph_con_revoke() is passed both a message and a ceph connection. Now that any message associated with a connection holds a pointer to that connection, there's no need to provide the connection when revoking a message. This has the added benefit of precluding the possibility of the providing the wrong connection pointer. If the message's connection pointer is null, it is not being tracked by any connection, so revoking it is a no-op. This is supported as a convenience for upper layers, so they can revoke a message that is not actually "in flight." Rename the function ceph_msg_revoke() to reflect that it is really an operation on a message, not a connection. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 88ac083bb995..d636903ad4b2 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2421,8 +2421,13 @@ EXPORT_SYMBOL(ceph_con_send); /* * Revoke a message that was previously queued for send */ -void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) +void ceph_msg_revoke(struct ceph_msg *msg) { + struct ceph_connection *con = msg->con; + + if (!con) + return; /* Message not in our possession */ + mutex_lock(&con->mutex); if (!list_empty(&msg->list_head)) { dout("%s %p msg %p - was on queue\n", __func__, con, msg); -- cgit v1.2.2 From 8921d114f5574c6da2cdd00749d185633ecf88f3 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 1 Jun 2012 14:56:43 -0500 Subject: libceph: make ceph_con_revoke_message() a msg op ceph_con_revoke_message() is passed both a message and a ceph connection. A ceph_msg allocated for incoming messages on a connection always has a pointer to that connection, so there's no need to provide the connection when revoking such a message. Note that the existing logic does not preclude the message supplied being a null/bogus message pointer. The only user of this interface is the OSD client, and the only value an osd client passes is a request's r_reply field. That is always non-null (except briefly in an error path in ceph_osdc_alloc_request(), and that drops the only reference so the request won't ever have a reply to revoke). So we can safely assume the passed-in message is non-null, but add a BUG_ON() to make it very obvious we are imposing this restriction. Rename the function ceph_msg_revoke_incoming() to reflect that it is really an operation on an incoming message. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index d636903ad4b2..3857f815c035 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2456,17 +2456,27 @@ void ceph_msg_revoke(struct ceph_msg *msg) /* * Revoke a message that we may be reading data into */ -void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) +void ceph_msg_revoke_incoming(struct ceph_msg *msg) { + struct ceph_connection *con; + + BUG_ON(msg == NULL); + if (!msg->con) { + dout("%s msg %p null con\n", __func__, msg); + + return; /* Message not in our possession */ + } + + con = msg->con; mutex_lock(&con->mutex); - if (con->in_msg && con->in_msg == msg) { + if (con->in_msg == msg) { unsigned front_len = le32_to_cpu(con->in_hdr.front_len); unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len); unsigned data_len = le32_to_cpu(con->in_hdr.data_len); /* skip rest of message */ - dout("con_revoke_pages %p msg %p revoked\n", con, msg); - con->in_base_pos = con->in_base_pos - + dout("%s %p msg %p revoked\n", __func__, con, msg); + con->in_base_pos = con->in_base_pos - sizeof(struct ceph_msg_header) - front_len - middle_len - @@ -2477,8 +2487,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) con->in_tag = CEPH_MSGR_TAG_READY; con->in_seq++; } else { - dout("con_revoke_pages %p msg %p pages %p no-op\n", - con, con->in_msg, msg); + dout("%s %p in_msg %p msg %p no-op\n", + __func__, con, con->in_msg, msg); } mutex_unlock(&con->mutex); } -- cgit v1.2.2 From 43643528cce60ca184fe8197efa8e8da7c89a037 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 6 Jun 2012 19:35:55 -0500 Subject: rbd: Clear ceph_msg->bio_iter for retransmitted message The bug can cause NULL pointer dereference in write_partial_msg_pages Signed-off-by: Zheng Yan Reviewed-by: Alex Elder --- net/ceph/messenger.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 3857f815c035..769a2c9fe1af 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -649,6 +649,10 @@ static void prepare_write_message(struct ceph_connection *con) m->hdr.seq = cpu_to_le64(++con->out_seq); m->needs_out_seq = false; } +#ifdef CONFIG_BLOCK + else + m->bio_iter = NULL; +#endif dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", m, con->out_seq, le16_to_cpu(m->hdr.type), -- cgit v1.2.2 From 89a86be0ce20022f6ede8bccec078dbb3d63caaa Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sat, 9 Jun 2012 14:19:21 -0700 Subject: libceph: transition socket state prior to actual connect Once we call ->connect(), we are racing against the actual connection, and a subsequent transition from CONNECTING -> CONNECTED. Set the state to CONNECTING before that, under the protection of the mutex, to avoid the race. This was introduced in 928443cd9644e7cfd46f687dbeffda2d1a357ff9, with the original socket state code. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 769a2c9fe1af..bdbecac2d69d 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -321,6 +321,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); + con_sock_state_connecting(con); ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), O_NONBLOCK); if (ret == -EINPROGRESS) { @@ -336,8 +337,6 @@ static int ceph_tcp_connect(struct ceph_connection *con) return ret; } con->sock = sock; - con_sock_state_connecting(con); - return 0; } -- cgit v1.2.2 From 26ce171915f348abd1f41da1ed139d93750d987f Mon Sep 17 00:00:00 2001 From: Dan Carpenter Date: Tue, 19 Jun 2012 08:52:33 -0500 Subject: libceph: fix NULL dereference in reset_connection() We dereference "con->in_msg" on the line after it was set to NULL. Signed-off-by: Dan Carpenter Reviewed-by: Alex Elder --- net/ceph/messenger.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5e9f61d6d234..23073cff6481 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -440,7 +440,7 @@ static void reset_connection(struct ceph_connection *con) con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - ceph_con_put(con->in_msg->con); + ceph_con_put(con); } con->connect_seq = 0; -- cgit v1.2.2 From 36eb71aa57e6a33d61fd90a2fd87f00c6844bc86 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 21 Jun 2012 12:47:08 -0700 Subject: libceph: use con get/put methods The ceph_con_get/put() helpers manipulate the embedded con ref count, which isn't used now that ceph_connections are embedded in other structures. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 23073cff6481..fc0cee7c9aa2 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -414,7 +414,7 @@ static void ceph_msg_remove(struct ceph_msg *msg) { list_del_init(&msg->list_head); BUG_ON(msg->con == NULL); - ceph_con_put(msg->con); + msg->con->ops->put(msg->con); msg->con = NULL; ceph_msg_put(msg); @@ -440,7 +440,7 @@ static void reset_connection(struct ceph_connection *con) con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - ceph_con_put(con); + con->ops->put(con); } con->connect_seq = 0; @@ -1919,7 +1919,7 @@ static void process_message(struct ceph_connection *con) con->in_msg->con = NULL; msg = con->in_msg; con->in_msg = NULL; - ceph_con_put(con); + con->ops->put(con); /* if first message, set peer_name */ if (con->peer_name.type == 0) @@ -2281,7 +2281,7 @@ static void ceph_fault(struct ceph_connection *con) con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - ceph_con_put(con); + con->ops->put(con); } /* Requeue anything that hasn't been acked */ @@ -2400,7 +2400,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) mutex_lock(&con->mutex); BUG_ON(msg->con != NULL); - msg->con = ceph_con_get(con); + msg->con = con->ops->get(con); BUG_ON(msg->con == NULL); BUG_ON(!list_empty(&msg->list_head)); @@ -2436,7 +2436,7 @@ void ceph_msg_revoke(struct ceph_msg *msg) dout("%s %p msg %p - was on queue\n", __func__, con, msg); list_del_init(&msg->list_head); BUG_ON(msg->con == NULL); - ceph_con_put(msg->con); + msg->con->ops->put(msg->con); msg->con = NULL; msg->hdr.seq = 0; @@ -2646,7 +2646,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, con->in_msg = con->ops->alloc_msg(con, hdr, &skip); mutex_lock(&con->mutex); if (con->in_msg) { - con->in_msg->con = ceph_con_get(con); + con->in_msg->con = con->ops->get(con); BUG_ON(con->in_msg->con == NULL); } if (skip) @@ -2662,7 +2662,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, type, front_len); return false; } - con->in_msg->con = ceph_con_get(con); + con->in_msg->con = con->ops->get(con); BUG_ON(con->in_msg->con == NULL); con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); } -- cgit v1.2.2 From d59315ca8c0de00df9b363f94a2641a30961ca1c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 21 Jun 2012 12:49:23 -0700 Subject: libceph: drop ceph_con_get/put helpers and nref member These are no longer used. Every ceph_connection instance is embedded in another structure, and refcounts manipulated via the get/put ops. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 28 +--------------------------- 1 file changed, 1 insertion(+), 27 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index fc0cee7c9aa2..ab690e2e1206 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -500,30 +500,6 @@ bool ceph_con_opened(struct ceph_connection *con) return con->connect_seq > 0; } -/* - * generic get/put - */ -struct ceph_connection *ceph_con_get(struct ceph_connection *con) -{ - int nref = __atomic_add_unless(&con->nref, 1, 0); - - dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1); - - return nref ? con : NULL; -} - -void ceph_con_put(struct ceph_connection *con) -{ - int nref = atomic_dec_return(&con->nref); - - BUG_ON(nref < 0); - if (nref == 0) { - BUG_ON(con->sock); - kfree(con); - } - dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref); -} - /* * initialize a new connection. */ @@ -535,7 +511,6 @@ void ceph_con_init(struct ceph_connection *con, void *private, memset(con, 0, sizeof(*con)); con->private = private; con->ops = ops; - atomic_set(&con->nref, 1); con->msgr = msgr; con_sock_state_init(con); @@ -1951,8 +1926,7 @@ static int try_write(struct ceph_connection *con) { int ret = 1; - dout("try_write start %p state %lu nref %d\n", con, con->state, - atomic_read(&con->nref)); + dout("try_write start %p state %lu\n", con, con->state); more: dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); -- cgit v1.2.2 From 739c905baa018c99003564ebc367d93aa44d4861 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: encapsulate out message data setup Move the code that prepares to write the data portion of a message into its own function. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index ab690e2e1206..56448660883d 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -565,6 +565,24 @@ static void con_out_kvec_add(struct ceph_connection *con, con->out_kvec_bytes += size; } +static void prepare_write_message_data(struct ceph_connection *con) +{ + struct ceph_msg *msg = con->out_msg; + + BUG_ON(!msg); + BUG_ON(!msg->hdr.data_len); + + /* initialize page iterator */ + con->out_msg_pos.page = 0; + if (msg->pages) + con->out_msg_pos.page_pos = msg->page_alignment; + else + con->out_msg_pos.page_pos = 0; + con->out_msg_pos.data_pos = 0; + con->out_msg_pos.did_page_crc = false; + con->out_more = 1; /* data + footer will follow */ +} + /* * Prepare footer for currently outgoing message, and finish things * off. Assumes out_kvec* are already valid.. we just add on to the end. @@ -657,26 +675,17 @@ static void prepare_write_message(struct ceph_connection *con) con->out_msg->footer.middle_crc = cpu_to_le32(crc); } else con->out_msg->footer.middle_crc = 0; - con->out_msg->footer.data_crc = 0; - dout("prepare_write_message front_crc %u data_crc %u\n", + dout("%s front_crc %u middle_crc %u\n", __func__, le32_to_cpu(con->out_msg->footer.front_crc), le32_to_cpu(con->out_msg->footer.middle_crc)); /* is there a data payload? */ - if (le32_to_cpu(m->hdr.data_len) > 0) { - /* initialize page iterator */ - con->out_msg_pos.page = 0; - if (m->pages) - con->out_msg_pos.page_pos = m->page_alignment; - else - con->out_msg_pos.page_pos = 0; - con->out_msg_pos.data_pos = 0; - con->out_msg_pos.did_page_crc = false; - con->out_more = 1; /* data + footer will follow */ - } else { + con->out_msg->footer.data_crc = 0; + if (m->hdr.data_len) + prepare_write_message_data(con); + else /* no, queue up footer too and be done */ prepare_write_message_footer(con); - } set_bit(WRITE_PENDING, &con->flags); } -- cgit v1.2.2 From 84ca8fc87fcf4ab97bb8acdb59bf97bb4820cb14 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: encapsulate advancing msg page In write_partial_msg_pages(), once all the data from a page has been sent we advance to the next one. Put the code that takes care of this into its own function. While modifying write_partial_msg_pages(), make its local variable "in_trail" be Boolean, and use the local variable "msg" (which is just the connection's current out_msg pointer) consistently. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 58 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 34 insertions(+), 24 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 56448660883d..1b92e3b16c0d 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -891,6 +891,33 @@ static void iter_bio_next(struct bio **bio_iter, int *seg) } #endif +static void out_msg_pos_next(struct ceph_connection *con, struct page *page, + size_t len, size_t sent, bool in_trail) +{ + struct ceph_msg *msg = con->out_msg; + + BUG_ON(!msg); + BUG_ON(!sent); + + con->out_msg_pos.data_pos += sent; + con->out_msg_pos.page_pos += sent; + if (sent == len) { + con->out_msg_pos.page_pos = 0; + con->out_msg_pos.page++; + con->out_msg_pos.did_page_crc = false; + if (in_trail) + list_move_tail(&page->lru, + &msg->trail->head); + else if (msg->pagelist) + list_move_tail(&page->lru, + &msg->pagelist->head); +#ifdef CONFIG_BLOCK + else if (msg->bio) + iter_bio_next(&msg->bio_iter, &msg->bio_seg); +#endif + } +} + /* * Write as much message data payload as we can. If we finish, queue * up the footer. @@ -906,11 +933,11 @@ static int write_partial_msg_pages(struct ceph_connection *con) bool do_datacrc = !con->msgr->nocrc; int ret; int total_max_write; - int in_trail = 0; + bool in_trail = false; size_t trail_len = (msg->trail ? msg->trail->length : 0); dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", - con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, + con, msg, con->out_msg_pos.page, msg->nr_pages, con->out_msg_pos.page_pos); #ifdef CONFIG_BLOCK @@ -934,13 +961,12 @@ static int write_partial_msg_pages(struct ceph_connection *con) /* have we reached the trail part of the data? */ if (con->out_msg_pos.data_pos >= data_len - trail_len) { - in_trail = 1; + in_trail = true; total_max_write = data_len - con->out_msg_pos.data_pos; page = list_first_entry(&msg->trail->head, struct page, lru); - max_write = PAGE_SIZE; } else if (msg->pages) { page = msg->pages[con->out_msg_pos.page]; } else if (msg->pagelist) { @@ -964,14 +990,14 @@ static int write_partial_msg_pages(struct ceph_connection *con) if (do_datacrc && !con->out_msg_pos.did_page_crc) { void *base; u32 crc; - u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); + u32 tmpcrc = le32_to_cpu(msg->footer.data_crc); char *kaddr; kaddr = kmap(page); BUG_ON(kaddr == NULL); base = kaddr + con->out_msg_pos.page_pos + bio_offset; crc = crc32c(tmpcrc, base, len); - con->out_msg->footer.data_crc = cpu_to_le32(crc); + msg->footer.data_crc = cpu_to_le32(crc); con->out_msg_pos.did_page_crc = true; } ret = ceph_tcp_sendpage(con->sock, page, @@ -984,30 +1010,14 @@ static int write_partial_msg_pages(struct ceph_connection *con) if (ret <= 0) goto out; - con->out_msg_pos.data_pos += ret; - con->out_msg_pos.page_pos += ret; - if (ret == len) { - con->out_msg_pos.page_pos = 0; - con->out_msg_pos.page++; - con->out_msg_pos.did_page_crc = false; - if (in_trail) - list_move_tail(&page->lru, - &msg->trail->head); - else if (msg->pagelist) - list_move_tail(&page->lru, - &msg->pagelist->head); -#ifdef CONFIG_BLOCK - else if (msg->bio) - iter_bio_next(&msg->bio_iter, &msg->bio_seg); -#endif - } + out_msg_pos_next(con, page, len, (size_t) ret, in_trail); } dout("write_partial_msg_pages %p msg %p done\n", con, msg); /* prepare and queue up footer, too */ if (!do_datacrc) - con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; + msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; con_out_kvec_reset(con); prepare_write_message_footer(con); ret = 1; -- cgit v1.2.2 From fd154f3c75465abd83b7a395033e3755908a1e6e Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: don't mark footer complete before it is This is a nit, but prepare_write_message() sets the FOOTER_COMPLETE flag before the CRC for the data portion (recorded in the footer) has been completely computed. Hold off setting the complete flag until we've decided it's ready to send. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 1b92e3b16c0d..5354d59ba8b9 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -592,6 +592,8 @@ static void prepare_write_message_footer(struct ceph_connection *con) struct ceph_msg *m = con->out_msg; int v = con->out_kvec_left; + m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; + dout("prepare_write_message_footer %p\n", con); con->out_kvec_is_msg = true; con->out_kvec[v].iov_base = &m->footer; @@ -665,7 +667,7 @@ static void prepare_write_message(struct ceph_connection *con) /* fill in crc (except data pages), footer */ crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); con->out_msg->hdr.crc = cpu_to_le32(crc); - con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; + con->out_msg->footer.flags = 0; crc = crc32c(0, m->front.iov_base, m->front.iov_len); con->out_msg->footer.front_crc = cpu_to_le32(crc); -- cgit v1.2.2 From df6ad1f97342ebc4270128222e896541405eecdb Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: move init_bio_*() functions up Move init_bio_iter() and iter_bio_next() up in their source file so the'll be defined before they're needed. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5354d59ba8b9..7b5ff4545bf3 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -565,6 +565,31 @@ static void con_out_kvec_add(struct ceph_connection *con, con->out_kvec_bytes += size; } +#ifdef CONFIG_BLOCK +static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) +{ + if (!bio) { + *iter = NULL; + *seg = 0; + return; + } + *iter = bio; + *seg = bio->bi_idx; +} + +static void iter_bio_next(struct bio **bio_iter, int *seg) +{ + if (*bio_iter == NULL) + return; + + BUG_ON(*seg >= (*bio_iter)->bi_vcnt); + + (*seg)++; + if (*seg == (*bio_iter)->bi_vcnt) + init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); +} +#endif + static void prepare_write_message_data(struct ceph_connection *con) { struct ceph_msg *msg = con->out_msg; @@ -868,31 +893,6 @@ out: return ret; /* done! */ } -#ifdef CONFIG_BLOCK -static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) -{ - if (!bio) { - *iter = NULL; - *seg = 0; - return; - } - *iter = bio; - *seg = bio->bi_idx; -} - -static void iter_bio_next(struct bio **bio_iter, int *seg) -{ - if (*bio_iter == NULL) - return; - - BUG_ON(*seg >= (*bio_iter)->bi_vcnt); - - (*seg)++; - if (*seg == (*bio_iter)->bi_vcnt) - init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); -} -#endif - static void out_msg_pos_next(struct ceph_connection *con, struct page *page, size_t len, size_t sent, bool in_trail) { -- cgit v1.2.2 From 572c588edadaa3da3992bd8a0fed830bbcc861f8 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: move init of bio_iter If a message has a non-null bio pointer, its bio_iter field is initialized in write_partial_msg_pages() if this has not been done already. This is really a one-time setup operation for sending a message's (bio) data, so move that initialization code into prepare_write_message_data() which serves that purpose. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 7b5ff4545bf3..fedad914b238 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -603,6 +603,10 @@ static void prepare_write_message_data(struct ceph_connection *con) con->out_msg_pos.page_pos = msg->page_alignment; else con->out_msg_pos.page_pos = 0; +#ifdef CONFIG_BLOCK + if (msg->bio && !msg->bio_iter) + init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); +#endif con->out_msg_pos.data_pos = 0; con->out_msg_pos.did_page_crc = false; con->out_more = 1; /* data + footer will follow */ @@ -942,11 +946,6 @@ static int write_partial_msg_pages(struct ceph_connection *con) con, msg, con->out_msg_pos.page, msg->nr_pages, con->out_msg_pos.page_pos); -#ifdef CONFIG_BLOCK - if (msg->bio && !msg->bio_iter) - init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); -#endif - while (data_len > con->out_msg_pos.data_pos) { struct page *page = NULL; int max_write = PAGE_SIZE; -- cgit v1.2.2 From abdaa6a849af1d63153682c11f5bbb22dacb1f6b Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: don't use bio_iter as a flag Recently a bug was fixed in which the bio_iter field in a ceph message was not being properly re-initialized when a message got re-transmitted: commit 43643528cce60ca184fe8197efa8e8da7c89a037 Author: Yan, Zheng rbd: Clear ceph_msg->bio_iter for retransmitted message We are now only initializing the bio_iter field when we are about to start to write message data (in prepare_write_message_data()), rather than every time we are attempting to write any portion of the message data (in write_partial_msg_pages()). This means we no longer need to use the msg->bio_iter field as a flag. So just don't do that any more. Trust prepare_write_message_data() to ensure msg->bio_iter is properly initialized, every time we are about to begin writing (or re-writing) a message's bio data. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index fedad914b238..3a4330371d88 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -604,7 +604,7 @@ static void prepare_write_message_data(struct ceph_connection *con) else con->out_msg_pos.page_pos = 0; #ifdef CONFIG_BLOCK - if (msg->bio && !msg->bio_iter) + if (msg->bio) init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); #endif con->out_msg_pos.data_pos = 0; @@ -672,10 +672,6 @@ static void prepare_write_message(struct ceph_connection *con) m->hdr.seq = cpu_to_le64(++con->out_seq); m->needs_out_seq = false; } -#ifdef CONFIG_BLOCK - else - m->bio_iter = NULL; -#endif dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", m, con->out_seq, le16_to_cpu(m->hdr.type), -- cgit v1.2.2 From a8d00e3cdef4c1c4f194414b72b24cd995439a05 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: SOCK_CLOSED is a flag, not a state The following commit changed it so SOCK_CLOSED bit was stored in a connection's new "flags" field rather than its "state" field. libceph: start separating connection flags from state commit 928443cd That bit is used in con_close_socket() to protect against setting an error message more than once in the socket event handler function. Unfortunately, the field being operated on in that function was not updated to be "flags" as it should have been. This fixes that error. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 3a4330371d88..39653944f21b 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -397,11 +397,11 @@ static int con_close_socket(struct ceph_connection *con) dout("con_close_socket on %p sock %p\n", con, con->sock); if (!con->sock) return 0; - set_bit(SOCK_CLOSED, &con->state); + set_bit(SOCK_CLOSED, &con->flags); rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); sock_release(con->sock); con->sock = NULL; - clear_bit(SOCK_CLOSED, &con->state); + clear_bit(SOCK_CLOSED, &con->flags); con_sock_state_closed(con); return rc; } -- cgit v1.2.2 From 188048bce311ee41e5178bc3255415d0eae28423 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: don't change socket state on sock event Currently the socket state change event handler records an error message on a connection to distinguish a close while connecting from a close while a connection was already established. Changing connection information during handling of a socket event is not very clean, so instead move this assignment inside con_work(), where it can be done during normal connection-level processing (and under protection of the connection mutex as well). Move the handling of a socket closed event up to the top of the processing loop in con_work(); there's no point in handling backoff etc. if we have a newly-closed socket to take care of. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 39653944f21b..56381b973d02 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -261,13 +261,8 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); - if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) { - if (test_bit(CONNECTING, &con->state)) - con->error_msg = "connection failed"; - else - con->error_msg = "socket closed"; + if (!test_and_set_bit(SOCK_CLOSED, &con->flags)) queue_con(con); - } break; case TCP_ESTABLISHED: dout("%s TCP_ESTABLISHED\n", __func__); @@ -2187,6 +2182,14 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: + if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { + if (test_bit(CONNECTING, &con->state)) + con->error_msg = "connection failed"; + else + con->error_msg = "socket closed"; + goto fault; + } + if (test_and_clear_bit(BACKOFF, &con->flags)) { dout("con_work %p backing off\n", con); if (queue_delayed_work(ceph_msgr_wq, &con->work, @@ -2216,9 +2219,6 @@ restart: con_close_socket(con); } - if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) - goto fault; - ret = try_read(con); if (ret == -EAGAIN) goto restart; -- cgit v1.2.2 From d65c9e0b9eb43d14ece9dd843506ccba06162ee7 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: just set SOCK_CLOSED when state changes When a TCP_CLOSE or TCP_CLOSE_WAIT event occurs, the SOCK_CLOSED connection flag bit is set, and if it had not been previously set queue_con() is called to ensure con_work() will get a chance to handle the changed state. con_work() atomically checks--and if set, clears--the SOCK_CLOSED bit if it was set. This means that even if the bit were set repeatedly, the related processing in con_work() only gets called once per transition of the bit from 0 to 1. What's important then is that we ensure con_work() gets called *at least* once when a socket close event occurs, not that it gets called *exactly* once. The work queue mechanism already takes care of queueing work only if it is not already queued, so there's no need for us to call queue_con() conditionally. So this patch just makes it so the SOCK_CLOSED flag gets set unconditionally in ceph_sock_state_change(). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 56381b973d02..cebef8560586 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -261,8 +261,8 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); - if (!test_and_set_bit(SOCK_CLOSED, &con->flags)) - queue_con(con); + set_bit(SOCK_CLOSED, &con->flags); + queue_con(con); break; case TCP_ESTABLISHED: dout("%s TCP_ESTABLISHED\n", __func__); -- cgit v1.2.2 From 456ea46865787283088b23a8a7f69244513b95f0 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: don't touch con state in con_close_socket() In con_close_socket(), a connection's SOCK_CLOSED flag gets set and then cleared while its shutdown method is called and its reference gets dropped. Previously, that flag got set only if it had not already been set, so setting it in con_close_socket() might have prevented additional processing being done on a socket being shut down. We no longer set SOCK_CLOSED in the socket event routine conditionally, so setting that bit here no longer provides whatever benefit it might have provided before. A race condition could still leave the SOCK_CLOSED bit set even after we've issued the call to con_close_socket(), so we still clear that bit after shutting the socket down. Add a comment explaining the reason for this. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index cebef8560586..cfcca1f5be67 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -392,10 +392,16 @@ static int con_close_socket(struct ceph_connection *con) dout("con_close_socket on %p sock %p\n", con, con->sock); if (!con->sock) return 0; - set_bit(SOCK_CLOSED, &con->flags); rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); sock_release(con->sock); con->sock = NULL; + + /* + * Forcibly clear the SOCK_CLOSE flag. It gets set + * independent of the connection mutex, and we could have + * received a socket close event before we had the chance to + * shut the socket down. + */ clear_bit(SOCK_CLOSED, &con->flags); con_sock_state_closed(con); return rc; -- cgit v1.2.2 From bb9e6bba5d8b85b631390f8dbe8a24ae1ff5b48a Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: clear CONNECTING in ceph_con_close() A connection that is closed will no longer be connecting. So clear the CONNECTING state bit in ceph_con_close(). Similarly, if the socket has been closed we no longer are in connecting state (a new connect sequence will need to be initiated). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index cfcca1f5be67..beee382d784e 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -462,6 +462,7 @@ void ceph_con_close(struct ceph_connection *con) dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr.in_addr)); clear_bit(NEGOTIATING, &con->state); + clear_bit(CONNECTING, &con->state); clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ set_bit(CLOSED, &con->state); @@ -2189,7 +2190,7 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { - if (test_bit(CONNECTING, &con->state)) + if (test_and_clear_bit(CONNECTING, &con->state)) con->error_msg = "connection failed"; else con->error_msg = "socket closed"; -- cgit v1.2.2 From 3ec50d1868a9e0493046400bb1fdd054c7f64ebd Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 23 May 2012 14:35:23 -0500 Subject: libceph: clear NEGOTIATING when done A connection state's NEGOTIATING bit gets set while in CONNECTING state after we have successfully exchanged a ceph banner and IP addresses with the connection's peer (the server). But that bit is not cleared again--at least not until another connection attempt is initiated. Instead, clear it as soon as the connection is fully established. Also, clear it when a socket connection gets prematurely closed in the midst of establishing a ceph connection (in case we had reached the point where it was set). Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index beee382d784e..500207bad5d6 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1562,6 +1562,7 @@ static int process_connect(struct ceph_connection *con) fail_protocol(con); return -1; } + clear_bit(NEGOTIATING, &con->state); clear_bit(CONNECTING, &con->state); con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->connect_seq++; @@ -1951,7 +1952,6 @@ more: /* open the socket first? */ if (con->sock == NULL) { - clear_bit(NEGOTIATING, &con->state); set_bit(CONNECTING, &con->state); con_out_kvec_reset(con); @@ -2190,10 +2190,12 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { - if (test_and_clear_bit(CONNECTING, &con->state)) + if (test_and_clear_bit(CONNECTING, &con->state)) { + clear_bit(NEGOTIATING, &con->state); con->error_msg = "connection failed"; - else + } else { con->error_msg = "socket closed"; + } goto fault; } -- cgit v1.2.2 From e27947c767f5bed15048f4e4dad3e2eb69133697 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 23 May 2012 14:35:23 -0500 Subject: libceph: define and use an explicit CONNECTED state There is no state explicitly defined when a ceph connection is fully operational. So define one. It's set when the connection sequence completes successfully, and is cleared when the connection gets closed. Be a little more careful when examining the old state when a socket disconnect event is reported. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 500207bad5d6..83bcf977e9b9 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -463,6 +463,7 @@ void ceph_con_close(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); clear_bit(NEGOTIATING, &con->state); clear_bit(CONNECTING, &con->state); + clear_bit(CONNECTED, &con->state); clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ set_bit(CLOSED, &con->state); @@ -1564,6 +1565,7 @@ static int process_connect(struct ceph_connection *con) } clear_bit(NEGOTIATING, &con->state); clear_bit(CONNECTING, &con->state); + set_bit(CONNECTED, &con->state); con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->connect_seq++; con->peer_features = server_feat; @@ -2114,6 +2116,7 @@ more: prepare_read_ack(con); break; case CEPH_MSGR_TAG_CLOSE: + clear_bit(CONNECTED, &con->state); set_bit(CLOSED, &con->state); /* fixme */ goto out; default: @@ -2190,11 +2193,13 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { - if (test_and_clear_bit(CONNECTING, &con->state)) { + if (test_and_clear_bit(CONNECTED, &con->state)) + con->error_msg = "socket closed"; + else if (test_and_clear_bit(CONNECTING, &con->state)) { clear_bit(NEGOTIATING, &con->state); con->error_msg = "connection failed"; } else { - con->error_msg = "socket closed"; + con->error_msg = "unrecognized con state"; } goto fault; } -- cgit v1.2.2 From ab166d5aa3bc036fba7efaca6e4e43a7e9510acf Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 31 May 2012 11:37:29 -0500 Subject: libceph: separate banner and connect writes There are two phases in the process of linking together the two ends of a ceph connection. The first involves exchanging a banner and IP addresses, and if that is successful a second phase exchanges some detail about each side's connection capabilities. When initiating a connection, the client side now queues to send its information for both phases of this process at the same time. This is probably a bit more efficient, but it is slightly messier from a layering perspective in the code. So rearrange things so that the client doesn't send the connection information until it has received and processed the response in the initial banner phase (in process_banner()). Move the code (in the (con->sock == NULL) case in try_write()) that prepares for writing the connection information, delaying doing that until the banner exchange has completed. Move the code that begins the transition to this second "NEGOTIATING" phase out of process_banner() and into its caller, so preparing to write the connection information and preparing to read the response are adjacent to each other. Finally, preparing to write the connection information now requires the output kvec to be reset in all cases, so move that into the prepare_write_connect() and delete it from all callers. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 83bcf977e9b9..5e67be3fa296 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -841,6 +841,7 @@ static int prepare_write_connect(struct ceph_connection *con) con->out_connect.authorizer_len = auth ? cpu_to_le32(auth->authorizer_buf_len) : 0; + con_out_kvec_reset(con); con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); if (auth && auth->authorizer_buf_len) @@ -1430,8 +1431,6 @@ static int process_banner(struct ceph_connection *con) ceph_pr_addr(&con->msgr->inst.addr.in_addr)); } - set_bit(NEGOTIATING, &con->state); - prepare_read_connect(con); return 0; } @@ -1481,7 +1480,6 @@ static int process_connect(struct ceph_connection *con) return -1; } con->auth_retry = 1; - con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1502,7 +1500,6 @@ static int process_connect(struct ceph_connection *con) ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr)); reset_connection(con); - con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1528,7 +1525,6 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->out_connect.connect_seq), le32_to_cpu(con->in_connect.connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); - con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1545,7 +1541,6 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_connect.global_seq)); get_global_seq(con->msgr, le32_to_cpu(con->in_connect.global_seq)); - con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1958,9 +1953,6 @@ more: con_out_kvec_reset(con); prepare_write_banner(con); - ret = prepare_write_connect(con); - if (ret < 0) - goto out; prepare_read_banner(con); BUG_ON(con->in_msg); @@ -2073,6 +2065,16 @@ more: ret = process_banner(con); if (ret < 0) goto out; + + /* Banner is good, exchange connection info */ + ret = prepare_write_connect(con); + if (ret < 0) + goto out; + prepare_read_connect(con); + set_bit(NEGOTIATING, &con->state); + + /* Send connection info before awaiting response */ + goto out; } ret = read_partial_connect(con); if (ret <= 0) -- cgit v1.2.2 From 7593af920baac37752190a0db703d2732bed4a3b Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 24 May 2012 11:55:03 -0500 Subject: libceph: distinguish two phases of connect sequence Currently a ceph connection enters a "CONNECTING" state when it begins the process of (re-)connecting with its peer. Once the two ends have successfully exchanged their banner and addresses, an additional NEGOTIATING bit is set in the ceph connection's state to indicate the connection information exhange has begun. The CONNECTING bit/state continues to be set during this phase. Rather than have the CONNECTING state continue while the NEGOTIATING bit is set, interpret these two phases as distinct states. In other words, when NEGOTIATING is set, clear CONNECTING. That way only one of them will be active at a time. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 52 ++++++++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 24 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5e67be3fa296..32a3a2a72580 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1559,7 +1559,6 @@ static int process_connect(struct ceph_connection *con) return -1; } clear_bit(NEGOTIATING, &con->state); - clear_bit(CONNECTING, &con->state); set_bit(CONNECTED, &con->state); con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->connect_seq++; @@ -2000,7 +1999,8 @@ more_kvec: } do_next: - if (!test_bit(CONNECTING, &con->state)) { + if (!test_bit(CONNECTING, &con->state) && + !test_bit(NEGOTIATING, &con->state)) { /* is anything else pending? */ if (!list_empty(&con->out_queue)) { prepare_write_message(con); @@ -2057,25 +2057,29 @@ more: } if (test_bit(CONNECTING, &con->state)) { - if (!test_bit(NEGOTIATING, &con->state)) { - dout("try_read connecting\n"); - ret = read_partial_banner(con); - if (ret <= 0) - goto out; - ret = process_banner(con); - if (ret < 0) - goto out; - - /* Banner is good, exchange connection info */ - ret = prepare_write_connect(con); - if (ret < 0) - goto out; - prepare_read_connect(con); - set_bit(NEGOTIATING, &con->state); - - /* Send connection info before awaiting response */ + dout("try_read connecting\n"); + ret = read_partial_banner(con); + if (ret <= 0) goto out; - } + ret = process_banner(con); + if (ret < 0) + goto out; + + clear_bit(CONNECTING, &con->state); + set_bit(NEGOTIATING, &con->state); + + /* Banner is good, exchange connection info */ + ret = prepare_write_connect(con); + if (ret < 0) + goto out; + prepare_read_connect(con); + + /* Send connection info before awaiting response */ + goto out; + } + + if (test_bit(NEGOTIATING, &con->state)) { + dout("try_read negotiating\n"); ret = read_partial_connect(con); if (ret <= 0) goto out; @@ -2197,12 +2201,12 @@ restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { if (test_and_clear_bit(CONNECTED, &con->state)) con->error_msg = "socket closed"; - else if (test_and_clear_bit(CONNECTING, &con->state)) { - clear_bit(NEGOTIATING, &con->state); + else if (test_and_clear_bit(NEGOTIATING, &con->state)) + con->error_msg = "negotiation failed"; + else if (test_and_clear_bit(CONNECTING, &con->state)) con->error_msg = "connection failed"; - } else { + else con->error_msg = "unrecognized con state"; - } goto fault; } -- cgit v1.2.2 From 5821bd8ccdf5d17ab2c391c773756538603838c3 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 11 Jun 2012 14:57:13 -0500 Subject: libceph: small changes to messenger.c This patch gathers a few small changes in "net/ceph/messenger.c": out_msg_pos_next() - small logic change that mostly affects indentation write_partial_msg_pages(). - use a local variable trail_off to represent the offset into a message of the trail portion of the data (if present) - once we are in the trail portion we will always be there, so we don't always need to check against our data position - avoid computing len twice after we've reached the trail - get rid of the variable tmpcrc, which is not needed - trail_off and trail_len never change so mark them const - update some comments read_partial_message_bio() - bio_iovec_idx() will never return an error, so don't bother checking for it Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 63 ++++++++++++++++++++++++++-------------------------- 1 file changed, 31 insertions(+), 32 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 32a3a2a72580..4578e99bbef1 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -907,21 +907,23 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, con->out_msg_pos.data_pos += sent; con->out_msg_pos.page_pos += sent; - if (sent == len) { - con->out_msg_pos.page_pos = 0; - con->out_msg_pos.page++; - con->out_msg_pos.did_page_crc = false; - if (in_trail) - list_move_tail(&page->lru, - &msg->trail->head); - else if (msg->pagelist) - list_move_tail(&page->lru, - &msg->pagelist->head); + if (sent < len) + return; + + BUG_ON(sent != len); + con->out_msg_pos.page_pos = 0; + con->out_msg_pos.page++; + con->out_msg_pos.did_page_crc = false; + if (in_trail) + list_move_tail(&page->lru, + &msg->trail->head); + else if (msg->pagelist) + list_move_tail(&page->lru, + &msg->pagelist->head); #ifdef CONFIG_BLOCK - else if (msg->bio) - iter_bio_next(&msg->bio_iter, &msg->bio_seg); + else if (msg->bio) + iter_bio_next(&msg->bio_iter, &msg->bio_seg); #endif - } } /* @@ -940,30 +942,31 @@ static int write_partial_msg_pages(struct ceph_connection *con) int ret; int total_max_write; bool in_trail = false; - size_t trail_len = (msg->trail ? msg->trail->length : 0); + const size_t trail_len = (msg->trail ? msg->trail->length : 0); + const size_t trail_off = data_len - trail_len; dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", con, msg, con->out_msg_pos.page, msg->nr_pages, con->out_msg_pos.page_pos); + /* + * Iterate through each page that contains data to be + * written, and send as much as possible for each. + * + * If we are calculating the data crc (the default), we will + * need to map the page. If we have no pages, they have + * been revoked, so use the zero page. + */ while (data_len > con->out_msg_pos.data_pos) { struct page *page = NULL; int max_write = PAGE_SIZE; int bio_offset = 0; - total_max_write = data_len - trail_len - - con->out_msg_pos.data_pos; - - /* - * if we are calculating the data crc (the default), we need - * to map the page. if our pages[] has been revoked, use the - * zero page. - */ - - /* have we reached the trail part of the data? */ - if (con->out_msg_pos.data_pos >= data_len - trail_len) { - in_trail = true; + in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off; + if (!in_trail) + total_max_write = trail_off - con->out_msg_pos.data_pos; + if (in_trail) { total_max_write = data_len - con->out_msg_pos.data_pos; page = list_first_entry(&msg->trail->head, @@ -990,14 +993,13 @@ static int write_partial_msg_pages(struct ceph_connection *con) if (do_datacrc && !con->out_msg_pos.did_page_crc) { void *base; - u32 crc; - u32 tmpcrc = le32_to_cpu(msg->footer.data_crc); + u32 crc = le32_to_cpu(msg->footer.data_crc); char *kaddr; kaddr = kmap(page); BUG_ON(kaddr == NULL); base = kaddr + con->out_msg_pos.page_pos + bio_offset; - crc = crc32c(tmpcrc, base, len); + crc = crc32c(crc, base, len); msg->footer.data_crc = cpu_to_le32(crc); con->out_msg_pos.did_page_crc = true; } @@ -1702,9 +1704,6 @@ static int read_partial_message_bio(struct ceph_connection *con, void *p; int ret, left; - if (IS_ERR(bv)) - return PTR_ERR(bv); - left = min((int)(data_len - con->in_msg_pos.data_pos), (int)(bv->bv_len - con->in_msg_pos.page_pos)); -- cgit v1.2.2 From bc18f4b1c850ab355e38373fbb60fd28568d84b5 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Jun 2012 21:53:53 -0500 Subject: libceph: add some fine ASCII art Sage liked the state diagram I put in my commit description so I'm putting it in with the code. Signed-off-by: Alex Elder Reviewed-by: Sage Weil --- net/ceph/messenger.c | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 4578e99bbef1..dcc50e4cd5cd 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -29,7 +29,47 @@ * the sender. */ -/* State values for ceph_connection->sock_state; NEW is assumed to be 0 */ +/* + * We track the state of the socket on a given connection using + * values defined below. The transition to a new socket state is + * handled by a function which verifies we aren't coming from an + * unexpected state. + * + * -------- + * | NEW* | transient initial state + * -------- + * | con_sock_state_init() + * v + * ---------- + * | CLOSED | initialized, but no socket (and no + * ---------- TCP connection) + * ^ \ + * | \ con_sock_state_connecting() + * | ---------------------- + * | \ + * + con_sock_state_closed() \ + * |\ \ + * | \ \ + * | ----------- \ + * | | CLOSING | socket event; \ + * | ----------- await close \ + * | ^ | + * | | | + * | + con_sock_state_closing() | + * | / \ | + * | / --------------- | + * | / \ v + * | / -------------- + * | / -----------------| CONNECTING | socket created, TCP + * | | / -------------- connect initiated + * | | | con_sock_state_connected() + * | | v + * ------------- + * | CONNECTED | TCP connection established + * ------------- + * + * State values for ceph_connection->sock_state; NEW is assumed to be 0. + */ #define CON_SOCK_STATE_NEW 0 /* -> CLOSED */ #define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */ -- cgit v1.2.2 From b7a9e5dd40f17a48a72f249b8bbc989b63bae5fd Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 27 Jun 2012 12:24:08 -0700 Subject: libceph: set peer name on con_open, not init The peer name may change on each open attempt, even when the connection is reused. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index dcc50e4cd5cd..ae082d95fc72 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -523,12 +523,17 @@ EXPORT_SYMBOL(ceph_con_close); /* * Reopen a closed connection, with a new peer address. */ -void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) +void ceph_con_open(struct ceph_connection *con, + __u8 entity_type, __u64 entity_num, + struct ceph_entity_addr *addr) { dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); set_bit(OPENING, &con->state); WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); + con->peer_name.type = (__u8) entity_type; + con->peer_name.num = cpu_to_le64(entity_num); + memcpy(&con->peer_addr, addr, sizeof(*addr)); con->delay = 0; /* reset backoff memory */ queue_con(con); @@ -548,7 +553,7 @@ bool ceph_con_opened(struct ceph_connection *con) */ void ceph_con_init(struct ceph_connection *con, void *private, const struct ceph_connection_operations *ops, - struct ceph_messenger *msgr, __u8 entity_type, __u64 entity_num) + struct ceph_messenger *msgr) { dout("con_init %p\n", con); memset(con, 0, sizeof(*con)); @@ -558,9 +563,6 @@ void ceph_con_init(struct ceph_connection *con, void *private, con_sock_state_init(con); - con->peer_name.type = (__u8) entity_type; - con->peer_name.num = cpu_to_le64(entity_num); - mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); -- cgit v1.2.2 From fbb85a478f6d4cce6942f1c25c6a68ec5b1e7e7f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 27 Jun 2012 12:31:02 -0700 Subject: libceph: allow sock transition from CONNECTING to CLOSED It is possible to close a socket that is in the OPENING state. For example, it can happen if ceph_con_close() is called on the con before the TCP connection is established. con_work() will come around and shut down the socket. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index ae082d95fc72..09ada7924874 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -48,17 +48,17 @@ * | ---------------------- * | \ * + con_sock_state_closed() \ - * |\ \ - * | \ \ - * | ----------- \ - * | | CLOSING | socket event; \ - * | ----------- await close \ - * | ^ | - * | | | - * | + con_sock_state_closing() | - * | / \ | - * | / --------------- | - * | / \ v + * |+--------------------------- \ + * | \ \ \ + * | ----------- \ \ + * | | CLOSING | socket event; \ \ + * | ----------- await close \ \ + * | ^ \ | + * | | \ | + * | + con_sock_state_closing() \ | + * | / \ | | + * | / --------------- | | + * | / \ v v * | / -------------- * | / -----------------| CONNECTING | socket created, TCP * | | / -------------- connect initiated @@ -241,7 +241,8 @@ static void con_sock_state_closed(struct ceph_connection *con) old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && - old_state != CON_SOCK_STATE_CLOSING)) + old_state != CON_SOCK_STATE_CLOSING && + old_state != CON_SOCK_STATE_CONNECTING)) printk("%s: unexpected old state %d\n", __func__, old_state); } -- cgit v1.2.2 From a16cb1f70799c851410d9dca0a24122e258df06c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 10 Jul 2012 11:53:34 -0700 Subject: libceph: fix messenger retry In ancient times, the messenger could both initiate and accept connections. An artifact if that was data structures to store/process an incoming ceph_msg_connect request and send an outgoing ceph_msg_connect_reply. Sadly, the negotiation code was referencing those structures and ignoring important information (like the peer's connect_seq) from the correct ones. Among other things, this fixes tight reconnect loops where the server sends RETRY_SESSION and we (the client) retries with the same connect_seq as last time. This bug pretty easily triggered by injecting socket failures on the MDS and running some fs workload like workunits/direct_io/test_sync_io. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 09ada7924874..16814d1f4774 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1540,7 +1540,7 @@ static int process_connect(struct ceph_connection *con) * dropped messages. */ dout("process_connect got RESET peer seq %u\n", - le32_to_cpu(con->in_connect.connect_seq)); + le32_to_cpu(con->in_reply.connect_seq)); pr_err("%s%lld %s connection reset\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr)); @@ -1566,10 +1566,10 @@ static int process_connect(struct ceph_connection *con) * If we sent a smaller connect_seq than the peer has, try * again with a larger value. */ - dout("process_connect got RETRY my seq = %u, peer_seq = %u\n", + dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", le32_to_cpu(con->out_connect.connect_seq), - le32_to_cpu(con->in_connect.connect_seq)); - con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); + le32_to_cpu(con->in_reply.connect_seq)); + con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); ret = prepare_write_connect(con); if (ret < 0) return ret; @@ -1583,9 +1583,9 @@ static int process_connect(struct ceph_connection *con) */ dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", con->peer_global_seq, - le32_to_cpu(con->in_connect.global_seq)); + le32_to_cpu(con->in_reply.global_seq)); get_global_seq(con->msgr, - le32_to_cpu(con->in_connect.global_seq)); + le32_to_cpu(con->in_reply.global_seq)); ret = prepare_write_connect(con); if (ret < 0) return ret; -- cgit v1.2.2 From a2a3258417eb6a1799cf893350771428875a8287 Mon Sep 17 00:00:00 2001 From: Guanjun He Date: Sun, 8 Jul 2012 19:50:33 -0700 Subject: libceph: prevent the race of incoming work during teardown Add an atomic variable 'stopping' as flag in struct ceph_messenger, set this flag to 1 in function ceph_destroy_client(), and add the condition code in function ceph_data_ready() to test the flag value, if true(1), just return. Signed-off-by: Guanjun He Reviewed-by: Sage Weil --- net/ceph/messenger.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 16814d1f4774..63e1252d3af5 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -254,6 +254,9 @@ static void con_sock_state_closed(struct ceph_connection *con) static void ceph_sock_data_ready(struct sock *sk, int count_unused) { struct ceph_connection *con = sk->sk_user_data; + if (atomic_read(&con->msgr->stopping)) { + return; + } if (sk->sk_state != TCP_CLOSE_WAIT) { dout("%s on %p state = %lu, queueing work\n", __func__, @@ -2413,6 +2416,8 @@ void ceph_messenger_init(struct ceph_messenger *msgr, encode_my_addr(msgr); msgr->nocrc = nocrc; + atomic_set(&msgr->stopping, 0); + dout("%s %p\n", __func__, msgr); } EXPORT_SYMBOL(ceph_messenger_init); -- cgit v1.2.2 From 3a140a0d5c4b9e35373b016e41dfc85f1e526bdb Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:24:21 -0700 Subject: libceph: report socket read/write error message We need to set error_msg to something useful before calling ceph_fault(); do so here for try_{read,write}(). This is more informative than libceph: osd0 192.168.106.220:6801 (null) Signed-off-by: Sage Weil Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/messenger.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 63e1252d3af5..6e2f67816f61 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2287,14 +2287,18 @@ restart: ret = try_read(con); if (ret == -EAGAIN) goto restart; - if (ret < 0) + if (ret < 0) { + con->error_msg = "socket error on read"; goto fault; + } ret = try_write(con); if (ret == -EAGAIN) goto restart; - if (ret < 0) + if (ret < 0) { + con->error_msg = "socket error on write"; goto fault; + } done: mutex_unlock(&con->mutex); -- cgit v1.2.2 From 8c50c817566dfa4581f82373aac39f3e608a7dc8 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:24:37 -0700 Subject: libceph: fix mutex coverage for ceph_con_close Hold the mutex while twiddling all of the state bits to avoid possible races. While we're here, make not of why we cannot close the socket directly. Signed-off-by: Sage Weil Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/messenger.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 6e2f67816f61..e65b15d5d8b9 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -503,6 +503,7 @@ static void reset_connection(struct ceph_connection *con) */ void ceph_con_close(struct ceph_connection *con) { + mutex_lock(&con->mutex); dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr.in_addr)); clear_bit(NEGOTIATING, &con->state); @@ -515,11 +516,16 @@ void ceph_con_close(struct ceph_connection *con) clear_bit(KEEPALIVE_PENDING, &con->flags); clear_bit(WRITE_PENDING, &con->flags); - mutex_lock(&con->mutex); reset_connection(con); con->peer_global_seq = 0; cancel_delayed_work(&con->work); mutex_unlock(&con->mutex); + + /* + * We cannot close the socket directly from here because the + * work threads use it without holding the mutex. Instead, let + * con_work() do it. + */ queue_con(con); } EXPORT_SYMBOL(ceph_con_close); -- cgit v1.2.2 From a4107026976f06c9a6ce8cc84a763564ee39d901 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:20:25 -0700 Subject: libceph: (re)initialize bio_iter on start of message receive Previously, we were opportunistically initializing the bio_iter if it appeared to be uninitialized in the middle of the read path. The problem is that a sequence like: - start reading message - initialize bio_iter - read half a message - messenger fault, reconnect - restart reading message - ** bio_iter now non-NULL, not reinitialized ** - read past end of bio, crash Instead, initialize the bio_iter unconditionally when we allocate/claim the message for read. Signed-off-by: Sage Weil Reviewed-by: Alex Elder Reviewed-by: Yehuda Sadeh --- net/ceph/messenger.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e65b15d5d8b9..f1bd3bbb0c46 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1872,6 +1872,11 @@ static int read_partial_message(struct ceph_connection *con) else con->in_msg_pos.page_pos = 0; con->in_msg_pos.data_pos = 0; + +#ifdef CONFIG_BLOCK + if (m->bio) + init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); +#endif } /* front */ @@ -1888,10 +1893,6 @@ static int read_partial_message(struct ceph_connection *con) if (ret <= 0) return ret; } -#ifdef CONFIG_BLOCK - if (m->bio && !m->bio_iter) - init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); -#endif /* (page) data */ while (con->in_msg_pos.data_pos < data_len) { @@ -1902,7 +1903,7 @@ static int read_partial_message(struct ceph_connection *con) return ret; #ifdef CONFIG_BLOCK } else if (m->bio) { - + BUG_ON(!m->bio_iter); ret = read_partial_message_bio(con, &m->bio_iter, &m->bio_seg, data_len, do_datacrc); -- cgit v1.2.2 From 5469155f2bc83bb2c88b0a0370c3d54d87eed06e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:21:40 -0700 Subject: libceph: protect ceph_con_open() with mutex Take the con mutex while we are initiating a ceph open. This is necessary because the may have previously been in use and then closed, which could result in a racing workqueue running con_work(). Signed-off-by: Sage Weil Reviewed-by: Yehuda Sadeh Reviewed-by: Alex Elder --- net/ceph/messenger.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index f1bd3bbb0c46..a4779988c847 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -537,6 +537,7 @@ void ceph_con_open(struct ceph_connection *con, __u8 entity_type, __u64 entity_num, struct ceph_entity_addr *addr) { + mutex_lock(&con->mutex); dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); set_bit(OPENING, &con->state); WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); @@ -546,6 +547,7 @@ void ceph_con_open(struct ceph_connection *con, memcpy(&con->peer_addr, addr, sizeof(*addr)); con->delay = 0; /* reset backoff memory */ + mutex_unlock(&con->mutex); queue_con(con); } EXPORT_SYMBOL(ceph_con_open); -- cgit v1.2.2 From 85effe183dd45854d1ad1a370b88cddb403c4c91 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 16:22:05 -0700 Subject: libceph: reset connection retry on successfully negotiation We exponentially back off when we encounter connection errors. If several errors accumulate, we will eventually wait ages before even trying to reconnect. Fix this by resetting the backoff counter after a successful negotiation/ connection with the remote node. Fixes ceph issue #2802. Signed-off-by: Sage Weil Reviewed-by: Yehuda Sadeh Reviewed-by: Alex Elder --- net/ceph/messenger.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index a4779988c847..07204f19e856 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1629,6 +1629,8 @@ static int process_connect(struct ceph_connection *con) if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) set_bit(LOSSYTX, &con->flags); + con->delay = 0; /* reset backoff memory */ + prepare_read_tag(con); break; -- cgit v1.2.2 From 3b5ede07b55b52c3be27749d183d87257d032065 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 15:22:53 -0700 Subject: libceph: fix fault locking; close socket on lossy fault If we fault on a lossy connection, we should still close the socket immediately, and do so under the con mutex. We should also take the con mutex before printing out the state bits in the debug output. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 07204f19e856..9aaf539942ac 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2330,22 +2330,23 @@ fault: */ static void ceph_fault(struct ceph_connection *con) { + mutex_lock(&con->mutex); + pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); dout("fault %p state %lu to peer %s\n", con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); - if (test_bit(LOSSYTX, &con->flags)) { - dout("fault on LOSSYTX channel\n"); - goto out; - } - - mutex_lock(&con->mutex); if (test_bit(CLOSED, &con->state)) goto out_unlock; con_close_socket(con); + if (test_bit(LOSSYTX, &con->flags)) { + dout("fault on LOSSYTX channel\n"); + goto out_unlock; + } + if (con->in_msg) { BUG_ON(con->in_msg->con != con); con->in_msg->con = NULL; @@ -2392,7 +2393,6 @@ static void ceph_fault(struct ceph_connection *con) out_unlock: mutex_unlock(&con->mutex); -out: /* * in case we faulted due to authentication, invalidate our * current tickets so that we can get new ones. -- cgit v1.2.2 From 00650931e52e97fe64096bec167f5a6780dfd94a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 15:33:04 -0700 Subject: libceph: move msgr clear_standby under con mutex protection Avoid dropping and retaking con->mutex in the ceph_con_send() case by leaving locking up to the caller. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 9aaf539942ac..1a3cb4a4f180 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2441,12 +2441,10 @@ static void clear_standby(struct ceph_connection *con) { /* come back from STANDBY? */ if (test_and_clear_bit(STANDBY, &con->state)) { - mutex_lock(&con->mutex); dout("clear_standby %p and ++connect_seq\n", con); con->connect_seq++; WARN_ON(test_bit(WRITE_PENDING, &con->flags)); WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); - mutex_unlock(&con->mutex); } } @@ -2483,11 +2481,12 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) le32_to_cpu(msg->hdr.front_len), le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len)); + + clear_standby(con); mutex_unlock(&con->mutex); /* if there wasn't anything waiting to send before, queue * new work */ - clear_standby(con); if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0) queue_con(con); } @@ -2574,7 +2573,9 @@ void ceph_msg_revoke_incoming(struct ceph_msg *msg) void ceph_con_keepalive(struct ceph_connection *con) { dout("con_keepalive %p\n", con); + mutex_lock(&con->mutex); clear_standby(con); + mutex_unlock(&con->mutex); if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 && test_and_set_bit(WRITE_PENDING, &con->flags) == 0) queue_con(con); -- cgit v1.2.2 From a59b55a602b6c741052d79c1e3643f8440cddd27 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 15:34:04 -0700 Subject: libceph: move ceph_con_send() closed check under the con mutex Take the con mutex before checking whether the connection is closed to avoid racing with someone else closing it. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 1a3cb4a4f180..20e60a80bc29 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2453,22 +2453,20 @@ static void clear_standby(struct ceph_connection *con) */ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) { - if (test_bit(CLOSED, &con->state)) { - dout("con_send %p closed, dropping %p\n", con, msg); - ceph_msg_put(msg); - return; - } - /* set src+dst */ msg->hdr.src = con->msgr->inst.name; - BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); - msg->needs_out_seq = true; - /* queue */ mutex_lock(&con->mutex); + if (test_bit(CLOSED, &con->state)) { + dout("con_send %p closed, dropping %p\n", con, msg); + ceph_msg_put(msg); + mutex_unlock(&con->mutex); + return; + } + BUG_ON(msg->con != NULL); msg->con = con->ops->get(con); BUG_ON(msg->con == NULL); -- cgit v1.2.2 From 2e8cb10063820af7ed7638e3fd9013eee21266e7 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 15:40:04 -0700 Subject: libceph: drop gratuitous socket close calls in con_work If the state is CLOSED or OPENING, we shouldn't have a socket. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 20e60a80bc29..32ab7cd089a3 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2284,15 +2284,15 @@ restart: dout("con_work %p STANDBY\n", con); goto done; } - if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ - dout("con_work CLOSED\n"); - con_close_socket(con); + if (test_bit(CLOSED, &con->state)) { + dout("con_work %p CLOSED\n", con); + BUG_ON(con->sock); goto done; } if (test_and_clear_bit(OPENING, &con->state)) { /* reopen w/ new peer */ dout("con_work OPENING\n"); - con_close_socket(con); + BUG_ON(con->sock); } ret = try_read(con); -- cgit v1.2.2 From ee76e0736db8455e3b11827d6899bd2a4e1d0584 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 16:45:49 -0700 Subject: libceph: close socket directly from ceph_con_close() It is simpler to do this immediately, since we already hold the con mutex. It also avoids the need to deal with a not-quite-CLOSED socket in con_work. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 32ab7cd089a3..46ce113732e6 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -519,14 +519,8 @@ void ceph_con_close(struct ceph_connection *con) reset_connection(con); con->peer_global_seq = 0; cancel_delayed_work(&con->work); + con_close_socket(con); mutex_unlock(&con->mutex); - - /* - * We cannot close the socket directly from here because the - * work threads use it without holding the mutex. Instead, let - * con_work() do it. - */ - queue_con(con); } EXPORT_SYMBOL(ceph_con_close); -- cgit v1.2.2 From d7353dd5aaf22ed611fbcd0d4a4a12fb30659290 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 17:19:43 -0700 Subject: libceph: drop unnecessary CLOSED check in socket state change callback If we are CLOSED, the socket is closed and we won't get these. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 3 --- 1 file changed, 3 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 46ce113732e6..e7320cd5fdbc 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -296,9 +296,6 @@ static void ceph_sock_state_change(struct sock *sk) dout("%s %p state = %lu sk_state = %u\n", __func__, con, con->state, sk->sk_state); - if (test_bit(CLOSED, &con->state)) - return; - switch (sk->sk_state) { case TCP_CLOSE: dout("%s TCP_CLOSE\n", __func__); -- cgit v1.2.2 From 8dacc7da69a491c515851e68de6036f21b5663ce Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 17:24:40 -0700 Subject: libceph: replace connection state bits with states Use a simple set of 6 enumerated values for the socket states (CON_STATE_*) and use those instead of the state bits. All of the con->state checks are now under the protection of the con mutex, so this is safe. It also simplifies many of the state checks because we can check for anything other than the expected state instead of various bits for races we can think of. This appears to hold up well to stress testing both with and without socket failure injection on the server side. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 130 +++++++++++++++++++++++++++------------------------ 1 file changed, 68 insertions(+), 62 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e7320cd5fdbc..563e46aa4d6d 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -77,6 +77,17 @@ #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ +/* + * connection states + */ +#define CON_STATE_CLOSED 1 /* -> PREOPEN */ +#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */ +#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */ +#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */ +#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ +#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ + + /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; @@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connection *con) mutex_lock(&con->mutex); dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr.in_addr)); - clear_bit(NEGOTIATING, &con->state); - clear_bit(CONNECTING, &con->state); - clear_bit(CONNECTED, &con->state); - clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ - set_bit(CLOSED, &con->state); + con->state = CON_STATE_CLOSED; clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ clear_bit(KEEPALIVE_PENDING, &con->flags); @@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connection *con, { mutex_lock(&con->mutex); dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); - set_bit(OPENING, &con->state); - WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); + + BUG_ON(con->state != CON_STATE_CLOSED); + con->state = CON_STATE_PREOPEN; con->peer_name.type = (__u8) entity_type; con->peer_name.num = cpu_to_le64(entity_num); @@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connection *con, void *private, INIT_LIST_HEAD(&con->out_sent); INIT_DELAYED_WORK(&con->work, con_work); - set_bit(CLOSED, &con->state); + con->state = CON_STATE_CLOSED; } EXPORT_SYMBOL(ceph_con_init); @@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection if (!con->ops->get_authorizer) { con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; con->out_connect.authorizer_len = 0; - return NULL; } /* Can't hold the mutex while getting authorizer */ - mutex_unlock(&con->mutex); - auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); - mutex_lock(&con->mutex); if (IS_ERR(auth)) return auth; - if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) + if (con->state != CON_STATE_NEGOTIATING) return ERR_PTR(-EAGAIN); con->auth_reply_buf = auth->authorizer_reply_buf; con->auth_reply_buf_len = auth->authorizer_reply_buf_len; - - return auth; } @@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_connection *con) static void fail_protocol(struct ceph_connection *con) { reset_connection(con); - set_bit(CLOSED, &con->state); /* in case there's queued work */ + BUG_ON(con->state != CON_STATE_NEGOTIATING); + con->state = CON_STATE_CLOSED; } static int process_connect(struct ceph_connection *con) @@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_connection *con) if (con->ops->peer_reset) con->ops->peer_reset(con); mutex_lock(&con->mutex); - if (test_bit(CLOSED, &con->state) || - test_bit(OPENING, &con->state)) + if (con->state != CON_STATE_NEGOTIATING) return -EAGAIN; break; @@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_connection *con) fail_protocol(con); return -1; } - clear_bit(NEGOTIATING, &con->state); - set_bit(CONNECTED, &con->state); + + BUG_ON(con->state != CON_STATE_NEGOTIATING); + con->state = CON_STATE_OPEN; + con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->connect_seq++; con->peer_features = server_feat; @@ -1994,8 +1998,9 @@ more: dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); /* open the socket first? */ - if (con->sock == NULL) { - set_bit(CONNECTING, &con->state); + if (con->state == CON_STATE_PREOPEN) { + BUG_ON(con->sock); + con->state = CON_STATE_CONNECTING; con_out_kvec_reset(con); prepare_write_banner(con); @@ -2046,8 +2051,7 @@ more_kvec: } do_next: - if (!test_bit(CONNECTING, &con->state) && - !test_bit(NEGOTIATING, &con->state)) { + if (con->state == CON_STATE_OPEN) { /* is anything else pending? */ if (!list_empty(&con->out_queue)) { prepare_write_message(con); @@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connection *con) { int ret = -1; - if (!con->sock) - return 0; - - if (test_bit(STANDBY, &con->state)) +more: + dout("try_read start on %p state %lu\n", con, con->state); + if (con->state != CON_STATE_CONNECTING && + con->state != CON_STATE_NEGOTIATING && + con->state != CON_STATE_OPEN) return 0; - dout("try_read start on %p\n", con); + BUG_ON(!con->sock); -more: dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, con->in_base_pos); - /* - * process_connect and process_message drop and re-take - * con->mutex. make sure we handle a racing close or reopen. - */ - if (test_bit(CLOSED, &con->state) || - test_bit(OPENING, &con->state)) { - ret = -EAGAIN; - goto out; - } - - if (test_bit(CONNECTING, &con->state)) { + if (con->state == CON_STATE_CONNECTING) { dout("try_read connecting\n"); ret = read_partial_banner(con); if (ret <= 0) @@ -2112,8 +2106,8 @@ more: if (ret < 0) goto out; - clear_bit(CONNECTING, &con->state); - set_bit(NEGOTIATING, &con->state); + BUG_ON(con->state != CON_STATE_CONNECTING); + con->state = CON_STATE_NEGOTIATING; /* Banner is good, exchange connection info */ ret = prepare_write_connect(con); @@ -2125,7 +2119,7 @@ more: goto out; } - if (test_bit(NEGOTIATING, &con->state)) { + if (con->state == CON_STATE_NEGOTIATING) { dout("try_read negotiating\n"); ret = read_partial_connect(con); if (ret <= 0) @@ -2136,6 +2130,8 @@ more: goto more; } + BUG_ON(con->state != CON_STATE_OPEN); + if (con->in_base_pos < 0) { /* * skipping + discarding content. @@ -2169,8 +2165,8 @@ more: prepare_read_ack(con); break; case CEPH_MSGR_TAG_CLOSE: - clear_bit(CONNECTED, &con->state); - set_bit(CLOSED, &con->state); /* fixme */ + con_close_socket(con); + con->state = CON_STATE_CLOSED; goto out; default: goto bad_tag; @@ -2246,14 +2242,21 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { - if (test_and_clear_bit(CONNECTED, &con->state)) - con->error_msg = "socket closed"; - else if (test_and_clear_bit(NEGOTIATING, &con->state)) - con->error_msg = "negotiation failed"; - else if (test_and_clear_bit(CONNECTING, &con->state)) + switch (con->state) { + case CON_STATE_CONNECTING: con->error_msg = "connection failed"; - else + break; + case CON_STATE_NEGOTIATING: + con->error_msg = "negotiation failed"; + break; + case CON_STATE_OPEN: + con->error_msg = "socket closed"; + break; + default: + dout("unrecognized con state %d\n", (int)con->state); con->error_msg = "unrecognized con state"; + BUG(); + } goto fault; } @@ -2271,17 +2274,16 @@ restart: } } - if (test_bit(STANDBY, &con->state)) { + if (con->state == CON_STATE_STANDBY) { dout("con_work %p STANDBY\n", con); goto done; } - if (test_bit(CLOSED, &con->state)) { + if (con->state == CON_STATE_CLOSED) { dout("con_work %p CLOSED\n", con); BUG_ON(con->sock); goto done; } - if (test_and_clear_bit(OPENING, &con->state)) { - /* reopen w/ new peer */ + if (con->state == CON_STATE_PREOPEN) { dout("con_work OPENING\n"); BUG_ON(con->sock); } @@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_connection *con) dout("fault %p state %lu to peer %s\n", con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); - if (test_bit(CLOSED, &con->state)) - goto out_unlock; + BUG_ON(con->state != CON_STATE_CONNECTING && + con->state != CON_STATE_NEGOTIATING && + con->state != CON_STATE_OPEN); con_close_socket(con); if (test_bit(LOSSYTX, &con->flags)) { - dout("fault on LOSSYTX channel\n"); + dout("fault on LOSSYTX channel, marking CLOSED\n"); + con->state = CON_STATE_CLOSED; goto out_unlock; } @@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_connection *con) !test_bit(KEEPALIVE_PENDING, &con->flags)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); clear_bit(WRITE_PENDING, &con->flags); - set_bit(STANDBY, &con->state); + con->state = CON_STATE_STANDBY; } else { /* retry after a delay. */ + con->state = CON_STATE_PREOPEN; if (con->delay == 0) con->delay = BASE_DELAY_INTERVAL; else if (con->delay < MAX_DELAY_INTERVAL) @@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init); static void clear_standby(struct ceph_connection *con) { /* come back from STANDBY? */ - if (test_and_clear_bit(STANDBY, &con->state)) { + if (con->state == CON_STATE_STANDBY) { dout("clear_standby %p and ++connect_seq\n", con); + con->state = CON_STATE_PREOPEN; con->connect_seq++; WARN_ON(test_bit(WRITE_PENDING, &con->flags)); WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); @@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) mutex_lock(&con->mutex); - if (test_bit(CLOSED, &con->state)) { + if (con->state == CON_STATE_CLOSED) { dout("con_send %p closed, dropping %p\n", con, msg); ceph_msg_put(msg); mutex_unlock(&con->mutex); -- cgit v1.2.2 From 4a8616920860920abaa51193146fe36b38ef09aa Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 17:29:55 -0700 Subject: libceph: clean up con flags Rename flags with CON_FLAG prefix, move the definitions into the c file, and (better) document their meaning. Signed-off-by: Sage Weil --- net/ceph/messenger.c | 62 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 26 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 563e46aa4d6d..b872db5c4989 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -87,6 +87,15 @@ #define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ #define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ +/* + * ceph_connection flag bits + */ +#define CON_FLAG_LOSSYTX 0 /* we can close channel or drop + * messages on errors */ +#define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */ +#define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */ +#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ +#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; @@ -288,7 +297,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ - if (test_bit(WRITE_PENDING, &con->flags)) { + if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) { if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); @@ -313,7 +322,7 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); - set_bit(SOCK_CLOSED, &con->flags); + set_bit(CON_FLAG_SOCK_CLOSED, &con->flags); queue_con(con); break; case TCP_ESTABLISHED: @@ -449,12 +458,12 @@ static int con_close_socket(struct ceph_connection *con) con->sock = NULL; /* - * Forcibly clear the SOCK_CLOSE flag. It gets set + * Forcibly clear the SOCK_CLOSED flag. It gets set * independent of the connection mutex, and we could have * received a socket close event before we had the chance to * shut the socket down. */ - clear_bit(SOCK_CLOSED, &con->flags); + clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags); con_sock_state_closed(con); return rc; } @@ -516,9 +525,9 @@ void ceph_con_close(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); con->state = CON_STATE_CLOSED; - clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ - clear_bit(KEEPALIVE_PENDING, &con->flags); - clear_bit(WRITE_PENDING, &con->flags); + clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */ + clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); + clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); reset_connection(con); con->peer_global_seq = 0; @@ -770,7 +779,7 @@ static void prepare_write_message(struct ceph_connection *con) /* no, queue up footer too and be done */ prepare_write_message_footer(con); - set_bit(WRITE_PENDING, &con->flags); + set_bit(CON_FLAG_WRITE_PENDING, &con->flags); } /* @@ -791,7 +800,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ - set_bit(WRITE_PENDING, &con->flags); + set_bit(CON_FLAG_WRITE_PENDING, &con->flags); } /* @@ -802,7 +811,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); - set_bit(WRITE_PENDING, &con->flags); + set_bit(CON_FLAG_WRITE_PENDING, &con->flags); } /* @@ -845,7 +854,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; - set_bit(WRITE_PENDING, &con->flags); + set_bit(CON_FLAG_WRITE_PENDING, &con->flags); } static int prepare_write_connect(struct ceph_connection *con) @@ -896,7 +905,7 @@ static int prepare_write_connect(struct ceph_connection *con) auth->authorizer_buf); con->out_more = 0; - set_bit(WRITE_PENDING, &con->flags); + set_bit(CON_FLAG_WRITE_PENDING, &con->flags); return 0; } @@ -1622,7 +1631,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) - set_bit(LOSSYTX, &con->flags); + set_bit(CON_FLAG_LOSSYTX, &con->flags); con->delay = 0; /* reset backoff memory */ @@ -2061,14 +2070,15 @@ do_next: prepare_write_ack(con); goto more; } - if (test_and_clear_bit(KEEPALIVE_PENDING, &con->flags)) { + if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING, + &con->flags)) { prepare_write_keepalive(con); goto more; } } /* Nothing to do! */ - clear_bit(WRITE_PENDING, &con->flags); + clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); dout("try_write nothing else to write.\n"); ret = 0; out: @@ -2241,7 +2251,7 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: - if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { + if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) { switch (con->state) { case CON_STATE_CONNECTING: con->error_msg = "connection failed"; @@ -2260,7 +2270,7 @@ restart: goto fault; } - if (test_and_clear_bit(BACKOFF, &con->flags)) { + if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { dout("con_work %p backing off\n", con); if (queue_delayed_work(ceph_msgr_wq, &con->work, round_jiffies_relative(con->delay))) { @@ -2336,7 +2346,7 @@ static void ceph_fault(struct ceph_connection *con) con_close_socket(con); - if (test_bit(LOSSYTX, &con->flags)) { + if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CON_STATE_CLOSED; goto out_unlock; @@ -2356,9 +2366,9 @@ static void ceph_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && - !test_bit(KEEPALIVE_PENDING, &con->flags)) { + !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); - clear_bit(WRITE_PENDING, &con->flags); + clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); con->state = CON_STATE_STANDBY; } else { /* retry after a delay. */ @@ -2383,7 +2393,7 @@ static void ceph_fault(struct ceph_connection *con) * that when con_work restarts we schedule the * delay then. */ - set_bit(BACKOFF, &con->flags); + set_bit(CON_FLAG_BACKOFF, &con->flags); } } @@ -2440,8 +2450,8 @@ static void clear_standby(struct ceph_connection *con) dout("clear_standby %p and ++connect_seq\n", con); con->state = CON_STATE_PREOPEN; con->connect_seq++; - WARN_ON(test_bit(WRITE_PENDING, &con->flags)); - WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); + WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags)); + WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)); } } @@ -2482,7 +2492,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ - if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0) + if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); @@ -2571,8 +2581,8 @@ void ceph_con_keepalive(struct ceph_connection *con) mutex_lock(&con->mutex); clear_standby(con); mutex_unlock(&con->mutex); - if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 && - test_and_set_bit(WRITE_PENDING, &con->flags) == 0) + if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 && + test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); -- cgit v1.2.2 From 43c7427d100769451601b8a36988ac0528ce0124 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jul 2012 17:30:40 -0700 Subject: libceph: clear all flags on con_close Signed-off-by: Sage Weil --- net/ceph/messenger.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index b872db5c4989..fa16f2cc35fe 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -528,6 +528,8 @@ void ceph_con_close(struct ceph_connection *con) clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */ clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); + clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); + clear_bit(CON_FLAG_BACKOFF, &con->flags); reset_connection(con); con->peer_global_seq = 0; -- cgit v1.2.2 From 8007b8d626b49c34fb146ec16dc639d8b10c862f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:16:16 -0700 Subject: libceph: fix handling of immediate socket connect failure If the connect() call immediately fails such that sock == NULL, we still need con_close_socket() to reset our socket state to CLOSED. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index fa16f2cc35fe..a6a0c7a0a979 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -224,6 +224,8 @@ static void con_sock_state_init(struct ceph_connection *con) old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); if (WARN_ON(old_state != CON_SOCK_STATE_NEW)) printk("%s: unexpected old state %d\n", __func__, old_state); + dout("%s con %p sock %d -> %d\n", __func__, con, old_state, + CON_SOCK_STATE_CLOSED); } static void con_sock_state_connecting(struct ceph_connection *con) @@ -233,6 +235,8 @@ static void con_sock_state_connecting(struct ceph_connection *con) old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED)) printk("%s: unexpected old state %d\n", __func__, old_state); + dout("%s con %p sock %d -> %d\n", __func__, con, old_state, + CON_SOCK_STATE_CONNECTING); } static void con_sock_state_connected(struct ceph_connection *con) @@ -242,6 +246,8 @@ static void con_sock_state_connected(struct ceph_connection *con) old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING)) printk("%s: unexpected old state %d\n", __func__, old_state); + dout("%s con %p sock %d -> %d\n", __func__, con, old_state, + CON_SOCK_STATE_CONNECTED); } static void con_sock_state_closing(struct ceph_connection *con) @@ -253,6 +259,8 @@ static void con_sock_state_closing(struct ceph_connection *con) old_state != CON_SOCK_STATE_CONNECTED && old_state != CON_SOCK_STATE_CLOSING)) printk("%s: unexpected old state %d\n", __func__, old_state); + dout("%s con %p sock %d -> %d\n", __func__, con, old_state, + CON_SOCK_STATE_CLOSING); } static void con_sock_state_closed(struct ceph_connection *con) @@ -262,8 +270,11 @@ static void con_sock_state_closed(struct ceph_connection *con) old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && old_state != CON_SOCK_STATE_CLOSING && - old_state != CON_SOCK_STATE_CONNECTING)) + old_state != CON_SOCK_STATE_CONNECTING && + old_state != CON_SOCK_STATE_CLOSED)) printk("%s: unexpected old state %d\n", __func__, old_state); + dout("%s con %p sock %d -> %d\n", __func__, con, old_state, + CON_SOCK_STATE_CLOSED); } /* @@ -448,14 +459,14 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page, */ static int con_close_socket(struct ceph_connection *con) { - int rc; + int rc = 0; dout("con_close_socket on %p sock %p\n", con, con->sock); - if (!con->sock) - return 0; - rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); - sock_release(con->sock); - con->sock = NULL; + if (con->sock) { + rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); + sock_release(con->sock); + con->sock = NULL; + } /* * Forcibly clear the SOCK_CLOSED flag. It gets set @@ -464,6 +475,7 @@ static int con_close_socket(struct ceph_connection *con) * shut the socket down. */ clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags); + con_sock_state_closed(con); return rc; } -- cgit v1.2.2 From 7b862e07b1a4d5c963d19027f10ea78085f27f9b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:16:56 -0700 Subject: libceph: verify state after retaking con lock after dispatch We drop the con mutex when delivering a message. When we retake the lock, we need to verify we are still in the OPEN state before preparing to read the next tag, or else we risk stepping on a connection that has been closed. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index a6a0c7a0a979..feb5a2ac724c 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2003,7 +2003,6 @@ static void process_message(struct ceph_connection *con) con->ops->dispatch(con, msg); mutex_lock(&con->mutex); - prepare_read_tag(con); } @@ -2213,6 +2212,8 @@ more: if (con->in_tag == CEPH_MSGR_TAG_READY) goto more; process_message(con); + if (con->state == CON_STATE_OPEN) + prepare_read_tag(con); goto more; } if (con->in_tag == CEPH_MSGR_TAG_ACK) { -- cgit v1.2.2 From 8636ea672f0c5ab7478c42c5b6705ebd1db7eb6a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:17:13 -0700 Subject: libceph: avoid dropping con mutex before fault The ceph_fault() function takes the con mutex, so we should avoid dropping it before calling it. This fixes a potential race with another thread calling ceph_con_close(), or _open(), or similar (we don't reverify con->state after retaking the lock). Add annotation so that lockdep realizes we will drop the mutex before returning. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index feb5a2ac724c..c3b628c76194 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2336,7 +2336,6 @@ done_unlocked: return; fault: - mutex_unlock(&con->mutex); ceph_fault(con); /* error/fault path */ goto done_unlocked; } @@ -2347,9 +2346,8 @@ fault: * exponential backoff */ static void ceph_fault(struct ceph_connection *con) + __releases(con->mutex) { - mutex_lock(&con->mutex); - pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); dout("fault %p state %lu to peer %s\n", -- cgit v1.2.2 From 4740a623d20c51d167da7f752b63e2b8714b2543 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:19:30 -0700 Subject: libceph: change ceph_con_in_msg_alloc convention to be less weird This function's calling convention is very limiting. In particular, we can't return any error other than ENOMEM (and only implicitly), which is a problem (see next patch). Instead, return an normal 0 or error code, and make the skip a pointer output parameter. Drop the useless in_hdr argument (we have the con pointer). Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 56 +++++++++++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 25 deletions(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index c3b628c76194..13b549b6b1bf 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1733,9 +1733,7 @@ static int read_partial_message_section(struct ceph_connection *con, return 1; } -static bool ceph_con_in_msg_alloc(struct ceph_connection *con, - struct ceph_msg_header *hdr); - +static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip); static int read_partial_message_pages(struct ceph_connection *con, struct page **pages, @@ -1864,9 +1862,14 @@ static int read_partial_message(struct ceph_connection *con) /* allocate message? */ if (!con->in_msg) { + int skip = 0; + dout("got hdr type %d front %d data %d\n", con->in_hdr.type, con->in_hdr.front_len, con->in_hdr.data_len); - if (ceph_con_in_msg_alloc(con, &con->in_hdr)) { + ret = ceph_con_in_msg_alloc(con, &skip); + if (ret < 0) + return ret; + if (skip) { /* skip this message */ dout("alloc_msg said skip message\n"); BUG_ON(con->in_msg); @@ -1876,12 +1879,8 @@ static int read_partial_message(struct ceph_connection *con) con->in_seq++; return 0; } - if (!con->in_msg) { - con->error_msg = - "error allocating memory for incoming message"; - return -ENOMEM; - } + BUG_ON(!con->in_msg); BUG_ON(con->in_msg->con != con); m = con->in_msg; m->front.iov_len = 0; /* haven't read it yet */ @@ -2715,43 +2714,50 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) * connection, and save the result in con->in_msg. Uses the * connection's private alloc_msg op if available. * - * Returns true if the message should be skipped, false otherwise. - * If true is returned (skip message), con->in_msg will be NULL. - * If false is returned, con->in_msg will contain a pointer to the - * newly-allocated message, or NULL in case of memory exhaustion. + * Returns 0 on success, or a negative error code. + * + * On success, if we set *skip = 1: + * - the next message should be skipped and ignored. + * - con->in_msg == NULL + * or if we set *skip = 0: + * - con->in_msg is non-null. + * On error (ENOMEM, EAGAIN, ...), + * - con->in_msg == NULL */ -static bool ceph_con_in_msg_alloc(struct ceph_connection *con, - struct ceph_msg_header *hdr) +static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) { + struct ceph_msg_header *hdr = &con->in_hdr; int type = le16_to_cpu(hdr->type); int front_len = le32_to_cpu(hdr->front_len); int middle_len = le32_to_cpu(hdr->middle_len); - int ret; + int ret = 0; BUG_ON(con->in_msg != NULL); if (con->ops->alloc_msg) { - int skip = 0; - mutex_unlock(&con->mutex); - con->in_msg = con->ops->alloc_msg(con, hdr, &skip); + con->in_msg = con->ops->alloc_msg(con, hdr, skip); mutex_lock(&con->mutex); if (con->in_msg) { con->in_msg->con = con->ops->get(con); BUG_ON(con->in_msg->con == NULL); } - if (skip) + if (*skip) { con->in_msg = NULL; - - if (!con->in_msg) - return skip != 0; + return 0; + } + if (!con->in_msg) { + con->error_msg = + "error allocating memory for incoming message"; + return -ENOMEM; + } } if (!con->in_msg) { con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false); if (!con->in_msg) { pr_err("unable to allocate msg type %d len %d\n", type, front_len); - return false; + return -ENOMEM; } con->in_msg->con = con->ops->get(con); BUG_ON(con->in_msg->con == NULL); @@ -2767,7 +2773,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, } } - return false; + return ret; } -- cgit v1.2.2 From 6139919133377652992a5fe134e22abce3e9c25e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jul 2012 18:19:45 -0700 Subject: libceph: recheck con state after allocating incoming message We drop the lock when calling the ->alloc_msg() con op, which means we need to (a) not clobber con->in_msg without the mutex held, and (b) we need to verify that we are still in the OPEN state when we retake it to avoid causing any mayhem. If the state does change, -EAGAIN will get us back to con_work() and loop. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/messenger.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'net/ceph/messenger.c') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 13b549b6b1bf..b6655b131558 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2735,9 +2735,16 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) BUG_ON(con->in_msg != NULL); if (con->ops->alloc_msg) { + struct ceph_msg *msg; + mutex_unlock(&con->mutex); - con->in_msg = con->ops->alloc_msg(con, hdr, skip); + msg = con->ops->alloc_msg(con, hdr, skip); mutex_lock(&con->mutex); + if (con->state != CON_STATE_OPEN) { + ceph_msg_put(msg); + return -EAGAIN; + } + con->in_msg = msg; if (con->in_msg) { con->in_msg->con = con->ops->get(con); BUG_ON(con->in_msg->con == NULL); -- cgit v1.2.2