aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2016-08-12 09:45:52 -0400
committerIlya Dryomov <idryomov@gmail.com>2016-08-24 17:49:16 -0400
commit1643dfa4c2c827d6e2aa419df8c17b0f24090278 (patch)
treed7fa6499301913a0f2bb965c40251cfcb30befdd /drivers/block
parent033268a5f01270f0ef20d1a9a078b157f4af97f8 (diff)
rbd: introduce a per-device ordered workqueue
This is going to be used for re-registering watch requests and exclusive-lock tasks: acquire/request lock, notify-acquired, release lock, notify-released. Some refactoring in the map/unmap paths was necessary to give this workqueue a meaningful name: "rbdX-tasks". Signed-off-by: Ilya Dryomov <idryomov@gmail.com> Reviewed-by: Mike Christie <mchristi@redhat.com>
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c151
1 files changed, 71 insertions, 80 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index e0585e9040f1..1c805eea6767 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -128,11 +128,8 @@ static int atomic_dec_return_safe(atomic_t *v)
128/* 128/*
129 * An RBD device name will be "rbd#", where the "rbd" comes from 129 * An RBD device name will be "rbd#", where the "rbd" comes from
130 * RBD_DRV_NAME above, and # is a unique integer identifier. 130 * RBD_DRV_NAME above, and # is a unique integer identifier.
131 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
132 * enough to hold all possible device names.
133 */ 131 */
134#define DEV_NAME_LEN 32 132#define DEV_NAME_LEN 32
135#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
136 133
137/* 134/*
138 * block device image metadata (in-memory version) 135 * block device image metadata (in-memory version)
@@ -353,10 +350,12 @@ struct rbd_device {
353 struct ceph_object_id header_oid; 350 struct ceph_object_id header_oid;
354 struct ceph_object_locator header_oloc; 351 struct ceph_object_locator header_oloc;
355 352
356 struct ceph_file_layout layout; 353 struct ceph_file_layout layout; /* used for all rbd requests */
357 354
358 struct ceph_osd_linger_request *watch_handle; 355 struct ceph_osd_linger_request *watch_handle;
359 356
357 struct workqueue_struct *task_wq;
358
360 struct rbd_spec *parent_spec; 359 struct rbd_spec *parent_spec;
361 u64 parent_overlap; 360 u64 parent_overlap;
362 atomic_t parent_ref; 361 atomic_t parent_ref;
@@ -3944,11 +3943,8 @@ static void rbd_spec_free(struct kref *kref)
3944 kfree(spec); 3943 kfree(spec);
3945} 3944}
3946 3945
3947static void rbd_dev_release(struct device *dev) 3946static void rbd_dev_free(struct rbd_device *rbd_dev)
3948{ 3947{
3949 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3950 bool need_put = !!rbd_dev->opts;
3951
3952 ceph_oid_destroy(&rbd_dev->header_oid); 3948 ceph_oid_destroy(&rbd_dev->header_oid);
3953 ceph_oloc_destroy(&rbd_dev->header_oloc); 3949 ceph_oloc_destroy(&rbd_dev->header_oloc);
3954 3950
@@ -3956,6 +3952,19 @@ static void rbd_dev_release(struct device *dev)
3956 rbd_spec_put(rbd_dev->spec); 3952 rbd_spec_put(rbd_dev->spec);
3957 kfree(rbd_dev->opts); 3953 kfree(rbd_dev->opts);
3958 kfree(rbd_dev); 3954 kfree(rbd_dev);
3955}
3956
3957static void rbd_dev_release(struct device *dev)
3958{
3959 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3960 bool need_put = !!rbd_dev->opts;
3961
3962 if (need_put) {
3963 destroy_workqueue(rbd_dev->task_wq);
3964 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
3965 }
3966
3967 rbd_dev_free(rbd_dev);
3959 3968
3960 /* 3969 /*
3961 * This is racy, but way better than putting module outside of 3970 * This is racy, but way better than putting module outside of
@@ -3966,19 +3975,16 @@ static void rbd_dev_release(struct device *dev)
3966 module_put(THIS_MODULE); 3975 module_put(THIS_MODULE);
3967} 3976}
3968 3977
3969static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 3978static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
3970 struct rbd_spec *spec, 3979 struct rbd_spec *spec)
3971 struct rbd_options *opts)
3972{ 3980{
3973 struct rbd_device *rbd_dev; 3981 struct rbd_device *rbd_dev;
3974 3982
3975 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); 3983 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3976 if (!rbd_dev) 3984 if (!rbd_dev)
3977 return NULL; 3985 return NULL;
3978 3986
3979 spin_lock_init(&rbd_dev->lock); 3987 spin_lock_init(&rbd_dev->lock);
3980 rbd_dev->flags = 0;
3981 atomic_set(&rbd_dev->parent_ref, 0);
3982 INIT_LIST_HEAD(&rbd_dev->node); 3988 INIT_LIST_HEAD(&rbd_dev->node);
3983 init_rwsem(&rbd_dev->header_rwsem); 3989 init_rwsem(&rbd_dev->header_rwsem);
3984 3990
@@ -3992,9 +3998,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3992 3998
3993 rbd_dev->rbd_client = rbdc; 3999 rbd_dev->rbd_client = rbdc;
3994 rbd_dev->spec = spec; 4000 rbd_dev->spec = spec;
3995 rbd_dev->opts = opts;
3996
3997 /* Initialize the layout used for all rbd requests */
3998 4001
3999 rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER; 4002 rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
4000 rbd_dev->layout.stripe_count = 1; 4003 rbd_dev->layout.stripe_count = 1;
@@ -4002,15 +4005,48 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4002 rbd_dev->layout.pool_id = spec->pool_id; 4005 rbd_dev->layout.pool_id = spec->pool_id;
4003 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); 4006 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
4004 4007
4005 /* 4008 return rbd_dev;
4006 * If this is a mapping rbd_dev (as opposed to a parent one), 4009}
4007 * pin our module. We have a ref from do_rbd_add(), so use 4010
4008 * __module_get(). 4011/*
4009 */ 4012 * Create a mapping rbd_dev.
4010 if (rbd_dev->opts) 4013 */
4011 __module_get(THIS_MODULE); 4014static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4015 struct rbd_spec *spec,
4016 struct rbd_options *opts)
4017{
4018 struct rbd_device *rbd_dev;
4019
4020 rbd_dev = __rbd_dev_create(rbdc, spec);
4021 if (!rbd_dev)
4022 return NULL;
4023
4024 rbd_dev->opts = opts;
4025
4026 /* get an id and fill in device name */
4027 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4028 minor_to_rbd_dev_id(1 << MINORBITS),
4029 GFP_KERNEL);
4030 if (rbd_dev->dev_id < 0)
4031 goto fail_rbd_dev;
4032
4033 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4034 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4035 rbd_dev->name);
4036 if (!rbd_dev->task_wq)
4037 goto fail_dev_id;
4012 4038
4039 /* we have a ref from do_rbd_add() */
4040 __module_get(THIS_MODULE);
4041
4042 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4013 return rbd_dev; 4043 return rbd_dev;
4044
4045fail_dev_id:
4046 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4047fail_rbd_dev:
4048 rbd_dev_free(rbd_dev);
4049 return NULL;
4014} 4050}
4015 4051
4016static void rbd_dev_destroy(struct rbd_device *rbd_dev) 4052static void rbd_dev_destroy(struct rbd_device *rbd_dev)
@@ -4646,46 +4682,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4646} 4682}
4647 4683
4648/* 4684/*
4649 * Get a unique rbd identifier for the given new rbd_dev, and add
4650 * the rbd_dev to the global list.
4651 */
4652static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4653{
4654 int new_dev_id;
4655
4656 new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4657 0, minor_to_rbd_dev_id(1 << MINORBITS),
4658 GFP_KERNEL);
4659 if (new_dev_id < 0)
4660 return new_dev_id;
4661
4662 rbd_dev->dev_id = new_dev_id;
4663
4664 spin_lock(&rbd_dev_list_lock);
4665 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4666 spin_unlock(&rbd_dev_list_lock);
4667
4668 dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4669
4670 return 0;
4671}
4672
4673/*
4674 * Remove an rbd_dev from the global list, and record that its
4675 * identifier is no longer in use.
4676 */
4677static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4678{
4679 spin_lock(&rbd_dev_list_lock);
4680 list_del_init(&rbd_dev->node);
4681 spin_unlock(&rbd_dev_list_lock);
4682
4683 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4684
4685 dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4686}
4687
4688/*
4689 * Skips over white space at *buf, and updates *buf to point to the 4685 * Skips over white space at *buf, and updates *buf to point to the
4690 * first found non-space character (if any). Returns the length of 4686 * first found non-space character (if any). Returns the length of
4691 * the token (string of non-white space characters) found. Note 4687 * the token (string of non-white space characters) found. Note
@@ -5077,8 +5073,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5077 goto out_err; 5073 goto out_err;
5078 } 5074 }
5079 5075
5080 parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec, 5076 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5081 NULL);
5082 if (!parent) { 5077 if (!parent) {
5083 ret = -ENOMEM; 5078 ret = -ENOMEM;
5084 goto out_err; 5079 goto out_err;
@@ -5113,22 +5108,12 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5113{ 5108{
5114 int ret; 5109 int ret;
5115 5110
5116 /* Get an id and fill in device name. */
5117
5118 ret = rbd_dev_id_get(rbd_dev);
5119 if (ret)
5120 goto err_out_unlock;
5121
5122 BUILD_BUG_ON(DEV_NAME_LEN
5123 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
5124 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
5125
5126 /* Record our major and minor device numbers. */ 5111 /* Record our major and minor device numbers. */
5127 5112
5128 if (!single_major) { 5113 if (!single_major) {
5129 ret = register_blkdev(0, rbd_dev->name); 5114 ret = register_blkdev(0, rbd_dev->name);
5130 if (ret < 0) 5115 if (ret < 0)
5131 goto err_out_id; 5116 goto err_out_unlock;
5132 5117
5133 rbd_dev->major = ret; 5118 rbd_dev->major = ret;
5134 rbd_dev->minor = 0; 5119 rbd_dev->minor = 0;
@@ -5160,6 +5145,10 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5160 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5145 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5161 up_write(&rbd_dev->header_rwsem); 5146 up_write(&rbd_dev->header_rwsem);
5162 5147
5148 spin_lock(&rbd_dev_list_lock);
5149 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5150 spin_unlock(&rbd_dev_list_lock);
5151
5163 add_disk(rbd_dev->disk); 5152 add_disk(rbd_dev->disk);
5164 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 5153 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
5165 (unsigned long long) rbd_dev->mapping.size); 5154 (unsigned long long) rbd_dev->mapping.size);
@@ -5173,8 +5162,6 @@ err_out_disk:
5173err_out_blkdev: 5162err_out_blkdev:
5174 if (!single_major) 5163 if (!single_major)
5175 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5164 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5176err_out_id:
5177 rbd_dev_id_put(rbd_dev);
5178err_out_unlock: 5165err_out_unlock:
5179 up_write(&rbd_dev->header_rwsem); 5166 up_write(&rbd_dev->header_rwsem);
5180 return ret; 5167 return ret;
@@ -5406,12 +5393,16 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
5406static void rbd_dev_device_release(struct rbd_device *rbd_dev) 5393static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5407{ 5394{
5408 rbd_free_disk(rbd_dev); 5395 rbd_free_disk(rbd_dev);
5396
5397 spin_lock(&rbd_dev_list_lock);
5398 list_del_init(&rbd_dev->node);
5399 spin_unlock(&rbd_dev_list_lock);
5400
5409 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5401 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5410 device_del(&rbd_dev->dev); 5402 device_del(&rbd_dev->dev);
5411 rbd_dev_mapping_clear(rbd_dev); 5403 rbd_dev_mapping_clear(rbd_dev);
5412 if (!single_major) 5404 if (!single_major)
5413 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5405 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5414 rbd_dev_id_put(rbd_dev);
5415} 5406}
5416 5407
5417static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 5408static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)