aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2012-06-04 15:43:33 -0400
committerAlex Elder <elder@dreamhost.com>2012-06-06 10:23:54 -0400
commit92ce034b5a740046cc643a21ea21eaad589e0043 (patch)
treec4c15d59a5dda0e2e1350dd2afbe6a3a8c9e9170 /net/ceph/messenger.c
parent38941f8031bf042dba3ced6394ba3a3b16c244ea (diff)
libceph: have messages take a connection reference
There are essentially two types of ceph messages: incoming and outgoing. Outgoing messages are always allocated via ceph_msg_new(), and at the time of their allocation they are not associated with any particular connection. Incoming messages are always allocated via ceph_con_in_msg_alloc(), and they are initially associated with the connection from which incoming data will be placed into the message. When an outgoing message gets sent, it becomes associated with a connection and remains that way until the message is successfully sent. The association of an incoming message goes away at the point it is sent to an upper layer via a con->ops->dispatch method. This patch implements reference counting for all ceph messages, such that every message holds a reference (and a pointer) to a connection if and only if it is associated with that connection (as described above). For background, here is an explanation of the ceph message lifecycle, emphasizing when an association exists between a message and a connection. Outgoing Messages An outgoing message is "owned" by its allocator, from the time it is allocated in ceph_msg_new() up to the point it gets queued for sending in ceph_con_send(). Prior to that point the message's msg->con pointer is null; at the point it is queued for sending its message pointer is assigned to refer to the connection. At that time the message is inserted into a connection's out_queue list. When a message on the out_queue list has been sent to the socket layer to be put on the wire, it is transferred out of that list and into the connection's out_sent list. At that point it is still owned by the connection, and will remain so until an acknowledgement is received from the recipient that indicates the message was successfully transferred. When such an acknowledgement is received (in process_ack()), the message is removed from its list (in ceph_msg_remove()), at which point it is no longer associated with the connection. So basically, any time a message is on one of a connection's lists, it is associated with that connection. Reference counting outgoing messages can thus be done at the points a message is added to the out_queue (in ceph_con_send()) and the point it is removed from either its two lists (in ceph_msg_remove())--at which point its connection pointer becomes null. Incoming Messages When an incoming message on a connection is getting read (in read_partial_message()) and there is no message in con->in_msg, a new one is allocated using ceph_con_in_msg_alloc(). At that point the message is associated with the connection. Once that message has been completely and successfully read, it is passed to upper layer code using the connection's con->ops->dispatch method. At that point the association between the message and the connection no longer exists. Reference counting of connections for incoming messages can be done by taking a reference to the connection when the message gets allocated, and releasing that reference when it gets handed off using the dispatch method. We should never fail to get a connection reference for a message--the since the caller should already hold one. Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Sage Weil <sage@inktank.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c24
1 files changed, 18 insertions, 6 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 68b49b5b8e86..88ac083bb995 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -415,6 +415,7 @@ static void ceph_msg_remove(struct ceph_msg *msg)
415{ 415{
416 list_del_init(&msg->list_head); 416 list_del_init(&msg->list_head);
417 BUG_ON(msg->con == NULL); 417 BUG_ON(msg->con == NULL);
418 ceph_con_put(msg->con);
418 msg->con = NULL; 419 msg->con = NULL;
419 420
420 ceph_msg_put(msg); 421 ceph_msg_put(msg);
@@ -440,6 +441,7 @@ static void reset_connection(struct ceph_connection *con)
440 con->in_msg->con = NULL; 441 con->in_msg->con = NULL;
441 ceph_msg_put(con->in_msg); 442 ceph_msg_put(con->in_msg);
442 con->in_msg = NULL; 443 con->in_msg = NULL;
444 ceph_con_put(con->in_msg->con);
443 } 445 }
444 446
445 con->connect_seq = 0; 447 con->connect_seq = 0;
@@ -1914,6 +1916,7 @@ static void process_message(struct ceph_connection *con)
1914 con->in_msg->con = NULL; 1916 con->in_msg->con = NULL;
1915 msg = con->in_msg; 1917 msg = con->in_msg;
1916 con->in_msg = NULL; 1918 con->in_msg = NULL;
1919 ceph_con_put(con);
1917 1920
1918 /* if first message, set peer_name */ 1921 /* if first message, set peer_name */
1919 if (con->peer_name.type == 0) 1922 if (con->peer_name.type == 0)
@@ -2275,6 +2278,7 @@ static void ceph_fault(struct ceph_connection *con)
2275 con->in_msg->con = NULL; 2278 con->in_msg->con = NULL;
2276 ceph_msg_put(con->in_msg); 2279 ceph_msg_put(con->in_msg);
2277 con->in_msg = NULL; 2280 con->in_msg = NULL;
2281 ceph_con_put(con);
2278 } 2282 }
2279 2283
2280 /* Requeue anything that hasn't been acked */ 2284 /* Requeue anything that hasn't been acked */
@@ -2391,8 +2395,11 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2391 2395
2392 /* queue */ 2396 /* queue */
2393 mutex_lock(&con->mutex); 2397 mutex_lock(&con->mutex);
2398
2394 BUG_ON(msg->con != NULL); 2399 BUG_ON(msg->con != NULL);
2395 msg->con = con; 2400 msg->con = ceph_con_get(con);
2401 BUG_ON(msg->con == NULL);
2402
2396 BUG_ON(!list_empty(&msg->list_head)); 2403 BUG_ON(!list_empty(&msg->list_head));
2397 list_add_tail(&msg->list_head, &con->out_queue); 2404 list_add_tail(&msg->list_head, &con->out_queue);
2398 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 2405 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -2421,10 +2428,11 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
2421 dout("%s %p msg %p - was on queue\n", __func__, con, msg); 2428 dout("%s %p msg %p - was on queue\n", __func__, con, msg);
2422 list_del_init(&msg->list_head); 2429 list_del_init(&msg->list_head);
2423 BUG_ON(msg->con == NULL); 2430 BUG_ON(msg->con == NULL);
2431 ceph_con_put(msg->con);
2424 msg->con = NULL; 2432 msg->con = NULL;
2433 msg->hdr.seq = 0;
2425 2434
2426 ceph_msg_put(msg); 2435 ceph_msg_put(msg);
2427 msg->hdr.seq = 0;
2428 } 2436 }
2429 if (con->out_msg == msg) { 2437 if (con->out_msg == msg) {
2430 dout("%s %p msg %p - was sending\n", __func__, con, msg); 2438 dout("%s %p msg %p - was sending\n", __func__, con, msg);
@@ -2433,8 +2441,9 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
2433 con->out_skip = con->out_kvec_bytes; 2441 con->out_skip = con->out_kvec_bytes;
2434 con->out_kvec_is_msg = false; 2442 con->out_kvec_is_msg = false;
2435 } 2443 }
2436 ceph_msg_put(msg);
2437 msg->hdr.seq = 0; 2444 msg->hdr.seq = 0;
2445
2446 ceph_msg_put(msg);
2438 } 2447 }
2439 mutex_unlock(&con->mutex); 2448 mutex_unlock(&con->mutex);
2440} 2449}
@@ -2618,8 +2627,10 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
2618 mutex_unlock(&con->mutex); 2627 mutex_unlock(&con->mutex);
2619 con->in_msg = con->ops->alloc_msg(con, hdr, &skip); 2628 con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
2620 mutex_lock(&con->mutex); 2629 mutex_lock(&con->mutex);
2621 if (con->in_msg) 2630 if (con->in_msg) {
2622 con->in_msg->con = con; 2631 con->in_msg->con = ceph_con_get(con);
2632 BUG_ON(con->in_msg->con == NULL);
2633 }
2623 if (skip) 2634 if (skip)
2624 con->in_msg = NULL; 2635 con->in_msg = NULL;
2625 2636
@@ -2633,7 +2644,8 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
2633 type, front_len); 2644 type, front_len);
2634 return false; 2645 return false;
2635 } 2646 }
2636 con->in_msg->con = con; 2647 con->in_msg->con = ceph_con_get(con);
2648 BUG_ON(con->in_msg->con == NULL);
2637 con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); 2649 con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
2638 } 2650 }
2639 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2651 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));