summaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2019-05-30 10:07:48 -0400
committerIlya Dryomov <idryomov@gmail.com>2019-07-08 08:01:45 -0400
commite1fddc8fdd22ed5a55fc7e7a81437c4663c7ba8c (patch)
tree8b89d209d035aa2538c4bf32facf32f4e0486e92 /drivers/block
parenta2b1da09793d003410b57f96eaf7e83e43b7a50a (diff)
rbd: quiescing lock should wait for image requests
Syncing OSD requests doesn't really work. A single image request may be comprised of multiple object requests, each of which can go through a series of OSD requests (original, copyups, etc). On top of that, the OSD cliest may be shared with other rbd devices. What we want is to ensure that all in-flight image requests complete. Introduce rbd_dev->running_list and block in RBD_LOCK_STATE_RELEASING until that happens. New OSD requests may be started during this time. Note that __rbd_img_handle_request() acquires rbd_dev->lock_rwsem only if need_exclusive_lock() returns true. This avoids a deadlock similar to the one outlined in the previous commit between unlock and I/O that doesn't require lock, such as a read with object-map feature disabled. Signed-off-by: Ilya Dryomov <idryomov@gmail.com> Reviewed-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c104
1 files changed, 90 insertions, 14 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4ae1cdf40b27..a1bb8f3100a8 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -331,6 +331,7 @@ struct rbd_img_request {
331 struct rbd_obj_request *obj_request; /* obj req initiator */ 331 struct rbd_obj_request *obj_request; /* obj req initiator */
332 }; 332 };
333 333
334 struct list_head lock_item;
334 struct list_head object_extents; /* obj_req.ex structs */ 335 struct list_head object_extents; /* obj_req.ex structs */
335 336
336 struct mutex state_mutex; 337 struct mutex state_mutex;
@@ -410,6 +411,9 @@ struct rbd_device {
410 struct work_struct released_lock_work; 411 struct work_struct released_lock_work;
411 struct delayed_work lock_dwork; 412 struct delayed_work lock_dwork;
412 struct work_struct unlock_work; 413 struct work_struct unlock_work;
414 spinlock_t lock_lists_lock;
415 struct list_head running_list;
416 struct completion releasing_wait;
413 wait_queue_head_t lock_waitq; 417 wait_queue_head_t lock_waitq;
414 418
415 struct workqueue_struct *task_wq; 419 struct workqueue_struct *task_wq;
@@ -1726,6 +1730,7 @@ static struct rbd_img_request *rbd_img_request_create(
1726 if (rbd_dev_parent_get(rbd_dev)) 1730 if (rbd_dev_parent_get(rbd_dev))
1727 img_request_layered_set(img_request); 1731 img_request_layered_set(img_request);
1728 1732
1733 INIT_LIST_HEAD(&img_request->lock_item);
1729 INIT_LIST_HEAD(&img_request->object_extents); 1734 INIT_LIST_HEAD(&img_request->object_extents);
1730 mutex_init(&img_request->state_mutex); 1735 mutex_init(&img_request->state_mutex);
1731 kref_init(&img_request->kref); 1736 kref_init(&img_request->kref);
@@ -1745,6 +1750,7 @@ static void rbd_img_request_destroy(struct kref *kref)
1745 1750
1746 dout("%s: img %p\n", __func__, img_request); 1751 dout("%s: img %p\n", __func__, img_request);
1747 1752
1753 WARN_ON(!list_empty(&img_request->lock_item));
1748 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1754 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1749 rbd_img_obj_request_del(img_request, obj_request); 1755 rbd_img_obj_request_del(img_request, obj_request);
1750 1756
@@ -2872,6 +2878,50 @@ static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
2872 rbd_img_handle_request(obj_req->img_request, result); 2878 rbd_img_handle_request(obj_req->img_request, result);
2873} 2879}
2874 2880
2881static bool need_exclusive_lock(struct rbd_img_request *img_req)
2882{
2883 struct rbd_device *rbd_dev = img_req->rbd_dev;
2884
2885 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
2886 return false;
2887
2888 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2889 return false;
2890
2891 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2892 if (rbd_dev->opts->lock_on_read)
2893 return true;
2894
2895 return rbd_img_is_write(img_req);
2896}
2897
2898static void rbd_lock_add_request(struct rbd_img_request *img_req)
2899{
2900 struct rbd_device *rbd_dev = img_req->rbd_dev;
2901
2902 lockdep_assert_held(&rbd_dev->lock_rwsem);
2903 spin_lock(&rbd_dev->lock_lists_lock);
2904 rbd_assert(list_empty(&img_req->lock_item));
2905 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
2906 spin_unlock(&rbd_dev->lock_lists_lock);
2907}
2908
2909static void rbd_lock_del_request(struct rbd_img_request *img_req)
2910{
2911 struct rbd_device *rbd_dev = img_req->rbd_dev;
2912 bool need_wakeup;
2913
2914 lockdep_assert_held(&rbd_dev->lock_rwsem);
2915 spin_lock(&rbd_dev->lock_lists_lock);
2916 rbd_assert(!list_empty(&img_req->lock_item));
2917 list_del_init(&img_req->lock_item);
2918 need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
2919 list_empty(&rbd_dev->running_list));
2920 spin_unlock(&rbd_dev->lock_lists_lock);
2921 if (need_wakeup)
2922 complete(&rbd_dev->releasing_wait);
2923}
2924
2875static void rbd_img_object_requests(struct rbd_img_request *img_req) 2925static void rbd_img_object_requests(struct rbd_img_request *img_req)
2876{ 2926{
2877 struct rbd_obj_request *obj_req; 2927 struct rbd_obj_request *obj_req;
@@ -2927,9 +2977,19 @@ static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
2927 struct rbd_device *rbd_dev = img_req->rbd_dev; 2977 struct rbd_device *rbd_dev = img_req->rbd_dev;
2928 bool done; 2978 bool done;
2929 2979
2930 mutex_lock(&img_req->state_mutex); 2980 if (need_exclusive_lock(img_req)) {
2931 done = rbd_img_advance(img_req, result); 2981 down_read(&rbd_dev->lock_rwsem);
2932 mutex_unlock(&img_req->state_mutex); 2982 mutex_lock(&img_req->state_mutex);
2983 done = rbd_img_advance(img_req, result);
2984 if (done)
2985 rbd_lock_del_request(img_req);
2986 mutex_unlock(&img_req->state_mutex);
2987 up_read(&rbd_dev->lock_rwsem);
2988 } else {
2989 mutex_lock(&img_req->state_mutex);
2990 done = rbd_img_advance(img_req, result);
2991 mutex_unlock(&img_req->state_mutex);
2992 }
2933 2993
2934 if (done && *result) { 2994 if (done && *result) {
2935 rbd_assert(*result < 0); 2995 rbd_assert(*result < 0);
@@ -3413,30 +3473,40 @@ again:
3413 3473
3414static bool rbd_quiesce_lock(struct rbd_device *rbd_dev) 3474static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
3415{ 3475{
3476 bool need_wait;
3477
3416 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3478 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3417 lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem); 3479 lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem);
3418 3480
3419 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) 3481 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3420 return false; 3482 return false;
3421 3483
3422 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3423 downgrade_write(&rbd_dev->lock_rwsem);
3424 /* 3484 /*
3425 * Ensure that all in-flight IO is flushed. 3485 * Ensure that all in-flight IO is flushed.
3426 *
3427 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3428 * may be shared with other devices.
3429 */ 3486 */
3430 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); 3487 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3488 rbd_assert(!completion_done(&rbd_dev->releasing_wait));
3489 need_wait = !list_empty(&rbd_dev->running_list);
3490 downgrade_write(&rbd_dev->lock_rwsem);
3491 if (need_wait)
3492 wait_for_completion(&rbd_dev->releasing_wait);
3431 up_read(&rbd_dev->lock_rwsem); 3493 up_read(&rbd_dev->lock_rwsem);
3432 3494
3433 down_write(&rbd_dev->lock_rwsem); 3495 down_write(&rbd_dev->lock_rwsem);
3434 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) 3496 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3435 return false; 3497 return false;
3436 3498
3499 rbd_assert(list_empty(&rbd_dev->running_list));
3437 return true; 3500 return true;
3438} 3501}
3439 3502
3503static void __rbd_release_lock(struct rbd_device *rbd_dev)
3504{
3505 rbd_assert(list_empty(&rbd_dev->running_list));
3506
3507 rbd_unlock(rbd_dev);
3508}
3509
3440/* 3510/*
3441 * lock_rwsem must be held for write 3511 * lock_rwsem must be held for write
3442 */ 3512 */
@@ -3445,7 +3515,7 @@ static void rbd_release_lock(struct rbd_device *rbd_dev)
3445 if (!rbd_quiesce_lock(rbd_dev)) 3515 if (!rbd_quiesce_lock(rbd_dev))
3446 return; 3516 return;
3447 3517
3448 rbd_unlock(rbd_dev); 3518 __rbd_release_lock(rbd_dev);
3449 3519
3450 /* 3520 /*
3451 * Give others a chance to grab the lock - we would re-acquire 3521 * Give others a chance to grab the lock - we would re-acquire
@@ -3819,7 +3889,7 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3819 * Lock cookie cannot be updated on older OSDs, so do 3889 * Lock cookie cannot be updated on older OSDs, so do
3820 * a manual release and queue an acquire. 3890 * a manual release and queue an acquire.
3821 */ 3891 */
3822 rbd_unlock(rbd_dev); 3892 __rbd_release_lock(rbd_dev);
3823 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 3893 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3824 } else { 3894 } else {
3825 __rbd_lock(rbd_dev, cookie); 3895 __rbd_lock(rbd_dev, cookie);
@@ -4085,9 +4155,12 @@ static void rbd_queue_workfn(struct work_struct *work)
4085 if (result) 4155 if (result)
4086 goto err_img_request; 4156 goto err_img_request;
4087 4157
4088 rbd_img_handle_request(img_request, 0); 4158 if (must_be_locked) {
4089 if (must_be_locked) 4159 rbd_lock_add_request(img_request);
4090 up_read(&rbd_dev->lock_rwsem); 4160 up_read(&rbd_dev->lock_rwsem);
4161 }
4162
4163 rbd_img_handle_request(img_request, 0);
4091 return; 4164 return;
4092 4165
4093err_img_request: 4166err_img_request:
@@ -4761,6 +4834,9 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4761 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); 4834 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4762 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); 4835 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4763 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); 4836 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4837 spin_lock_init(&rbd_dev->lock_lists_lock);
4838 INIT_LIST_HEAD(&rbd_dev->running_list);
4839 init_completion(&rbd_dev->releasing_wait);
4764 init_waitqueue_head(&rbd_dev->lock_waitq); 4840 init_waitqueue_head(&rbd_dev->lock_waitq);
4765 4841
4766 rbd_dev->dev.bus = &rbd_bus_type; 4842 rbd_dev->dev.bus = &rbd_bus_type;
@@ -5777,7 +5853,7 @@ static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5777{ 5853{
5778 down_write(&rbd_dev->lock_rwsem); 5854 down_write(&rbd_dev->lock_rwsem);
5779 if (__rbd_is_lock_owner(rbd_dev)) 5855 if (__rbd_is_lock_owner(rbd_dev))
5780 rbd_unlock(rbd_dev); 5856 __rbd_release_lock(rbd_dev);
5781 up_write(&rbd_dev->lock_rwsem); 5857 up_write(&rbd_dev->lock_rwsem);
5782} 5858}
5783 5859