aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/osd_client.c
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-02-26 18:32:31 -0500
committerSage Weil <sage@newdream.net>2010-03-04 14:26:35 -0500
commit422d2cb8f9afadba1ecd3614f658b6daaaa480fb (patch)
tree22e1a61acdbbe1459b190c4dbb6019360464b2e9 /fs/ceph/osd_client.c
parente9964c102312967a4bc1fd501cb628c4a3b19034 (diff)
ceph: reset osd after relevant messages timed out
This simplifies the process of timing out messages. We keep lru of current messages that are in flight. If a timeout has passed, we reset the osd connection, so that messages will be retransmitted. This is a failsafe in case we hit some sort of problem sending out message to the OSD. Normally, we'll get notification via an updated osdmap if there are problems. If a request is older than the keepalive timeout, send a keepalive to ensure we detect any breaks in the TCP connection. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/osd_client.c')
-rw-r--r--fs/ceph/osd_client.c153
1 files changed, 98 insertions, 55 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index c4763bff97b4..dbe63db9762f 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -17,6 +17,8 @@
17#define OSD_OPREPLY_FRONT_LEN 512 17#define OSD_OPREPLY_FRONT_LEN 512
18 18
19const static struct ceph_connection_operations osd_con_ops; 19const static struct ceph_connection_operations osd_con_ops;
20static int __kick_requests(struct ceph_osd_client *osdc,
21 struct ceph_osd *kickosd);
20 22
21static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 23static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
22 24
@@ -339,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
339 osd->o_con.ops = &osd_con_ops; 341 osd->o_con.ops = &osd_con_ops;
340 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; 342 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
341 343
344 INIT_LIST_HEAD(&osd->o_keepalive_item);
342 return osd; 345 return osd;
343} 346}
344 347
@@ -461,6 +464,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
461 return NULL; 464 return NULL;
462} 465}
463 466
467static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
468{
469 schedule_delayed_work(&osdc->timeout_work,
470 osdc->client->mount_args->osd_keepalive_timeout * HZ);
471}
472
473static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
474{
475 cancel_delayed_work(&osdc->timeout_work);
476}
464 477
465/* 478/*
466 * Register request, assign tid. If this is the first request, set up 479 * Register request, assign tid. If this is the first request, set up
@@ -472,21 +485,16 @@ static void register_request(struct ceph_osd_client *osdc,
472 mutex_lock(&osdc->request_mutex); 485 mutex_lock(&osdc->request_mutex);
473 req->r_tid = ++osdc->last_tid; 486 req->r_tid = ++osdc->last_tid;
474 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 487 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
488 INIT_LIST_HEAD(&req->r_req_lru_item);
475 489
476 dout("register_request %p tid %lld\n", req, req->r_tid); 490 dout("register_request %p tid %lld\n", req, req->r_tid);
477 __insert_request(osdc, req); 491 __insert_request(osdc, req);
478 ceph_osdc_get_request(req); 492 ceph_osdc_get_request(req);
479 osdc->num_requests++; 493 osdc->num_requests++;
480 494
481 req->r_timeout_stamp =
482 jiffies + osdc->client->mount_args->osd_timeout*HZ;
483
484 if (osdc->num_requests == 1) { 495 if (osdc->num_requests == 1) {
485 osdc->timeout_tid = req->r_tid; 496 dout(" first request, scheduling timeout\n");
486 dout(" timeout on tid %llu at %lu\n", req->r_tid, 497 __schedule_osd_timeout(osdc);
487 req->r_timeout_stamp);
488 schedule_delayed_work(&osdc->timeout_work,
489 round_jiffies_relative(req->r_timeout_stamp - jiffies));
490 } 498 }
491 mutex_unlock(&osdc->request_mutex); 499 mutex_unlock(&osdc->request_mutex);
492} 500}
@@ -513,21 +521,10 @@ static void __unregister_request(struct ceph_osd_client *osdc,
513 521
514 ceph_osdc_put_request(req); 522 ceph_osdc_put_request(req);
515 523
516 if (req->r_tid == osdc->timeout_tid) { 524 list_del_init(&req->r_req_lru_item);
517 if (osdc->num_requests == 0) { 525 if (osdc->num_requests == 0) {
518 dout("no requests, canceling timeout\n"); 526 dout(" no requests, canceling timeout\n");
519 osdc->timeout_tid = 0; 527 __cancel_osd_timeout(osdc);
520 cancel_delayed_work(&osdc->timeout_work);
521 } else {
522 req = rb_entry(rb_first(&osdc->requests),
523 struct ceph_osd_request, r_node);
524 osdc->timeout_tid = req->r_tid;
525 dout("rescheduled timeout on tid %llu at %lu\n",
526 req->r_tid, req->r_timeout_stamp);
527 schedule_delayed_work(&osdc->timeout_work,
528 round_jiffies_relative(req->r_timeout_stamp -
529 jiffies));
530 }
531 } 528 }
532} 529}
533 530
@@ -540,6 +537,7 @@ static void __cancel_request(struct ceph_osd_request *req)
540 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 537 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
541 req->r_sent = 0; 538 req->r_sent = 0;
542 } 539 }
540 list_del_init(&req->r_req_lru_item);
543} 541}
544 542
545/* 543/*
@@ -635,7 +633,8 @@ static int __send_request(struct ceph_osd_client *osdc,
635 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ 633 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
636 reqhead->reassert_version = req->r_reassert_version; 634 reqhead->reassert_version = req->r_reassert_version;
637 635
638 req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ; 636 req->r_sent_stamp = jiffies;
637 list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
639 638
640 ceph_msg_get(req->r_request); /* send consumes a ref */ 639 ceph_msg_get(req->r_request); /* send consumes a ref */
641 ceph_con_send(&req->r_osd->o_con, req->r_request); 640 ceph_con_send(&req->r_osd->o_con, req->r_request);
@@ -656,11 +655,14 @@ static void handle_timeout(struct work_struct *work)
656{ 655{
657 struct ceph_osd_client *osdc = 656 struct ceph_osd_client *osdc =
658 container_of(work, struct ceph_osd_client, timeout_work.work); 657 container_of(work, struct ceph_osd_client, timeout_work.work);
659 struct ceph_osd_request *req; 658 struct ceph_osd_request *req, *last_req = NULL;
660 struct ceph_osd *osd; 659 struct ceph_osd *osd;
661 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ; 660 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
662 unsigned long next_timeout = timeout + jiffies; 661 unsigned long keepalive =
662 osdc->client->mount_args->osd_keepalive_timeout * HZ;
663 unsigned long last_sent = 0;
663 struct rb_node *p; 664 struct rb_node *p;
665 struct list_head slow_osds;
664 666
665 dout("timeout\n"); 667 dout("timeout\n");
666 down_read(&osdc->map_sem); 668 down_read(&osdc->map_sem);
@@ -683,25 +685,56 @@ static void handle_timeout(struct work_struct *work)
683 continue; 685 continue;
684 } 686 }
685 } 687 }
686 for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
687 osd = rb_entry(p, struct ceph_osd, o_node);
688 if (list_empty(&osd->o_requests))
689 continue;
690 req = list_first_entry(&osd->o_requests,
691 struct ceph_osd_request, r_osd_item);
692 if (time_before(jiffies, req->r_timeout_stamp))
693 continue;
694 688
695 dout(" tid %llu (at least) timed out on osd%d\n", 689 /*
690 * reset osds that appear to be _really_ unresponsive. this
691 * is a failsafe measure.. we really shouldn't be getting to
692 * this point if the system is working properly. the monitors
693 * should mark the osd as failed and we should find out about
694 * it from an updated osd map.
695 */
696 while (!list_empty(&osdc->req_lru)) {
697 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
698 r_req_lru_item);
699
700 if (time_before(jiffies, req->r_sent_stamp + timeout))
701 break;
702
703 BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
704 last_req = req;
705 last_sent = req->r_sent_stamp;
706
707 osd = req->r_osd;
708 BUG_ON(!osd);
709 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
710 req->r_tid, osd->o_osd);
711 __kick_requests(osdc, osd);
712 }
713
714 /*
715 * ping osds that are a bit slow. this ensures that if there
716 * is a break in the TCP connection we will notice, and reopen
717 * a connection with that osd (from the fault callback).
718 */
719 INIT_LIST_HEAD(&slow_osds);
720 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
721 if (time_before(jiffies, req->r_sent_stamp + keepalive))
722 break;
723
724 osd = req->r_osd;
725 BUG_ON(!osd);
726 dout(" tid %llu is slow, will send keepalive on osd%d\n",
696 req->r_tid, osd->o_osd); 727 req->r_tid, osd->o_osd);
697 req->r_timeout_stamp = next_timeout; 728 list_move_tail(&osd->o_keepalive_item, &slow_osds);
729 }
730 while (!list_empty(&slow_osds)) {
731 osd = list_entry(slow_osds.next, struct ceph_osd,
732 o_keepalive_item);
733 list_del_init(&osd->o_keepalive_item);
698 ceph_con_keepalive(&osd->o_con); 734 ceph_con_keepalive(&osd->o_con);
699 } 735 }
700 736
701 if (osdc->timeout_tid) 737 __schedule_osd_timeout(osdc);
702 schedule_delayed_work(&osdc->timeout_work,
703 round_jiffies_relative(timeout));
704
705 mutex_unlock(&osdc->request_mutex); 738 mutex_unlock(&osdc->request_mutex);
706 739
707 up_read(&osdc->map_sem); 740 up_read(&osdc->map_sem);
@@ -819,18 +852,7 @@ bad:
819} 852}
820 853
821 854
822/* 855static int __kick_requests(struct ceph_osd_client *osdc,
823 * Resubmit osd requests whose osd or osd address has changed. Request
824 * a new osd map if osds are down, or we are otherwise unable to determine
825 * how to direct a request.
826 *
827 * Close connections to down osds.
828 *
829 * If @who is specified, resubmit requests for that specific osd.
830 *
831 * Caller should hold map_sem for read and request_mutex.
832 */
833static void kick_requests(struct ceph_osd_client *osdc,
834 struct ceph_osd *kickosd) 856 struct ceph_osd *kickosd)
835{ 857{
836 struct ceph_osd_request *req; 858 struct ceph_osd_request *req;
@@ -839,7 +861,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
839 int err; 861 int err;
840 862
841 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); 863 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
842 mutex_lock(&osdc->request_mutex);
843 if (kickosd) { 864 if (kickosd) {
844 __reset_osd(osdc, kickosd); 865 __reset_osd(osdc, kickosd);
845 } else { 866 } else {
@@ -900,14 +921,36 @@ kick:
900 req->r_resend = true; 921 req->r_resend = true;
901 } 922 }
902 } 923 }
924
925 return needmap;
926}
927
928/*
929 * Resubmit osd requests whose osd or osd address has changed. Request
930 * a new osd map if osds are down, or we are otherwise unable to determine
931 * how to direct a request.
932 *
933 * Close connections to down osds.
934 *
935 * If @who is specified, resubmit requests for that specific osd.
936 *
937 * Caller should hold map_sem for read and request_mutex.
938 */
939static void kick_requests(struct ceph_osd_client *osdc,
940 struct ceph_osd *kickosd)
941{
942 int needmap;
943
944 mutex_lock(&osdc->request_mutex);
945 needmap = __kick_requests(osdc, kickosd);
903 mutex_unlock(&osdc->request_mutex); 946 mutex_unlock(&osdc->request_mutex);
904 947
905 if (needmap) { 948 if (needmap) {
906 dout("%d requests for down osds, need new map\n", needmap); 949 dout("%d requests for down osds, need new map\n", needmap);
907 ceph_monc_request_next_osdmap(&osdc->client->monc); 950 ceph_monc_request_next_osdmap(&osdc->client->monc);
908 } 951 }
909}
910 952
953}
911/* 954/*
912 * Process updated osd map. 955 * Process updated osd map.
913 * 956 *
@@ -1164,11 +1207,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1164 init_completion(&osdc->map_waiters); 1207 init_completion(&osdc->map_waiters);
1165 osdc->last_requested_map = 0; 1208 osdc->last_requested_map = 0;
1166 mutex_init(&osdc->request_mutex); 1209 mutex_init(&osdc->request_mutex);
1167 osdc->timeout_tid = 0;
1168 osdc->last_tid = 0; 1210 osdc->last_tid = 0;
1169 osdc->osds = RB_ROOT; 1211 osdc->osds = RB_ROOT;
1170 INIT_LIST_HEAD(&osdc->osd_lru); 1212 INIT_LIST_HEAD(&osdc->osd_lru);
1171 osdc->requests = RB_ROOT; 1213 osdc->requests = RB_ROOT;
1214 INIT_LIST_HEAD(&osdc->req_lru);
1172 osdc->num_requests = 0; 1215 osdc->num_requests = 0;
1173 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1216 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1174 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1217 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);