aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c93
1 files changed, 33 insertions, 60 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index c1d756cc7448..eb9a44478764 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -221,6 +221,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
221 kref_init(&req->r_kref); 221 kref_init(&req->r_kref);
222 init_completion(&req->r_completion); 222 init_completion(&req->r_completion);
223 init_completion(&req->r_safe_completion); 223 init_completion(&req->r_safe_completion);
224 RB_CLEAR_NODE(&req->r_node);
224 INIT_LIST_HEAD(&req->r_unsafe_item); 225 INIT_LIST_HEAD(&req->r_unsafe_item);
225 INIT_LIST_HEAD(&req->r_linger_item); 226 INIT_LIST_HEAD(&req->r_linger_item);
226 INIT_LIST_HEAD(&req->r_linger_osd); 227 INIT_LIST_HEAD(&req->r_linger_osd);
@@ -580,7 +581,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
580 581
581 dout("__kick_osd_requests osd%d\n", osd->o_osd); 582 dout("__kick_osd_requests osd%d\n", osd->o_osd);
582 err = __reset_osd(osdc, osd); 583 err = __reset_osd(osdc, osd);
583 if (err == -EAGAIN) 584 if (err)
584 return; 585 return;
585 586
586 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 587 list_for_each_entry(req, &osd->o_requests, r_osd_item) {
@@ -607,14 +608,6 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
607 } 608 }
608} 609}
609 610
610static void kick_osd_requests(struct ceph_osd_client *osdc,
611 struct ceph_osd *kickosd)
612{
613 mutex_lock(&osdc->request_mutex);
614 __kick_osd_requests(osdc, kickosd);
615 mutex_unlock(&osdc->request_mutex);
616}
617
618/* 611/*
619 * If the osd connection drops, we need to resubmit all requests. 612 * If the osd connection drops, we need to resubmit all requests.
620 */ 613 */
@@ -628,7 +621,9 @@ static void osd_reset(struct ceph_connection *con)
628 dout("osd_reset osd%d\n", osd->o_osd); 621 dout("osd_reset osd%d\n", osd->o_osd);
629 osdc = osd->o_osdc; 622 osdc = osd->o_osdc;
630 down_read(&osdc->map_sem); 623 down_read(&osdc->map_sem);
631 kick_osd_requests(osdc, osd); 624 mutex_lock(&osdc->request_mutex);
625 __kick_osd_requests(osdc, osd);
626 mutex_unlock(&osdc->request_mutex);
632 send_queued(osdc); 627 send_queued(osdc);
633 up_read(&osdc->map_sem); 628 up_read(&osdc->map_sem);
634} 629}
@@ -647,6 +642,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
647 atomic_set(&osd->o_ref, 1); 642 atomic_set(&osd->o_ref, 1);
648 osd->o_osdc = osdc; 643 osd->o_osdc = osdc;
649 osd->o_osd = onum; 644 osd->o_osd = onum;
645 RB_CLEAR_NODE(&osd->o_node);
650 INIT_LIST_HEAD(&osd->o_requests); 646 INIT_LIST_HEAD(&osd->o_requests);
651 INIT_LIST_HEAD(&osd->o_linger_requests); 647 INIT_LIST_HEAD(&osd->o_linger_requests);
652 INIT_LIST_HEAD(&osd->o_osd_lru); 648 INIT_LIST_HEAD(&osd->o_osd_lru);
@@ -750,6 +746,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
750 if (list_empty(&osd->o_requests) && 746 if (list_empty(&osd->o_requests) &&
751 list_empty(&osd->o_linger_requests)) { 747 list_empty(&osd->o_linger_requests)) {
752 __remove_osd(osdc, osd); 748 __remove_osd(osdc, osd);
749 ret = -ENODEV;
753 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 750 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
754 &osd->o_con.peer_addr, 751 &osd->o_con.peer_addr,
755 sizeof(osd->o_con.peer_addr)) == 0 && 752 sizeof(osd->o_con.peer_addr)) == 0 &&
@@ -876,9 +873,9 @@ static void __unregister_request(struct ceph_osd_client *osdc,
876 req->r_osd = NULL; 873 req->r_osd = NULL;
877 } 874 }
878 875
876 list_del_init(&req->r_req_lru_item);
879 ceph_osdc_put_request(req); 877 ceph_osdc_put_request(req);
880 878
881 list_del_init(&req->r_req_lru_item);
882 if (osdc->num_requests == 0) { 879 if (osdc->num_requests == 0) {
883 dout(" no requests, canceling timeout\n"); 880 dout(" no requests, canceling timeout\n");
884 __cancel_osd_timeout(osdc); 881 __cancel_osd_timeout(osdc);
@@ -910,8 +907,8 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
910 struct ceph_osd_request *req) 907 struct ceph_osd_request *req)
911{ 908{
912 dout("__unregister_linger_request %p\n", req); 909 dout("__unregister_linger_request %p\n", req);
910 list_del_init(&req->r_linger_item);
913 if (req->r_osd) { 911 if (req->r_osd) {
914 list_del_init(&req->r_linger_item);
915 list_del_init(&req->r_linger_osd); 912 list_del_init(&req->r_linger_osd);
916 913
917 if (list_empty(&req->r_osd->o_requests) && 914 if (list_empty(&req->r_osd->o_requests) &&
@@ -1090,12 +1087,10 @@ static void handle_timeout(struct work_struct *work)
1090{ 1087{
1091 struct ceph_osd_client *osdc = 1088 struct ceph_osd_client *osdc =
1092 container_of(work, struct ceph_osd_client, timeout_work.work); 1089 container_of(work, struct ceph_osd_client, timeout_work.work);
1093 struct ceph_osd_request *req, *last_req = NULL; 1090 struct ceph_osd_request *req;
1094 struct ceph_osd *osd; 1091 struct ceph_osd *osd;
1095 unsigned long timeout = osdc->client->options->osd_timeout * HZ;
1096 unsigned long keepalive = 1092 unsigned long keepalive =
1097 osdc->client->options->osd_keepalive_timeout * HZ; 1093 osdc->client->options->osd_keepalive_timeout * HZ;
1098 unsigned long last_stamp = 0;
1099 struct list_head slow_osds; 1094 struct list_head slow_osds;
1100 dout("timeout\n"); 1095 dout("timeout\n");
1101 down_read(&osdc->map_sem); 1096 down_read(&osdc->map_sem);
@@ -1105,37 +1100,6 @@ static void handle_timeout(struct work_struct *work)
1105 mutex_lock(&osdc->request_mutex); 1100 mutex_lock(&osdc->request_mutex);
1106 1101
1107 /* 1102 /*
1108 * reset osds that appear to be _really_ unresponsive. this
1109 * is a failsafe measure.. we really shouldn't be getting to
1110 * this point if the system is working properly. the monitors
1111 * should mark the osd as failed and we should find out about
1112 * it from an updated osd map.
1113 */
1114 while (timeout && !list_empty(&osdc->req_lru)) {
1115 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1116 r_req_lru_item);
1117
1118 /* hasn't been long enough since we sent it? */
1119 if (time_before(jiffies, req->r_stamp + timeout))
1120 break;
1121
1122 /* hasn't been long enough since it was acked? */
1123 if (req->r_request->ack_stamp == 0 ||
1124 time_before(jiffies, req->r_request->ack_stamp + timeout))
1125 break;
1126
1127 BUG_ON(req == last_req && req->r_stamp == last_stamp);
1128 last_req = req;
1129 last_stamp = req->r_stamp;
1130
1131 osd = req->r_osd;
1132 BUG_ON(!osd);
1133 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
1134 req->r_tid, osd->o_osd);
1135 __kick_osd_requests(osdc, osd);
1136 }
1137
1138 /*
1139 * ping osds that are a bit slow. this ensures that if there 1103 * ping osds that are a bit slow. this ensures that if there
1140 * is a break in the TCP connection we will notice, and reopen 1104 * is a break in the TCP connection we will notice, and reopen
1141 * a connection with that osd (from the fault callback). 1105 * a connection with that osd (from the fault callback).
@@ -1306,7 +1270,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
1306 * Requeue requests whose mapping to an OSD has changed. If requests map to 1270 * Requeue requests whose mapping to an OSD has changed. If requests map to
1307 * no osd, request a new map. 1271 * no osd, request a new map.
1308 * 1272 *
1309 * Caller should hold map_sem for read and request_mutex. 1273 * Caller should hold map_sem for read.
1310 */ 1274 */
1311static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1275static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1312{ 1276{
@@ -1320,6 +1284,24 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1320 for (p = rb_first(&osdc->requests); p; ) { 1284 for (p = rb_first(&osdc->requests); p; ) {
1321 req = rb_entry(p, struct ceph_osd_request, r_node); 1285 req = rb_entry(p, struct ceph_osd_request, r_node);
1322 p = rb_next(p); 1286 p = rb_next(p);
1287
1288 /*
1289 * For linger requests that have not yet been
1290 * registered, move them to the linger list; they'll
1291 * be sent to the osd in the loop below. Unregister
1292 * the request before re-registering it as a linger
1293 * request to ensure the __map_request() below
1294 * will decide it needs to be sent.
1295 */
1296 if (req->r_linger && list_empty(&req->r_linger_item)) {
1297 dout("%p tid %llu restart on osd%d\n",
1298 req, req->r_tid,
1299 req->r_osd ? req->r_osd->o_osd : -1);
1300 __unregister_request(osdc, req);
1301 __register_linger_request(osdc, req);
1302 continue;
1303 }
1304
1323 err = __map_request(osdc, req, force_resend); 1305 err = __map_request(osdc, req, force_resend);
1324 if (err < 0) 1306 if (err < 0)
1325 continue; /* error */ 1307 continue; /* error */
@@ -1334,17 +1316,6 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1334 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1316 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1335 } 1317 }
1336 } 1318 }
1337 if (req->r_linger && list_empty(&req->r_linger_item)) {
1338 /*
1339 * register as a linger so that we will
1340 * re-submit below and get a new tid
1341 */
1342 dout("%p tid %llu restart on osd%d\n",
1343 req, req->r_tid,
1344 req->r_osd ? req->r_osd->o_osd : -1);
1345 __register_linger_request(osdc, req);
1346 __unregister_request(osdc, req);
1347 }
1348 } 1319 }
1349 1320
1350 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1321 list_for_each_entry_safe(req, nreq, &osdc->req_linger,
@@ -1352,6 +1323,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1352 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1323 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1353 1324
1354 err = __map_request(osdc, req, force_resend); 1325 err = __map_request(osdc, req, force_resend);
1326 dout("__map_request returned %d\n", err);
1355 if (err == 0) 1327 if (err == 0)
1356 continue; /* no change and no osd was specified */ 1328 continue; /* no change and no osd was specified */
1357 if (err < 0) 1329 if (err < 0)
@@ -1364,8 +1336,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1364 1336
1365 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1337 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1366 req->r_osd ? req->r_osd->o_osd : -1); 1338 req->r_osd ? req->r_osd->o_osd : -1);
1367 __unregister_linger_request(osdc, req);
1368 __register_request(osdc, req); 1339 __register_request(osdc, req);
1340 __unregister_linger_request(osdc, req);
1369 } 1341 }
1370 mutex_unlock(&osdc->request_mutex); 1342 mutex_unlock(&osdc->request_mutex);
1371 1343
@@ -1373,6 +1345,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1373 dout("%d requests for down osds, need new map\n", needmap); 1345 dout("%d requests for down osds, need new map\n", needmap);
1374 ceph_monc_request_next_osdmap(&osdc->client->monc); 1346 ceph_monc_request_next_osdmap(&osdc->client->monc);
1375 } 1347 }
1348 reset_changed_osds(osdc);
1376} 1349}
1377 1350
1378 1351
@@ -1429,7 +1402,6 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1429 osdc->osdmap = newmap; 1402 osdc->osdmap = newmap;
1430 } 1403 }
1431 kick_requests(osdc, 0); 1404 kick_requests(osdc, 0);
1432 reset_changed_osds(osdc);
1433 } else { 1405 } else {
1434 dout("ignoring incremental map %u len %d\n", 1406 dout("ignoring incremental map %u len %d\n",
1435 epoch, maplen); 1407 epoch, maplen);
@@ -1599,6 +1571,7 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1599 event->data = data; 1571 event->data = data;
1600 event->osdc = osdc; 1572 event->osdc = osdc;
1601 INIT_LIST_HEAD(&event->osd_node); 1573 INIT_LIST_HEAD(&event->osd_node);
1574 RB_CLEAR_NODE(&event->node);
1602 kref_init(&event->kref); /* one ref for us */ 1575 kref_init(&event->kref); /* one ref for us */
1603 kref_get(&event->kref); /* one ref for the caller */ 1576 kref_get(&event->kref); /* one ref for the caller */
1604 init_completion(&event->completion); 1577 init_completion(&event->completion);