diff options
author | Yehuda Sadeh <yehuda@hq.newdream.net> | 2010-02-26 18:32:31 -0500 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2010-03-04 14:26:35 -0500 |
commit | 422d2cb8f9afadba1ecd3614f658b6daaaa480fb (patch) | |
tree | 22e1a61acdbbe1459b190c4dbb6019360464b2e9 /fs/ceph/osd_client.c | |
parent | e9964c102312967a4bc1fd501cb628c4a3b19034 (diff) |
ceph: reset osd after relevant messages timed out
This simplifies the process of timing out messages. We
keep lru of current messages that are in flight. If a
timeout has passed, we reset the osd connection, so that
messages will be retransmitted. This is a failsafe in case
we hit some sort of problem sending out message to the OSD.
Normally, we'll get notification via an updated osdmap if
there are problems.
If a request is older than the keepalive timeout, send a
keepalive to ensure we detect any breaks in the TCP connection.
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/osd_client.c')
-rw-r--r-- | fs/ceph/osd_client.c | 153 |
1 files changed, 98 insertions, 55 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index c4763bff97b4..dbe63db9762f 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c | |||
@@ -17,6 +17,8 @@ | |||
17 | #define OSD_OPREPLY_FRONT_LEN 512 | 17 | #define OSD_OPREPLY_FRONT_LEN 512 |
18 | 18 | ||
19 | const static struct ceph_connection_operations osd_con_ops; | 19 | const static struct ceph_connection_operations osd_con_ops; |
20 | static int __kick_requests(struct ceph_osd_client *osdc, | ||
21 | struct ceph_osd *kickosd); | ||
20 | 22 | ||
21 | static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); | 23 | static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); |
22 | 24 | ||
@@ -339,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) | |||
339 | osd->o_con.ops = &osd_con_ops; | 341 | osd->o_con.ops = &osd_con_ops; |
340 | osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; | 342 | osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; |
341 | 343 | ||
344 | INIT_LIST_HEAD(&osd->o_keepalive_item); | ||
342 | return osd; | 345 | return osd; |
343 | } | 346 | } |
344 | 347 | ||
@@ -461,6 +464,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) | |||
461 | return NULL; | 464 | return NULL; |
462 | } | 465 | } |
463 | 466 | ||
467 | static void __schedule_osd_timeout(struct ceph_osd_client *osdc) | ||
468 | { | ||
469 | schedule_delayed_work(&osdc->timeout_work, | ||
470 | osdc->client->mount_args->osd_keepalive_timeout * HZ); | ||
471 | } | ||
472 | |||
473 | static void __cancel_osd_timeout(struct ceph_osd_client *osdc) | ||
474 | { | ||
475 | cancel_delayed_work(&osdc->timeout_work); | ||
476 | } | ||
464 | 477 | ||
465 | /* | 478 | /* |
466 | * Register request, assign tid. If this is the first request, set up | 479 | * Register request, assign tid. If this is the first request, set up |
@@ -472,21 +485,16 @@ static void register_request(struct ceph_osd_client *osdc, | |||
472 | mutex_lock(&osdc->request_mutex); | 485 | mutex_lock(&osdc->request_mutex); |
473 | req->r_tid = ++osdc->last_tid; | 486 | req->r_tid = ++osdc->last_tid; |
474 | req->r_request->hdr.tid = cpu_to_le64(req->r_tid); | 487 | req->r_request->hdr.tid = cpu_to_le64(req->r_tid); |
488 | INIT_LIST_HEAD(&req->r_req_lru_item); | ||
475 | 489 | ||
476 | dout("register_request %p tid %lld\n", req, req->r_tid); | 490 | dout("register_request %p tid %lld\n", req, req->r_tid); |
477 | __insert_request(osdc, req); | 491 | __insert_request(osdc, req); |
478 | ceph_osdc_get_request(req); | 492 | ceph_osdc_get_request(req); |
479 | osdc->num_requests++; | 493 | osdc->num_requests++; |
480 | 494 | ||
481 | req->r_timeout_stamp = | ||
482 | jiffies + osdc->client->mount_args->osd_timeout*HZ; | ||
483 | |||
484 | if (osdc->num_requests == 1) { | 495 | if (osdc->num_requests == 1) { |
485 | osdc->timeout_tid = req->r_tid; | 496 | dout(" first request, scheduling timeout\n"); |
486 | dout(" timeout on tid %llu at %lu\n", req->r_tid, | 497 | __schedule_osd_timeout(osdc); |
487 | req->r_timeout_stamp); | ||
488 | schedule_delayed_work(&osdc->timeout_work, | ||
489 | round_jiffies_relative(req->r_timeout_stamp - jiffies)); | ||
490 | } | 498 | } |
491 | mutex_unlock(&osdc->request_mutex); | 499 | mutex_unlock(&osdc->request_mutex); |
492 | } | 500 | } |
@@ -513,21 +521,10 @@ static void __unregister_request(struct ceph_osd_client *osdc, | |||
513 | 521 | ||
514 | ceph_osdc_put_request(req); | 522 | ceph_osdc_put_request(req); |
515 | 523 | ||
516 | if (req->r_tid == osdc->timeout_tid) { | 524 | list_del_init(&req->r_req_lru_item); |
517 | if (osdc->num_requests == 0) { | 525 | if (osdc->num_requests == 0) { |
518 | dout("no requests, canceling timeout\n"); | 526 | dout(" no requests, canceling timeout\n"); |
519 | osdc->timeout_tid = 0; | 527 | __cancel_osd_timeout(osdc); |
520 | cancel_delayed_work(&osdc->timeout_work); | ||
521 | } else { | ||
522 | req = rb_entry(rb_first(&osdc->requests), | ||
523 | struct ceph_osd_request, r_node); | ||
524 | osdc->timeout_tid = req->r_tid; | ||
525 | dout("rescheduled timeout on tid %llu at %lu\n", | ||
526 | req->r_tid, req->r_timeout_stamp); | ||
527 | schedule_delayed_work(&osdc->timeout_work, | ||
528 | round_jiffies_relative(req->r_timeout_stamp - | ||
529 | jiffies)); | ||
530 | } | ||
531 | } | 528 | } |
532 | } | 529 | } |
533 | 530 | ||
@@ -540,6 +537,7 @@ static void __cancel_request(struct ceph_osd_request *req) | |||
540 | ceph_con_revoke(&req->r_osd->o_con, req->r_request); | 537 | ceph_con_revoke(&req->r_osd->o_con, req->r_request); |
541 | req->r_sent = 0; | 538 | req->r_sent = 0; |
542 | } | 539 | } |
540 | list_del_init(&req->r_req_lru_item); | ||
543 | } | 541 | } |
544 | 542 | ||
545 | /* | 543 | /* |
@@ -635,7 +633,8 @@ static int __send_request(struct ceph_osd_client *osdc, | |||
635 | reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ | 633 | reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ |
636 | reqhead->reassert_version = req->r_reassert_version; | 634 | reqhead->reassert_version = req->r_reassert_version; |
637 | 635 | ||
638 | req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ; | 636 | req->r_sent_stamp = jiffies; |
637 | list_move_tail(&osdc->req_lru, &req->r_req_lru_item); | ||
639 | 638 | ||
640 | ceph_msg_get(req->r_request); /* send consumes a ref */ | 639 | ceph_msg_get(req->r_request); /* send consumes a ref */ |
641 | ceph_con_send(&req->r_osd->o_con, req->r_request); | 640 | ceph_con_send(&req->r_osd->o_con, req->r_request); |
@@ -656,11 +655,14 @@ static void handle_timeout(struct work_struct *work) | |||
656 | { | 655 | { |
657 | struct ceph_osd_client *osdc = | 656 | struct ceph_osd_client *osdc = |
658 | container_of(work, struct ceph_osd_client, timeout_work.work); | 657 | container_of(work, struct ceph_osd_client, timeout_work.work); |
659 | struct ceph_osd_request *req; | 658 | struct ceph_osd_request *req, *last_req = NULL; |
660 | struct ceph_osd *osd; | 659 | struct ceph_osd *osd; |
661 | unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ; | 660 | unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ; |
662 | unsigned long next_timeout = timeout + jiffies; | 661 | unsigned long keepalive = |
662 | osdc->client->mount_args->osd_keepalive_timeout * HZ; | ||
663 | unsigned long last_sent = 0; | ||
663 | struct rb_node *p; | 664 | struct rb_node *p; |
665 | struct list_head slow_osds; | ||
664 | 666 | ||
665 | dout("timeout\n"); | 667 | dout("timeout\n"); |
666 | down_read(&osdc->map_sem); | 668 | down_read(&osdc->map_sem); |
@@ -683,25 +685,56 @@ static void handle_timeout(struct work_struct *work) | |||
683 | continue; | 685 | continue; |
684 | } | 686 | } |
685 | } | 687 | } |
686 | for (p = rb_first(&osdc->osds); p; p = rb_next(p)) { | ||
687 | osd = rb_entry(p, struct ceph_osd, o_node); | ||
688 | if (list_empty(&osd->o_requests)) | ||
689 | continue; | ||
690 | req = list_first_entry(&osd->o_requests, | ||
691 | struct ceph_osd_request, r_osd_item); | ||
692 | if (time_before(jiffies, req->r_timeout_stamp)) | ||
693 | continue; | ||
694 | 688 | ||
695 | dout(" tid %llu (at least) timed out on osd%d\n", | 689 | /* |
690 | * reset osds that appear to be _really_ unresponsive. this | ||
691 | * is a failsafe measure.. we really shouldn't be getting to | ||
692 | * this point if the system is working properly. the monitors | ||
693 | * should mark the osd as failed and we should find out about | ||
694 | * it from an updated osd map. | ||
695 | */ | ||
696 | while (!list_empty(&osdc->req_lru)) { | ||
697 | req = list_entry(osdc->req_lru.next, struct ceph_osd_request, | ||
698 | r_req_lru_item); | ||
699 | |||
700 | if (time_before(jiffies, req->r_sent_stamp + timeout)) | ||
701 | break; | ||
702 | |||
703 | BUG_ON(req == last_req && req->r_sent_stamp == last_sent); | ||
704 | last_req = req; | ||
705 | last_sent = req->r_sent_stamp; | ||
706 | |||
707 | osd = req->r_osd; | ||
708 | BUG_ON(!osd); | ||
709 | pr_warning(" tid %llu timed out on osd%d, will reset osd\n", | ||
710 | req->r_tid, osd->o_osd); | ||
711 | __kick_requests(osdc, osd); | ||
712 | } | ||
713 | |||
714 | /* | ||
715 | * ping osds that are a bit slow. this ensures that if there | ||
716 | * is a break in the TCP connection we will notice, and reopen | ||
717 | * a connection with that osd (from the fault callback). | ||
718 | */ | ||
719 | INIT_LIST_HEAD(&slow_osds); | ||
720 | list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { | ||
721 | if (time_before(jiffies, req->r_sent_stamp + keepalive)) | ||
722 | break; | ||
723 | |||
724 | osd = req->r_osd; | ||
725 | BUG_ON(!osd); | ||
726 | dout(" tid %llu is slow, will send keepalive on osd%d\n", | ||
696 | req->r_tid, osd->o_osd); | 727 | req->r_tid, osd->o_osd); |
697 | req->r_timeout_stamp = next_timeout; | 728 | list_move_tail(&osd->o_keepalive_item, &slow_osds); |
729 | } | ||
730 | while (!list_empty(&slow_osds)) { | ||
731 | osd = list_entry(slow_osds.next, struct ceph_osd, | ||
732 | o_keepalive_item); | ||
733 | list_del_init(&osd->o_keepalive_item); | ||
698 | ceph_con_keepalive(&osd->o_con); | 734 | ceph_con_keepalive(&osd->o_con); |
699 | } | 735 | } |
700 | 736 | ||
701 | if (osdc->timeout_tid) | 737 | __schedule_osd_timeout(osdc); |
702 | schedule_delayed_work(&osdc->timeout_work, | ||
703 | round_jiffies_relative(timeout)); | ||
704 | |||
705 | mutex_unlock(&osdc->request_mutex); | 738 | mutex_unlock(&osdc->request_mutex); |
706 | 739 | ||
707 | up_read(&osdc->map_sem); | 740 | up_read(&osdc->map_sem); |
@@ -819,18 +852,7 @@ bad: | |||
819 | } | 852 | } |
820 | 853 | ||
821 | 854 | ||
822 | /* | 855 | static int __kick_requests(struct ceph_osd_client *osdc, |
823 | * Resubmit osd requests whose osd or osd address has changed. Request | ||
824 | * a new osd map if osds are down, or we are otherwise unable to determine | ||
825 | * how to direct a request. | ||
826 | * | ||
827 | * Close connections to down osds. | ||
828 | * | ||
829 | * If @who is specified, resubmit requests for that specific osd. | ||
830 | * | ||
831 | * Caller should hold map_sem for read and request_mutex. | ||
832 | */ | ||
833 | static void kick_requests(struct ceph_osd_client *osdc, | ||
834 | struct ceph_osd *kickosd) | 856 | struct ceph_osd *kickosd) |
835 | { | 857 | { |
836 | struct ceph_osd_request *req; | 858 | struct ceph_osd_request *req; |
@@ -839,7 +861,6 @@ static void kick_requests(struct ceph_osd_client *osdc, | |||
839 | int err; | 861 | int err; |
840 | 862 | ||
841 | dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); | 863 | dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); |
842 | mutex_lock(&osdc->request_mutex); | ||
843 | if (kickosd) { | 864 | if (kickosd) { |
844 | __reset_osd(osdc, kickosd); | 865 | __reset_osd(osdc, kickosd); |
845 | } else { | 866 | } else { |
@@ -900,14 +921,36 @@ kick: | |||
900 | req->r_resend = true; | 921 | req->r_resend = true; |
901 | } | 922 | } |
902 | } | 923 | } |
924 | |||
925 | return needmap; | ||
926 | } | ||
927 | |||
928 | /* | ||
929 | * Resubmit osd requests whose osd or osd address has changed. Request | ||
930 | * a new osd map if osds are down, or we are otherwise unable to determine | ||
931 | * how to direct a request. | ||
932 | * | ||
933 | * Close connections to down osds. | ||
934 | * | ||
935 | * If @who is specified, resubmit requests for that specific osd. | ||
936 | * | ||
937 | * Caller should hold map_sem for read and request_mutex. | ||
938 | */ | ||
939 | static void kick_requests(struct ceph_osd_client *osdc, | ||
940 | struct ceph_osd *kickosd) | ||
941 | { | ||
942 | int needmap; | ||
943 | |||
944 | mutex_lock(&osdc->request_mutex); | ||
945 | needmap = __kick_requests(osdc, kickosd); | ||
903 | mutex_unlock(&osdc->request_mutex); | 946 | mutex_unlock(&osdc->request_mutex); |
904 | 947 | ||
905 | if (needmap) { | 948 | if (needmap) { |
906 | dout("%d requests for down osds, need new map\n", needmap); | 949 | dout("%d requests for down osds, need new map\n", needmap); |
907 | ceph_monc_request_next_osdmap(&osdc->client->monc); | 950 | ceph_monc_request_next_osdmap(&osdc->client->monc); |
908 | } | 951 | } |
909 | } | ||
910 | 952 | ||
953 | } | ||
911 | /* | 954 | /* |
912 | * Process updated osd map. | 955 | * Process updated osd map. |
913 | * | 956 | * |
@@ -1164,11 +1207,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) | |||
1164 | init_completion(&osdc->map_waiters); | 1207 | init_completion(&osdc->map_waiters); |
1165 | osdc->last_requested_map = 0; | 1208 | osdc->last_requested_map = 0; |
1166 | mutex_init(&osdc->request_mutex); | 1209 | mutex_init(&osdc->request_mutex); |
1167 | osdc->timeout_tid = 0; | ||
1168 | osdc->last_tid = 0; | 1210 | osdc->last_tid = 0; |
1169 | osdc->osds = RB_ROOT; | 1211 | osdc->osds = RB_ROOT; |
1170 | INIT_LIST_HEAD(&osdc->osd_lru); | 1212 | INIT_LIST_HEAD(&osdc->osd_lru); |
1171 | osdc->requests = RB_ROOT; | 1213 | osdc->requests = RB_ROOT; |
1214 | INIT_LIST_HEAD(&osdc->req_lru); | ||
1172 | osdc->num_requests = 0; | 1215 | osdc->num_requests = 0; |
1173 | INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); | 1216 | INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); |
1174 | INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); | 1217 | INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); |