diff options
| author | Yehuda Sadeh <yehuda@hq.newdream.net> | 2010-02-26 18:32:31 -0500 |
|---|---|---|
| committer | Sage Weil <sage@newdream.net> | 2010-03-04 14:26:35 -0500 |
| commit | 422d2cb8f9afadba1ecd3614f658b6daaaa480fb (patch) | |
| tree | 22e1a61acdbbe1459b190c4dbb6019360464b2e9 /fs/ceph/osd_client.c | |
| parent | e9964c102312967a4bc1fd501cb628c4a3b19034 (diff) | |
ceph: reset osd after relevant messages timed out
This simplifies the process of timing out messages. We
keep lru of current messages that are in flight. If a
timeout has passed, we reset the osd connection, so that
messages will be retransmitted. This is a failsafe in case
we hit some sort of problem sending out message to the OSD.
Normally, we'll get notification via an updated osdmap if
there are problems.
If a request is older than the keepalive timeout, send a
keepalive to ensure we detect any breaks in the TCP connection.
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/osd_client.c')
| -rw-r--r-- | fs/ceph/osd_client.c | 153 |
1 files changed, 98 insertions, 55 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index c4763bff97b4..dbe63db9762f 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c | |||
| @@ -17,6 +17,8 @@ | |||
| 17 | #define OSD_OPREPLY_FRONT_LEN 512 | 17 | #define OSD_OPREPLY_FRONT_LEN 512 |
| 18 | 18 | ||
| 19 | const static struct ceph_connection_operations osd_con_ops; | 19 | const static struct ceph_connection_operations osd_con_ops; |
| 20 | static int __kick_requests(struct ceph_osd_client *osdc, | ||
| 21 | struct ceph_osd *kickosd); | ||
| 20 | 22 | ||
| 21 | static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); | 23 | static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); |
| 22 | 24 | ||
| @@ -339,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) | |||
| 339 | osd->o_con.ops = &osd_con_ops; | 341 | osd->o_con.ops = &osd_con_ops; |
| 340 | osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; | 342 | osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; |
| 341 | 343 | ||
| 344 | INIT_LIST_HEAD(&osd->o_keepalive_item); | ||
| 342 | return osd; | 345 | return osd; |
| 343 | } | 346 | } |
| 344 | 347 | ||
| @@ -461,6 +464,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) | |||
| 461 | return NULL; | 464 | return NULL; |
| 462 | } | 465 | } |
| 463 | 466 | ||
| 467 | static void __schedule_osd_timeout(struct ceph_osd_client *osdc) | ||
| 468 | { | ||
| 469 | schedule_delayed_work(&osdc->timeout_work, | ||
| 470 | osdc->client->mount_args->osd_keepalive_timeout * HZ); | ||
| 471 | } | ||
| 472 | |||
| 473 | static void __cancel_osd_timeout(struct ceph_osd_client *osdc) | ||
| 474 | { | ||
| 475 | cancel_delayed_work(&osdc->timeout_work); | ||
| 476 | } | ||
| 464 | 477 | ||
| 465 | /* | 478 | /* |
| 466 | * Register request, assign tid. If this is the first request, set up | 479 | * Register request, assign tid. If this is the first request, set up |
| @@ -472,21 +485,16 @@ static void register_request(struct ceph_osd_client *osdc, | |||
| 472 | mutex_lock(&osdc->request_mutex); | 485 | mutex_lock(&osdc->request_mutex); |
| 473 | req->r_tid = ++osdc->last_tid; | 486 | req->r_tid = ++osdc->last_tid; |
| 474 | req->r_request->hdr.tid = cpu_to_le64(req->r_tid); | 487 | req->r_request->hdr.tid = cpu_to_le64(req->r_tid); |
| 488 | INIT_LIST_HEAD(&req->r_req_lru_item); | ||
| 475 | 489 | ||
| 476 | dout("register_request %p tid %lld\n", req, req->r_tid); | 490 | dout("register_request %p tid %lld\n", req, req->r_tid); |
| 477 | __insert_request(osdc, req); | 491 | __insert_request(osdc, req); |
| 478 | ceph_osdc_get_request(req); | 492 | ceph_osdc_get_request(req); |
| 479 | osdc->num_requests++; | 493 | osdc->num_requests++; |
| 480 | 494 | ||
| 481 | req->r_timeout_stamp = | ||
| 482 | jiffies + osdc->client->mount_args->osd_timeout*HZ; | ||
| 483 | |||
| 484 | if (osdc->num_requests == 1) { | 495 | if (osdc->num_requests == 1) { |
| 485 | osdc->timeout_tid = req->r_tid; | 496 | dout(" first request, scheduling timeout\n"); |
| 486 | dout(" timeout on tid %llu at %lu\n", req->r_tid, | 497 | __schedule_osd_timeout(osdc); |
| 487 | req->r_timeout_stamp); | ||
| 488 | schedule_delayed_work(&osdc->timeout_work, | ||
| 489 | round_jiffies_relative(req->r_timeout_stamp - jiffies)); | ||
| 490 | } | 498 | } |
| 491 | mutex_unlock(&osdc->request_mutex); | 499 | mutex_unlock(&osdc->request_mutex); |
| 492 | } | 500 | } |
| @@ -513,21 +521,10 @@ static void __unregister_request(struct ceph_osd_client *osdc, | |||
| 513 | 521 | ||
| 514 | ceph_osdc_put_request(req); | 522 | ceph_osdc_put_request(req); |
| 515 | 523 | ||
| 516 | if (req->r_tid == osdc->timeout_tid) { | 524 | list_del_init(&req->r_req_lru_item); |
| 517 | if (osdc->num_requests == 0) { | 525 | if (osdc->num_requests == 0) { |
| 518 | dout("no requests, canceling timeout\n"); | 526 | dout(" no requests, canceling timeout\n"); |
| 519 | osdc->timeout_tid = 0; | 527 | __cancel_osd_timeout(osdc); |
| 520 | cancel_delayed_work(&osdc->timeout_work); | ||
| 521 | } else { | ||
| 522 | req = rb_entry(rb_first(&osdc->requests), | ||
| 523 | struct ceph_osd_request, r_node); | ||
| 524 | osdc->timeout_tid = req->r_tid; | ||
| 525 | dout("rescheduled timeout on tid %llu at %lu\n", | ||
| 526 | req->r_tid, req->r_timeout_stamp); | ||
| 527 | schedule_delayed_work(&osdc->timeout_work, | ||
| 528 | round_jiffies_relative(req->r_timeout_stamp - | ||
| 529 | jiffies)); | ||
| 530 | } | ||
| 531 | } | 528 | } |
| 532 | } | 529 | } |
| 533 | 530 | ||
| @@ -540,6 +537,7 @@ static void __cancel_request(struct ceph_osd_request *req) | |||
| 540 | ceph_con_revoke(&req->r_osd->o_con, req->r_request); | 537 | ceph_con_revoke(&req->r_osd->o_con, req->r_request); |
| 541 | req->r_sent = 0; | 538 | req->r_sent = 0; |
| 542 | } | 539 | } |
| 540 | list_del_init(&req->r_req_lru_item); | ||
| 543 | } | 541 | } |
| 544 | 542 | ||
| 545 | /* | 543 | /* |
| @@ -635,7 +633,8 @@ static int __send_request(struct ceph_osd_client *osdc, | |||
| 635 | reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ | 633 | reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ |
| 636 | reqhead->reassert_version = req->r_reassert_version; | 634 | reqhead->reassert_version = req->r_reassert_version; |
| 637 | 635 | ||
| 638 | req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ; | 636 | req->r_sent_stamp = jiffies; |
| 637 | list_move_tail(&osdc->req_lru, &req->r_req_lru_item); | ||
| 639 | 638 | ||
| 640 | ceph_msg_get(req->r_request); /* send consumes a ref */ | 639 | ceph_msg_get(req->r_request); /* send consumes a ref */ |
| 641 | ceph_con_send(&req->r_osd->o_con, req->r_request); | 640 | ceph_con_send(&req->r_osd->o_con, req->r_request); |
| @@ -656,11 +655,14 @@ static void handle_timeout(struct work_struct *work) | |||
| 656 | { | 655 | { |
| 657 | struct ceph_osd_client *osdc = | 656 | struct ceph_osd_client *osdc = |
| 658 | container_of(work, struct ceph_osd_client, timeout_work.work); | 657 | container_of(work, struct ceph_osd_client, timeout_work.work); |
| 659 | struct ceph_osd_request *req; | 658 | struct ceph_osd_request *req, *last_req = NULL; |
| 660 | struct ceph_osd *osd; | 659 | struct ceph_osd *osd; |
| 661 | unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ; | 660 | unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ; |
| 662 | unsigned long next_timeout = timeout + jiffies; | 661 | unsigned long keepalive = |
| 662 | osdc->client->mount_args->osd_keepalive_timeout * HZ; | ||
| 663 | unsigned long last_sent = 0; | ||
| 663 | struct rb_node *p; | 664 | struct rb_node *p; |
| 665 | struct list_head slow_osds; | ||
| 664 | 666 | ||
| 665 | dout("timeout\n"); | 667 | dout("timeout\n"); |
| 666 | down_read(&osdc->map_sem); | 668 | down_read(&osdc->map_sem); |
| @@ -683,25 +685,56 @@ static void handle_timeout(struct work_struct *work) | |||
| 683 | continue; | 685 | continue; |
| 684 | } | 686 | } |
| 685 | } | 687 | } |
| 686 | for (p = rb_first(&osdc->osds); p; p = rb_next(p)) { | ||
| 687 | osd = rb_entry(p, struct ceph_osd, o_node); | ||
| 688 | if (list_empty(&osd->o_requests)) | ||
| 689 | continue; | ||
| 690 | req = list_first_entry(&osd->o_requests, | ||
| 691 | struct ceph_osd_request, r_osd_item); | ||
| 692 | if (time_before(jiffies, req->r_timeout_stamp)) | ||
| 693 | continue; | ||
| 694 | 688 | ||
| 695 | dout(" tid %llu (at least) timed out on osd%d\n", | 689 | /* |
| 690 | * reset osds that appear to be _really_ unresponsive. this | ||
| 691 | * is a failsafe measure.. we really shouldn't be getting to | ||
| 692 | * this point if the system is working properly. the monitors | ||
| 693 | * should mark the osd as failed and we should find out about | ||
| 694 | * it from an updated osd map. | ||
| 695 | */ | ||
| 696 | while (!list_empty(&osdc->req_lru)) { | ||
| 697 | req = list_entry(osdc->req_lru.next, struct ceph_osd_request, | ||
| 698 | r_req_lru_item); | ||
| 699 | |||
| 700 | if (time_before(jiffies, req->r_sent_stamp + timeout)) | ||
| 701 | break; | ||
| 702 | |||
| 703 | BUG_ON(req == last_req && req->r_sent_stamp == last_sent); | ||
| 704 | last_req = req; | ||
| 705 | last_sent = req->r_sent_stamp; | ||
| 706 | |||
| 707 | osd = req->r_osd; | ||
| 708 | BUG_ON(!osd); | ||
| 709 | pr_warning(" tid %llu timed out on osd%d, will reset osd\n", | ||
| 710 | req->r_tid, osd->o_osd); | ||
| 711 | __kick_requests(osdc, osd); | ||
| 712 | } | ||
| 713 | |||
| 714 | /* | ||
| 715 | * ping osds that are a bit slow. this ensures that if there | ||
| 716 | * is a break in the TCP connection we will notice, and reopen | ||
| 717 | * a connection with that osd (from the fault callback). | ||
| 718 | */ | ||
| 719 | INIT_LIST_HEAD(&slow_osds); | ||
| 720 | list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { | ||
| 721 | if (time_before(jiffies, req->r_sent_stamp + keepalive)) | ||
| 722 | break; | ||
| 723 | |||
| 724 | osd = req->r_osd; | ||
| 725 | BUG_ON(!osd); | ||
| 726 | dout(" tid %llu is slow, will send keepalive on osd%d\n", | ||
| 696 | req->r_tid, osd->o_osd); | 727 | req->r_tid, osd->o_osd); |
| 697 | req->r_timeout_stamp = next_timeout; | 728 | list_move_tail(&osd->o_keepalive_item, &slow_osds); |
| 729 | } | ||
| 730 | while (!list_empty(&slow_osds)) { | ||
| 731 | osd = list_entry(slow_osds.next, struct ceph_osd, | ||
| 732 | o_keepalive_item); | ||
| 733 | list_del_init(&osd->o_keepalive_item); | ||
| 698 | ceph_con_keepalive(&osd->o_con); | 734 | ceph_con_keepalive(&osd->o_con); |
| 699 | } | 735 | } |
| 700 | 736 | ||
| 701 | if (osdc->timeout_tid) | 737 | __schedule_osd_timeout(osdc); |
| 702 | schedule_delayed_work(&osdc->timeout_work, | ||
| 703 | round_jiffies_relative(timeout)); | ||
| 704 | |||
| 705 | mutex_unlock(&osdc->request_mutex); | 738 | mutex_unlock(&osdc->request_mutex); |
| 706 | 739 | ||
| 707 | up_read(&osdc->map_sem); | 740 | up_read(&osdc->map_sem); |
| @@ -819,18 +852,7 @@ bad: | |||
| 819 | } | 852 | } |
| 820 | 853 | ||
| 821 | 854 | ||
| 822 | /* | 855 | static int __kick_requests(struct ceph_osd_client *osdc, |
| 823 | * Resubmit osd requests whose osd or osd address has changed. Request | ||
| 824 | * a new osd map if osds are down, or we are otherwise unable to determine | ||
| 825 | * how to direct a request. | ||
| 826 | * | ||
| 827 | * Close connections to down osds. | ||
| 828 | * | ||
| 829 | * If @who is specified, resubmit requests for that specific osd. | ||
| 830 | * | ||
| 831 | * Caller should hold map_sem for read and request_mutex. | ||
| 832 | */ | ||
| 833 | static void kick_requests(struct ceph_osd_client *osdc, | ||
| 834 | struct ceph_osd *kickosd) | 856 | struct ceph_osd *kickosd) |
| 835 | { | 857 | { |
| 836 | struct ceph_osd_request *req; | 858 | struct ceph_osd_request *req; |
| @@ -839,7 +861,6 @@ static void kick_requests(struct ceph_osd_client *osdc, | |||
| 839 | int err; | 861 | int err; |
| 840 | 862 | ||
| 841 | dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); | 863 | dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); |
| 842 | mutex_lock(&osdc->request_mutex); | ||
| 843 | if (kickosd) { | 864 | if (kickosd) { |
| 844 | __reset_osd(osdc, kickosd); | 865 | __reset_osd(osdc, kickosd); |
| 845 | } else { | 866 | } else { |
| @@ -900,14 +921,36 @@ kick: | |||
| 900 | req->r_resend = true; | 921 | req->r_resend = true; |
| 901 | } | 922 | } |
| 902 | } | 923 | } |
| 924 | |||
| 925 | return needmap; | ||
| 926 | } | ||
| 927 | |||
| 928 | /* | ||
| 929 | * Resubmit osd requests whose osd or osd address has changed. Request | ||
| 930 | * a new osd map if osds are down, or we are otherwise unable to determine | ||
| 931 | * how to direct a request. | ||
| 932 | * | ||
| 933 | * Close connections to down osds. | ||
| 934 | * | ||
| 935 | * If @who is specified, resubmit requests for that specific osd. | ||
| 936 | * | ||
| 937 | * Caller should hold map_sem for read and request_mutex. | ||
| 938 | */ | ||
| 939 | static void kick_requests(struct ceph_osd_client *osdc, | ||
| 940 | struct ceph_osd *kickosd) | ||
| 941 | { | ||
| 942 | int needmap; | ||
| 943 | |||
| 944 | mutex_lock(&osdc->request_mutex); | ||
| 945 | needmap = __kick_requests(osdc, kickosd); | ||
| 903 | mutex_unlock(&osdc->request_mutex); | 946 | mutex_unlock(&osdc->request_mutex); |
| 904 | 947 | ||
| 905 | if (needmap) { | 948 | if (needmap) { |
| 906 | dout("%d requests for down osds, need new map\n", needmap); | 949 | dout("%d requests for down osds, need new map\n", needmap); |
| 907 | ceph_monc_request_next_osdmap(&osdc->client->monc); | 950 | ceph_monc_request_next_osdmap(&osdc->client->monc); |
| 908 | } | 951 | } |
| 909 | } | ||
| 910 | 952 | ||
| 953 | } | ||
| 911 | /* | 954 | /* |
| 912 | * Process updated osd map. | 955 | * Process updated osd map. |
| 913 | * | 956 | * |
| @@ -1164,11 +1207,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) | |||
| 1164 | init_completion(&osdc->map_waiters); | 1207 | init_completion(&osdc->map_waiters); |
| 1165 | osdc->last_requested_map = 0; | 1208 | osdc->last_requested_map = 0; |
| 1166 | mutex_init(&osdc->request_mutex); | 1209 | mutex_init(&osdc->request_mutex); |
| 1167 | osdc->timeout_tid = 0; | ||
| 1168 | osdc->last_tid = 0; | 1210 | osdc->last_tid = 0; |
| 1169 | osdc->osds = RB_ROOT; | 1211 | osdc->osds = RB_ROOT; |
| 1170 | INIT_LIST_HEAD(&osdc->osd_lru); | 1212 | INIT_LIST_HEAD(&osdc->osd_lru); |
| 1171 | osdc->requests = RB_ROOT; | 1213 | osdc->requests = RB_ROOT; |
| 1214 | INIT_LIST_HEAD(&osdc->req_lru); | ||
| 1172 | osdc->num_requests = 0; | 1215 | osdc->num_requests = 0; |
| 1173 | INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); | 1216 | INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); |
| 1174 | INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); | 1217 | INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); |
