aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristoph Hellwig <hch@lst.de>2015-01-13 11:20:04 -0500
committerIlya Dryomov <idryomov@gmail.com>2015-02-19 06:27:42 -0500
commit7ad18afad02f9802f1eeade91cf880b97e7a9902 (patch)
tree20e2423a0fd60f005c7035e9d05460e041fae0c4
parent4d41cef279f72f3965140fffa6b48f2a7d51408c (diff)
rbd: convert to blk-mq
This converts the rbd driver to use the blk-mq infrastructure. Except for switching to a per-request work item this is almost mechanical. This was tested by Alexandre DERUMIER in November, and found to give him 120000 iops, although the only comparism available was an old 3.10 kernel which gave 80000iops. Signed-off-by: Christoph Hellwig <hch@lst.de> Reviewed-by: Alex Elder <elder@linaro.org> [idryomov@gmail.com: context, blk_mq_init_queue() EH] Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
-rw-r--r--drivers/block/rbd.c122
1 files changed, 68 insertions, 54 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index e818c2a6ffb1..b40af3203089 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -38,6 +38,7 @@
38#include <linux/kernel.h> 38#include <linux/kernel.h>
39#include <linux/device.h> 39#include <linux/device.h>
40#include <linux/module.h> 40#include <linux/module.h>
41#include <linux/blk-mq.h>
41#include <linux/fs.h> 42#include <linux/fs.h>
42#include <linux/blkdev.h> 43#include <linux/blkdev.h>
43#include <linux/slab.h> 44#include <linux/slab.h>
@@ -340,9 +341,7 @@ struct rbd_device {
340 341
341 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 342 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342 343
343 struct list_head rq_queue; /* incoming rq queue */
344 spinlock_t lock; /* queue, flags, open_count */ 344 spinlock_t lock; /* queue, flags, open_count */
345 struct work_struct rq_work;
346 345
347 struct rbd_image_header header; 346 struct rbd_image_header header;
348 unsigned long flags; /* possibly lock protected */ 347 unsigned long flags; /* possibly lock protected */
@@ -360,6 +359,9 @@ struct rbd_device {
360 atomic_t parent_ref; 359 atomic_t parent_ref;
361 struct rbd_device *parent; 360 struct rbd_device *parent;
362 361
362 /* Block layer tags. */
363 struct blk_mq_tag_set tag_set;
364
363 /* protects updating the header */ 365 /* protects updating the header */
364 struct rw_semaphore header_rwsem; 366 struct rw_semaphore header_rwsem;
365 367
@@ -1817,7 +1819,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1817 1819
1818 /* 1820 /*
1819 * We support a 64-bit length, but ultimately it has to be 1821 * We support a 64-bit length, but ultimately it has to be
1820 * passed to blk_end_request(), which takes an unsigned int. 1822 * passed to the block layer, which just supports a 32-bit
1823 * length field.
1821 */ 1824 */
1822 obj_request->xferred = osd_req->r_reply_op_len[0]; 1825 obj_request->xferred = osd_req->r_reply_op_len[0];
1823 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1826 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
@@ -2275,7 +2278,10 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2275 more = obj_request->which < img_request->obj_request_count - 1; 2278 more = obj_request->which < img_request->obj_request_count - 1;
2276 } else { 2279 } else {
2277 rbd_assert(img_request->rq != NULL); 2280 rbd_assert(img_request->rq != NULL);
2278 more = blk_end_request(img_request->rq, result, xferred); 2281
2282 more = blk_update_request(img_request->rq, result, xferred);
2283 if (!more)
2284 __blk_mq_end_request(img_request->rq, result);
2279 } 2285 }
2280 2286
2281 return more; 2287 return more;
@@ -3304,8 +3310,10 @@ out:
3304 return ret; 3310 return ret;
3305} 3311}
3306 3312
3307static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) 3313static void rbd_queue_workfn(struct work_struct *work)
3308{ 3314{
3315 struct request *rq = blk_mq_rq_from_pdu(work);
3316 struct rbd_device *rbd_dev = rq->q->queuedata;
3309 struct rbd_img_request *img_request; 3317 struct rbd_img_request *img_request;
3310 struct ceph_snap_context *snapc = NULL; 3318 struct ceph_snap_context *snapc = NULL;
3311 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; 3319 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
@@ -3314,6 +3322,13 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3314 u64 mapping_size; 3322 u64 mapping_size;
3315 int result; 3323 int result;
3316 3324
3325 if (rq->cmd_type != REQ_TYPE_FS) {
3326 dout("%s: non-fs request type %d\n", __func__,
3327 (int) rq->cmd_type);
3328 result = -EIO;
3329 goto err;
3330 }
3331
3317 if (rq->cmd_flags & REQ_DISCARD) 3332 if (rq->cmd_flags & REQ_DISCARD)
3318 op_type = OBJ_OP_DISCARD; 3333 op_type = OBJ_OP_DISCARD;
3319 else if (rq->cmd_flags & REQ_WRITE) 3334 else if (rq->cmd_flags & REQ_WRITE)
@@ -3359,6 +3374,8 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3359 goto err_rq; /* Shouldn't happen */ 3374 goto err_rq; /* Shouldn't happen */
3360 } 3375 }
3361 3376
3377 blk_mq_start_request(rq);
3378
3362 down_read(&rbd_dev->header_rwsem); 3379 down_read(&rbd_dev->header_rwsem);
3363 mapping_size = rbd_dev->mapping.size; 3380 mapping_size = rbd_dev->mapping.size;
3364 if (op_type != OBJ_OP_READ) { 3381 if (op_type != OBJ_OP_READ) {
@@ -3404,53 +3421,18 @@ err_rq:
3404 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 3421 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3405 obj_op_name(op_type), length, offset, result); 3422 obj_op_name(op_type), length, offset, result);
3406 ceph_put_snap_context(snapc); 3423 ceph_put_snap_context(snapc);
3407 blk_end_request_all(rq, result); 3424err:
3425 blk_mq_end_request(rq, result);
3408} 3426}
3409 3427
3410static void rbd_request_workfn(struct work_struct *work) 3428static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3429 const struct blk_mq_queue_data *bd)
3411{ 3430{
3412 struct rbd_device *rbd_dev = 3431 struct request *rq = bd->rq;
3413 container_of(work, struct rbd_device, rq_work); 3432 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3414 struct request *rq, *next;
3415 LIST_HEAD(requests);
3416
3417 spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3418 list_splice_init(&rbd_dev->rq_queue, &requests);
3419 spin_unlock_irq(&rbd_dev->lock);
3420 3433
3421 list_for_each_entry_safe(rq, next, &requests, queuelist) { 3434 queue_work(rbd_wq, work);
3422 list_del_init(&rq->queuelist); 3435 return BLK_MQ_RQ_QUEUE_OK;
3423 rbd_handle_request(rbd_dev, rq);
3424 }
3425}
3426
3427/*
3428 * Called with q->queue_lock held and interrupts disabled, possibly on
3429 * the way to schedule(). Do not sleep here!
3430 */
3431static void rbd_request_fn(struct request_queue *q)
3432{
3433 struct rbd_device *rbd_dev = q->queuedata;
3434 struct request *rq;
3435 int queued = 0;
3436
3437 rbd_assert(rbd_dev);
3438
3439 while ((rq = blk_fetch_request(q))) {
3440 /* Ignore any non-FS requests that filter through. */
3441 if (rq->cmd_type != REQ_TYPE_FS) {
3442 dout("%s: non-fs request type %d\n", __func__,
3443 (int) rq->cmd_type);
3444 __blk_end_request_all(rq, 0);
3445 continue;
3446 }
3447
3448 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3449 queued++;
3450 }
3451
3452 if (queued)
3453 queue_work(rbd_wq, &rbd_dev->rq_work);
3454} 3436}
3455 3437
3456/* 3438/*
@@ -3511,6 +3493,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
3511 del_gendisk(disk); 3493 del_gendisk(disk);
3512 if (disk->queue) 3494 if (disk->queue)
3513 blk_cleanup_queue(disk->queue); 3495 blk_cleanup_queue(disk->queue);
3496 blk_mq_free_tag_set(&rbd_dev->tag_set);
3514 } 3497 }
3515 put_disk(disk); 3498 put_disk(disk);
3516} 3499}
@@ -3721,11 +3704,28 @@ out:
3721 return ret; 3704 return ret;
3722} 3705}
3723 3706
3707static int rbd_init_request(void *data, struct request *rq,
3708 unsigned int hctx_idx, unsigned int request_idx,
3709 unsigned int numa_node)
3710{
3711 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3712
3713 INIT_WORK(work, rbd_queue_workfn);
3714 return 0;
3715}
3716
3717static struct blk_mq_ops rbd_mq_ops = {
3718 .queue_rq = rbd_queue_rq,
3719 .map_queue = blk_mq_map_queue,
3720 .init_request = rbd_init_request,
3721};
3722
3724static int rbd_init_disk(struct rbd_device *rbd_dev) 3723static int rbd_init_disk(struct rbd_device *rbd_dev)
3725{ 3724{
3726 struct gendisk *disk; 3725 struct gendisk *disk;
3727 struct request_queue *q; 3726 struct request_queue *q;
3728 u64 segment_size; 3727 u64 segment_size;
3728 int err;
3729 3729
3730 /* create gendisk info */ 3730 /* create gendisk info */
3731 disk = alloc_disk(single_major ? 3731 disk = alloc_disk(single_major ?
@@ -3743,10 +3743,25 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
3743 disk->fops = &rbd_bd_ops; 3743 disk->fops = &rbd_bd_ops;
3744 disk->private_data = rbd_dev; 3744 disk->private_data = rbd_dev;
3745 3745
3746 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); 3746 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3747 if (!q) 3747 rbd_dev->tag_set.ops = &rbd_mq_ops;
3748 rbd_dev->tag_set.queue_depth = BLKDEV_MAX_RQ;
3749 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
3750 rbd_dev->tag_set.flags =
3751 BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
3752 rbd_dev->tag_set.nr_hw_queues = 1;
3753 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3754
3755 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3756 if (err)
3748 goto out_disk; 3757 goto out_disk;
3749 3758
3759 q = blk_mq_init_queue(&rbd_dev->tag_set);
3760 if (IS_ERR(q)) {
3761 err = PTR_ERR(q);
3762 goto out_tag_set;
3763 }
3764
3750 /* We use the default size, but let's be explicit about it. */ 3765 /* We use the default size, but let's be explicit about it. */
3751 blk_queue_physical_block_size(q, SECTOR_SIZE); 3766 blk_queue_physical_block_size(q, SECTOR_SIZE);
3752 3767
@@ -3772,10 +3787,11 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
3772 rbd_dev->disk = disk; 3787 rbd_dev->disk = disk;
3773 3788
3774 return 0; 3789 return 0;
3790out_tag_set:
3791 blk_mq_free_tag_set(&rbd_dev->tag_set);
3775out_disk: 3792out_disk:
3776 put_disk(disk); 3793 put_disk(disk);
3777 3794 return err;
3778 return -ENOMEM;
3779} 3795}
3780 3796
3781/* 3797/*
@@ -4032,8 +4048,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4032 return NULL; 4048 return NULL;
4033 4049
4034 spin_lock_init(&rbd_dev->lock); 4050 spin_lock_init(&rbd_dev->lock);
4035 INIT_LIST_HEAD(&rbd_dev->rq_queue);
4036 INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
4037 rbd_dev->flags = 0; 4051 rbd_dev->flags = 0;
4038 atomic_set(&rbd_dev->parent_ref, 0); 4052 atomic_set(&rbd_dev->parent_ref, 0);
4039 INIT_LIST_HEAD(&rbd_dev->node); 4053 INIT_LIST_HEAD(&rbd_dev->node);