summaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2017-05-10 11:42:33 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2017-05-10 11:42:33 -0400
commit26c5eaa1326e9703effd01e7cc3cc0d4ad4b3c19 (patch)
tree070c518340ae308dce62695a06a118a1df78be15 /drivers/block
parent1176032cb12bb89ad558a3e57e82f2f25b817eff (diff)
parenteeca958dce0a9231d1969f86196653eb50fcc9b3 (diff)
Merge tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "The two main items are support for disabling automatic rbd exclusive lock transfers from myself and the long awaited -ENOSPC handling series from Jeff. The former will allow rbd users to take advantage of exclusive lock's built-in blacklist/break-lock functionality while staying in control of who owns the lock. With the latter in place, we will abort filesystem writes on -ENOSPC instead of having them block indefinitely. Beyond that we've got the usual pile of filesystem fixes from Zheng, some refcount_t conversion patches from Elena and a patch for an ancient open() flags handling bug from Alexander" * tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client: (31 commits) ceph: fix memory leak in __ceph_setxattr() ceph: fix file open flags on ppc64 ceph: choose readdir frag based on previous readdir reply rbd: exclusive map option rbd: return ResponseMessage result from rbd_handle_request_lock() rbd: kill rbd_is_lock_supported() rbd: support updating the lock cookie without releasing the lock rbd: store lock cookie rbd: ignore unlock errors rbd: fix error handling around rbd_init_disk() rbd: move rbd_unregister_watch() call into rbd_dev_image_release() rbd: move rbd_dev_destroy() call out of rbd_dev_image_release() ceph: when seeing write errors on an inode, switch to sync writes Revert "ceph: SetPageError() for writeback pages if writepages fails" ceph: handle epoch barriers in cap messages libceph: add an epoch_barrier field to struct ceph_osd_client libceph: abort already submitted but abortable requests when map or pool goes full libceph: allow requests to return immediately on full conditions if caller wishes libceph: remove req->r_replay_version ceph: make seeky readdir more efficient ...
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c359
1 files changed, 215 insertions, 144 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 26812c1ed0cf..454bf9c34882 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -387,6 +387,7 @@ struct rbd_device {
387 387
388 struct rw_semaphore lock_rwsem; 388 struct rw_semaphore lock_rwsem;
389 enum rbd_lock_state lock_state; 389 enum rbd_lock_state lock_state;
390 char lock_cookie[32];
390 struct rbd_client_id owner_cid; 391 struct rbd_client_id owner_cid;
391 struct work_struct acquired_lock_work; 392 struct work_struct acquired_lock_work;
392 struct work_struct released_lock_work; 393 struct work_struct released_lock_work;
@@ -477,13 +478,6 @@ static int minor_to_rbd_dev_id(int minor)
477 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; 478 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
478} 479}
479 480
480static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
481{
482 return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
483 rbd_dev->spec->snap_id == CEPH_NOSNAP &&
484 !rbd_dev->mapping.read_only;
485}
486
487static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) 481static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
488{ 482{
489 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || 483 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
@@ -731,7 +725,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
731 kref_init(&rbdc->kref); 725 kref_init(&rbdc->kref);
732 INIT_LIST_HEAD(&rbdc->node); 726 INIT_LIST_HEAD(&rbdc->node);
733 727
734 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); 728 rbdc->client = ceph_create_client(ceph_opts, rbdc);
735 if (IS_ERR(rbdc->client)) 729 if (IS_ERR(rbdc->client))
736 goto out_rbdc; 730 goto out_rbdc;
737 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 731 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
@@ -804,6 +798,7 @@ enum {
804 Opt_read_only, 798 Opt_read_only,
805 Opt_read_write, 799 Opt_read_write,
806 Opt_lock_on_read, 800 Opt_lock_on_read,
801 Opt_exclusive,
807 Opt_err 802 Opt_err
808}; 803};
809 804
@@ -816,6 +811,7 @@ static match_table_t rbd_opts_tokens = {
816 {Opt_read_write, "read_write"}, 811 {Opt_read_write, "read_write"},
817 {Opt_read_write, "rw"}, /* Alternate spelling */ 812 {Opt_read_write, "rw"}, /* Alternate spelling */
818 {Opt_lock_on_read, "lock_on_read"}, 813 {Opt_lock_on_read, "lock_on_read"},
814 {Opt_exclusive, "exclusive"},
819 {Opt_err, NULL} 815 {Opt_err, NULL}
820}; 816};
821 817
@@ -823,11 +819,13 @@ struct rbd_options {
823 int queue_depth; 819 int queue_depth;
824 bool read_only; 820 bool read_only;
825 bool lock_on_read; 821 bool lock_on_read;
822 bool exclusive;
826}; 823};
827 824
828#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ 825#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
829#define RBD_READ_ONLY_DEFAULT false 826#define RBD_READ_ONLY_DEFAULT false
830#define RBD_LOCK_ON_READ_DEFAULT false 827#define RBD_LOCK_ON_READ_DEFAULT false
828#define RBD_EXCLUSIVE_DEFAULT false
831 829
832static int parse_rbd_opts_token(char *c, void *private) 830static int parse_rbd_opts_token(char *c, void *private)
833{ 831{
@@ -866,6 +864,9 @@ static int parse_rbd_opts_token(char *c, void *private)
866 case Opt_lock_on_read: 864 case Opt_lock_on_read:
867 rbd_opts->lock_on_read = true; 865 rbd_opts->lock_on_read = true;
868 break; 866 break;
867 case Opt_exclusive:
868 rbd_opts->exclusive = true;
869 break;
869 default: 870 default:
870 /* libceph prints "bad option" msg */ 871 /* libceph prints "bad option" msg */
871 return -EINVAL; 872 return -EINVAL;
@@ -3079,7 +3080,8 @@ static int rbd_lock(struct rbd_device *rbd_dev)
3079 char cookie[32]; 3080 char cookie[32];
3080 int ret; 3081 int ret;
3081 3082
3082 WARN_ON(__rbd_is_lock_owner(rbd_dev)); 3083 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3084 rbd_dev->lock_cookie[0] != '\0');
3083 3085
3084 format_lock_cookie(rbd_dev, cookie); 3086 format_lock_cookie(rbd_dev, cookie);
3085 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3087 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
@@ -3089,6 +3091,7 @@ static int rbd_lock(struct rbd_device *rbd_dev)
3089 return ret; 3091 return ret;
3090 3092
3091 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; 3093 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3094 strcpy(rbd_dev->lock_cookie, cookie);
3092 rbd_set_owner_cid(rbd_dev, &cid); 3095 rbd_set_owner_cid(rbd_dev, &cid);
3093 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); 3096 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3094 return 0; 3097 return 0;
@@ -3097,27 +3100,24 @@ static int rbd_lock(struct rbd_device *rbd_dev)
3097/* 3100/*
3098 * lock_rwsem must be held for write 3101 * lock_rwsem must be held for write
3099 */ 3102 */
3100static int rbd_unlock(struct rbd_device *rbd_dev) 3103static void rbd_unlock(struct rbd_device *rbd_dev)
3101{ 3104{
3102 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3105 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3103 char cookie[32];
3104 int ret; 3106 int ret;
3105 3107
3106 WARN_ON(!__rbd_is_lock_owner(rbd_dev)); 3108 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3107 3109 rbd_dev->lock_cookie[0] == '\0');
3108 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3109 3110
3110 format_lock_cookie(rbd_dev, cookie);
3111 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3111 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3112 RBD_LOCK_NAME, cookie); 3112 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3113 if (ret && ret != -ENOENT) { 3113 if (ret && ret != -ENOENT)
3114 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret); 3114 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3115 return ret;
3116 }
3117 3115
3116 /* treat errors as the image is unlocked */
3117 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3118 rbd_dev->lock_cookie[0] = '\0';
3118 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3119 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3119 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); 3120 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3120 return 0;
3121} 3121}
3122 3122
3123static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, 3123static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
@@ -3447,6 +3447,18 @@ again:
3447 ret = rbd_request_lock(rbd_dev); 3447 ret = rbd_request_lock(rbd_dev);
3448 if (ret == -ETIMEDOUT) { 3448 if (ret == -ETIMEDOUT) {
3449 goto again; /* treat this as a dead client */ 3449 goto again; /* treat this as a dead client */
3450 } else if (ret == -EROFS) {
3451 rbd_warn(rbd_dev, "peer will not release lock");
3452 /*
3453 * If this is rbd_add_acquire_lock(), we want to fail
3454 * immediately -- reuse BLACKLISTED flag. Otherwise we
3455 * want to block.
3456 */
3457 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3458 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3459 /* wake "rbd map --exclusive" process */
3460 wake_requests(rbd_dev, false);
3461 }
3450 } else if (ret < 0) { 3462 } else if (ret < 0) {
3451 rbd_warn(rbd_dev, "error requesting lock: %d", ret); 3463 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3452 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3464 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
@@ -3490,16 +3502,15 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev)
3490 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) 3502 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3491 return false; 3503 return false;
3492 3504
3493 if (!rbd_unlock(rbd_dev)) 3505 rbd_unlock(rbd_dev);
3494 /* 3506 /*
3495 * Give others a chance to grab the lock - we would re-acquire 3507 * Give others a chance to grab the lock - we would re-acquire
3496 * almost immediately if we got new IO during ceph_osdc_sync() 3508 * almost immediately if we got new IO during ceph_osdc_sync()
3497 * otherwise. We need to ack our own notifications, so this 3509 * otherwise. We need to ack our own notifications, so this
3498 * lock_dwork will be requeued from rbd_wait_state_locked() 3510 * lock_dwork will be requeued from rbd_wait_state_locked()
3499 * after wake_requests() in rbd_handle_released_lock(). 3511 * after wake_requests() in rbd_handle_released_lock().
3500 */ 3512 */
3501 cancel_delayed_work(&rbd_dev->lock_dwork); 3513 cancel_delayed_work(&rbd_dev->lock_dwork);
3502
3503 return true; 3514 return true;
3504} 3515}
3505 3516
@@ -3580,12 +3591,16 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3580 up_read(&rbd_dev->lock_rwsem); 3591 up_read(&rbd_dev->lock_rwsem);
3581} 3592}
3582 3593
3583static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, 3594/*
3584 void **p) 3595 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3596 * ResponseMessage is needed.
3597 */
3598static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3599 void **p)
3585{ 3600{
3586 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); 3601 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3587 struct rbd_client_id cid = { 0 }; 3602 struct rbd_client_id cid = { 0 };
3588 bool need_to_send; 3603 int result = 1;
3589 3604
3590 if (struct_v >= 2) { 3605 if (struct_v >= 2) {
3591 cid.gid = ceph_decode_64(p); 3606 cid.gid = ceph_decode_64(p);
@@ -3595,19 +3610,36 @@ static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3595 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3610 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3596 cid.handle); 3611 cid.handle);
3597 if (rbd_cid_equal(&cid, &my_cid)) 3612 if (rbd_cid_equal(&cid, &my_cid))
3598 return false; 3613 return result;
3599 3614
3600 down_read(&rbd_dev->lock_rwsem); 3615 down_read(&rbd_dev->lock_rwsem);
3601 need_to_send = __rbd_is_lock_owner(rbd_dev); 3616 if (__rbd_is_lock_owner(rbd_dev)) {
3602 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { 3617 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3603 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) { 3618 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3604 dout("%s rbd_dev %p queueing unlock_work\n", __func__, 3619 goto out_unlock;
3605 rbd_dev); 3620
3606 queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work); 3621 /*
3622 * encode ResponseMessage(0) so the peer can detect
3623 * a missing owner
3624 */
3625 result = 0;
3626
3627 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3628 if (!rbd_dev->opts->exclusive) {
3629 dout("%s rbd_dev %p queueing unlock_work\n",
3630 __func__, rbd_dev);
3631 queue_work(rbd_dev->task_wq,
3632 &rbd_dev->unlock_work);
3633 } else {
3634 /* refuse to release the lock */
3635 result = -EROFS;
3636 }
3607 } 3637 }
3608 } 3638 }
3639
3640out_unlock:
3609 up_read(&rbd_dev->lock_rwsem); 3641 up_read(&rbd_dev->lock_rwsem);
3610 return need_to_send; 3642 return result;
3611} 3643}
3612 3644
3613static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, 3645static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
@@ -3690,13 +3722,10 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3690 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3722 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3691 break; 3723 break;
3692 case RBD_NOTIFY_OP_REQUEST_LOCK: 3724 case RBD_NOTIFY_OP_REQUEST_LOCK:
3693 if (rbd_handle_request_lock(rbd_dev, struct_v, &p)) 3725 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3694 /* 3726 if (ret <= 0)
3695 * send ResponseMessage(0) back so the client
3696 * can detect a missing owner
3697 */
3698 rbd_acknowledge_notify_result(rbd_dev, notify_id, 3727 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3699 cookie, 0); 3728 cookie, ret);
3700 else 3729 else
3701 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3730 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3702 break; 3731 break;
@@ -3821,24 +3850,51 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3821 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 3850 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3822} 3851}
3823 3852
3853/*
3854 * lock_rwsem must be held for write
3855 */
3856static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3857{
3858 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3859 char cookie[32];
3860 int ret;
3861
3862 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3863
3864 format_lock_cookie(rbd_dev, cookie);
3865 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3866 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3867 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3868 RBD_LOCK_TAG, cookie);
3869 if (ret) {
3870 if (ret != -EOPNOTSUPP)
3871 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3872 ret);
3873
3874 /*
3875 * Lock cookie cannot be updated on older OSDs, so do
3876 * a manual release and queue an acquire.
3877 */
3878 if (rbd_release_lock(rbd_dev))
3879 queue_delayed_work(rbd_dev->task_wq,
3880 &rbd_dev->lock_dwork, 0);
3881 } else {
3882 strcpy(rbd_dev->lock_cookie, cookie);
3883 }
3884}
3885
3824static void rbd_reregister_watch(struct work_struct *work) 3886static void rbd_reregister_watch(struct work_struct *work)
3825{ 3887{
3826 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3888 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3827 struct rbd_device, watch_dwork); 3889 struct rbd_device, watch_dwork);
3828 bool was_lock_owner = false;
3829 bool need_to_wake = false;
3830 int ret; 3890 int ret;
3831 3891
3832 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3892 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3833 3893
3834 down_write(&rbd_dev->lock_rwsem);
3835 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3836 was_lock_owner = rbd_release_lock(rbd_dev);
3837
3838 mutex_lock(&rbd_dev->watch_mutex); 3894 mutex_lock(&rbd_dev->watch_mutex);
3839 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) { 3895 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3840 mutex_unlock(&rbd_dev->watch_mutex); 3896 mutex_unlock(&rbd_dev->watch_mutex);
3841 goto out; 3897 return;
3842 } 3898 }
3843 3899
3844 ret = __rbd_register_watch(rbd_dev); 3900 ret = __rbd_register_watch(rbd_dev);
@@ -3846,36 +3902,28 @@ static void rbd_reregister_watch(struct work_struct *work)
3846 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); 3902 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3847 if (ret == -EBLACKLISTED || ret == -ENOENT) { 3903 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3848 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3904 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3849 need_to_wake = true; 3905 wake_requests(rbd_dev, true);
3850 } else { 3906 } else {
3851 queue_delayed_work(rbd_dev->task_wq, 3907 queue_delayed_work(rbd_dev->task_wq,
3852 &rbd_dev->watch_dwork, 3908 &rbd_dev->watch_dwork,
3853 RBD_RETRY_DELAY); 3909 RBD_RETRY_DELAY);
3854 } 3910 }
3855 mutex_unlock(&rbd_dev->watch_mutex); 3911 mutex_unlock(&rbd_dev->watch_mutex);
3856 goto out; 3912 return;
3857 } 3913 }
3858 3914
3859 need_to_wake = true;
3860 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3915 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3861 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3916 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3862 mutex_unlock(&rbd_dev->watch_mutex); 3917 mutex_unlock(&rbd_dev->watch_mutex);
3863 3918
3919 down_write(&rbd_dev->lock_rwsem);
3920 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3921 rbd_reacquire_lock(rbd_dev);
3922 up_write(&rbd_dev->lock_rwsem);
3923
3864 ret = rbd_dev_refresh(rbd_dev); 3924 ret = rbd_dev_refresh(rbd_dev);
3865 if (ret) 3925 if (ret)
3866 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); 3926 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3867
3868 if (was_lock_owner) {
3869 ret = rbd_try_lock(rbd_dev);
3870 if (ret)
3871 rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3872 ret);
3873 }
3874
3875out:
3876 up_write(&rbd_dev->lock_rwsem);
3877 if (need_to_wake)
3878 wake_requests(rbd_dev, true);
3879} 3927}
3880 3928
3881/* 3929/*
@@ -4034,10 +4082,6 @@ static void rbd_queue_workfn(struct work_struct *work)
4034 if (op_type != OBJ_OP_READ) { 4082 if (op_type != OBJ_OP_READ) {
4035 snapc = rbd_dev->header.snapc; 4083 snapc = rbd_dev->header.snapc;
4036 ceph_get_snap_context(snapc); 4084 ceph_get_snap_context(snapc);
4037 must_be_locked = rbd_is_lock_supported(rbd_dev);
4038 } else {
4039 must_be_locked = rbd_dev->opts->lock_on_read &&
4040 rbd_is_lock_supported(rbd_dev);
4041 } 4085 }
4042 up_read(&rbd_dev->header_rwsem); 4086 up_read(&rbd_dev->header_rwsem);
4043 4087
@@ -4048,14 +4092,20 @@ static void rbd_queue_workfn(struct work_struct *work)
4048 goto err_rq; 4092 goto err_rq;
4049 } 4093 }
4050 4094
4095 must_be_locked =
4096 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4097 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4051 if (must_be_locked) { 4098 if (must_be_locked) {
4052 down_read(&rbd_dev->lock_rwsem); 4099 down_read(&rbd_dev->lock_rwsem);
4053 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && 4100 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4054 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) 4101 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4102 if (rbd_dev->opts->exclusive) {
4103 rbd_warn(rbd_dev, "exclusive lock required");
4104 result = -EROFS;
4105 goto err_unlock;
4106 }
4055 rbd_wait_state_locked(rbd_dev); 4107 rbd_wait_state_locked(rbd_dev);
4056 4108 }
4057 WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
4058 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4059 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 4109 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4060 result = -EBLACKLISTED; 4110 result = -EBLACKLISTED;
4061 goto err_unlock; 4111 goto err_unlock;
@@ -4114,19 +4164,10 @@ static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4114 4164
4115static void rbd_free_disk(struct rbd_device *rbd_dev) 4165static void rbd_free_disk(struct rbd_device *rbd_dev)
4116{ 4166{
4117 struct gendisk *disk = rbd_dev->disk; 4167 blk_cleanup_queue(rbd_dev->disk->queue);
4118 4168 blk_mq_free_tag_set(&rbd_dev->tag_set);
4119 if (!disk) 4169 put_disk(rbd_dev->disk);
4120 return;
4121
4122 rbd_dev->disk = NULL; 4170 rbd_dev->disk = NULL;
4123 if (disk->flags & GENHD_FL_UP) {
4124 del_gendisk(disk);
4125 if (disk->queue)
4126 blk_cleanup_queue(disk->queue);
4127 blk_mq_free_tag_set(&rbd_dev->tag_set);
4128 }
4129 put_disk(disk);
4130} 4171}
4131 4172
4132static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 4173static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
@@ -4383,8 +4424,12 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
4383 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC)) 4424 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4384 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES; 4425 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4385 4426
4427 /*
4428 * disk_release() expects a queue ref from add_disk() and will
4429 * put it. Hold an extra ref until add_disk() is called.
4430 */
4431 WARN_ON(!blk_get_queue(q));
4386 disk->queue = q; 4432 disk->queue = q;
4387
4388 q->queuedata = rbd_dev; 4433 q->queuedata = rbd_dev;
4389 4434
4390 rbd_dev->disk = disk; 4435 rbd_dev->disk = disk;
@@ -5624,6 +5669,7 @@ static int rbd_add_parse_args(const char *buf,
5624 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 5669 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5625 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; 5670 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5626 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; 5671 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5672 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5627 5673
5628 copts = ceph_parse_options(options, mon_addrs, 5674 copts = ceph_parse_options(options, mon_addrs,
5629 mon_addrs + mon_addrs_size - 1, 5675 mon_addrs + mon_addrs_size - 1,
@@ -5682,6 +5728,33 @@ again:
5682 return ret; 5728 return ret;
5683} 5729}
5684 5730
5731static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5732{
5733 down_write(&rbd_dev->lock_rwsem);
5734 if (__rbd_is_lock_owner(rbd_dev))
5735 rbd_unlock(rbd_dev);
5736 up_write(&rbd_dev->lock_rwsem);
5737}
5738
5739static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5740{
5741 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5742 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5743 return -EINVAL;
5744 }
5745
5746 /* FIXME: "rbd map --exclusive" should be in interruptible */
5747 down_read(&rbd_dev->lock_rwsem);
5748 rbd_wait_state_locked(rbd_dev);
5749 up_read(&rbd_dev->lock_rwsem);
5750 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5751 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5752 return -EROFS;
5753 }
5754
5755 return 0;
5756}
5757
5685/* 5758/*
5686 * An rbd format 2 image has a unique identifier, distinct from the 5759 * An rbd format 2 image has a unique identifier, distinct from the
5687 * name given to it by the user. Internally, that identifier is 5760 * name given to it by the user. Internally, that identifier is
@@ -5873,6 +5946,15 @@ out_err:
5873 return ret; 5946 return ret;
5874} 5947}
5875 5948
5949static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5950{
5951 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5952 rbd_dev_mapping_clear(rbd_dev);
5953 rbd_free_disk(rbd_dev);
5954 if (!single_major)
5955 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5956}
5957
5876/* 5958/*
5877 * rbd_dev->header_rwsem must be locked for write and will be unlocked 5959 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5878 * upon return. 5960 * upon return.
@@ -5908,26 +5990,13 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5908 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 5990 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5909 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); 5991 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5910 5992
5911 dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); 5993 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5912 ret = device_add(&rbd_dev->dev);
5913 if (ret) 5994 if (ret)
5914 goto err_out_mapping; 5995 goto err_out_mapping;
5915 5996
5916 /* Everything's ready. Announce the disk to the world. */
5917
5918 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5997 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5919 up_write(&rbd_dev->header_rwsem); 5998 up_write(&rbd_dev->header_rwsem);
5920 5999 return 0;
5921 spin_lock(&rbd_dev_list_lock);
5922 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5923 spin_unlock(&rbd_dev_list_lock);
5924
5925 add_disk(rbd_dev->disk);
5926 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5927 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5928 rbd_dev->header.features);
5929
5930 return ret;
5931 6000
5932err_out_mapping: 6001err_out_mapping:
5933 rbd_dev_mapping_clear(rbd_dev); 6002 rbd_dev_mapping_clear(rbd_dev);
@@ -5962,11 +6031,11 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5962static void rbd_dev_image_release(struct rbd_device *rbd_dev) 6031static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5963{ 6032{
5964 rbd_dev_unprobe(rbd_dev); 6033 rbd_dev_unprobe(rbd_dev);
6034 if (rbd_dev->opts)
6035 rbd_unregister_watch(rbd_dev);
5965 rbd_dev->image_format = 0; 6036 rbd_dev->image_format = 0;
5966 kfree(rbd_dev->spec->image_id); 6037 kfree(rbd_dev->spec->image_id);
5967 rbd_dev->spec->image_id = NULL; 6038 rbd_dev->spec->image_id = NULL;
5968
5969 rbd_dev_destroy(rbd_dev);
5970} 6039}
5971 6040
5972/* 6041/*
@@ -6126,22 +6195,43 @@ static ssize_t do_rbd_add(struct bus_type *bus,
6126 rbd_dev->mapping.read_only = read_only; 6195 rbd_dev->mapping.read_only = read_only;
6127 6196
6128 rc = rbd_dev_device_setup(rbd_dev); 6197 rc = rbd_dev_device_setup(rbd_dev);
6129 if (rc) { 6198 if (rc)
6130 /* 6199 goto err_out_image_probe;
6131 * rbd_unregister_watch() can't be moved into 6200
6132 * rbd_dev_image_release() without refactoring, see 6201 if (rbd_dev->opts->exclusive) {
6133 * commit 1f3ef78861ac. 6202 rc = rbd_add_acquire_lock(rbd_dev);
6134 */ 6203 if (rc)
6135 rbd_unregister_watch(rbd_dev); 6204 goto err_out_device_setup;
6136 rbd_dev_image_release(rbd_dev);
6137 goto out;
6138 } 6205 }
6139 6206
6207 /* Everything's ready. Announce the disk to the world. */
6208
6209 rc = device_add(&rbd_dev->dev);
6210 if (rc)
6211 goto err_out_image_lock;
6212
6213 add_disk(rbd_dev->disk);
6214 /* see rbd_init_disk() */
6215 blk_put_queue(rbd_dev->disk->queue);
6216
6217 spin_lock(&rbd_dev_list_lock);
6218 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6219 spin_unlock(&rbd_dev_list_lock);
6220
6221 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6222 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6223 rbd_dev->header.features);
6140 rc = count; 6224 rc = count;
6141out: 6225out:
6142 module_put(THIS_MODULE); 6226 module_put(THIS_MODULE);
6143 return rc; 6227 return rc;
6144 6228
6229err_out_image_lock:
6230 rbd_dev_image_unlock(rbd_dev);
6231err_out_device_setup:
6232 rbd_dev_device_release(rbd_dev);
6233err_out_image_probe:
6234 rbd_dev_image_release(rbd_dev);
6145err_out_rbd_dev: 6235err_out_rbd_dev:
6146 rbd_dev_destroy(rbd_dev); 6236 rbd_dev_destroy(rbd_dev);
6147err_out_client: 6237err_out_client:
@@ -6169,21 +6259,6 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
6169 return do_rbd_add(bus, buf, count); 6259 return do_rbd_add(bus, buf, count);
6170} 6260}
6171 6261
6172static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6173{
6174 rbd_free_disk(rbd_dev);
6175
6176 spin_lock(&rbd_dev_list_lock);
6177 list_del_init(&rbd_dev->node);
6178 spin_unlock(&rbd_dev_list_lock);
6179
6180 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6181 device_del(&rbd_dev->dev);
6182 rbd_dev_mapping_clear(rbd_dev);
6183 if (!single_major)
6184 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6185}
6186
6187static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 6262static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6188{ 6263{
6189 while (rbd_dev->parent) { 6264 while (rbd_dev->parent) {
@@ -6201,6 +6276,7 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6201 } 6276 }
6202 rbd_assert(second); 6277 rbd_assert(second);
6203 rbd_dev_image_release(second); 6278 rbd_dev_image_release(second);
6279 rbd_dev_destroy(second);
6204 first->parent = NULL; 6280 first->parent = NULL;
6205 first->parent_overlap = 0; 6281 first->parent_overlap = 0;
6206 6282
@@ -6269,21 +6345,16 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
6269 blk_set_queue_dying(rbd_dev->disk->queue); 6345 blk_set_queue_dying(rbd_dev->disk->queue);
6270 } 6346 }
6271 6347
6272 down_write(&rbd_dev->lock_rwsem); 6348 del_gendisk(rbd_dev->disk);
6273 if (__rbd_is_lock_owner(rbd_dev)) 6349 spin_lock(&rbd_dev_list_lock);
6274 rbd_unlock(rbd_dev); 6350 list_del_init(&rbd_dev->node);
6275 up_write(&rbd_dev->lock_rwsem); 6351 spin_unlock(&rbd_dev_list_lock);
6276 rbd_unregister_watch(rbd_dev); 6352 device_del(&rbd_dev->dev);
6277 6353
6278 /* 6354 rbd_dev_image_unlock(rbd_dev);
6279 * Don't free anything from rbd_dev->disk until after all
6280 * notifies are completely processed. Otherwise
6281 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6282 * in a potential use after free of rbd_dev->disk or rbd_dev.
6283 */
6284 rbd_dev_device_release(rbd_dev); 6355 rbd_dev_device_release(rbd_dev);
6285 rbd_dev_image_release(rbd_dev); 6356 rbd_dev_image_release(rbd_dev);
6286 6357 rbd_dev_destroy(rbd_dev);
6287 return count; 6358 return count;
6288} 6359}
6289 6360