aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/direct-io.c140
1 files changed, 66 insertions, 74 deletions
diff --git a/fs/direct-io.c b/fs/direct-io.c
index b296942ff7d5..bc1cbf9149f7 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -121,9 +121,8 @@ struct dio {
121 int page_errors; /* errno from get_user_pages() */ 121 int page_errors; /* errno from get_user_pages() */
122 122
123 /* BIO completion state */ 123 /* BIO completion state */
124 atomic_t refcount; /* direct_io_worker() and bios */
124 spinlock_t bio_lock; /* protects BIO fields below */ 125 spinlock_t bio_lock; /* protects BIO fields below */
125 int bio_count; /* nr bios to be completed */
126 int bios_in_flight; /* nr bios in flight */
127 struct bio *bio_list; /* singly linked via bi_private */ 126 struct bio *bio_list; /* singly linked via bi_private */
128 struct task_struct *waiter; /* waiting task (NULL if none) */ 127 struct task_struct *waiter; /* waiting task (NULL if none) */
129 128
@@ -256,44 +255,27 @@ static int dio_complete(struct dio *dio, loff_t offset, int ret)
256 * Called when a BIO has been processed. If the count goes to zero then IO is 255 * Called when a BIO has been processed. If the count goes to zero then IO is
257 * complete and we can signal this to the AIO layer. 256 * complete and we can signal this to the AIO layer.
258 */ 257 */
259static void finished_one_bio(struct dio *dio) 258static void dio_complete_aio(struct dio *dio)
260{ 259{
261 unsigned long flags; 260 unsigned long flags;
261 int ret;
262 262
263 spin_lock_irqsave(&dio->bio_lock, flags); 263 ret = dio_complete(dio, dio->iocb->ki_pos, 0);
264 if (dio->bio_count == 1) {
265 if (dio->is_async) {
266 int ret;
267
268 /*
269 * Last reference to the dio is going away.
270 * Drop spinlock and complete the DIO.
271 */
272 spin_unlock_irqrestore(&dio->bio_lock, flags);
273
274 ret = dio_complete(dio, dio->iocb->ki_pos, 0);
275 264
276 /* Complete AIO later if falling back to buffered i/o */ 265 /* Complete AIO later if falling back to buffered i/o */
277 if (dio->result == dio->size || 266 if (dio->result == dio->size ||
278 ((dio->rw == READ) && dio->result)) { 267 ((dio->rw == READ) && dio->result)) {
279 aio_complete(dio->iocb, ret, 0); 268 aio_complete(dio->iocb, ret, 0);
280 kfree(dio); 269 kfree(dio);
281 return; 270 } else {
282 } else { 271 /*
283 /* 272 * Falling back to buffered
284 * Falling back to buffered 273 */
285 */ 274 spin_lock_irqsave(&dio->bio_lock, flags);
286 spin_lock_irqsave(&dio->bio_lock, flags); 275 if (dio->waiter)
287 dio->bio_count--; 276 wake_up_process(dio->waiter);
288 if (dio->waiter) 277 spin_unlock_irqrestore(&dio->bio_lock, flags);
289 wake_up_process(dio->waiter);
290 spin_unlock_irqrestore(&dio->bio_lock, flags);
291 return;
292 }
293 }
294 } 278 }
295 dio->bio_count--;
296 spin_unlock_irqrestore(&dio->bio_lock, flags);
297} 279}
298 280
299static int dio_bio_complete(struct dio *dio, struct bio *bio); 281static int dio_bio_complete(struct dio *dio, struct bio *bio);
@@ -309,6 +291,10 @@ static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
309 291
310 /* cleanup the bio */ 292 /* cleanup the bio */
311 dio_bio_complete(dio, bio); 293 dio_bio_complete(dio, bio);
294
295 if (atomic_dec_and_test(&dio->refcount))
296 dio_complete_aio(dio);
297
312 return 0; 298 return 0;
313} 299}
314 300
@@ -330,8 +316,7 @@ static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error)
330 spin_lock_irqsave(&dio->bio_lock, flags); 316 spin_lock_irqsave(&dio->bio_lock, flags);
331 bio->bi_private = dio->bio_list; 317 bio->bi_private = dio->bio_list;
332 dio->bio_list = bio; 318 dio->bio_list = bio;
333 dio->bios_in_flight--; 319 if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter)
334 if (dio->waiter && dio->bios_in_flight == 0)
335 wake_up_process(dio->waiter); 320 wake_up_process(dio->waiter);
336 spin_unlock_irqrestore(&dio->bio_lock, flags); 321 spin_unlock_irqrestore(&dio->bio_lock, flags);
337 return 0; 322 return 0;
@@ -362,17 +347,15 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
362 * In the AIO read case we speculatively dirty the pages before starting IO. 347 * In the AIO read case we speculatively dirty the pages before starting IO.
363 * During IO completion, any of these pages which happen to have been written 348 * During IO completion, any of these pages which happen to have been written
364 * back will be redirtied by bio_check_pages_dirty(). 349 * back will be redirtied by bio_check_pages_dirty().
350 *
351 * bios hold a dio reference between submit_bio and ->end_io.
365 */ 352 */
366static void dio_bio_submit(struct dio *dio) 353static void dio_bio_submit(struct dio *dio)
367{ 354{
368 struct bio *bio = dio->bio; 355 struct bio *bio = dio->bio;
369 unsigned long flags;
370 356
371 bio->bi_private = dio; 357 bio->bi_private = dio;
372 spin_lock_irqsave(&dio->bio_lock, flags); 358 atomic_inc(&dio->refcount);
373 dio->bio_count++;
374 dio->bios_in_flight++;
375 spin_unlock_irqrestore(&dio->bio_lock, flags);
376 if (dio->is_async && dio->rw == READ) 359 if (dio->is_async && dio->rw == READ)
377 bio_set_pages_dirty(bio); 360 bio_set_pages_dirty(bio);
378 submit_bio(dio->rw, bio); 361 submit_bio(dio->rw, bio);
@@ -390,18 +373,28 @@ static void dio_cleanup(struct dio *dio)
390 page_cache_release(dio_get_page(dio)); 373 page_cache_release(dio_get_page(dio));
391} 374}
392 375
376static int wait_for_more_bios(struct dio *dio)
377{
378 assert_spin_locked(&dio->bio_lock);
379
380 return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
381}
382
393/* 383/*
394 * Wait for the next BIO to complete. Remove it and return it. 384 * Wait for the next BIO to complete. Remove it and return it. NULL is
385 * returned once all BIOs have been completed. This must only be called once
386 * all bios have been issued so that dio->refcount can only decrease. This
387 * requires that that the caller hold a reference on the dio.
395 */ 388 */
396static struct bio *dio_await_one(struct dio *dio) 389static struct bio *dio_await_one(struct dio *dio)
397{ 390{
398 unsigned long flags; 391 unsigned long flags;
399 struct bio *bio; 392 struct bio *bio = NULL;
400 393
401 spin_lock_irqsave(&dio->bio_lock, flags); 394 spin_lock_irqsave(&dio->bio_lock, flags);
402 while (dio->bio_list == NULL) { 395 while (wait_for_more_bios(dio)) {
403 set_current_state(TASK_UNINTERRUPTIBLE); 396 set_current_state(TASK_UNINTERRUPTIBLE);
404 if (dio->bio_list == NULL) { 397 if (wait_for_more_bios(dio)) {
405 dio->waiter = current; 398 dio->waiter = current;
406 spin_unlock_irqrestore(&dio->bio_lock, flags); 399 spin_unlock_irqrestore(&dio->bio_lock, flags);
407 io_schedule(); 400 io_schedule();
@@ -410,8 +403,10 @@ static struct bio *dio_await_one(struct dio *dio)
410 } 403 }
411 set_current_state(TASK_RUNNING); 404 set_current_state(TASK_RUNNING);
412 } 405 }
413 bio = dio->bio_list; 406 if (dio->bio_list) {
414 dio->bio_list = bio->bi_private; 407 bio = dio->bio_list;
408 dio->bio_list = bio->bi_private;
409 }
415 spin_unlock_irqrestore(&dio->bio_lock, flags); 410 spin_unlock_irqrestore(&dio->bio_lock, flags);
416 return bio; 411 return bio;
417} 412}
@@ -440,25 +435,24 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio)
440 } 435 }
441 bio_put(bio); 436 bio_put(bio);
442 } 437 }
443 finished_one_bio(dio);
444 return uptodate ? 0 : -EIO; 438 return uptodate ? 0 : -EIO;
445} 439}
446 440
447/* 441/*
448 * Wait on and process all in-flight BIOs. 442 * Wait on and process all in-flight BIOs. This must only be called once
443 * all bios have been issued so that the refcount can only decrease.
444 * This just waits for all bios to make it through dio_bio_complete. IO
445 * errors are propogated through dio->io_error and should be propogated via
446 * dio_complete().
449 */ 447 */
450static void dio_await_completion(struct dio *dio) 448static void dio_await_completion(struct dio *dio)
451{ 449{
452 /* 450 struct bio *bio;
453 * The bio_lock is not held for the read of bio_count. 451 do {
454 * This is ok since it is the dio_bio_complete() that changes 452 bio = dio_await_one(dio);
455 * bio_count. 453 if (bio)
456 */ 454 dio_bio_complete(dio, bio);
457 while (dio->bio_count) { 455 } while (bio);
458 struct bio *bio = dio_await_one(dio);
459 /* io errors are propogated through dio->io_error */
460 dio_bio_complete(dio, bio);
461 }
462} 456}
463 457
464/* 458/*
@@ -995,16 +989,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
995 dio->iocb = iocb; 989 dio->iocb = iocb;
996 dio->i_size = i_size_read(inode); 990 dio->i_size = i_size_read(inode);
997 991
998 /* 992 atomic_set(&dio->refcount, 1);
999 * BIO completion state.
1000 *
1001 * ->bio_count starts out at one, and we decrement it to zero after all
1002 * BIOs are submitted. This to avoid the situation where a really fast
1003 * (or synchronous) device could take the count to zero while we're
1004 * still submitting BIOs.
1005 */
1006 dio->bio_count = 1;
1007 dio->bios_in_flight = 0;
1008 spin_lock_init(&dio->bio_lock); 993 spin_lock_init(&dio->bio_lock);
1009 dio->bio_list = NULL; 994 dio->bio_list = NULL;
1010 dio->waiter = NULL; 995 dio->waiter = NULL;
@@ -1111,7 +1096,11 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
1111 } 1096 }
1112 if (ret == 0) 1097 if (ret == 0)
1113 ret = dio->result; 1098 ret = dio->result;
1114 finished_one_bio(dio); /* This can free the dio */ 1099
1100 /* this can free the dio */
1101 if (atomic_dec_and_test(&dio->refcount))
1102 dio_complete_aio(dio);
1103
1115 if (should_wait) { 1104 if (should_wait) {
1116 unsigned long flags; 1105 unsigned long flags;
1117 /* 1106 /*
@@ -1122,7 +1111,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
1122 1111
1123 spin_lock_irqsave(&dio->bio_lock, flags); 1112 spin_lock_irqsave(&dio->bio_lock, flags);
1124 set_current_state(TASK_UNINTERRUPTIBLE); 1113 set_current_state(TASK_UNINTERRUPTIBLE);
1125 while (dio->bio_count) { 1114 while (atomic_read(&dio->refcount)) {
1126 spin_unlock_irqrestore(&dio->bio_lock, flags); 1115 spin_unlock_irqrestore(&dio->bio_lock, flags);
1127 io_schedule(); 1116 io_schedule();
1128 spin_lock_irqsave(&dio->bio_lock, flags); 1117 spin_lock_irqsave(&dio->bio_lock, flags);
@@ -1133,7 +1122,6 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
1133 kfree(dio); 1122 kfree(dio);
1134 } 1123 }
1135 } else { 1124 } else {
1136 finished_one_bio(dio);
1137 dio_await_completion(dio); 1125 dio_await_completion(dio);
1138 1126
1139 ret = dio_complete(dio, offset, ret); 1127 ret = dio_complete(dio, offset, ret);
@@ -1146,7 +1134,11 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
1146 * i/o, we have to mark the the aio complete. 1134 * i/o, we have to mark the the aio complete.
1147 */ 1135 */
1148 aio_complete(iocb, ret, 0); 1136 aio_complete(iocb, ret, 0);
1149 kfree(dio); 1137
1138 if (atomic_dec_and_test(&dio->refcount))
1139 kfree(dio);
1140 else
1141 BUG();
1150 } 1142 }
1151 return ret; 1143 return ret;
1152} 1144}