aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2015-02-19 17:14:42 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2015-02-19 17:14:42 -0500
commit4533f6e27a366ecc3da4876074ebfe0cc0ea4f0f (patch)
tree8b6f1aeeda991e6a1ce98702d7cc35d2d2a444b1 /drivers/block
parent89d3fa45b4add00cd0056361a2498e978cb1e119 (diff)
parent0f5417cea6cfeafd5cdec4223df63ca79918fdea (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Sage Weil: "On the RBD side, there is a conversion to blk-mq from Christoph, several long-standing bug fixes from Ilya, and some cleanup from Rickard Strandqvist. On the CephFS side there is a long list of fixes from Zheng, including improved session handling, a few IO path fixes, some dcache management correctness fixes, and several blocking while !TASK_RUNNING fixes. The core code gets a few cleanups and Chaitanya has added support for TCP_NODELAY (which has been used on the server side for ages but we somehow missed on the kernel client). There is also an update to MAINTAINERS to fix up some email addresses and reflect that Ilya and Zheng are doing most of the maintenance for RBD and CephFS these days. Do not be surprised to see a pull request come from one of them in the future if I am unavailable for some reason" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (27 commits) MAINTAINERS: update Ceph and RBD maintainers libceph: kfree() in put_osd() shouldn't depend on authorizer libceph: fix double __remove_osd() problem rbd: convert to blk-mq ceph: return error for traceless reply race ceph: fix dentry leaks ceph: re-send requests when MDS enters reconnecting stage ceph: show nocephx_require_signatures and notcp_nodelay options libceph: tcp_nodelay support rbd: do not treat standalone as flatten ceph: fix atomic_open snapdir ceph: properly mark empty directory as complete client: include kernel version in client metadata ceph: provide seperate {inode,file}_operations for snapdir ceph: fix request time stamp encoding ceph: fix reading inline data when i_size > PAGE_SIZE ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_close_sessions) ceph: avoid block operation when !TASK_RUNNING (ceph_get_caps) ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_sync) rbd: fix error paths in rbd_dev_refresh() ...
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c193
1 files changed, 83 insertions, 110 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 8a86b62466f7..b40af3203089 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -38,6 +38,7 @@
38#include <linux/kernel.h> 38#include <linux/kernel.h>
39#include <linux/device.h> 39#include <linux/device.h>
40#include <linux/module.h> 40#include <linux/module.h>
41#include <linux/blk-mq.h>
41#include <linux/fs.h> 42#include <linux/fs.h>
42#include <linux/blkdev.h> 43#include <linux/blkdev.h>
43#include <linux/slab.h> 44#include <linux/slab.h>
@@ -340,9 +341,7 @@ struct rbd_device {
340 341
341 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 342 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342 343
343 struct list_head rq_queue; /* incoming rq queue */
344 spinlock_t lock; /* queue, flags, open_count */ 344 spinlock_t lock; /* queue, flags, open_count */
345 struct work_struct rq_work;
346 345
347 struct rbd_image_header header; 346 struct rbd_image_header header;
348 unsigned long flags; /* possibly lock protected */ 347 unsigned long flags; /* possibly lock protected */
@@ -360,6 +359,9 @@ struct rbd_device {
360 atomic_t parent_ref; 359 atomic_t parent_ref;
361 struct rbd_device *parent; 360 struct rbd_device *parent;
362 361
362 /* Block layer tags. */
363 struct blk_mq_tag_set tag_set;
364
363 /* protects updating the header */ 365 /* protects updating the header */
364 struct rw_semaphore header_rwsem; 366 struct rw_semaphore header_rwsem;
365 367
@@ -1817,7 +1819,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1817 1819
1818 /* 1820 /*
1819 * We support a 64-bit length, but ultimately it has to be 1821 * We support a 64-bit length, but ultimately it has to be
1820 * passed to blk_end_request(), which takes an unsigned int. 1822 * passed to the block layer, which just supports a 32-bit
1823 * length field.
1821 */ 1824 */
1822 obj_request->xferred = osd_req->r_reply_op_len[0]; 1825 obj_request->xferred = osd_req->r_reply_op_len[0];
1823 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1826 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
@@ -2275,7 +2278,10 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2275 more = obj_request->which < img_request->obj_request_count - 1; 2278 more = obj_request->which < img_request->obj_request_count - 1;
2276 } else { 2279 } else {
2277 rbd_assert(img_request->rq != NULL); 2280 rbd_assert(img_request->rq != NULL);
2278 more = blk_end_request(img_request->rq, result, xferred); 2281
2282 more = blk_update_request(img_request->rq, result, xferred);
2283 if (!more)
2284 __blk_mq_end_request(img_request->rq, result);
2279 } 2285 }
2280 2286
2281 return more; 2287 return more;
@@ -3304,8 +3310,10 @@ out:
3304 return ret; 3310 return ret;
3305} 3311}
3306 3312
3307static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) 3313static void rbd_queue_workfn(struct work_struct *work)
3308{ 3314{
3315 struct request *rq = blk_mq_rq_from_pdu(work);
3316 struct rbd_device *rbd_dev = rq->q->queuedata;
3309 struct rbd_img_request *img_request; 3317 struct rbd_img_request *img_request;
3310 struct ceph_snap_context *snapc = NULL; 3318 struct ceph_snap_context *snapc = NULL;
3311 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; 3319 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
@@ -3314,6 +3322,13 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3314 u64 mapping_size; 3322 u64 mapping_size;
3315 int result; 3323 int result;
3316 3324
3325 if (rq->cmd_type != REQ_TYPE_FS) {
3326 dout("%s: non-fs request type %d\n", __func__,
3327 (int) rq->cmd_type);
3328 result = -EIO;
3329 goto err;
3330 }
3331
3317 if (rq->cmd_flags & REQ_DISCARD) 3332 if (rq->cmd_flags & REQ_DISCARD)
3318 op_type = OBJ_OP_DISCARD; 3333 op_type = OBJ_OP_DISCARD;
3319 else if (rq->cmd_flags & REQ_WRITE) 3334 else if (rq->cmd_flags & REQ_WRITE)
@@ -3359,6 +3374,8 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3359 goto err_rq; /* Shouldn't happen */ 3374 goto err_rq; /* Shouldn't happen */
3360 } 3375 }
3361 3376
3377 blk_mq_start_request(rq);
3378
3362 down_read(&rbd_dev->header_rwsem); 3379 down_read(&rbd_dev->header_rwsem);
3363 mapping_size = rbd_dev->mapping.size; 3380 mapping_size = rbd_dev->mapping.size;
3364 if (op_type != OBJ_OP_READ) { 3381 if (op_type != OBJ_OP_READ) {
@@ -3404,53 +3421,18 @@ err_rq:
3404 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 3421 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3405 obj_op_name(op_type), length, offset, result); 3422 obj_op_name(op_type), length, offset, result);
3406 ceph_put_snap_context(snapc); 3423 ceph_put_snap_context(snapc);
3407 blk_end_request_all(rq, result); 3424err:
3425 blk_mq_end_request(rq, result);
3408} 3426}
3409 3427
3410static void rbd_request_workfn(struct work_struct *work) 3428static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3429 const struct blk_mq_queue_data *bd)
3411{ 3430{
3412 struct rbd_device *rbd_dev = 3431 struct request *rq = bd->rq;
3413 container_of(work, struct rbd_device, rq_work); 3432 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3414 struct request *rq, *next;
3415 LIST_HEAD(requests);
3416
3417 spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3418 list_splice_init(&rbd_dev->rq_queue, &requests);
3419 spin_unlock_irq(&rbd_dev->lock);
3420 3433
3421 list_for_each_entry_safe(rq, next, &requests, queuelist) { 3434 queue_work(rbd_wq, work);
3422 list_del_init(&rq->queuelist); 3435 return BLK_MQ_RQ_QUEUE_OK;
3423 rbd_handle_request(rbd_dev, rq);
3424 }
3425}
3426
3427/*
3428 * Called with q->queue_lock held and interrupts disabled, possibly on
3429 * the way to schedule(). Do not sleep here!
3430 */
3431static void rbd_request_fn(struct request_queue *q)
3432{
3433 struct rbd_device *rbd_dev = q->queuedata;
3434 struct request *rq;
3435 int queued = 0;
3436
3437 rbd_assert(rbd_dev);
3438
3439 while ((rq = blk_fetch_request(q))) {
3440 /* Ignore any non-FS requests that filter through. */
3441 if (rq->cmd_type != REQ_TYPE_FS) {
3442 dout("%s: non-fs request type %d\n", __func__,
3443 (int) rq->cmd_type);
3444 __blk_end_request_all(rq, 0);
3445 continue;
3446 }
3447
3448 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3449 queued++;
3450 }
3451
3452 if (queued)
3453 queue_work(rbd_wq, &rbd_dev->rq_work);
3454} 3436}
3455 3437
3456/* 3438/*
@@ -3511,6 +3493,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
3511 del_gendisk(disk); 3493 del_gendisk(disk);
3512 if (disk->queue) 3494 if (disk->queue)
3513 blk_cleanup_queue(disk->queue); 3495 blk_cleanup_queue(disk->queue);
3496 blk_mq_free_tag_set(&rbd_dev->tag_set);
3514 } 3497 }
3515 put_disk(disk); 3498 put_disk(disk);
3516} 3499}
@@ -3694,7 +3677,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3694 3677
3695 ret = rbd_dev_header_info(rbd_dev); 3678 ret = rbd_dev_header_info(rbd_dev);
3696 if (ret) 3679 if (ret)
3697 return ret; 3680 goto out;
3698 3681
3699 /* 3682 /*
3700 * If there is a parent, see if it has disappeared due to the 3683 * If there is a parent, see if it has disappeared due to the
@@ -3703,30 +3686,46 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3703 if (rbd_dev->parent) { 3686 if (rbd_dev->parent) {
3704 ret = rbd_dev_v2_parent_info(rbd_dev); 3687 ret = rbd_dev_v2_parent_info(rbd_dev);
3705 if (ret) 3688 if (ret)
3706 return ret; 3689 goto out;
3707 } 3690 }
3708 3691
3709 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { 3692 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3710 if (rbd_dev->mapping.size != rbd_dev->header.image_size) 3693 rbd_dev->mapping.size = rbd_dev->header.image_size;
3711 rbd_dev->mapping.size = rbd_dev->header.image_size;
3712 } else { 3694 } else {
3713 /* validate mapped snapshot's EXISTS flag */ 3695 /* validate mapped snapshot's EXISTS flag */
3714 rbd_exists_validate(rbd_dev); 3696 rbd_exists_validate(rbd_dev);
3715 } 3697 }
3716 3698
3699out:
3717 up_write(&rbd_dev->header_rwsem); 3700 up_write(&rbd_dev->header_rwsem);
3718 3701 if (!ret && mapping_size != rbd_dev->mapping.size)
3719 if (mapping_size != rbd_dev->mapping.size)
3720 rbd_dev_update_size(rbd_dev); 3702 rbd_dev_update_size(rbd_dev);
3721 3703
3704 return ret;
3705}
3706
3707static int rbd_init_request(void *data, struct request *rq,
3708 unsigned int hctx_idx, unsigned int request_idx,
3709 unsigned int numa_node)
3710{
3711 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3712
3713 INIT_WORK(work, rbd_queue_workfn);
3722 return 0; 3714 return 0;
3723} 3715}
3724 3716
3717static struct blk_mq_ops rbd_mq_ops = {
3718 .queue_rq = rbd_queue_rq,
3719 .map_queue = blk_mq_map_queue,
3720 .init_request = rbd_init_request,
3721};
3722
3725static int rbd_init_disk(struct rbd_device *rbd_dev) 3723static int rbd_init_disk(struct rbd_device *rbd_dev)
3726{ 3724{
3727 struct gendisk *disk; 3725 struct gendisk *disk;
3728 struct request_queue *q; 3726 struct request_queue *q;
3729 u64 segment_size; 3727 u64 segment_size;
3728 int err;
3730 3729
3731 /* create gendisk info */ 3730 /* create gendisk info */
3732 disk = alloc_disk(single_major ? 3731 disk = alloc_disk(single_major ?
@@ -3744,10 +3743,25 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
3744 disk->fops = &rbd_bd_ops; 3743 disk->fops = &rbd_bd_ops;
3745 disk->private_data = rbd_dev; 3744 disk->private_data = rbd_dev;
3746 3745
3747 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); 3746 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3748 if (!q) 3747 rbd_dev->tag_set.ops = &rbd_mq_ops;
3748 rbd_dev->tag_set.queue_depth = BLKDEV_MAX_RQ;
3749 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
3750 rbd_dev->tag_set.flags =
3751 BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
3752 rbd_dev->tag_set.nr_hw_queues = 1;
3753 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3754
3755 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3756 if (err)
3749 goto out_disk; 3757 goto out_disk;
3750 3758
3759 q = blk_mq_init_queue(&rbd_dev->tag_set);
3760 if (IS_ERR(q)) {
3761 err = PTR_ERR(q);
3762 goto out_tag_set;
3763 }
3764
3751 /* We use the default size, but let's be explicit about it. */ 3765 /* We use the default size, but let's be explicit about it. */
3752 blk_queue_physical_block_size(q, SECTOR_SIZE); 3766 blk_queue_physical_block_size(q, SECTOR_SIZE);
3753 3767
@@ -3773,10 +3787,11 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
3773 rbd_dev->disk = disk; 3787 rbd_dev->disk = disk;
3774 3788
3775 return 0; 3789 return 0;
3790out_tag_set:
3791 blk_mq_free_tag_set(&rbd_dev->tag_set);
3776out_disk: 3792out_disk:
3777 put_disk(disk); 3793 put_disk(disk);
3778 3794 return err;
3779 return -ENOMEM;
3780} 3795}
3781 3796
3782/* 3797/*
@@ -4033,8 +4048,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4033 return NULL; 4048 return NULL;
4034 4049
4035 spin_lock_init(&rbd_dev->lock); 4050 spin_lock_init(&rbd_dev->lock);
4036 INIT_LIST_HEAD(&rbd_dev->rq_queue);
4037 INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
4038 rbd_dev->flags = 0; 4051 rbd_dev->flags = 0;
4039 atomic_set(&rbd_dev->parent_ref, 0); 4052 atomic_set(&rbd_dev->parent_ref, 0);
4040 INIT_LIST_HEAD(&rbd_dev->node); 4053 INIT_LIST_HEAD(&rbd_dev->node);
@@ -4274,32 +4287,22 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4274 } 4287 }
4275 4288
4276 /* 4289 /*
4277 * We always update the parent overlap. If it's zero we 4290 * We always update the parent overlap. If it's zero we issue
4278 * treat it specially. 4291 * a warning, as we will proceed as if there was no parent.
4279 */ 4292 */
4280 rbd_dev->parent_overlap = overlap;
4281 if (!overlap) { 4293 if (!overlap) {
4282
4283 /* A null parent_spec indicates it's the initial probe */
4284
4285 if (parent_spec) { 4294 if (parent_spec) {
4286 /* 4295 /* refresh, careful to warn just once */
4287 * The overlap has become zero, so the clone 4296 if (rbd_dev->parent_overlap)
4288 * must have been resized down to 0 at some 4297 rbd_warn(rbd_dev,
4289 * point. Treat this the same as a flatten. 4298 "clone now standalone (overlap became 0)");
4290 */
4291 rbd_dev_parent_put(rbd_dev);
4292 pr_info("%s: clone image now standalone\n",
4293 rbd_dev->disk->disk_name);
4294 } else { 4299 } else {
4295 /* 4300 /* initial probe */
4296 * For the initial probe, if we find the 4301 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
4297 * overlap is zero we just pretend there was
4298 * no parent image.
4299 */
4300 rbd_warn(rbd_dev, "ignoring parent with overlap 0");
4301 } 4302 }
4302 } 4303 }
4304 rbd_dev->parent_overlap = overlap;
4305
4303out: 4306out:
4304 ret = 0; 4307 ret = 0;
4305out_err: 4308out_err:
@@ -4771,36 +4774,6 @@ static inline size_t next_token(const char **buf)
4771} 4774}
4772 4775
4773/* 4776/*
4774 * Finds the next token in *buf, and if the provided token buffer is
4775 * big enough, copies the found token into it. The result, if
4776 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4777 * must be terminated with '\0' on entry.
4778 *
4779 * Returns the length of the token found (not including the '\0').
4780 * Return value will be 0 if no token is found, and it will be >=
4781 * token_size if the token would not fit.
4782 *
4783 * The *buf pointer will be updated to point beyond the end of the
4784 * found token. Note that this occurs even if the token buffer is
4785 * too small to hold it.
4786 */
4787static inline size_t copy_token(const char **buf,
4788 char *token,
4789 size_t token_size)
4790{
4791 size_t len;
4792
4793 len = next_token(buf);
4794 if (len < token_size) {
4795 memcpy(token, *buf, len);
4796 *(token + len) = '\0';
4797 }
4798 *buf += len;
4799
4800 return len;
4801}
4802
4803/*
4804 * Finds the next token in *buf, dynamically allocates a buffer big 4777 * Finds the next token in *buf, dynamically allocates a buffer big
4805 * enough to hold a copy of it, and copies the token into the new 4778 * enough to hold a copy of it, and copies the token into the new
4806 * buffer. The copy is guaranteed to be terminated with '\0'. Note 4779 * buffer. The copy is guaranteed to be terminated with '\0'. Note