summaryrefslogtreecommitdiffstats
path: root/drivers/block/rbd.c
diff options
context:
space:
mode:
authorIlya Dryomov <ilya.dryomov@inktank.com>2014-08-04 10:04:39 -0400
committerIlya Dryomov <ilya.dryomov@inktank.com>2014-08-07 06:56:20 -0400
commitbc1ecc65a259fa9333dc8bd6a4ba0cf03b7d4bf8 (patch)
tree270e618c8c98afb1bb591415a343fab58060f36c /drivers/block/rbd.c
parent282c105225ec3229f344c5fced795b9e1e634440 (diff)
rbd: rework rbd_request_fn()
While it was never a good idea to sleep in request_fn(), commit 34c6bc2c919a ("locking/mutexes: Add extra reschedule point") made it a *bad* idea. mutex_lock() since 3.15 may reschedule *before* putting task on the mutex wait queue, which for tasks in !TASK_RUNNING state means block forever. request_fn() may be called with !TASK_RUNNING on the way to schedule() in io_schedule(). Offload request handling to a workqueue, one per rbd device, to avoid calling blocking primitives from rbd_request_fn(). Fixes: http://tracker.ceph.com/issues/8818 Cc: stable@vger.kernel.org # 3.16, needs backporting for 3.15 Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com> Tested-by: Eric Eastman <eric0e@aol.com> Tested-by: Greg Wilson <greg.wilson@keepertech.com> Reviewed-by: Alex Elder <elder@linaro.org>
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r--drivers/block/rbd.c196
1 files changed, 119 insertions, 77 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index cbc89fa9a677..4515b128d0b4 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -42,6 +42,7 @@
42#include <linux/blkdev.h> 42#include <linux/blkdev.h>
43#include <linux/slab.h> 43#include <linux/slab.h>
44#include <linux/idr.h> 44#include <linux/idr.h>
45#include <linux/workqueue.h>
45 46
46#include "rbd_types.h" 47#include "rbd_types.h"
47 48
@@ -332,7 +333,10 @@ struct rbd_device {
332 333
333 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 334 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
334 335
336 struct list_head rq_queue; /* incoming rq queue */
335 spinlock_t lock; /* queue, flags, open_count */ 337 spinlock_t lock; /* queue, flags, open_count */
338 struct workqueue_struct *rq_wq;
339 struct work_struct rq_work;
336 340
337 struct rbd_image_header header; 341 struct rbd_image_header header;
338 unsigned long flags; /* possibly lock protected */ 342 unsigned long flags; /* possibly lock protected */
@@ -3176,102 +3180,129 @@ out:
3176 return ret; 3180 return ret;
3177} 3181}
3178 3182
3179static void rbd_request_fn(struct request_queue *q) 3183static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3180 __releases(q->queue_lock) __acquires(q->queue_lock)
3181{ 3184{
3182 struct rbd_device *rbd_dev = q->queuedata; 3185 struct rbd_img_request *img_request;
3183 struct request *rq; 3186 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3187 u64 length = blk_rq_bytes(rq);
3188 bool wr = rq_data_dir(rq) == WRITE;
3184 int result; 3189 int result;
3185 3190
3186 while ((rq = blk_fetch_request(q))) { 3191 /* Ignore/skip any zero-length requests */
3187 bool write_request = rq_data_dir(rq) == WRITE;
3188 struct rbd_img_request *img_request;
3189 u64 offset;
3190 u64 length;
3191 3192
3192 /* Ignore any non-FS requests that filter through. */ 3193 if (!length) {
3194 dout("%s: zero-length request\n", __func__);
3195 result = 0;
3196 goto err_rq;
3197 }
3193 3198
3194 if (rq->cmd_type != REQ_TYPE_FS) { 3199 /* Disallow writes to a read-only device */
3195 dout("%s: non-fs request type %d\n", __func__, 3200
3196 (int) rq->cmd_type); 3201 if (wr) {
3197 __blk_end_request_all(rq, 0); 3202 if (rbd_dev->mapping.read_only) {
3198 continue; 3203 result = -EROFS;
3204 goto err_rq;
3199 } 3205 }
3206 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3207 }
3200 3208
3201 /* Ignore/skip any zero-length requests */ 3209 /*
3210 * Quit early if the mapped snapshot no longer exists. It's
3211 * still possible the snapshot will have disappeared by the
3212 * time our request arrives at the osd, but there's no sense in
3213 * sending it if we already know.
3214 */
3215 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3216 dout("request for non-existent snapshot");
3217 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3218 result = -ENXIO;
3219 goto err_rq;
3220 }
3202 3221
3203 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; 3222 if (offset && length > U64_MAX - offset + 1) {
3204 length = (u64) blk_rq_bytes(rq); 3223 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3224 length);
3225 result = -EINVAL;
3226 goto err_rq; /* Shouldn't happen */
3227 }
3205 3228
3206 if (!length) { 3229 if (offset + length > rbd_dev->mapping.size) {
3207 dout("%s: zero-length request\n", __func__); 3230 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3208 __blk_end_request_all(rq, 0); 3231 length, rbd_dev->mapping.size);
3209 continue; 3232 result = -EIO;
3210 } 3233 goto err_rq;
3234 }
3211 3235
3212 spin_unlock_irq(q->queue_lock); 3236 img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
3237 if (!img_request) {
3238 result = -ENOMEM;
3239 goto err_rq;
3240 }
3241 img_request->rq = rq;
3213 3242
3214 /* Disallow writes to a read-only device */ 3243 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
3244 if (result)
3245 goto err_img_request;
3215 3246
3216 if (write_request) { 3247 result = rbd_img_request_submit(img_request);
3217 result = -EROFS; 3248 if (result)
3218 if (rbd_dev->mapping.read_only) 3249 goto err_img_request;
3219 goto end_request;
3220 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3221 }
3222 3250
3223 /* 3251 return;
3224 * Quit early if the mapped snapshot no longer
3225 * exists. It's still possible the snapshot will
3226 * have disappeared by the time our request arrives
3227 * at the osd, but there's no sense in sending it if
3228 * we already know.
3229 */
3230 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3231 dout("request for non-existent snapshot");
3232 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3233 result = -ENXIO;
3234 goto end_request;
3235 }
3236 3252
3237 result = -EINVAL; 3253err_img_request:
3238 if (offset && length > U64_MAX - offset + 1) { 3254 rbd_img_request_put(img_request);
3239 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n", 3255err_rq:
3240 offset, length); 3256 if (result)
3241 goto end_request; /* Shouldn't happen */ 3257 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3242 } 3258 wr ? "write" : "read", length, offset, result);
3259 blk_end_request_all(rq, result);
3260}
3243 3261
3244 result = -EIO; 3262static void rbd_request_workfn(struct work_struct *work)
3245 if (offset + length > rbd_dev->mapping.size) { 3263{
3246 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n", 3264 struct rbd_device *rbd_dev =
3247 offset, length, rbd_dev->mapping.size); 3265 container_of(work, struct rbd_device, rq_work);
3248 goto end_request; 3266 struct request *rq, *next;
3249 } 3267 LIST_HEAD(requests);
3250 3268
3251 result = -ENOMEM; 3269 spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3252 img_request = rbd_img_request_create(rbd_dev, offset, length, 3270 list_splice_init(&rbd_dev->rq_queue, &requests);
3253 write_request); 3271 spin_unlock_irq(&rbd_dev->lock);
3254 if (!img_request)
3255 goto end_request;
3256 3272
3257 img_request->rq = rq; 3273 list_for_each_entry_safe(rq, next, &requests, queuelist) {
3274 list_del_init(&rq->queuelist);
3275 rbd_handle_request(rbd_dev, rq);
3276 }
3277}
3258 3278
3259 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 3279/*
3260 rq->bio); 3280 * Called with q->queue_lock held and interrupts disabled, possibly on
3261 if (!result) 3281 * the way to schedule(). Do not sleep here!
3262 result = rbd_img_request_submit(img_request); 3282 */
3263 if (result) 3283static void rbd_request_fn(struct request_queue *q)
3264 rbd_img_request_put(img_request); 3284{
3265end_request: 3285 struct rbd_device *rbd_dev = q->queuedata;
3266 spin_lock_irq(q->queue_lock); 3286 struct request *rq;
3267 if (result < 0) { 3287 int queued = 0;
3268 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n", 3288
3269 write_request ? "write" : "read", 3289 rbd_assert(rbd_dev);
3270 length, offset, result); 3290
3271 3291 while ((rq = blk_fetch_request(q))) {
3272 __blk_end_request_all(rq, result); 3292 /* Ignore any non-FS requests that filter through. */
3293 if (rq->cmd_type != REQ_TYPE_FS) {
3294 dout("%s: non-fs request type %d\n", __func__,
3295 (int) rq->cmd_type);
3296 __blk_end_request_all(rq, 0);
3297 continue;
3273 } 3298 }
3299
3300 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3301 queued++;
3274 } 3302 }
3303
3304 if (queued)
3305 queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
3275} 3306}
3276 3307
3277/* 3308/*
@@ -3847,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3847 return NULL; 3878 return NULL;
3848 3879
3849 spin_lock_init(&rbd_dev->lock); 3880 spin_lock_init(&rbd_dev->lock);
3881 INIT_LIST_HEAD(&rbd_dev->rq_queue);
3882 INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
3850 rbd_dev->flags = 0; 3883 rbd_dev->flags = 0;
3851 atomic_set(&rbd_dev->parent_ref, 0); 3884 atomic_set(&rbd_dev->parent_ref, 0);
3852 INIT_LIST_HEAD(&rbd_dev->node); 3885 INIT_LIST_HEAD(&rbd_dev->node);
@@ -5051,12 +5084,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5051 ret = rbd_dev_mapping_set(rbd_dev); 5084 ret = rbd_dev_mapping_set(rbd_dev);
5052 if (ret) 5085 if (ret)
5053 goto err_out_disk; 5086 goto err_out_disk;
5087
5054 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 5088 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5055 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); 5089 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5056 5090
5091 rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0);
5092 if (!rbd_dev->rq_wq)
5093 goto err_out_mapping;
5094
5057 ret = rbd_bus_add_dev(rbd_dev); 5095 ret = rbd_bus_add_dev(rbd_dev);
5058 if (ret) 5096 if (ret)
5059 goto err_out_mapping; 5097 goto err_out_workqueue;
5060 5098
5061 /* Everything's ready. Announce the disk to the world. */ 5099 /* Everything's ready. Announce the disk to the world. */
5062 5100
@@ -5068,6 +5106,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5068 5106
5069 return ret; 5107 return ret;
5070 5108
5109err_out_workqueue:
5110 destroy_workqueue(rbd_dev->rq_wq);
5111 rbd_dev->rq_wq = NULL;
5071err_out_mapping: 5112err_out_mapping:
5072 rbd_dev_mapping_clear(rbd_dev); 5113 rbd_dev_mapping_clear(rbd_dev);
5073err_out_disk: 5114err_out_disk:
@@ -5314,6 +5355,7 @@ static void rbd_dev_device_release(struct device *dev)
5314{ 5355{
5315 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5356 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5316 5357
5358 destroy_workqueue(rbd_dev->rq_wq);
5317 rbd_free_disk(rbd_dev); 5359 rbd_free_disk(rbd_dev);
5318 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5360 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5319 rbd_dev_mapping_clear(rbd_dev); 5361 rbd_dev_mapping_clear(rbd_dev);