aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-02-07 15:35:56 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2014-02-07 15:35:56 -0500
commit41f76d8bee06be5a9b301ebf13f61a719ba548c4 (patch)
treea582e83d1706b7f69ad6707f7102b5dd59e0fd60
parent42be3f35a3ef0b64af80394afc5873aa5cf70871 (diff)
parent0ec1d15ec6ed513ab2cc86c67d94761d71228a32 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull ceph fixes from Sage Weil: "There is an RBD fix for a crash due to the immutable bio changes, an error path fix, and a locking fix in the recent redirect support" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: libceph: do not dereference a NULL bio pointer libceph: take map_sem for read in handle_reply() libceph: factor out logic from ceph_osdc_start_request() libceph: fix error handling in ceph_osdc_init()
-rw-r--r--net/ceph/messenger.c8
-rw-r--r--net/ceph/osd_client.c84
2 files changed, 60 insertions, 32 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 0e478a0f4204..30efc5c18622 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -840,9 +840,13 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
840 840
841 if (!cursor->bvec_iter.bi_size) { 841 if (!cursor->bvec_iter.bi_size) {
842 bio = bio->bi_next; 842 bio = bio->bi_next;
843 cursor->bvec_iter = bio->bi_iter; 843 cursor->bio = bio;
844 if (bio)
845 cursor->bvec_iter = bio->bi_iter;
846 else
847 memset(&cursor->bvec_iter, 0,
848 sizeof(cursor->bvec_iter));
844 } 849 }
845 cursor->bio = bio;
846 850
847 if (!cursor->last_piece) { 851 if (!cursor->last_piece) {
848 BUG_ON(!cursor->resid); 852 BUG_ON(!cursor->resid);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 010ff3bd58ad..0676f2b199d6 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1427,6 +1427,40 @@ static void __send_queued(struct ceph_osd_client *osdc)
1427} 1427}
1428 1428
1429/* 1429/*
1430 * Caller should hold map_sem for read and request_mutex.
1431 */
1432static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
1433 struct ceph_osd_request *req,
1434 bool nofail)
1435{
1436 int rc;
1437
1438 __register_request(osdc, req);
1439 req->r_sent = 0;
1440 req->r_got_reply = 0;
1441 rc = __map_request(osdc, req, 0);
1442 if (rc < 0) {
1443 if (nofail) {
1444 dout("osdc_start_request failed map, "
1445 " will retry %lld\n", req->r_tid);
1446 rc = 0;
1447 } else {
1448 __unregister_request(osdc, req);
1449 }
1450 return rc;
1451 }
1452
1453 if (req->r_osd == NULL) {
1454 dout("send_request %p no up osds in pg\n", req);
1455 ceph_monc_request_next_osdmap(&osdc->client->monc);
1456 } else {
1457 __send_queued(osdc);
1458 }
1459
1460 return 0;
1461}
1462
1463/*
1430 * Timeout callback, called every N seconds when 1 or more osd 1464 * Timeout callback, called every N seconds when 1 or more osd
1431 * requests has been active for more than N seconds. When this 1465 * requests has been active for more than N seconds. When this
1432 * happens, we ping all OSDs with requests who have timed out to 1466 * happens, we ping all OSDs with requests who have timed out to
@@ -1653,6 +1687,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1653 osdmap_epoch = ceph_decode_32(&p); 1687 osdmap_epoch = ceph_decode_32(&p);
1654 1688
1655 /* lookup */ 1689 /* lookup */
1690 down_read(&osdc->map_sem);
1656 mutex_lock(&osdc->request_mutex); 1691 mutex_lock(&osdc->request_mutex);
1657 req = __lookup_request(osdc, tid); 1692 req = __lookup_request(osdc, tid);
1658 if (req == NULL) { 1693 if (req == NULL) {
@@ -1709,7 +1744,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1709 dout("redirect pool %lld\n", redir.oloc.pool); 1744 dout("redirect pool %lld\n", redir.oloc.pool);
1710 1745
1711 __unregister_request(osdc, req); 1746 __unregister_request(osdc, req);
1712 mutex_unlock(&osdc->request_mutex);
1713 1747
1714 req->r_target_oloc = redir.oloc; /* struct */ 1748 req->r_target_oloc = redir.oloc; /* struct */
1715 1749
@@ -1721,10 +1755,10 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1721 * successfully. In the future we might want to follow 1755 * successfully. In the future we might want to follow
1722 * original request's nofail setting here. 1756 * original request's nofail setting here.
1723 */ 1757 */
1724 err = ceph_osdc_start_request(osdc, req, true); 1758 err = __ceph_osdc_start_request(osdc, req, true);
1725 BUG_ON(err); 1759 BUG_ON(err);
1726 1760
1727 goto done; 1761 goto out_unlock;
1728 } 1762 }
1729 1763
1730 already_completed = req->r_got_reply; 1764 already_completed = req->r_got_reply;
@@ -1742,8 +1776,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1742 req->r_got_reply = 1; 1776 req->r_got_reply = 1;
1743 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1777 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1744 dout("handle_reply tid %llu dup ack\n", tid); 1778 dout("handle_reply tid %llu dup ack\n", tid);
1745 mutex_unlock(&osdc->request_mutex); 1779 goto out_unlock;
1746 goto done;
1747 } 1780 }
1748 1781
1749 dout("handle_reply tid %llu flags %d\n", tid, flags); 1782 dout("handle_reply tid %llu flags %d\n", tid, flags);
@@ -1758,6 +1791,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1758 __unregister_request(osdc, req); 1791 __unregister_request(osdc, req);
1759 1792
1760 mutex_unlock(&osdc->request_mutex); 1793 mutex_unlock(&osdc->request_mutex);
1794 up_read(&osdc->map_sem);
1761 1795
1762 if (!already_completed) { 1796 if (!already_completed) {
1763 if (req->r_unsafe_callback && 1797 if (req->r_unsafe_callback &&
@@ -1775,10 +1809,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1775 complete_request(req); 1809 complete_request(req);
1776 } 1810 }
1777 1811
1778done: 1812out:
1779 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1813 dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1780 ceph_osdc_put_request(req); 1814 ceph_osdc_put_request(req);
1781 return; 1815 return;
1816out_unlock:
1817 mutex_unlock(&osdc->request_mutex);
1818 up_read(&osdc->map_sem);
1819 goto out;
1782 1820
1783bad_put: 1821bad_put:
1784 req->r_result = -EIO; 1822 req->r_result = -EIO;
@@ -1791,6 +1829,7 @@ bad_put:
1791 ceph_osdc_put_request(req); 1829 ceph_osdc_put_request(req);
1792bad_mutex: 1830bad_mutex:
1793 mutex_unlock(&osdc->request_mutex); 1831 mutex_unlock(&osdc->request_mutex);
1832 up_read(&osdc->map_sem);
1794bad: 1833bad:
1795 pr_err("corrupt osd_op_reply got %d %d\n", 1834 pr_err("corrupt osd_op_reply got %d %d\n",
1796 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1835 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
@@ -2351,34 +2390,16 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
2351 struct ceph_osd_request *req, 2390 struct ceph_osd_request *req,
2352 bool nofail) 2391 bool nofail)
2353{ 2392{
2354 int rc = 0; 2393 int rc;
2355 2394
2356 down_read(&osdc->map_sem); 2395 down_read(&osdc->map_sem);
2357 mutex_lock(&osdc->request_mutex); 2396 mutex_lock(&osdc->request_mutex);
2358 __register_request(osdc, req); 2397
2359 req->r_sent = 0; 2398 rc = __ceph_osdc_start_request(osdc, req, nofail);
2360 req->r_got_reply = 0; 2399
2361 rc = __map_request(osdc, req, 0);
2362 if (rc < 0) {
2363 if (nofail) {
2364 dout("osdc_start_request failed map, "
2365 " will retry %lld\n", req->r_tid);
2366 rc = 0;
2367 } else {
2368 __unregister_request(osdc, req);
2369 }
2370 goto out_unlock;
2371 }
2372 if (req->r_osd == NULL) {
2373 dout("send_request %p no up osds in pg\n", req);
2374 ceph_monc_request_next_osdmap(&osdc->client->monc);
2375 } else {
2376 __send_queued(osdc);
2377 }
2378 rc = 0;
2379out_unlock:
2380 mutex_unlock(&osdc->request_mutex); 2400 mutex_unlock(&osdc->request_mutex);
2381 up_read(&osdc->map_sem); 2401 up_read(&osdc->map_sem);
2402
2382 return rc; 2403 return rc;
2383} 2404}
2384EXPORT_SYMBOL(ceph_osdc_start_request); 2405EXPORT_SYMBOL(ceph_osdc_start_request);
@@ -2504,9 +2525,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
2504 err = -ENOMEM; 2525 err = -ENOMEM;
2505 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2526 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
2506 if (!osdc->notify_wq) 2527 if (!osdc->notify_wq)
2507 goto out_msgpool; 2528 goto out_msgpool_reply;
2529
2508 return 0; 2530 return 0;
2509 2531
2532out_msgpool_reply:
2533 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
2510out_msgpool: 2534out_msgpool:
2511 ceph_msgpool_destroy(&osdc->msgpool_op); 2535 ceph_msgpool_destroy(&osdc->msgpool_op);
2512out_mempool: 2536out_mempool: