aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-02-26 18:32:31 -0500
committerSage Weil <sage@newdream.net>2010-03-04 14:26:35 -0500
commit422d2cb8f9afadba1ecd3614f658b6daaaa480fb (patch)
tree22e1a61acdbbe1459b190c4dbb6019360464b2e9 /fs
parente9964c102312967a4bc1fd501cb628c4a3b19034 (diff)
ceph: reset osd after relevant messages timed out
This simplifies the process of timing out messages. We keep lru of current messages that are in flight. If a timeout has passed, we reset the osd connection, so that messages will be retransmitted. This is a failsafe in case we hit some sort of problem sending out message to the OSD. Normally, we'll get notification via an updated osdmap if there are problems. If a request is older than the keepalive timeout, send a keepalive to ensure we detect any breaks in the TCP connection. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/osd_client.c153
-rw-r--r--fs/ceph/osd_client.h6
-rw-r--r--fs/ceph/super.c8
-rw-r--r--fs/ceph/super.h3
4 files changed, 113 insertions, 57 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index c4763bff97b4..dbe63db9762f 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -17,6 +17,8 @@
17#define OSD_OPREPLY_FRONT_LEN 512 17#define OSD_OPREPLY_FRONT_LEN 512
18 18
19const static struct ceph_connection_operations osd_con_ops; 19const static struct ceph_connection_operations osd_con_ops;
20static int __kick_requests(struct ceph_osd_client *osdc,
21 struct ceph_osd *kickosd);
20 22
21static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 23static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
22 24
@@ -339,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
339 osd->o_con.ops = &osd_con_ops; 341 osd->o_con.ops = &osd_con_ops;
340 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; 342 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
341 343
344 INIT_LIST_HEAD(&osd->o_keepalive_item);
342 return osd; 345 return osd;
343} 346}
344 347
@@ -461,6 +464,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
461 return NULL; 464 return NULL;
462} 465}
463 466
467static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
468{
469 schedule_delayed_work(&osdc->timeout_work,
470 osdc->client->mount_args->osd_keepalive_timeout * HZ);
471}
472
473static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
474{
475 cancel_delayed_work(&osdc->timeout_work);
476}
464 477
465/* 478/*
466 * Register request, assign tid. If this is the first request, set up 479 * Register request, assign tid. If this is the first request, set up
@@ -472,21 +485,16 @@ static void register_request(struct ceph_osd_client *osdc,
472 mutex_lock(&osdc->request_mutex); 485 mutex_lock(&osdc->request_mutex);
473 req->r_tid = ++osdc->last_tid; 486 req->r_tid = ++osdc->last_tid;
474 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 487 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
488 INIT_LIST_HEAD(&req->r_req_lru_item);
475 489
476 dout("register_request %p tid %lld\n", req, req->r_tid); 490 dout("register_request %p tid %lld\n", req, req->r_tid);
477 __insert_request(osdc, req); 491 __insert_request(osdc, req);
478 ceph_osdc_get_request(req); 492 ceph_osdc_get_request(req);
479 osdc->num_requests++; 493 osdc->num_requests++;
480 494
481 req->r_timeout_stamp =
482 jiffies + osdc->client->mount_args->osd_timeout*HZ;
483
484 if (osdc->num_requests == 1) { 495 if (osdc->num_requests == 1) {
485 osdc->timeout_tid = req->r_tid; 496 dout(" first request, scheduling timeout\n");
486 dout(" timeout on tid %llu at %lu\n", req->r_tid, 497 __schedule_osd_timeout(osdc);
487 req->r_timeout_stamp);
488 schedule_delayed_work(&osdc->timeout_work,
489 round_jiffies_relative(req->r_timeout_stamp - jiffies));
490 } 498 }
491 mutex_unlock(&osdc->request_mutex); 499 mutex_unlock(&osdc->request_mutex);
492} 500}
@@ -513,21 +521,10 @@ static void __unregister_request(struct ceph_osd_client *osdc,
513 521
514 ceph_osdc_put_request(req); 522 ceph_osdc_put_request(req);
515 523
516 if (req->r_tid == osdc->timeout_tid) { 524 list_del_init(&req->r_req_lru_item);
517 if (osdc->num_requests == 0) { 525 if (osdc->num_requests == 0) {
518 dout("no requests, canceling timeout\n"); 526 dout(" no requests, canceling timeout\n");
519 osdc->timeout_tid = 0; 527 __cancel_osd_timeout(osdc);
520 cancel_delayed_work(&osdc->timeout_work);
521 } else {
522 req = rb_entry(rb_first(&osdc->requests),
523 struct ceph_osd_request, r_node);
524 osdc->timeout_tid = req->r_tid;
525 dout("rescheduled timeout on tid %llu at %lu\n",
526 req->r_tid, req->r_timeout_stamp);
527 schedule_delayed_work(&osdc->timeout_work,
528 round_jiffies_relative(req->r_timeout_stamp -
529 jiffies));
530 }
531 } 528 }
532} 529}
533 530
@@ -540,6 +537,7 @@ static void __cancel_request(struct ceph_osd_request *req)
540 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 537 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
541 req->r_sent = 0; 538 req->r_sent = 0;
542 } 539 }
540 list_del_init(&req->r_req_lru_item);
543} 541}
544 542
545/* 543/*
@@ -635,7 +633,8 @@ static int __send_request(struct ceph_osd_client *osdc,
635 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ 633 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
636 reqhead->reassert_version = req->r_reassert_version; 634 reqhead->reassert_version = req->r_reassert_version;
637 635
638 req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ; 636 req->r_sent_stamp = jiffies;
637 list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
639 638
640 ceph_msg_get(req->r_request); /* send consumes a ref */ 639 ceph_msg_get(req->r_request); /* send consumes a ref */
641 ceph_con_send(&req->r_osd->o_con, req->r_request); 640 ceph_con_send(&req->r_osd->o_con, req->r_request);
@@ -656,11 +655,14 @@ static void handle_timeout(struct work_struct *work)
656{ 655{
657 struct ceph_osd_client *osdc = 656 struct ceph_osd_client *osdc =
658 container_of(work, struct ceph_osd_client, timeout_work.work); 657 container_of(work, struct ceph_osd_client, timeout_work.work);
659 struct ceph_osd_request *req; 658 struct ceph_osd_request *req, *last_req = NULL;
660 struct ceph_osd *osd; 659 struct ceph_osd *osd;
661 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ; 660 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
662 unsigned long next_timeout = timeout + jiffies; 661 unsigned long keepalive =
662 osdc->client->mount_args->osd_keepalive_timeout * HZ;
663 unsigned long last_sent = 0;
663 struct rb_node *p; 664 struct rb_node *p;
665 struct list_head slow_osds;
664 666
665 dout("timeout\n"); 667 dout("timeout\n");
666 down_read(&osdc->map_sem); 668 down_read(&osdc->map_sem);
@@ -683,25 +685,56 @@ static void handle_timeout(struct work_struct *work)
683 continue; 685 continue;
684 } 686 }
685 } 687 }
686 for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
687 osd = rb_entry(p, struct ceph_osd, o_node);
688 if (list_empty(&osd->o_requests))
689 continue;
690 req = list_first_entry(&osd->o_requests,
691 struct ceph_osd_request, r_osd_item);
692 if (time_before(jiffies, req->r_timeout_stamp))
693 continue;
694 688
695 dout(" tid %llu (at least) timed out on osd%d\n", 689 /*
690 * reset osds that appear to be _really_ unresponsive. this
691 * is a failsafe measure.. we really shouldn't be getting to
692 * this point if the system is working properly. the monitors
693 * should mark the osd as failed and we should find out about
694 * it from an updated osd map.
695 */
696 while (!list_empty(&osdc->req_lru)) {
697 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
698 r_req_lru_item);
699
700 if (time_before(jiffies, req->r_sent_stamp + timeout))
701 break;
702
703 BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
704 last_req = req;
705 last_sent = req->r_sent_stamp;
706
707 osd = req->r_osd;
708 BUG_ON(!osd);
709 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
710 req->r_tid, osd->o_osd);
711 __kick_requests(osdc, osd);
712 }
713
714 /*
715 * ping osds that are a bit slow. this ensures that if there
716 * is a break in the TCP connection we will notice, and reopen
717 * a connection with that osd (from the fault callback).
718 */
719 INIT_LIST_HEAD(&slow_osds);
720 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
721 if (time_before(jiffies, req->r_sent_stamp + keepalive))
722 break;
723
724 osd = req->r_osd;
725 BUG_ON(!osd);
726 dout(" tid %llu is slow, will send keepalive on osd%d\n",
696 req->r_tid, osd->o_osd); 727 req->r_tid, osd->o_osd);
697 req->r_timeout_stamp = next_timeout; 728 list_move_tail(&osd->o_keepalive_item, &slow_osds);
729 }
730 while (!list_empty(&slow_osds)) {
731 osd = list_entry(slow_osds.next, struct ceph_osd,
732 o_keepalive_item);
733 list_del_init(&osd->o_keepalive_item);
698 ceph_con_keepalive(&osd->o_con); 734 ceph_con_keepalive(&osd->o_con);
699 } 735 }
700 736
701 if (osdc->timeout_tid) 737 __schedule_osd_timeout(osdc);
702 schedule_delayed_work(&osdc->timeout_work,
703 round_jiffies_relative(timeout));
704
705 mutex_unlock(&osdc->request_mutex); 738 mutex_unlock(&osdc->request_mutex);
706 739
707 up_read(&osdc->map_sem); 740 up_read(&osdc->map_sem);
@@ -819,18 +852,7 @@ bad:
819} 852}
820 853
821 854
822/* 855static int __kick_requests(struct ceph_osd_client *osdc,
823 * Resubmit osd requests whose osd or osd address has changed. Request
824 * a new osd map if osds are down, or we are otherwise unable to determine
825 * how to direct a request.
826 *
827 * Close connections to down osds.
828 *
829 * If @who is specified, resubmit requests for that specific osd.
830 *
831 * Caller should hold map_sem for read and request_mutex.
832 */
833static void kick_requests(struct ceph_osd_client *osdc,
834 struct ceph_osd *kickosd) 856 struct ceph_osd *kickosd)
835{ 857{
836 struct ceph_osd_request *req; 858 struct ceph_osd_request *req;
@@ -839,7 +861,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
839 int err; 861 int err;
840 862
841 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); 863 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
842 mutex_lock(&osdc->request_mutex);
843 if (kickosd) { 864 if (kickosd) {
844 __reset_osd(osdc, kickosd); 865 __reset_osd(osdc, kickosd);
845 } else { 866 } else {
@@ -900,14 +921,36 @@ kick:
900 req->r_resend = true; 921 req->r_resend = true;
901 } 922 }
902 } 923 }
924
925 return needmap;
926}
927
928/*
929 * Resubmit osd requests whose osd or osd address has changed. Request
930 * a new osd map if osds are down, or we are otherwise unable to determine
931 * how to direct a request.
932 *
933 * Close connections to down osds.
934 *
935 * If @who is specified, resubmit requests for that specific osd.
936 *
937 * Caller should hold map_sem for read and request_mutex.
938 */
939static void kick_requests(struct ceph_osd_client *osdc,
940 struct ceph_osd *kickosd)
941{
942 int needmap;
943
944 mutex_lock(&osdc->request_mutex);
945 needmap = __kick_requests(osdc, kickosd);
903 mutex_unlock(&osdc->request_mutex); 946 mutex_unlock(&osdc->request_mutex);
904 947
905 if (needmap) { 948 if (needmap) {
906 dout("%d requests for down osds, need new map\n", needmap); 949 dout("%d requests for down osds, need new map\n", needmap);
907 ceph_monc_request_next_osdmap(&osdc->client->monc); 950 ceph_monc_request_next_osdmap(&osdc->client->monc);
908 } 951 }
909}
910 952
953}
911/* 954/*
912 * Process updated osd map. 955 * Process updated osd map.
913 * 956 *
@@ -1164,11 +1207,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1164 init_completion(&osdc->map_waiters); 1207 init_completion(&osdc->map_waiters);
1165 osdc->last_requested_map = 0; 1208 osdc->last_requested_map = 0;
1166 mutex_init(&osdc->request_mutex); 1209 mutex_init(&osdc->request_mutex);
1167 osdc->timeout_tid = 0;
1168 osdc->last_tid = 0; 1210 osdc->last_tid = 0;
1169 osdc->osds = RB_ROOT; 1211 osdc->osds = RB_ROOT;
1170 INIT_LIST_HEAD(&osdc->osd_lru); 1212 INIT_LIST_HEAD(&osdc->osd_lru);
1171 osdc->requests = RB_ROOT; 1213 osdc->requests = RB_ROOT;
1214 INIT_LIST_HEAD(&osdc->req_lru);
1172 osdc->num_requests = 0; 1215 osdc->num_requests = 0;
1173 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1216 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1174 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1217 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index f256eba6fe7a..1b1a3ca43afc 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -36,12 +36,15 @@ struct ceph_osd {
36 void *o_authorizer_buf, *o_authorizer_reply_buf; 36 void *o_authorizer_buf, *o_authorizer_reply_buf;
37 size_t o_authorizer_buf_len, o_authorizer_reply_buf_len; 37 size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
38 unsigned long lru_ttl; 38 unsigned long lru_ttl;
39 int o_marked_for_keepalive;
40 struct list_head o_keepalive_item;
39}; 41};
40 42
41/* an in-flight request */ 43/* an in-flight request */
42struct ceph_osd_request { 44struct ceph_osd_request {
43 u64 r_tid; /* unique for this client */ 45 u64 r_tid; /* unique for this client */
44 struct rb_node r_node; 46 struct rb_node r_node;
47 struct list_head r_req_lru_item;
45 struct list_head r_osd_item; 48 struct list_head r_osd_item;
46 struct ceph_osd *r_osd; 49 struct ceph_osd *r_osd;
47 struct ceph_pg r_pgid; 50 struct ceph_pg r_pgid;
@@ -67,7 +70,7 @@ struct ceph_osd_request {
67 70
68 char r_oid[40]; /* object name */ 71 char r_oid[40]; /* object name */
69 int r_oid_len; 72 int r_oid_len;
70 unsigned long r_timeout_stamp; 73 unsigned long r_sent_stamp;
71 bool r_resend; /* msg send failed, needs retry */ 74 bool r_resend; /* msg send failed, needs retry */
72 75
73 struct ceph_file_layout r_file_layout; 76 struct ceph_file_layout r_file_layout;
@@ -92,6 +95,7 @@ struct ceph_osd_client {
92 u64 timeout_tid; /* tid of timeout triggering rq */ 95 u64 timeout_tid; /* tid of timeout triggering rq */
93 u64 last_tid; /* tid of last request */ 96 u64 last_tid; /* tid of last request */
94 struct rb_root requests; /* pending requests */ 97 struct rb_root requests; /* pending requests */
98 struct list_head req_lru; /* pending requests lru */
95 int num_requests; 99 int num_requests;
96 struct delayed_work timeout_work; 100 struct delayed_work timeout_work;
97 struct delayed_work osds_timeout_work; 101 struct delayed_work osds_timeout_work;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 74953be75f8f..4290a6e860b0 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -292,6 +292,7 @@ enum {
292 Opt_wsize, 292 Opt_wsize,
293 Opt_rsize, 293 Opt_rsize,
294 Opt_osdtimeout, 294 Opt_osdtimeout,
295 Opt_osdkeepalivetimeout,
295 Opt_mount_timeout, 296 Opt_mount_timeout,
296 Opt_osd_idle_ttl, 297 Opt_osd_idle_ttl,
297 Opt_caps_wanted_delay_min, 298 Opt_caps_wanted_delay_min,
@@ -322,6 +323,7 @@ static match_table_t arg_tokens = {
322 {Opt_wsize, "wsize=%d"}, 323 {Opt_wsize, "wsize=%d"},
323 {Opt_rsize, "rsize=%d"}, 324 {Opt_rsize, "rsize=%d"},
324 {Opt_osdtimeout, "osdtimeout=%d"}, 325 {Opt_osdtimeout, "osdtimeout=%d"},
326 {Opt_osdkeepalivetimeout, "osdkeepalive=%d"},
325 {Opt_mount_timeout, "mount_timeout=%d"}, 327 {Opt_mount_timeout, "mount_timeout=%d"},
326 {Opt_osd_idle_ttl, "osd_idle_ttl=%d"}, 328 {Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
327 {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"}, 329 {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
@@ -367,7 +369,8 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
367 /* start with defaults */ 369 /* start with defaults */
368 args->sb_flags = flags; 370 args->sb_flags = flags;
369 args->flags = CEPH_OPT_DEFAULT; 371 args->flags = CEPH_OPT_DEFAULT;
370 args->osd_timeout = 5; /* seconds */ 372 args->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
373 args->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
371 args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */ 374 args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
372 args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */ 375 args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */
373 args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT; 376 args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
@@ -468,6 +471,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
468 case Opt_osdtimeout: 471 case Opt_osdtimeout:
469 args->osd_timeout = intval; 472 args->osd_timeout = intval;
470 break; 473 break;
474 case Opt_osdkeepalivetimeout:
475 args->osd_keepalive_timeout = intval;
476 break;
471 case Opt_mount_timeout: 477 case Opt_mount_timeout:
472 args->mount_timeout = intval; 478 args->mount_timeout = intval;
473 break; 479 break;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 6a778f2c3f6e..02c0ddcf3eaf 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -62,6 +62,7 @@ struct ceph_mount_args {
62 int max_readdir; /* max readdir size */ 62 int max_readdir; /* max readdir size */
63 int congestion_kb; /* max readdir size */ 63 int congestion_kb; /* max readdir size */
64 int osd_timeout; 64 int osd_timeout;
65 int osd_keepalive_timeout;
65 char *snapdir_name; /* default ".snap" */ 66 char *snapdir_name; /* default ".snap" */
66 char *name; 67 char *name;
67 char *secret; 68 char *secret;
@@ -72,6 +73,8 @@ struct ceph_mount_args {
72 * defaults 73 * defaults
73 */ 74 */
74#define CEPH_MOUNT_TIMEOUT_DEFAULT 60 75#define CEPH_MOUNT_TIMEOUT_DEFAULT 60
76#define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */
77#define CEPH_OSD_KEEPALIVE_DEFAULT 5
75#define CEPH_OSD_IDLE_TTL_DEFAULT 60 78#define CEPH_OSD_IDLE_TTL_DEFAULT 60
76#define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */ 79#define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */
77 80