aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2017-04-13 06:17:38 -0400
committerIlya Dryomov <idryomov@gmail.com>2017-05-04 03:19:23 -0400
commit14bb211d324d6c8140167bd6b2b8a80757348a2f (patch)
treeaafb98077b59c9a8488b2e15082099fec2ce9a9d
parentcbbfb0ff115159847121afe9c7553bd5c86f6062 (diff)
rbd: support updating the lock cookie without releasing the lock
As we no longer release the lock before potentially raising BLACKLISTED in rbd_reregister_watch(), the "either locked or blacklisted" assert in rbd_queue_workfn() needs to go: we can be both locked and blacklisted at that point now. Signed-off-by: Ilya Dryomov <idryomov@gmail.com> Reviewed-by: Jason Dillaman <dillaman@redhat.com>
-rw-r--r--drivers/block/rbd.c66
-rw-r--r--include/linux/ceph/cls_lock_client.h5
-rw-r--r--net/ceph/cls_lock_client.c51
3 files changed, 97 insertions, 25 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 5f563db59820..063c8f06fb9c 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3820,24 +3820,51 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3820 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 3820 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3821} 3821}
3822 3822
3823/*
3824 * lock_rwsem must be held for write
3825 */
3826static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3827{
3828 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3829 char cookie[32];
3830 int ret;
3831
3832 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3833
3834 format_lock_cookie(rbd_dev, cookie);
3835 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3836 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3837 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3838 RBD_LOCK_TAG, cookie);
3839 if (ret) {
3840 if (ret != -EOPNOTSUPP)
3841 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3842 ret);
3843
3844 /*
3845 * Lock cookie cannot be updated on older OSDs, so do
3846 * a manual release and queue an acquire.
3847 */
3848 if (rbd_release_lock(rbd_dev))
3849 queue_delayed_work(rbd_dev->task_wq,
3850 &rbd_dev->lock_dwork, 0);
3851 } else {
3852 strcpy(rbd_dev->lock_cookie, cookie);
3853 }
3854}
3855
3823static void rbd_reregister_watch(struct work_struct *work) 3856static void rbd_reregister_watch(struct work_struct *work)
3824{ 3857{
3825 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3858 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3826 struct rbd_device, watch_dwork); 3859 struct rbd_device, watch_dwork);
3827 bool was_lock_owner = false;
3828 bool need_to_wake = false;
3829 int ret; 3860 int ret;
3830 3861
3831 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3862 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3832 3863
3833 down_write(&rbd_dev->lock_rwsem);
3834 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3835 was_lock_owner = rbd_release_lock(rbd_dev);
3836
3837 mutex_lock(&rbd_dev->watch_mutex); 3864 mutex_lock(&rbd_dev->watch_mutex);
3838 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) { 3865 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3839 mutex_unlock(&rbd_dev->watch_mutex); 3866 mutex_unlock(&rbd_dev->watch_mutex);
3840 goto out; 3867 return;
3841 } 3868 }
3842 3869
3843 ret = __rbd_register_watch(rbd_dev); 3870 ret = __rbd_register_watch(rbd_dev);
@@ -3845,36 +3872,28 @@ static void rbd_reregister_watch(struct work_struct *work)
3845 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); 3872 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3846 if (ret == -EBLACKLISTED || ret == -ENOENT) { 3873 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3847 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3874 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3848 need_to_wake = true; 3875 wake_requests(rbd_dev, true);
3849 } else { 3876 } else {
3850 queue_delayed_work(rbd_dev->task_wq, 3877 queue_delayed_work(rbd_dev->task_wq,
3851 &rbd_dev->watch_dwork, 3878 &rbd_dev->watch_dwork,
3852 RBD_RETRY_DELAY); 3879 RBD_RETRY_DELAY);
3853 } 3880 }
3854 mutex_unlock(&rbd_dev->watch_mutex); 3881 mutex_unlock(&rbd_dev->watch_mutex);
3855 goto out; 3882 return;
3856 } 3883 }
3857 3884
3858 need_to_wake = true;
3859 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3885 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3860 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3886 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3861 mutex_unlock(&rbd_dev->watch_mutex); 3887 mutex_unlock(&rbd_dev->watch_mutex);
3862 3888
3889 down_write(&rbd_dev->lock_rwsem);
3890 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3891 rbd_reacquire_lock(rbd_dev);
3892 up_write(&rbd_dev->lock_rwsem);
3893
3863 ret = rbd_dev_refresh(rbd_dev); 3894 ret = rbd_dev_refresh(rbd_dev);
3864 if (ret) 3895 if (ret)
3865 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); 3896 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3866
3867 if (was_lock_owner) {
3868 ret = rbd_try_lock(rbd_dev);
3869 if (ret)
3870 rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3871 ret);
3872 }
3873
3874out:
3875 up_write(&rbd_dev->lock_rwsem);
3876 if (need_to_wake)
3877 wake_requests(rbd_dev, true);
3878} 3897}
3879 3898
3880/* 3899/*
@@ -4052,9 +4071,6 @@ static void rbd_queue_workfn(struct work_struct *work)
4052 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && 4071 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4053 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) 4072 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
4054 rbd_wait_state_locked(rbd_dev); 4073 rbd_wait_state_locked(rbd_dev);
4055
4056 WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
4057 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4058 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 4074 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4059 result = -EBLACKLISTED; 4075 result = -EBLACKLISTED;
4060 goto err_unlock; 4076 goto err_unlock;
diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
index 84884d8d4710..0594d3bba774 100644
--- a/include/linux/ceph/cls_lock_client.h
+++ b/include/linux/ceph/cls_lock_client.h
@@ -37,6 +37,11 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
37 struct ceph_object_locator *oloc, 37 struct ceph_object_locator *oloc,
38 char *lock_name, char *cookie, 38 char *lock_name, char *cookie,
39 struct ceph_entity_name *locker); 39 struct ceph_entity_name *locker);
40int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
41 struct ceph_object_id *oid,
42 struct ceph_object_locator *oloc,
43 char *lock_name, u8 type, char *old_cookie,
44 char *tag, char *new_cookie);
40 45
41void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers); 46void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);
42 47
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
index b9233b990399..08ada893f01e 100644
--- a/net/ceph/cls_lock_client.c
+++ b/net/ceph/cls_lock_client.c
@@ -179,6 +179,57 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
179} 179}
180EXPORT_SYMBOL(ceph_cls_break_lock); 180EXPORT_SYMBOL(ceph_cls_break_lock);
181 181
182int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
183 struct ceph_object_id *oid,
184 struct ceph_object_locator *oloc,
185 char *lock_name, u8 type, char *old_cookie,
186 char *tag, char *new_cookie)
187{
188 int cookie_op_buf_size;
189 int name_len = strlen(lock_name);
190 int old_cookie_len = strlen(old_cookie);
191 int tag_len = strlen(tag);
192 int new_cookie_len = strlen(new_cookie);
193 void *p, *end;
194 struct page *cookie_op_page;
195 int ret;
196
197 cookie_op_buf_size = name_len + sizeof(__le32) +
198 old_cookie_len + sizeof(__le32) +
199 tag_len + sizeof(__le32) +
200 new_cookie_len + sizeof(__le32) +
201 sizeof(u8) + CEPH_ENCODING_START_BLK_LEN;
202 if (cookie_op_buf_size > PAGE_SIZE)
203 return -E2BIG;
204
205 cookie_op_page = alloc_page(GFP_NOIO);
206 if (!cookie_op_page)
207 return -ENOMEM;
208
209 p = page_address(cookie_op_page);
210 end = p + cookie_op_buf_size;
211
212 /* encode cls_lock_set_cookie_op struct */
213 ceph_start_encoding(&p, 1, 1,
214 cookie_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
215 ceph_encode_string(&p, end, lock_name, name_len);
216 ceph_encode_8(&p, type);
217 ceph_encode_string(&p, end, old_cookie, old_cookie_len);
218 ceph_encode_string(&p, end, tag, tag_len);
219 ceph_encode_string(&p, end, new_cookie, new_cookie_len);
220
221 dout("%s lock_name %s type %d old_cookie %s tag %s new_cookie %s\n",
222 __func__, lock_name, type, old_cookie, tag, new_cookie);
223 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "set_cookie",
224 CEPH_OSD_FLAG_WRITE, cookie_op_page,
225 cookie_op_buf_size, NULL, NULL);
226
227 dout("%s: status %d\n", __func__, ret);
228 __free_page(cookie_op_page);
229 return ret;
230}
231EXPORT_SYMBOL(ceph_cls_set_cookie);
232
182void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers) 233void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
183{ 234{
184 int i; 235 int i;