aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2011-01-17 23:34:08 -0500
committerSage Weil <sage@newdream.net>2011-03-21 15:24:19 -0400
commit6f6c7006755b667f9f6c1f3b6f08cd65f75cc471 (patch)
tree233e96acdc3b627c97267992368ae1cb6cd66a5f /net
parent09adc80c611bb8902daa8ccfe34dbbc009d6befe (diff)
libceph: fix osd request queuing on osdmap updates
If we send a request to osd A, and the request's pg remaps to osd B and then back to A in quick succession, we need to resend the request to A. The old code was only calling kick_requests after processing all incremental maps in a message, so it was very possible to not resend a request that needed to be resent. This would make the osd eventually time out (at least with the current default of osd timeouts enabled). The correct approach is to scan requests on every map incremental. This patch refactors the kick code in a few ways: - all requests are either on req_lru (in flight), req_unsent (ready to send), or req_notarget (currently map to no up osd) - mapping always done by map_request (previous map_osds) - if the mapping changes, we requeue. requests are resent only after all map incrementals are processed. - some osd reset code is moved out of kick_requests into a separate function - the "kick this osd" functionality is moved to kick_osd_requests, as it is unrelated to scanning for request->pg->osd mapping changes Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/osd_client.c255
1 files changed, 122 insertions, 133 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3e20a122ffa2..b85ed5a5503d 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -22,10 +22,9 @@
22#define OSD_OPREPLY_FRONT_LEN 512 22#define OSD_OPREPLY_FRONT_LEN 512
23 23
24static const struct ceph_connection_operations osd_con_ops; 24static const struct ceph_connection_operations osd_con_ops;
25static int __kick_requests(struct ceph_osd_client *osdc,
26 struct ceph_osd *kickosd);
27 25
28static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 26static void send_queued(struct ceph_osd_client *osdc);
27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
29 28
30static int op_needs_trail(int op) 29static int op_needs_trail(int op)
31{ 30{
@@ -529,6 +528,35 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
529 return NULL; 528 return NULL;
530} 529}
531 530
531/*
532 * Resubmit requests pending on the given osd.
533 */
534static void __kick_osd_requests(struct ceph_osd_client *osdc,
535 struct ceph_osd *osd)
536{
537 struct ceph_osd_request *req;
538 int err;
539
540 dout("__kick_osd_requests osd%d\n", osd->o_osd);
541 err = __reset_osd(osdc, osd);
542 if (err == -EAGAIN)
543 return;
544
545 list_for_each_entry(req, &osd->o_requests, r_osd_item) {
546 list_move(&req->r_req_lru_item, &osdc->req_unsent);
547 dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
548 osd->o_osd);
549 req->r_flags |= CEPH_OSD_FLAG_RETRY;
550 }
551}
552
553static void kick_osd_requests(struct ceph_osd_client *osdc,
554 struct ceph_osd *kickosd)
555{
556 mutex_lock(&osdc->request_mutex);
557 __kick_osd_requests(osdc, kickosd);
558 mutex_unlock(&osdc->request_mutex);
559}
532 560
533/* 561/*
534 * If the osd connection drops, we need to resubmit all requests. 562 * If the osd connection drops, we need to resubmit all requests.
@@ -543,7 +571,8 @@ static void osd_reset(struct ceph_connection *con)
543 dout("osd_reset osd%d\n", osd->o_osd); 571 dout("osd_reset osd%d\n", osd->o_osd);
544 osdc = osd->o_osdc; 572 osdc = osd->o_osdc;
545 down_read(&osdc->map_sem); 573 down_read(&osdc->map_sem);
546 kick_requests(osdc, osd); 574 kick_osd_requests(osdc, osd);
575 send_queued(osdc);
547 up_read(&osdc->map_sem); 576 up_read(&osdc->map_sem);
548} 577}
549 578
@@ -781,20 +810,20 @@ static void __cancel_request(struct ceph_osd_request *req)
781 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 810 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
782 req->r_sent = 0; 811 req->r_sent = 0;
783 } 812 }
784 list_del_init(&req->r_req_lru_item);
785} 813}
786 814
787/* 815/*
788 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 816 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
789 * (as needed), and set the request r_osd appropriately. If there is 817 * (as needed), and set the request r_osd appropriately. If there is
790 * no up osd, set r_osd to NULL. 818 * no up osd, set r_osd to NULL. Move the request to the appropiate list
819 * (unsent, homeless) or leave on in-flight lru.
791 * 820 *
792 * Return 0 if unchanged, 1 if changed, or negative on error. 821 * Return 0 if unchanged, 1 if changed, or negative on error.
793 * 822 *
794 * Caller should hold map_sem for read and request_mutex. 823 * Caller should hold map_sem for read and request_mutex.
795 */ 824 */
796static int __map_osds(struct ceph_osd_client *osdc, 825static int __map_request(struct ceph_osd_client *osdc,
797 struct ceph_osd_request *req) 826 struct ceph_osd_request *req)
798{ 827{
799 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 828 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
800 struct ceph_pg pgid; 829 struct ceph_pg pgid;
@@ -802,11 +831,13 @@ static int __map_osds(struct ceph_osd_client *osdc,
802 int o = -1, num = 0; 831 int o = -1, num = 0;
803 int err; 832 int err;
804 833
805 dout("map_osds %p tid %lld\n", req, req->r_tid); 834 dout("map_request %p tid %lld\n", req, req->r_tid);
806 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, 835 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
807 &req->r_file_layout, osdc->osdmap); 836 &req->r_file_layout, osdc->osdmap);
808 if (err) 837 if (err) {
838 list_move(&req->r_req_lru_item, &osdc->req_notarget);
809 return err; 839 return err;
840 }
810 pgid = reqhead->layout.ol_pgid; 841 pgid = reqhead->layout.ol_pgid;
811 req->r_pgid = pgid; 842 req->r_pgid = pgid;
812 843
@@ -823,7 +854,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
823 (req->r_osd == NULL && o == -1)) 854 (req->r_osd == NULL && o == -1))
824 return 0; /* no change */ 855 return 0; /* no change */
825 856
826 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n", 857 dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
827 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 858 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
828 req->r_osd ? req->r_osd->o_osd : -1); 859 req->r_osd ? req->r_osd->o_osd : -1);
829 860
@@ -841,10 +872,12 @@ static int __map_osds(struct ceph_osd_client *osdc,
841 if (!req->r_osd && o >= 0) { 872 if (!req->r_osd && o >= 0) {
842 err = -ENOMEM; 873 err = -ENOMEM;
843 req->r_osd = create_osd(osdc); 874 req->r_osd = create_osd(osdc);
844 if (!req->r_osd) 875 if (!req->r_osd) {
876 list_move(&req->r_req_lru_item, &osdc->req_notarget);
845 goto out; 877 goto out;
878 }
846 879
847 dout("map_osds osd %p is osd%d\n", req->r_osd, o); 880 dout("map_request osd %p is osd%d\n", req->r_osd, o);
848 req->r_osd->o_osd = o; 881 req->r_osd->o_osd = o;
849 req->r_osd->o_con.peer_name.num = cpu_to_le64(o); 882 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
850 __insert_osd(osdc, req->r_osd); 883 __insert_osd(osdc, req->r_osd);
@@ -855,6 +888,9 @@ static int __map_osds(struct ceph_osd_client *osdc,
855 if (req->r_osd) { 888 if (req->r_osd) {
856 __remove_osd_from_lru(req->r_osd); 889 __remove_osd_from_lru(req->r_osd);
857 list_add(&req->r_osd_item, &req->r_osd->o_requests); 890 list_add(&req->r_osd_item, &req->r_osd->o_requests);
891 list_move(&req->r_req_lru_item, &osdc->req_unsent);
892 } else {
893 list_move(&req->r_req_lru_item, &osdc->req_notarget);
858 } 894 }
859 err = 1; /* osd or pg changed */ 895 err = 1; /* osd or pg changed */
860 896
@@ -869,16 +905,6 @@ static int __send_request(struct ceph_osd_client *osdc,
869 struct ceph_osd_request *req) 905 struct ceph_osd_request *req)
870{ 906{
871 struct ceph_osd_request_head *reqhead; 907 struct ceph_osd_request_head *reqhead;
872 int err;
873
874 err = __map_osds(osdc, req);
875 if (err < 0)
876 return err;
877 if (req->r_osd == NULL) {
878 dout("send_request %p no up osds in pg\n", req);
879 ceph_monc_request_next_osdmap(&osdc->client->monc);
880 return 0;
881 }
882 908
883 dout("send_request %p tid %llu to osd%d flags %d\n", 909 dout("send_request %p tid %llu to osd%d flags %d\n",
884 req, req->r_tid, req->r_osd->o_osd, req->r_flags); 910 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
@@ -898,6 +924,21 @@ static int __send_request(struct ceph_osd_client *osdc,
898} 924}
899 925
900/* 926/*
927 * Send any requests in the queue (req_unsent).
928 */
929static void send_queued(struct ceph_osd_client *osdc)
930{
931 struct ceph_osd_request *req, *tmp;
932
933 dout("send_queued\n");
934 mutex_lock(&osdc->request_mutex);
935 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
936 __send_request(osdc, req);
937 }
938 mutex_unlock(&osdc->request_mutex);
939}
940
941/*
901 * Timeout callback, called every N seconds when 1 or more osd 942 * Timeout callback, called every N seconds when 1 or more osd
902 * requests has been active for more than N seconds. When this 943 * requests has been active for more than N seconds. When this
903 * happens, we ping all OSDs with requests who have timed out to 944 * happens, we ping all OSDs with requests who have timed out to
@@ -916,7 +957,6 @@ static void handle_timeout(struct work_struct *work)
916 unsigned long keepalive = 957 unsigned long keepalive =
917 osdc->client->options->osd_keepalive_timeout * HZ; 958 osdc->client->options->osd_keepalive_timeout * HZ;
918 unsigned long last_stamp = 0; 959 unsigned long last_stamp = 0;
919 struct rb_node *p;
920 struct list_head slow_osds; 960 struct list_head slow_osds;
921 961
922 dout("timeout\n"); 962 dout("timeout\n");
@@ -925,21 +965,6 @@ static void handle_timeout(struct work_struct *work)
925 ceph_monc_request_next_osdmap(&osdc->client->monc); 965 ceph_monc_request_next_osdmap(&osdc->client->monc);
926 966
927 mutex_lock(&osdc->request_mutex); 967 mutex_lock(&osdc->request_mutex);
928 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
929 req = rb_entry(p, struct ceph_osd_request, r_node);
930
931 if (req->r_resend) {
932 int err;
933
934 dout("osdc resending prev failed %lld\n", req->r_tid);
935 err = __send_request(osdc, req);
936 if (err)
937 dout("osdc failed again on %lld\n", req->r_tid);
938 else
939 req->r_resend = false;
940 continue;
941 }
942 }
943 968
944 /* 969 /*
945 * reset osds that appear to be _really_ unresponsive. this 970 * reset osds that appear to be _really_ unresponsive. this
@@ -963,7 +988,7 @@ static void handle_timeout(struct work_struct *work)
963 BUG_ON(!osd); 988 BUG_ON(!osd);
964 pr_warning(" tid %llu timed out on osd%d, will reset osd\n", 989 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
965 req->r_tid, osd->o_osd); 990 req->r_tid, osd->o_osd);
966 __kick_requests(osdc, osd); 991 __kick_osd_requests(osdc, osd);
967 } 992 }
968 993
969 /* 994 /*
@@ -991,7 +1016,7 @@ static void handle_timeout(struct work_struct *work)
991 1016
992 __schedule_osd_timeout(osdc); 1017 __schedule_osd_timeout(osdc);
993 mutex_unlock(&osdc->request_mutex); 1018 mutex_unlock(&osdc->request_mutex);
994 1019 send_queued(osdc);
995 up_read(&osdc->map_sem); 1020 up_read(&osdc->map_sem);
996} 1021}
997 1022
@@ -1109,108 +1134,61 @@ bad:
1109 ceph_msg_dump(msg); 1134 ceph_msg_dump(msg);
1110} 1135}
1111 1136
1112 1137static void reset_changed_osds(struct ceph_osd_client *osdc)
1113static int __kick_requests(struct ceph_osd_client *osdc,
1114 struct ceph_osd *kickosd)
1115{ 1138{
1116 struct ceph_osd_request *req;
1117 struct rb_node *p, *n; 1139 struct rb_node *p, *n;
1118 int needmap = 0;
1119 int err;
1120
1121 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
1122 if (kickosd) {
1123 err = __reset_osd(osdc, kickosd);
1124 if (err == -EAGAIN)
1125 return 1;
1126 } else {
1127 for (p = rb_first(&osdc->osds); p; p = n) {
1128 struct ceph_osd *osd =
1129 rb_entry(p, struct ceph_osd, o_node);
1130
1131 n = rb_next(p);
1132 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1133 memcmp(&osd->o_con.peer_addr,
1134 ceph_osd_addr(osdc->osdmap,
1135 osd->o_osd),
1136 sizeof(struct ceph_entity_addr)) != 0)
1137 __reset_osd(osdc, osd);
1138 }
1139 }
1140
1141 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1142 req = rb_entry(p, struct ceph_osd_request, r_node);
1143
1144 if (req->r_resend) {
1145 dout(" r_resend set on tid %llu\n", req->r_tid);
1146 __cancel_request(req);
1147 goto kick;
1148 }
1149 if (req->r_osd && kickosd == req->r_osd) {
1150 __cancel_request(req);
1151 goto kick;
1152 }
1153 1140
1154 err = __map_osds(osdc, req); 1141 for (p = rb_first(&osdc->osds); p; p = n) {
1155 if (err == 0) 1142 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1156 continue; /* no change */
1157 if (err < 0) {
1158 /*
1159 * FIXME: really, we should set the request
1160 * error and fail if this isn't a 'nofail'
1161 * request, but that's a fair bit more
1162 * complicated to do. So retry!
1163 */
1164 dout(" setting r_resend on %llu\n", req->r_tid);
1165 req->r_resend = true;
1166 continue;
1167 }
1168 if (req->r_osd == NULL) {
1169 dout("tid %llu maps to no valid osd\n", req->r_tid);
1170 needmap++; /* request a newer map */
1171 continue;
1172 }
1173 1143
1174kick: 1144 n = rb_next(p);
1175 dout("kicking %p tid %llu osd%d\n", req, req->r_tid, 1145 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1176 req->r_osd ? req->r_osd->o_osd : -1); 1146 memcmp(&osd->o_con.peer_addr,
1177 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1147 ceph_osd_addr(osdc->osdmap,
1178 err = __send_request(osdc, req); 1148 osd->o_osd),
1179 if (err) { 1149 sizeof(struct ceph_entity_addr)) != 0)
1180 dout(" setting r_resend on %llu\n", req->r_tid); 1150 __reset_osd(osdc, osd);
1181 req->r_resend = true;
1182 }
1183 } 1151 }
1184
1185 return needmap;
1186} 1152}
1187 1153
1188/* 1154/*
1189 * Resubmit osd requests whose osd or osd address has changed. Request 1155 * Requeue requests whose mapping to an OSD has changed. If requests map to
1190 * a new osd map if osds are down, or we are otherwise unable to determine 1156 * no osd, request a new map.
1191 * how to direct a request.
1192 *
1193 * Close connections to down osds.
1194 *
1195 * If @who is specified, resubmit requests for that specific osd.
1196 * 1157 *
1197 * Caller should hold map_sem for read and request_mutex. 1158 * Caller should hold map_sem for read and request_mutex.
1198 */ 1159 */
1199static void kick_requests(struct ceph_osd_client *osdc, 1160static void kick_requests(struct ceph_osd_client *osdc)
1200 struct ceph_osd *kickosd)
1201{ 1161{
1202 int needmap; 1162 struct ceph_osd_request *req;
1163 struct rb_node *p;
1164 int needmap = 0;
1165 int err;
1203 1166
1167 dout("kick_requests\n");
1204 mutex_lock(&osdc->request_mutex); 1168 mutex_lock(&osdc->request_mutex);
1205 needmap = __kick_requests(osdc, kickosd); 1169 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1170 req = rb_entry(p, struct ceph_osd_request, r_node);
1171 err = __map_request(osdc, req);
1172 if (err < 0)
1173 continue; /* error */
1174 if (req->r_osd == NULL) {
1175 dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1176 needmap++; /* request a newer map */
1177 } else if (err > 0) {
1178 dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1179 req->r_osd ? req->r_osd->o_osd : -1);
1180 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1181 }
1182 }
1206 mutex_unlock(&osdc->request_mutex); 1183 mutex_unlock(&osdc->request_mutex);
1207 1184
1208 if (needmap) { 1185 if (needmap) {
1209 dout("%d requests for down osds, need new map\n", needmap); 1186 dout("%d requests for down osds, need new map\n", needmap);
1210 ceph_monc_request_next_osdmap(&osdc->client->monc); 1187 ceph_monc_request_next_osdmap(&osdc->client->monc);
1211 } 1188 }
1212
1213} 1189}
1190
1191
1214/* 1192/*
1215 * Process updated osd map. 1193 * Process updated osd map.
1216 * 1194 *
@@ -1263,6 +1241,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1263 ceph_osdmap_destroy(osdc->osdmap); 1241 ceph_osdmap_destroy(osdc->osdmap);
1264 osdc->osdmap = newmap; 1242 osdc->osdmap = newmap;
1265 } 1243 }
1244 kick_requests(osdc);
1245 reset_changed_osds(osdc);
1266 } else { 1246 } else {
1267 dout("ignoring incremental map %u len %d\n", 1247 dout("ignoring incremental map %u len %d\n",
1268 epoch, maplen); 1248 epoch, maplen);
@@ -1300,6 +1280,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1300 osdc->osdmap = newmap; 1280 osdc->osdmap = newmap;
1301 if (oldmap) 1281 if (oldmap)
1302 ceph_osdmap_destroy(oldmap); 1282 ceph_osdmap_destroy(oldmap);
1283 kick_requests(osdc);
1303 } 1284 }
1304 p += maplen; 1285 p += maplen;
1305 nr_maps--; 1286 nr_maps--;
@@ -1308,8 +1289,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1308done: 1289done:
1309 downgrade_write(&osdc->map_sem); 1290 downgrade_write(&osdc->map_sem);
1310 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1291 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1311 if (newmap) 1292 send_queued(osdc);
1312 kick_requests(osdc, NULL);
1313 up_read(&osdc->map_sem); 1293 up_read(&osdc->map_sem);
1314 wake_up_all(&osdc->client->auth_wq); 1294 wake_up_all(&osdc->client->auth_wq);
1315 return; 1295 return;
@@ -1347,15 +1327,22 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1347 * the request still han't been touched yet. 1327 * the request still han't been touched yet.
1348 */ 1328 */
1349 if (req->r_sent == 0) { 1329 if (req->r_sent == 0) {
1350 rc = __send_request(osdc, req); 1330 rc = __map_request(osdc, req);
1351 if (rc) { 1331 if (rc < 0)
1352 if (nofail) { 1332 return rc;
1353 dout("osdc_start_request failed send, " 1333 if (req->r_osd == NULL) {
1354 " marking %lld\n", req->r_tid); 1334 dout("send_request %p no up osds in pg\n", req);
1355 req->r_resend = true; 1335 ceph_monc_request_next_osdmap(&osdc->client->monc);
1356 rc = 0; 1336 } else {
1357 } else { 1337 rc = __send_request(osdc, req);
1358 __unregister_request(osdc, req); 1338 if (rc) {
1339 if (nofail) {
1340 dout("osdc_start_request failed send, "
1341 " will retry %lld\n", req->r_tid);
1342 rc = 0;
1343 } else {
1344 __unregister_request(osdc, req);
1345 }
1359 } 1346 }
1360 } 1347 }
1361 } 1348 }
@@ -1441,6 +1428,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1441 INIT_LIST_HEAD(&osdc->osd_lru); 1428 INIT_LIST_HEAD(&osdc->osd_lru);
1442 osdc->requests = RB_ROOT; 1429 osdc->requests = RB_ROOT;
1443 INIT_LIST_HEAD(&osdc->req_lru); 1430 INIT_LIST_HEAD(&osdc->req_lru);
1431 INIT_LIST_HEAD(&osdc->req_unsent);
1432 INIT_LIST_HEAD(&osdc->req_notarget);
1444 osdc->num_requests = 0; 1433 osdc->num_requests = 0;
1445 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1434 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1446 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1435 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);