aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2012-06-01 15:56:43 -0400
committerAlex Elder <elder@dreamhost.com>2012-06-06 10:23:54 -0400
commit38941f8031bf042dba3ced6394ba3a3b16c244ea (patch)
tree575613cf05d5a0f652fb0cd337d110590c465cb2 /net/ceph
parent1c20f2d26795803fc4f5155fe4fca5717a5944b6 (diff)
libceph: have messages point to their connection
When a ceph message is queued for sending it is placed on a list of pending messages (ceph_connection->out_queue). When they are actually sent over the wire, they are moved from that list to another (ceph_connection->out_sent). When acknowledgement for the message is received, it is removed from the sent messages list. During that entire time the message is "in the possession" of a single ceph connection. Keep track of that connection in the message. This will be used in the next patch (and is a helpful bit of information for debugging anyway). Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Sage Weil <sage@inktank.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c27
1 files changed, 25 insertions, 2 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 98ca23726ea6..68b49b5b8e86 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -414,6 +414,9 @@ static int con_close_socket(struct ceph_connection *con)
414static void ceph_msg_remove(struct ceph_msg *msg) 414static void ceph_msg_remove(struct ceph_msg *msg)
415{ 415{
416 list_del_init(&msg->list_head); 416 list_del_init(&msg->list_head);
417 BUG_ON(msg->con == NULL);
418 msg->con = NULL;
419
417 ceph_msg_put(msg); 420 ceph_msg_put(msg);
418} 421}
419static void ceph_msg_remove_list(struct list_head *head) 422static void ceph_msg_remove_list(struct list_head *head)
@@ -433,6 +436,8 @@ static void reset_connection(struct ceph_connection *con)
433 ceph_msg_remove_list(&con->out_sent); 436 ceph_msg_remove_list(&con->out_sent);
434 437
435 if (con->in_msg) { 438 if (con->in_msg) {
439 BUG_ON(con->in_msg->con != con);
440 con->in_msg->con = NULL;
436 ceph_msg_put(con->in_msg); 441 ceph_msg_put(con->in_msg);
437 con->in_msg = NULL; 442 con->in_msg = NULL;
438 } 443 }
@@ -625,8 +630,10 @@ static void prepare_write_message(struct ceph_connection *con)
625 &con->out_temp_ack); 630 &con->out_temp_ack);
626 } 631 }
627 632
633 BUG_ON(list_empty(&con->out_queue));
628 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); 634 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
629 con->out_msg = m; 635 con->out_msg = m;
636 BUG_ON(m->con != con);
630 637
631 /* put message on sent list */ 638 /* put message on sent list */
632 ceph_msg_get(m); 639 ceph_msg_get(m);
@@ -1806,6 +1813,8 @@ static int read_partial_message(struct ceph_connection *con)
1806 "error allocating memory for incoming message"; 1813 "error allocating memory for incoming message";
1807 return -ENOMEM; 1814 return -ENOMEM;
1808 } 1815 }
1816
1817 BUG_ON(con->in_msg->con != con);
1809 m = con->in_msg; 1818 m = con->in_msg;
1810 m->front.iov_len = 0; /* haven't read it yet */ 1819 m->front.iov_len = 0; /* haven't read it yet */
1811 if (m->middle) 1820 if (m->middle)
@@ -1901,6 +1910,8 @@ static void process_message(struct ceph_connection *con)
1901{ 1910{
1902 struct ceph_msg *msg; 1911 struct ceph_msg *msg;
1903 1912
1913 BUG_ON(con->in_msg->con != con);
1914 con->in_msg->con = NULL;
1904 msg = con->in_msg; 1915 msg = con->in_msg;
1905 con->in_msg = NULL; 1916 con->in_msg = NULL;
1906 1917
@@ -2260,6 +2271,8 @@ static void ceph_fault(struct ceph_connection *con)
2260 con_close_socket(con); 2271 con_close_socket(con);
2261 2272
2262 if (con->in_msg) { 2273 if (con->in_msg) {
2274 BUG_ON(con->in_msg->con != con);
2275 con->in_msg->con = NULL;
2263 ceph_msg_put(con->in_msg); 2276 ceph_msg_put(con->in_msg);
2264 con->in_msg = NULL; 2277 con->in_msg = NULL;
2265 } 2278 }
@@ -2378,6 +2391,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2378 2391
2379 /* queue */ 2392 /* queue */
2380 mutex_lock(&con->mutex); 2393 mutex_lock(&con->mutex);
2394 BUG_ON(msg->con != NULL);
2395 msg->con = con;
2381 BUG_ON(!list_empty(&msg->list_head)); 2396 BUG_ON(!list_empty(&msg->list_head));
2382 list_add_tail(&msg->list_head, &con->out_queue); 2397 list_add_tail(&msg->list_head, &con->out_queue);
2383 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 2398 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -2403,13 +2418,16 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
2403{ 2418{
2404 mutex_lock(&con->mutex); 2419 mutex_lock(&con->mutex);
2405 if (!list_empty(&msg->list_head)) { 2420 if (!list_empty(&msg->list_head)) {
2406 dout("con_revoke %p msg %p - was on queue\n", con, msg); 2421 dout("%s %p msg %p - was on queue\n", __func__, con, msg);
2407 list_del_init(&msg->list_head); 2422 list_del_init(&msg->list_head);
2423 BUG_ON(msg->con == NULL);
2424 msg->con = NULL;
2425
2408 ceph_msg_put(msg); 2426 ceph_msg_put(msg);
2409 msg->hdr.seq = 0; 2427 msg->hdr.seq = 0;
2410 } 2428 }
2411 if (con->out_msg == msg) { 2429 if (con->out_msg == msg) {
2412 dout("con_revoke %p msg %p - was sending\n", con, msg); 2430 dout("%s %p msg %p - was sending\n", __func__, con, msg);
2413 con->out_msg = NULL; 2431 con->out_msg = NULL;
2414 if (con->out_kvec_is_msg) { 2432 if (con->out_kvec_is_msg) {
2415 con->out_skip = con->out_kvec_bytes; 2433 con->out_skip = con->out_kvec_bytes;
@@ -2478,6 +2496,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2478 if (m == NULL) 2496 if (m == NULL)
2479 goto out; 2497 goto out;
2480 kref_init(&m->kref); 2498 kref_init(&m->kref);
2499
2500 m->con = NULL;
2481 INIT_LIST_HEAD(&m->list_head); 2501 INIT_LIST_HEAD(&m->list_head);
2482 2502
2483 m->hdr.tid = 0; 2503 m->hdr.tid = 0;
@@ -2598,6 +2618,8 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
2598 mutex_unlock(&con->mutex); 2618 mutex_unlock(&con->mutex);
2599 con->in_msg = con->ops->alloc_msg(con, hdr, &skip); 2619 con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
2600 mutex_lock(&con->mutex); 2620 mutex_lock(&con->mutex);
2621 if (con->in_msg)
2622 con->in_msg->con = con;
2601 if (skip) 2623 if (skip)
2602 con->in_msg = NULL; 2624 con->in_msg = NULL;
2603 2625
@@ -2611,6 +2633,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
2611 type, front_len); 2633 type, front_len);
2612 return false; 2634 return false;
2613 } 2635 }
2636 con->in_msg->con = con;
2614 con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); 2637 con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
2615 } 2638 }
2616 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2639 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));