aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2018-06-14 18:24:58 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2018-06-14 18:24:58 -0400
commitdc594c39f7a9dcdfd5dbb1a446ac6d06182e2472 (patch)
tree1296214ff63762d72e46acb1e8090e99608da746 /net/ceph
parente7655d2b25466c534ed1f539367dae595bb0bd20 (diff)
parent23edca864951250af845a11da86bb3ea63522ed2 (diff)
Merge tag 'ceph-for-4.18-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "The main piece is a set of libceph changes that revamps how OSD requests are aborted, improving CephFS ENOSPC handling and making "umount -f" actually work (Zheng and myself). The rest is mostly mount option handling cleanups from Chengguang and assorted fixes from Zheng, Luis and Dongsheng. * tag 'ceph-for-4.18-rc1' of git://github.com/ceph/ceph-client: (31 commits) rbd: flush rbd_dev->watch_dwork after watch is unregistered ceph: update description of some mount options ceph: show ino32 if the value is different with default ceph: strengthen rsize/wsize/readdir_max_bytes validation ceph: fix alignment of rasize ceph: fix use-after-free in ceph_statfs() ceph: prevent i_version from going back ceph: fix wrong check for the case of updating link count libceph: allocate the locator string with GFP_NOFAIL libceph: make abort_on_full a per-osdc setting libceph: don't abort reads in ceph_osdc_abort_on_full() libceph: avoid a use-after-free during map check libceph: don't warn if req->r_abort_on_full is set libceph: use for_each_request() in ceph_osdc_abort_on_full() libceph: defer __complete_request() to a workqueue libceph: move more code into __complete_request() libceph: no need to call flush_workqueue() before destruction ceph: flush pending works before shutdown super ceph: abort osd requests on force umount libceph: introduce ceph_osdc_abort_requests() ...
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c31
-rw-r--r--net/ceph/osd_client.c216
-rw-r--r--net/ceph/osdmap.c19
3 files changed, 149 insertions, 117 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3b3d33ea9ed8..c6413c360771 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -168,12 +168,6 @@ static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
168static struct lock_class_key socket_class; 168static struct lock_class_key socket_class;
169#endif 169#endif
170 170
171/*
172 * When skipping (ignoring) a block of input we read it into a "skip
173 * buffer," which is this many bytes in size.
174 */
175#define SKIP_BUF_SIZE 1024
176
177static void queue_con(struct ceph_connection *con); 171static void queue_con(struct ceph_connection *con);
178static void cancel_con(struct ceph_connection *con); 172static void cancel_con(struct ceph_connection *con);
179static void ceph_con_workfn(struct work_struct *); 173static void ceph_con_workfn(struct work_struct *);
@@ -520,12 +514,18 @@ static int ceph_tcp_connect(struct ceph_connection *con)
520 return 0; 514 return 0;
521} 515}
522 516
517/*
518 * If @buf is NULL, discard up to @len bytes.
519 */
523static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 520static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
524{ 521{
525 struct kvec iov = {buf, len}; 522 struct kvec iov = {buf, len};
526 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 523 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
527 int r; 524 int r;
528 525
526 if (!buf)
527 msg.msg_flags |= MSG_TRUNC;
528
529 iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, len); 529 iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, len);
530 r = sock_recvmsg(sock, &msg, msg.msg_flags); 530 r = sock_recvmsg(sock, &msg, msg.msg_flags);
531 if (r == -EAGAIN) 531 if (r == -EAGAIN)
@@ -2575,9 +2575,6 @@ static int try_write(struct ceph_connection *con)
2575 con->state != CON_STATE_OPEN) 2575 con->state != CON_STATE_OPEN)
2576 return 0; 2576 return 0;
2577 2577
2578more:
2579 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
2580
2581 /* open the socket first? */ 2578 /* open the socket first? */
2582 if (con->state == CON_STATE_PREOPEN) { 2579 if (con->state == CON_STATE_PREOPEN) {
2583 BUG_ON(con->sock); 2580 BUG_ON(con->sock);
@@ -2598,7 +2595,8 @@ more:
2598 } 2595 }
2599 } 2596 }
2600 2597
2601more_kvec: 2598more:
2599 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
2602 BUG_ON(!con->sock); 2600 BUG_ON(!con->sock);
2603 2601
2604 /* kvec data queued? */ 2602 /* kvec data queued? */
@@ -2623,7 +2621,7 @@ more_kvec:
2623 2621
2624 ret = write_partial_message_data(con); 2622 ret = write_partial_message_data(con);
2625 if (ret == 1) 2623 if (ret == 1)
2626 goto more_kvec; /* we need to send the footer, too! */ 2624 goto more; /* we need to send the footer, too! */
2627 if (ret == 0) 2625 if (ret == 0)
2628 goto out; 2626 goto out;
2629 if (ret < 0) { 2627 if (ret < 0) {
@@ -2659,8 +2657,6 @@ out:
2659 return ret; 2657 return ret;
2660} 2658}
2661 2659
2662
2663
2664/* 2660/*
2665 * Read what we can from the socket. 2661 * Read what we can from the socket.
2666 */ 2662 */
@@ -2721,16 +2717,11 @@ more:
2721 if (con->in_base_pos < 0) { 2717 if (con->in_base_pos < 0) {
2722 /* 2718 /*
2723 * skipping + discarding content. 2719 * skipping + discarding content.
2724 *
2725 * FIXME: there must be a better way to do this!
2726 */ 2720 */
2727 static char buf[SKIP_BUF_SIZE]; 2721 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos);
2728 int skip = min((int) sizeof (buf), -con->in_base_pos);
2729
2730 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
2731 ret = ceph_tcp_recvmsg(con->sock, buf, skip);
2732 if (ret <= 0) 2722 if (ret <= 0)
2733 goto out; 2723 goto out;
2724 dout("skipped %d / %d bytes\n", ret, -con->in_base_pos);
2734 con->in_base_pos += ret; 2725 con->in_base_pos += ret;
2735 if (con->in_base_pos) 2726 if (con->in_base_pos)
2736 goto more; 2727 goto more;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 69a2581ddbba..a00c74f1154e 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -766,7 +766,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
766} 766}
767EXPORT_SYMBOL(osd_req_op_extent_dup_last); 767EXPORT_SYMBOL(osd_req_op_extent_dup_last);
768 768
769void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 769int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
770 u16 opcode, const char *class, const char *method) 770 u16 opcode, const char *class, const char *method)
771{ 771{
772 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 772 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
@@ -778,7 +778,9 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
778 BUG_ON(opcode != CEPH_OSD_OP_CALL); 778 BUG_ON(opcode != CEPH_OSD_OP_CALL);
779 779
780 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 780 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
781 BUG_ON(!pagelist); 781 if (!pagelist)
782 return -ENOMEM;
783
782 ceph_pagelist_init(pagelist); 784 ceph_pagelist_init(pagelist);
783 785
784 op->cls.class_name = class; 786 op->cls.class_name = class;
@@ -798,6 +800,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
798 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 800 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
799 801
800 op->indata_len = payload_len; 802 op->indata_len = payload_len;
803 return 0;
801} 804}
802EXPORT_SYMBOL(osd_req_op_cls_init); 805EXPORT_SYMBOL(osd_req_op_cls_init);
803 806
@@ -1026,7 +1029,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
1026 truncate_size, truncate_seq); 1029 truncate_size, truncate_seq);
1027 } 1030 }
1028 1031
1029 req->r_abort_on_full = true;
1030 req->r_flags = flags; 1032 req->r_flags = flags;
1031 req->r_base_oloc.pool = layout->pool_id; 1033 req->r_base_oloc.pool = layout->pool_id;
1032 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); 1034 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
@@ -1054,6 +1056,38 @@ EXPORT_SYMBOL(ceph_osdc_new_request);
1054DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) 1056DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1055DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) 1057DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1056 1058
1059/*
1060 * Call @fn on each OSD request as long as @fn returns 0.
1061 */
1062static void for_each_request(struct ceph_osd_client *osdc,
1063 int (*fn)(struct ceph_osd_request *req, void *arg),
1064 void *arg)
1065{
1066 struct rb_node *n, *p;
1067
1068 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1069 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1070
1071 for (p = rb_first(&osd->o_requests); p; ) {
1072 struct ceph_osd_request *req =
1073 rb_entry(p, struct ceph_osd_request, r_node);
1074
1075 p = rb_next(p);
1076 if (fn(req, arg))
1077 return;
1078 }
1079 }
1080
1081 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
1082 struct ceph_osd_request *req =
1083 rb_entry(p, struct ceph_osd_request, r_node);
1084
1085 p = rb_next(p);
1086 if (fn(req, arg))
1087 return;
1088 }
1089}
1090
1057static bool osd_homeless(struct ceph_osd *osd) 1091static bool osd_homeless(struct ceph_osd *osd)
1058{ 1092{
1059 return osd->o_osd == CEPH_HOMELESS_OSD; 1093 return osd->o_osd == CEPH_HOMELESS_OSD;
@@ -1395,7 +1429,6 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1395 bool recovery_deletes = ceph_osdmap_flag(osdc, 1429 bool recovery_deletes = ceph_osdmap_flag(osdc,
1396 CEPH_OSDMAP_RECOVERY_DELETES); 1430 CEPH_OSDMAP_RECOVERY_DELETES);
1397 enum calc_target_result ct_res; 1431 enum calc_target_result ct_res;
1398 int ret;
1399 1432
1400 t->epoch = osdc->osdmap->epoch; 1433 t->epoch = osdc->osdmap->epoch;
1401 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); 1434 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
@@ -1431,14 +1464,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1431 } 1464 }
1432 } 1465 }
1433 1466
1434 ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, 1467 __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
1435 &pgid);
1436 if (ret) {
1437 WARN_ON(ret != -ENOENT);
1438 t->osd = CEPH_HOMELESS_OSD;
1439 ct_res = CALC_TARGET_POOL_DNE;
1440 goto out;
1441 }
1442 last_pgid.pool = pgid.pool; 1468 last_pgid.pool = pgid.pool;
1443 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); 1469 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1444 1470
@@ -2161,9 +2187,9 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2161 struct ceph_osd_client *osdc = req->r_osdc; 2187 struct ceph_osd_client *osdc = req->r_osdc;
2162 struct ceph_osd *osd; 2188 struct ceph_osd *osd;
2163 enum calc_target_result ct_res; 2189 enum calc_target_result ct_res;
2190 int err = 0;
2164 bool need_send = false; 2191 bool need_send = false;
2165 bool promoted = false; 2192 bool promoted = false;
2166 bool need_abort = false;
2167 2193
2168 WARN_ON(req->r_tid); 2194 WARN_ON(req->r_tid);
2169 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 2195 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
@@ -2179,7 +2205,10 @@ again:
2179 goto promote; 2205 goto promote;
2180 } 2206 }
2181 2207
2182 if (osdc->osdmap->epoch < osdc->epoch_barrier) { 2208 if (osdc->abort_err) {
2209 dout("req %p abort_err %d\n", req, osdc->abort_err);
2210 err = osdc->abort_err;
2211 } else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2183 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, 2212 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2184 osdc->epoch_barrier); 2213 osdc->epoch_barrier);
2185 req->r_t.paused = true; 2214 req->r_t.paused = true;
@@ -2200,11 +2229,13 @@ again:
2200 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2229 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2201 pool_full(osdc, req->r_t.base_oloc.pool))) { 2230 pool_full(osdc, req->r_t.base_oloc.pool))) {
2202 dout("req %p full/pool_full\n", req); 2231 dout("req %p full/pool_full\n", req);
2203 pr_warn_ratelimited("FULL or reached pool quota\n"); 2232 if (osdc->abort_on_full) {
2204 req->r_t.paused = true; 2233 err = -ENOSPC;
2205 maybe_request_map(osdc); 2234 } else {
2206 if (req->r_abort_on_full) 2235 pr_warn_ratelimited("FULL or reached pool quota\n");
2207 need_abort = true; 2236 req->r_t.paused = true;
2237 maybe_request_map(osdc);
2238 }
2208 } else if (!osd_homeless(osd)) { 2239 } else if (!osd_homeless(osd)) {
2209 need_send = true; 2240 need_send = true;
2210 } else { 2241 } else {
@@ -2221,11 +2252,11 @@ again:
2221 link_request(osd, req); 2252 link_request(osd, req);
2222 if (need_send) 2253 if (need_send)
2223 send_request(req); 2254 send_request(req);
2224 else if (need_abort) 2255 else if (err)
2225 complete_request(req, -ENOSPC); 2256 complete_request(req, err);
2226 mutex_unlock(&osd->lock); 2257 mutex_unlock(&osd->lock);
2227 2258
2228 if (ct_res == CALC_TARGET_POOL_DNE) 2259 if (!err && ct_res == CALC_TARGET_POOL_DNE)
2229 send_map_check(req); 2260 send_map_check(req);
2230 2261
2231 if (promoted) 2262 if (promoted)
@@ -2281,11 +2312,21 @@ static void finish_request(struct ceph_osd_request *req)
2281 2312
2282static void __complete_request(struct ceph_osd_request *req) 2313static void __complete_request(struct ceph_osd_request *req)
2283{ 2314{
2284 if (req->r_callback) { 2315 dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
2285 dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, 2316 req->r_tid, req->r_callback, req->r_result);
2286 req->r_tid, req->r_callback, req->r_result); 2317
2318 if (req->r_callback)
2287 req->r_callback(req); 2319 req->r_callback(req);
2288 } 2320 complete_all(&req->r_completion);
2321 ceph_osdc_put_request(req);
2322}
2323
2324static void complete_request_workfn(struct work_struct *work)
2325{
2326 struct ceph_osd_request *req =
2327 container_of(work, struct ceph_osd_request, r_complete_work);
2328
2329 __complete_request(req);
2289} 2330}
2290 2331
2291/* 2332/*
@@ -2297,9 +2338,9 @@ static void complete_request(struct ceph_osd_request *req, int err)
2297 2338
2298 req->r_result = err; 2339 req->r_result = err;
2299 finish_request(req); 2340 finish_request(req);
2300 __complete_request(req); 2341
2301 complete_all(&req->r_completion); 2342 INIT_WORK(&req->r_complete_work, complete_request_workfn);
2302 ceph_osdc_put_request(req); 2343 queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
2303} 2344}
2304 2345
2305static void cancel_map_check(struct ceph_osd_request *req) 2346static void cancel_map_check(struct ceph_osd_request *req)
@@ -2336,6 +2377,28 @@ static void abort_request(struct ceph_osd_request *req, int err)
2336 complete_request(req, err); 2377 complete_request(req, err);
2337} 2378}
2338 2379
2380static int abort_fn(struct ceph_osd_request *req, void *arg)
2381{
2382 int err = *(int *)arg;
2383
2384 abort_request(req, err);
2385 return 0; /* continue iteration */
2386}
2387
2388/*
2389 * Abort all in-flight requests with @err and arrange for all future
2390 * requests to be failed immediately.
2391 */
2392void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
2393{
2394 dout("%s osdc %p err %d\n", __func__, osdc, err);
2395 down_write(&osdc->lock);
2396 for_each_request(osdc, abort_fn, &err);
2397 osdc->abort_err = err;
2398 up_write(&osdc->lock);
2399}
2400EXPORT_SYMBOL(ceph_osdc_abort_requests);
2401
2339static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2402static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2340{ 2403{
2341 if (likely(eb > osdc->epoch_barrier)) { 2404 if (likely(eb > osdc->epoch_barrier)) {
@@ -2363,6 +2426,30 @@ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2363EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); 2426EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2364 2427
2365/* 2428/*
2429 * We can end up releasing caps as a result of abort_request().
2430 * In that case, we probably want to ensure that the cap release message
2431 * has an updated epoch barrier in it, so set the epoch barrier prior to
2432 * aborting the first request.
2433 */
2434static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
2435{
2436 struct ceph_osd_client *osdc = req->r_osdc;
2437 bool *victims = arg;
2438
2439 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2440 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2441 pool_full(osdc, req->r_t.base_oloc.pool))) {
2442 if (!*victims) {
2443 update_epoch_barrier(osdc, osdc->osdmap->epoch);
2444 *victims = true;
2445 }
2446 abort_request(req, -ENOSPC);
2447 }
2448
2449 return 0; /* continue iteration */
2450}
2451
2452/*
2366 * Drop all pending requests that are stalled waiting on a full condition to 2453 * Drop all pending requests that are stalled waiting on a full condition to
2367 * clear, and complete them with ENOSPC as the return code. Set the 2454 * clear, and complete them with ENOSPC as the return code. Set the
2368 * osdc->epoch_barrier to the latest map epoch that we've seen if any were 2455 * osdc->epoch_barrier to the latest map epoch that we've seen if any were
@@ -2370,61 +2457,11 @@ EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2370 */ 2457 */
2371static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) 2458static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2372{ 2459{
2373 struct rb_node *n;
2374 bool victims = false; 2460 bool victims = false;
2375 2461
2376 dout("enter abort_on_full\n"); 2462 if (osdc->abort_on_full &&
2377 2463 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
2378 if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) 2464 for_each_request(osdc, abort_on_full_fn, &victims);
2379 goto out;
2380
2381 /* Scan list and see if there is anything to abort */
2382 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2383 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2384 struct rb_node *m;
2385
2386 m = rb_first(&osd->o_requests);
2387 while (m) {
2388 struct ceph_osd_request *req = rb_entry(m,
2389 struct ceph_osd_request, r_node);
2390 m = rb_next(m);
2391
2392 if (req->r_abort_on_full) {
2393 victims = true;
2394 break;
2395 }
2396 }
2397 if (victims)
2398 break;
2399 }
2400
2401 if (!victims)
2402 goto out;
2403
2404 /*
2405 * Update the barrier to current epoch if it's behind that point,
2406 * since we know we have some calls to be aborted in the tree.
2407 */
2408 update_epoch_barrier(osdc, osdc->osdmap->epoch);
2409
2410 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2411 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2412 struct rb_node *m;
2413
2414 m = rb_first(&osd->o_requests);
2415 while (m) {
2416 struct ceph_osd_request *req = rb_entry(m,
2417 struct ceph_osd_request, r_node);
2418 m = rb_next(m);
2419
2420 if (req->r_abort_on_full &&
2421 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2422 pool_full(osdc, req->r_t.target_oloc.pool)))
2423 abort_request(req, -ENOSPC);
2424 }
2425 }
2426out:
2427 dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
2428} 2465}
2429 2466
2430static void check_pool_dne(struct ceph_osd_request *req) 2467static void check_pool_dne(struct ceph_osd_request *req)
@@ -3541,8 +3578,6 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3541 up_read(&osdc->lock); 3578 up_read(&osdc->lock);
3542 3579
3543 __complete_request(req); 3580 __complete_request(req);
3544 complete_all(&req->r_completion);
3545 ceph_osdc_put_request(req);
3546 return; 3581 return;
3547 3582
3548fail_request: 3583fail_request:
@@ -4927,7 +4962,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
4927 if (ret) 4962 if (ret)
4928 goto out_put_req; 4963 goto out_put_req;
4929 4964
4930 osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); 4965 ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
4966 if (ret)
4967 goto out_put_req;
4968
4931 if (req_page) 4969 if (req_page)
4932 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 4970 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
4933 0, false, false); 4971 0, false, false);
@@ -4996,6 +5034,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
4996 if (!osdc->notify_wq) 5034 if (!osdc->notify_wq)
4997 goto out_msgpool_reply; 5035 goto out_msgpool_reply;
4998 5036
5037 osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
5038 if (!osdc->completion_wq)
5039 goto out_notify_wq;
5040
4999 schedule_delayed_work(&osdc->timeout_work, 5041 schedule_delayed_work(&osdc->timeout_work,
5000 osdc->client->options->osd_keepalive_timeout); 5042 osdc->client->options->osd_keepalive_timeout);
5001 schedule_delayed_work(&osdc->osds_timeout_work, 5043 schedule_delayed_work(&osdc->osds_timeout_work,
@@ -5003,6 +5045,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
5003 5045
5004 return 0; 5046 return 0;
5005 5047
5048out_notify_wq:
5049 destroy_workqueue(osdc->notify_wq);
5006out_msgpool_reply: 5050out_msgpool_reply:
5007 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 5051 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5008out_msgpool: 5052out_msgpool:
@@ -5017,7 +5061,7 @@ out:
5017 5061
5018void ceph_osdc_stop(struct ceph_osd_client *osdc) 5062void ceph_osdc_stop(struct ceph_osd_client *osdc)
5019{ 5063{
5020 flush_workqueue(osdc->notify_wq); 5064 destroy_workqueue(osdc->completion_wq);
5021 destroy_workqueue(osdc->notify_wq); 5065 destroy_workqueue(osdc->notify_wq);
5022 cancel_delayed_work_sync(&osdc->timeout_work); 5066 cancel_delayed_work_sync(&osdc->timeout_work);
5023 cancel_delayed_work_sync(&osdc->osds_timeout_work); 5067 cancel_delayed_work_sync(&osdc->osds_timeout_work);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index e22820e24f50..98c0ff3d6441 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -2146,10 +2146,10 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting,
2146 * Should only be called with target_oid and target_oloc (as opposed to 2146 * Should only be called with target_oid and target_oloc (as opposed to
2147 * base_oid and base_oloc), since tiering isn't taken into account. 2147 * base_oid and base_oloc), since tiering isn't taken into account.
2148 */ 2148 */
2149int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, 2149void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2150 const struct ceph_object_id *oid, 2150 const struct ceph_object_id *oid,
2151 const struct ceph_object_locator *oloc, 2151 const struct ceph_object_locator *oloc,
2152 struct ceph_pg *raw_pgid) 2152 struct ceph_pg *raw_pgid)
2153{ 2153{
2154 WARN_ON(pi->id != oloc->pool); 2154 WARN_ON(pi->id != oloc->pool);
2155 2155
@@ -2165,11 +2165,8 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2165 int nsl = oloc->pool_ns->len; 2165 int nsl = oloc->pool_ns->len;
2166 size_t total = nsl + 1 + oid->name_len; 2166 size_t total = nsl + 1 + oid->name_len;
2167 2167
2168 if (total > sizeof(stack_buf)) { 2168 if (total > sizeof(stack_buf))
2169 buf = kmalloc(total, GFP_NOIO); 2169 buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL);
2170 if (!buf)
2171 return -ENOMEM;
2172 }
2173 memcpy(buf, oloc->pool_ns->str, nsl); 2170 memcpy(buf, oloc->pool_ns->str, nsl);
2174 buf[nsl] = '\037'; 2171 buf[nsl] = '\037';
2175 memcpy(buf + nsl + 1, oid->name, oid->name_len); 2172 memcpy(buf + nsl + 1, oid->name, oid->name_len);
@@ -2181,7 +2178,6 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2181 oid->name, nsl, oloc->pool_ns->str, 2178 oid->name, nsl, oloc->pool_ns->str,
2182 raw_pgid->pool, raw_pgid->seed); 2179 raw_pgid->pool, raw_pgid->seed);
2183 } 2180 }
2184 return 0;
2185} 2181}
2186 2182
2187int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, 2183int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
@@ -2195,7 +2191,8 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
2195 if (!pi) 2191 if (!pi)
2196 return -ENOENT; 2192 return -ENOENT;
2197 2193
2198 return __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); 2194 __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
2195 return 0;
2199} 2196}
2200EXPORT_SYMBOL(ceph_object_locator_to_pg); 2197EXPORT_SYMBOL(ceph_object_locator_to_pg);
2201 2198