summaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2019-06-06 11:14:49 -0400
committerIlya Dryomov <idryomov@gmail.com>2019-07-08 08:01:45 -0400
commit637cd060537d0c40bcf4f164b8a2c6e9b747e1ad (patch)
tree0f3a8d38285d2013f97c0d54fbb2ff1d3da0439c /drivers/block
parente1fddc8fdd22ed5a55fc7e7a81437c4663c7ba8c (diff)
rbd: new exclusive lock wait/wake code
rbd_wait_state_locked() is built around rbd_dev->lock_waitq and blocks rbd worker threads while waiting for the lock, potentially impacting other rbd devices. There is no good way to pass an error code into image request state machines when acquisition fails, hence the use of RBD_DEV_FLAG_BLACKLISTED for everything and various other issues. Introduce rbd_dev->acquiring_list and move acquisition into image request state machine. Use rbd_img_schedule() for kicking and passing error codes. No blocking occurs while waiting for the lock, but rbd_dev->lock_rwsem is still held across lock, unlock and set_cookie calls. Always acquire the lock on "rbd map" to avoid associating the latency of acquiring the lock with the first I/O request. A slight regression is that lock_timeout is now respected only if lock acquisition is triggered by "rbd map" and not by I/O. This is somewhat compensated by the fact that we no longer block if the peer refuses to release lock -- I/O is failed with EROFS right away. Signed-off-by: Ilya Dryomov <idryomov@gmail.com> Reviewed-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c329
1 files changed, 186 insertions, 143 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index a1bb8f3100a8..6d1df82eb883 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -312,6 +312,7 @@ enum img_req_flags {
312 312
313enum rbd_img_state { 313enum rbd_img_state {
314 RBD_IMG_START = 1, 314 RBD_IMG_START = 1,
315 RBD_IMG_EXCLUSIVE_LOCK,
315 __RBD_IMG_OBJECT_REQUESTS, 316 __RBD_IMG_OBJECT_REQUESTS,
316 RBD_IMG_OBJECT_REQUESTS, 317 RBD_IMG_OBJECT_REQUESTS,
317}; 318};
@@ -412,9 +413,11 @@ struct rbd_device {
412 struct delayed_work lock_dwork; 413 struct delayed_work lock_dwork;
413 struct work_struct unlock_work; 414 struct work_struct unlock_work;
414 spinlock_t lock_lists_lock; 415 spinlock_t lock_lists_lock;
416 struct list_head acquiring_list;
415 struct list_head running_list; 417 struct list_head running_list;
418 struct completion acquire_wait;
419 int acquire_err;
416 struct completion releasing_wait; 420 struct completion releasing_wait;
417 wait_queue_head_t lock_waitq;
418 421
419 struct workqueue_struct *task_wq; 422 struct workqueue_struct *task_wq;
420 423
@@ -442,12 +445,10 @@ struct rbd_device {
442 * Flag bits for rbd_dev->flags: 445 * Flag bits for rbd_dev->flags:
443 * - REMOVING (which is coupled with rbd_dev->open_count) is protected 446 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
444 * by rbd_dev->lock 447 * by rbd_dev->lock
445 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
446 */ 448 */
447enum rbd_dev_flags { 449enum rbd_dev_flags {
448 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ 450 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
449 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 451 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
450 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
451}; 452};
452 453
453static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ 454static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
@@ -500,6 +501,8 @@ static int minor_to_rbd_dev_id(int minor)
500 501
501static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) 502static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
502{ 503{
504 lockdep_assert_held(&rbd_dev->lock_rwsem);
505
503 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || 506 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
504 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; 507 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
505} 508}
@@ -2895,15 +2898,21 @@ static bool need_exclusive_lock(struct rbd_img_request *img_req)
2895 return rbd_img_is_write(img_req); 2898 return rbd_img_is_write(img_req);
2896} 2899}
2897 2900
2898static void rbd_lock_add_request(struct rbd_img_request *img_req) 2901static bool rbd_lock_add_request(struct rbd_img_request *img_req)
2899{ 2902{
2900 struct rbd_device *rbd_dev = img_req->rbd_dev; 2903 struct rbd_device *rbd_dev = img_req->rbd_dev;
2904 bool locked;
2901 2905
2902 lockdep_assert_held(&rbd_dev->lock_rwsem); 2906 lockdep_assert_held(&rbd_dev->lock_rwsem);
2907 locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
2903 spin_lock(&rbd_dev->lock_lists_lock); 2908 spin_lock(&rbd_dev->lock_lists_lock);
2904 rbd_assert(list_empty(&img_req->lock_item)); 2909 rbd_assert(list_empty(&img_req->lock_item));
2905 list_add_tail(&img_req->lock_item, &rbd_dev->running_list); 2910 if (!locked)
2911 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
2912 else
2913 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
2906 spin_unlock(&rbd_dev->lock_lists_lock); 2914 spin_unlock(&rbd_dev->lock_lists_lock);
2915 return locked;
2907} 2916}
2908 2917
2909static void rbd_lock_del_request(struct rbd_img_request *img_req) 2918static void rbd_lock_del_request(struct rbd_img_request *img_req)
@@ -2922,6 +2931,30 @@ static void rbd_lock_del_request(struct rbd_img_request *img_req)
2922 complete(&rbd_dev->releasing_wait); 2931 complete(&rbd_dev->releasing_wait);
2923} 2932}
2924 2933
2934static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
2935{
2936 struct rbd_device *rbd_dev = img_req->rbd_dev;
2937
2938 if (!need_exclusive_lock(img_req))
2939 return 1;
2940
2941 if (rbd_lock_add_request(img_req))
2942 return 1;
2943
2944 if (rbd_dev->opts->exclusive) {
2945 WARN_ON(1); /* lock got released? */
2946 return -EROFS;
2947 }
2948
2949 /*
2950 * Note the use of mod_delayed_work() in rbd_acquire_lock()
2951 * and cancel_delayed_work() in wake_lock_waiters().
2952 */
2953 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
2954 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
2955 return 0;
2956}
2957
2925static void rbd_img_object_requests(struct rbd_img_request *img_req) 2958static void rbd_img_object_requests(struct rbd_img_request *img_req)
2926{ 2959{
2927 struct rbd_obj_request *obj_req; 2960 struct rbd_obj_request *obj_req;
@@ -2944,11 +2977,30 @@ static void rbd_img_object_requests(struct rbd_img_request *img_req)
2944 2977
2945static bool rbd_img_advance(struct rbd_img_request *img_req, int *result) 2978static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
2946{ 2979{
2980 struct rbd_device *rbd_dev = img_req->rbd_dev;
2981 int ret;
2982
2947again: 2983again:
2948 switch (img_req->state) { 2984 switch (img_req->state) {
2949 case RBD_IMG_START: 2985 case RBD_IMG_START:
2950 rbd_assert(!*result); 2986 rbd_assert(!*result);
2951 2987
2988 ret = rbd_img_exclusive_lock(img_req);
2989 if (ret < 0) {
2990 *result = ret;
2991 return true;
2992 }
2993 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
2994 if (ret > 0)
2995 goto again;
2996 return false;
2997 case RBD_IMG_EXCLUSIVE_LOCK:
2998 if (*result)
2999 return true;
3000
3001 rbd_assert(!need_exclusive_lock(img_req) ||
3002 __rbd_is_lock_owner(rbd_dev));
3003
2952 rbd_img_object_requests(img_req); 3004 rbd_img_object_requests(img_req);
2953 if (!img_req->pending.num_pending) { 3005 if (!img_req->pending.num_pending) {
2954 *result = img_req->pending.result; 3006 *result = img_req->pending.result;
@@ -3107,7 +3159,7 @@ static void rbd_unlock(struct rbd_device *rbd_dev)
3107 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3159 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3108 RBD_LOCK_NAME, rbd_dev->lock_cookie); 3160 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3109 if (ret && ret != -ENOENT) 3161 if (ret && ret != -ENOENT)
3110 rbd_warn(rbd_dev, "failed to unlock: %d", ret); 3162 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3111 3163
3112 /* treat errors as the image is unlocked */ 3164 /* treat errors as the image is unlocked */
3113 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 3165 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
@@ -3234,15 +3286,34 @@ e_inval:
3234 goto out; 3286 goto out;
3235} 3287}
3236 3288
3237static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) 3289/*
3290 * Either image request state machine(s) or rbd_add_acquire_lock()
3291 * (i.e. "rbd map").
3292 */
3293static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3238{ 3294{
3239 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); 3295 struct rbd_img_request *img_req;
3296
3297 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3298 lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem);
3240 3299
3241 cancel_delayed_work(&rbd_dev->lock_dwork); 3300 cancel_delayed_work(&rbd_dev->lock_dwork);
3242 if (wake_all) 3301 if (!completion_done(&rbd_dev->acquire_wait)) {
3243 wake_up_all(&rbd_dev->lock_waitq); 3302 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3244 else 3303 list_empty(&rbd_dev->running_list));
3245 wake_up(&rbd_dev->lock_waitq); 3304 rbd_dev->acquire_err = result;
3305 complete_all(&rbd_dev->acquire_wait);
3306 return;
3307 }
3308
3309 list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3310 mutex_lock(&img_req->state_mutex);
3311 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3312 rbd_img_schedule(img_req, result);
3313 mutex_unlock(&img_req->state_mutex);
3314 }
3315
3316 list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3246} 3317}
3247 3318
3248static int get_lock_owner_info(struct rbd_device *rbd_dev, 3319static int get_lock_owner_info(struct rbd_device *rbd_dev,
@@ -3357,11 +3428,8 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
3357 goto again; 3428 goto again;
3358 3429
3359 ret = find_watcher(rbd_dev, lockers); 3430 ret = find_watcher(rbd_dev, lockers);
3360 if (ret) { 3431 if (ret)
3361 if (ret > 0) 3432 goto out; /* request lock or error */
3362 ret = 0; /* have to request lock */
3363 goto out;
3364 }
3365 3433
3366 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", 3434 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3367 ENTITY_NAME(lockers[0].id.name)); 3435 ENTITY_NAME(lockers[0].id.name));
@@ -3391,52 +3459,65 @@ out:
3391} 3459}
3392 3460
3393/* 3461/*
3394 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED 3462 * Return:
3463 * 0 - lock acquired
3464 * 1 - caller should call rbd_request_lock()
3465 * <0 - error
3395 */ 3466 */
3396static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, 3467static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
3397 int *pret)
3398{ 3468{
3399 enum rbd_lock_state lock_state; 3469 int ret;
3400 3470
3401 down_read(&rbd_dev->lock_rwsem); 3471 down_read(&rbd_dev->lock_rwsem);
3402 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 3472 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3403 rbd_dev->lock_state); 3473 rbd_dev->lock_state);
3404 if (__rbd_is_lock_owner(rbd_dev)) { 3474 if (__rbd_is_lock_owner(rbd_dev)) {
3405 lock_state = rbd_dev->lock_state;
3406 up_read(&rbd_dev->lock_rwsem); 3475 up_read(&rbd_dev->lock_rwsem);
3407 return lock_state; 3476 return 0;
3408 } 3477 }
3409 3478
3410 up_read(&rbd_dev->lock_rwsem); 3479 up_read(&rbd_dev->lock_rwsem);
3411 down_write(&rbd_dev->lock_rwsem); 3480 down_write(&rbd_dev->lock_rwsem);
3412 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 3481 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3413 rbd_dev->lock_state); 3482 rbd_dev->lock_state);
3414 if (!__rbd_is_lock_owner(rbd_dev)) { 3483 if (__rbd_is_lock_owner(rbd_dev)) {
3415 *pret = rbd_try_lock(rbd_dev); 3484 up_write(&rbd_dev->lock_rwsem);
3416 if (*pret) 3485 return 0;
3417 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3418 } 3486 }
3419 3487
3420 lock_state = rbd_dev->lock_state; 3488 ret = rbd_try_lock(rbd_dev);
3489 if (ret < 0) {
3490 rbd_warn(rbd_dev, "failed to lock header: %d", ret);
3491 if (ret == -EBLACKLISTED)
3492 goto out;
3493
3494 ret = 1; /* request lock anyway */
3495 }
3496 if (ret > 0) {
3497 up_write(&rbd_dev->lock_rwsem);
3498 return ret;
3499 }
3500
3501 rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
3502 rbd_assert(list_empty(&rbd_dev->running_list));
3503
3504out:
3505 wake_lock_waiters(rbd_dev, ret);
3421 up_write(&rbd_dev->lock_rwsem); 3506 up_write(&rbd_dev->lock_rwsem);
3422 return lock_state; 3507 return ret;
3423} 3508}
3424 3509
3425static void rbd_acquire_lock(struct work_struct *work) 3510static void rbd_acquire_lock(struct work_struct *work)
3426{ 3511{
3427 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3512 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3428 struct rbd_device, lock_dwork); 3513 struct rbd_device, lock_dwork);
3429 enum rbd_lock_state lock_state; 3514 int ret;
3430 int ret = 0;
3431 3515
3432 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3516 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3433again: 3517again:
3434 lock_state = rbd_try_acquire_lock(rbd_dev, &ret); 3518 ret = rbd_try_acquire_lock(rbd_dev);
3435 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { 3519 if (ret <= 0) {
3436 if (lock_state == RBD_LOCK_STATE_LOCKED) 3520 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
3437 wake_requests(rbd_dev, true);
3438 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3439 rbd_dev, lock_state, ret);
3440 return; 3521 return;
3441 } 3522 }
3442 3523
@@ -3445,16 +3526,9 @@ again:
3445 goto again; /* treat this as a dead client */ 3526 goto again; /* treat this as a dead client */
3446 } else if (ret == -EROFS) { 3527 } else if (ret == -EROFS) {
3447 rbd_warn(rbd_dev, "peer will not release lock"); 3528 rbd_warn(rbd_dev, "peer will not release lock");
3448 /* 3529 down_write(&rbd_dev->lock_rwsem);
3449 * If this is rbd_add_acquire_lock(), we want to fail 3530 wake_lock_waiters(rbd_dev, ret);
3450 * immediately -- reuse BLACKLISTED flag. Otherwise we 3531 up_write(&rbd_dev->lock_rwsem);
3451 * want to block.
3452 */
3453 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3454 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3455 /* wake "rbd map --exclusive" process */
3456 wake_requests(rbd_dev, false);
3457 }
3458 } else if (ret < 0) { 3532 } else if (ret < 0) {
3459 rbd_warn(rbd_dev, "error requesting lock: %d", ret); 3533 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3460 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3534 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
@@ -3519,10 +3593,10 @@ static void rbd_release_lock(struct rbd_device *rbd_dev)
3519 3593
3520 /* 3594 /*
3521 * Give others a chance to grab the lock - we would re-acquire 3595 * Give others a chance to grab the lock - we would re-acquire
3522 * almost immediately if we got new IO during ceph_osdc_sync() 3596 * almost immediately if we got new IO while draining the running
3523 * otherwise. We need to ack our own notifications, so this 3597 * list otherwise. We need to ack our own notifications, so this
3524 * lock_dwork will be requeued from rbd_wait_state_locked() 3598 * lock_dwork will be requeued from rbd_handle_released_lock() by
3525 * after wake_requests() in rbd_handle_released_lock(). 3599 * way of maybe_kick_acquire().
3526 */ 3600 */
3527 cancel_delayed_work(&rbd_dev->lock_dwork); 3601 cancel_delayed_work(&rbd_dev->lock_dwork);
3528} 3602}
@@ -3537,6 +3611,23 @@ static void rbd_release_lock_work(struct work_struct *work)
3537 up_write(&rbd_dev->lock_rwsem); 3611 up_write(&rbd_dev->lock_rwsem);
3538} 3612}
3539 3613
3614static void maybe_kick_acquire(struct rbd_device *rbd_dev)
3615{
3616 bool have_requests;
3617
3618 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3619 if (__rbd_is_lock_owner(rbd_dev))
3620 return;
3621
3622 spin_lock(&rbd_dev->lock_lists_lock);
3623 have_requests = !list_empty(&rbd_dev->acquiring_list);
3624 spin_unlock(&rbd_dev->lock_lists_lock);
3625 if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
3626 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
3627 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3628 }
3629}
3630
3540static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, 3631static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3541 void **p) 3632 void **p)
3542{ 3633{
@@ -3566,8 +3657,7 @@ static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3566 down_read(&rbd_dev->lock_rwsem); 3657 down_read(&rbd_dev->lock_rwsem);
3567 } 3658 }
3568 3659
3569 if (!__rbd_is_lock_owner(rbd_dev)) 3660 maybe_kick_acquire(rbd_dev);
3570 wake_requests(rbd_dev, false);
3571 up_read(&rbd_dev->lock_rwsem); 3661 up_read(&rbd_dev->lock_rwsem);
3572} 3662}
3573 3663
@@ -3599,8 +3689,7 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3599 down_read(&rbd_dev->lock_rwsem); 3689 down_read(&rbd_dev->lock_rwsem);
3600 } 3690 }
3601 3691
3602 if (!__rbd_is_lock_owner(rbd_dev)) 3692 maybe_kick_acquire(rbd_dev);
3603 wake_requests(rbd_dev, false);
3604 up_read(&rbd_dev->lock_rwsem); 3693 up_read(&rbd_dev->lock_rwsem);
3605} 3694}
3606 3695
@@ -3850,7 +3939,6 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3850 3939
3851static void rbd_unregister_watch(struct rbd_device *rbd_dev) 3940static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3852{ 3941{
3853 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3854 cancel_tasks_sync(rbd_dev); 3942 cancel_tasks_sync(rbd_dev);
3855 3943
3856 mutex_lock(&rbd_dev->watch_mutex); 3944 mutex_lock(&rbd_dev->watch_mutex);
@@ -3893,6 +3981,7 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3893 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 3981 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3894 } else { 3982 } else {
3895 __rbd_lock(rbd_dev, cookie); 3983 __rbd_lock(rbd_dev, cookie);
3984 wake_lock_waiters(rbd_dev, 0);
3896 } 3985 }
3897} 3986}
3898 3987
@@ -3913,15 +4002,18 @@ static void rbd_reregister_watch(struct work_struct *work)
3913 ret = __rbd_register_watch(rbd_dev); 4002 ret = __rbd_register_watch(rbd_dev);
3914 if (ret) { 4003 if (ret) {
3915 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); 4004 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3916 if (ret == -EBLACKLISTED || ret == -ENOENT) { 4005 if (ret != -EBLACKLISTED && ret != -ENOENT) {
3917 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3918 wake_requests(rbd_dev, true);
3919 } else {
3920 queue_delayed_work(rbd_dev->task_wq, 4006 queue_delayed_work(rbd_dev->task_wq,
3921 &rbd_dev->watch_dwork, 4007 &rbd_dev->watch_dwork,
3922 RBD_RETRY_DELAY); 4008 RBD_RETRY_DELAY);
4009 mutex_unlock(&rbd_dev->watch_mutex);
4010 return;
3923 } 4011 }
4012
3924 mutex_unlock(&rbd_dev->watch_mutex); 4013 mutex_unlock(&rbd_dev->watch_mutex);
4014 down_write(&rbd_dev->lock_rwsem);
4015 wake_lock_waiters(rbd_dev, ret);
4016 up_write(&rbd_dev->lock_rwsem);
3925 return; 4017 return;
3926 } 4018 }
3927 4019
@@ -3996,54 +4088,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3996 return ret; 4088 return ret;
3997} 4089}
3998 4090
3999/*
4000 * lock_rwsem must be held for read
4001 */
4002static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
4003{
4004 DEFINE_WAIT(wait);
4005 unsigned long timeout;
4006 int ret = 0;
4007
4008 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
4009 return -EBLACKLISTED;
4010
4011 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4012 return 0;
4013
4014 if (!may_acquire) {
4015 rbd_warn(rbd_dev, "exclusive lock required");
4016 return -EROFS;
4017 }
4018
4019 do {
4020 /*
4021 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4022 * and cancel_delayed_work() in wake_requests().
4023 */
4024 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4025 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4026 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4027 TASK_UNINTERRUPTIBLE);
4028 up_read(&rbd_dev->lock_rwsem);
4029 timeout = schedule_timeout(ceph_timeout_jiffies(
4030 rbd_dev->opts->lock_timeout));
4031 down_read(&rbd_dev->lock_rwsem);
4032 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4033 ret = -EBLACKLISTED;
4034 break;
4035 }
4036 if (!timeout) {
4037 rbd_warn(rbd_dev, "timed out waiting for lock");
4038 ret = -ETIMEDOUT;
4039 break;
4040 }
4041 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
4042
4043 finish_wait(&rbd_dev->lock_waitq, &wait);
4044 return ret;
4045}
4046
4047static void rbd_queue_workfn(struct work_struct *work) 4091static void rbd_queue_workfn(struct work_struct *work)
4048{ 4092{
4049 struct request *rq = blk_mq_rq_from_pdu(work); 4093 struct request *rq = blk_mq_rq_from_pdu(work);
@@ -4054,7 +4098,6 @@ static void rbd_queue_workfn(struct work_struct *work)
4054 u64 length = blk_rq_bytes(rq); 4098 u64 length = blk_rq_bytes(rq);
4055 enum obj_operation_type op_type; 4099 enum obj_operation_type op_type;
4056 u64 mapping_size; 4100 u64 mapping_size;
4057 bool must_be_locked;
4058 int result; 4101 int result;
4059 4102
4060 switch (req_op(rq)) { 4103 switch (req_op(rq)) {
@@ -4128,21 +4171,10 @@ static void rbd_queue_workfn(struct work_struct *work)
4128 goto err_rq; 4171 goto err_rq;
4129 } 4172 }
4130 4173
4131 must_be_locked =
4132 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4133 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4134 if (must_be_locked) {
4135 down_read(&rbd_dev->lock_rwsem);
4136 result = rbd_wait_state_locked(rbd_dev,
4137 !rbd_dev->opts->exclusive);
4138 if (result)
4139 goto err_unlock;
4140 }
4141
4142 img_request = rbd_img_request_create(rbd_dev, op_type, snapc); 4174 img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
4143 if (!img_request) { 4175 if (!img_request) {
4144 result = -ENOMEM; 4176 result = -ENOMEM;
4145 goto err_unlock; 4177 goto err_rq;
4146 } 4178 }
4147 img_request->rq = rq; 4179 img_request->rq = rq;
4148 snapc = NULL; /* img_request consumes a ref */ 4180 snapc = NULL; /* img_request consumes a ref */
@@ -4155,19 +4187,11 @@ static void rbd_queue_workfn(struct work_struct *work)
4155 if (result) 4187 if (result)
4156 goto err_img_request; 4188 goto err_img_request;
4157 4189
4158 if (must_be_locked) {
4159 rbd_lock_add_request(img_request);
4160 up_read(&rbd_dev->lock_rwsem);
4161 }
4162
4163 rbd_img_handle_request(img_request, 0); 4190 rbd_img_handle_request(img_request, 0);
4164 return; 4191 return;
4165 4192
4166err_img_request: 4193err_img_request:
4167 rbd_img_request_put(img_request); 4194 rbd_img_request_put(img_request);
4168err_unlock:
4169 if (must_be_locked)
4170 up_read(&rbd_dev->lock_rwsem);
4171err_rq: 4195err_rq:
4172 if (result) 4196 if (result)
4173 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 4197 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
@@ -4835,9 +4859,10 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4835 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); 4859 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4836 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); 4860 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4837 spin_lock_init(&rbd_dev->lock_lists_lock); 4861 spin_lock_init(&rbd_dev->lock_lists_lock);
4862 INIT_LIST_HEAD(&rbd_dev->acquiring_list);
4838 INIT_LIST_HEAD(&rbd_dev->running_list); 4863 INIT_LIST_HEAD(&rbd_dev->running_list);
4864 init_completion(&rbd_dev->acquire_wait);
4839 init_completion(&rbd_dev->releasing_wait); 4865 init_completion(&rbd_dev->releasing_wait);
4840 init_waitqueue_head(&rbd_dev->lock_waitq);
4841 4866
4842 rbd_dev->dev.bus = &rbd_bus_type; 4867 rbd_dev->dev.bus = &rbd_bus_type;
4843 rbd_dev->dev.type = &rbd_device_type; 4868 rbd_dev->dev.type = &rbd_device_type;
@@ -5857,24 +5882,45 @@ static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5857 up_write(&rbd_dev->lock_rwsem); 5882 up_write(&rbd_dev->lock_rwsem);
5858} 5883}
5859 5884
5885/*
5886 * If the wait is interrupted, an error is returned even if the lock
5887 * was successfully acquired. rbd_dev_image_unlock() will release it
5888 * if needed.
5889 */
5860static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) 5890static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5861{ 5891{
5862 int ret; 5892 long ret;
5863 5893
5864 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { 5894 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5895 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
5896 return 0;
5897
5865 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); 5898 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5866 return -EINVAL; 5899 return -EINVAL;
5867 } 5900 }
5868 5901
5869 /* FIXME: "rbd map --exclusive" should be in interruptible */ 5902 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5870 down_read(&rbd_dev->lock_rwsem); 5903 return 0;
5871 ret = rbd_wait_state_locked(rbd_dev, true); 5904
5872 up_read(&rbd_dev->lock_rwsem); 5905 rbd_assert(!rbd_is_lock_owner(rbd_dev));
5906 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
5907 ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
5908 ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
5909 if (ret > 0)
5910 ret = rbd_dev->acquire_err;
5911 else if (!ret)
5912 ret = -ETIMEDOUT;
5913
5873 if (ret) { 5914 if (ret) {
5874 rbd_warn(rbd_dev, "failed to acquire exclusive lock"); 5915 rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
5875 return -EROFS; 5916 return ret;
5876 } 5917 }
5877 5918
5919 /*
5920 * The lock may have been released by now, unless automatic lock
5921 * transitions are disabled.
5922 */
5923 rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
5878 return 0; 5924 return 0;
5879} 5925}
5880 5926
@@ -6319,11 +6365,9 @@ static ssize_t do_rbd_add(struct bus_type *bus,
6319 if (rc) 6365 if (rc)
6320 goto err_out_image_probe; 6366 goto err_out_image_probe;
6321 6367
6322 if (rbd_dev->opts->exclusive) { 6368 rc = rbd_add_acquire_lock(rbd_dev);
6323 rc = rbd_add_acquire_lock(rbd_dev); 6369 if (rc)
6324 if (rc) 6370 goto err_out_image_lock;
6325 goto err_out_device_setup;
6326 }
6327 6371
6328 /* Everything's ready. Announce the disk to the world. */ 6372 /* Everything's ready. Announce the disk to the world. */
6329 6373
@@ -6349,7 +6393,6 @@ out:
6349 6393
6350err_out_image_lock: 6394err_out_image_lock:
6351 rbd_dev_image_unlock(rbd_dev); 6395 rbd_dev_image_unlock(rbd_dev);
6352err_out_device_setup:
6353 rbd_dev_device_release(rbd_dev); 6396 rbd_dev_device_release(rbd_dev);
6354err_out_image_probe: 6397err_out_image_probe:
6355 rbd_dev_image_release(rbd_dev); 6398 rbd_dev_image_release(rbd_dev);