aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZach Brown <zach.brown@oracle.com>2006-12-10 05:20:59 -0500
committerLinus Torvalds <torvalds@woody.osdl.org>2006-12-10 12:57:21 -0500
commit0273201e693fd62381f6b1e85b15ffc117d8a46e (patch)
tree2dff76f33cbcaf7b7d88187da122d7aa98268b11
parent17a7b1d74b1207f8f1af40b5d184989076d08f8b (diff)
[PATCH] dio: formalize bio counters as a dio reference count
Previously we had two confusing counts of bio progress. 'bio_count' was decremented as bios were processed and freed by the dio core. It was used to indicate final completion of the dio operation. 'bios_in_flight' reflected how many bios were between submit_bio() and bio->end_io. It was used by the sync path to decide when to wake up and finish completing bios and was ignored by the async path. This patch collapses the two notions into one notion of a dio reference count. bios hold a dio reference when they're between submit_bio and bio->end_io. Since bios_in_flight was only used in the sync path it is now equivalent to dio->refcount - 1 which accounts for direct_io_worker() holding a reference for the duration of the operation. dio_bio_complete() -> finished_one_bio() was called from the sync path after finding bios on the list that the bio->end_io function had deposited. finished_one_bio() can not drop the dio reference on behalf of these bios now because bio->end_io already has. The is_async test in finished_one_bio() meant that it never actually did anything other than drop the bio_count for sync callers. So we remove its refcount decrement, don't call it from dio_bio_complete(), and hoist its call up into the async dio_bio_complete() caller after an explicit refcount decrement. It is renamed dio_complete_aio() to reflect the remaining work it actually does. Signed-off-by: Zach Brown <zach.brown@oracle.com> Cc: Badari Pulavarty <pbadari@us.ibm.com> Cc: Suparna Bhattacharya <suparna@in.ibm.com> Acked-by: Jeff Moyer <jmoyer@redhat.com> Signed-off-by: Andrew Morton <akpm@osdl.org> Signed-off-by: Linus Torvalds <torvalds@osdl.org>
-rw-r--r--fs/direct-io.c140
1 files changed, 66 insertions, 74 deletions
diff --git a/fs/direct-io.c b/fs/direct-io.c
index b296942ff7d5..bc1cbf9149f7 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -121,9 +121,8 @@ struct dio {
121 int page_errors; /* errno from get_user_pages() */ 121 int page_errors; /* errno from get_user_pages() */
122 122
123 /* BIO completion state */ 123 /* BIO completion state */
124 atomic_t refcount; /* direct_io_worker() and bios */
124 spinlock_t bio_lock; /* protects BIO fields below */ 125 spinlock_t bio_lock; /* protects BIO fields below */
125 int bio_count; /* nr bios to be completed */
126 int bios_in_flight; /* nr bios in flight */
127 struct bio *bio_list; /* singly linked via bi_private */ 126 struct bio *bio_list; /* singly linked via bi_private */
128 struct task_struct *waiter; /* waiting task (NULL if none) */ 127 struct task_struct *waiter; /* waiting task (NULL if none) */
129 128
@@ -256,44 +255,27 @@ static int dio_complete(struct dio *dio, loff_t offset, int ret)
256 * Called when a BIO has been processed. If the count goes to zero then IO is 255 * Called when a BIO has been processed. If the count goes to zero then IO is
257 * complete and we can signal this to the AIO layer. 256 * complete and we can signal this to the AIO layer.
258 */ 257 */
259static void finished_one_bio(struct dio *dio) 258static void dio_complete_aio(struct dio *dio)
260{ 259{
261 unsigned long flags; 260 unsigned long flags;
261 int ret;
262 262
263 spin_lock_irqsave(&dio->bio_lock, flags); 263 ret = dio_complete(dio, dio->iocb->ki_pos, 0);
264 if (dio->bio_count == 1) {
265 if (dio->is_async) {
266 int ret;
267
268 /*
269 * Last reference to the dio is going away.
270 * Drop spinlock and complete the DIO.
271 */
272 spin_unlock_irqrestore(&dio->bio_lock, flags);
273
274 ret = dio_complete(dio, dio->iocb->ki_pos, 0);
275 264
276 /* Complete AIO later if falling back to buffered i/o */ 265 /* Complete AIO later if falling back to buffered i/o */
277 if (dio->result == dio->size || 266 if (dio->result == dio->size ||
278 ((dio->rw == READ) && dio->result)) { 267 ((dio->rw == READ) && dio->result)) {
279 aio_complete(dio->iocb, ret, 0); 268 aio_complete(dio->iocb, ret, 0);
280 kfree(dio); 269 kfree(dio);
281 return; 270 } else {
282 } else { 271 /*
283 /* 272 * Falling back to buffered
284 * Falling back to buffered 273 */
285 */ 274 spin_lock_irqsave(&dio->bio_lock, flags);
286 spin_lock_irqsave(&dio->bio_lock, flags); 275 if (dio->waiter)
287 dio->bio_count--; 276 wake_up_process(dio->waiter);
288 if (dio->waiter) 277 spin_unlock_irqrestore(&dio->bio_lock, flags);
289 wake_up_process(dio->waiter);
290 spin_unlock_irqrestore(&dio->bio_lock, flags);
291 return;
292 }
293 }
294 } 278 }
295 dio->bio_count--;
296 spin_unlock_irqrestore(&dio->bio_lock, flags);
297} 279}
298 280
299static int dio_bio_complete(struct dio *dio, struct bio *bio); 281static int dio_bio_complete(struct dio *dio, struct bio *bio);
@@ -309,6 +291,10 @@ static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
309 291
310 /* cleanup the bio */ 292 /* cleanup the bio */
311 dio_bio_complete(dio, bio); 293 dio_bio_complete(dio, bio);
294
295 if (atomic_dec_and_test(&dio->refcount))
296 dio_complete_aio(dio);
297
312 return 0; 298 return 0;
313} 299}
314 300
@@ -330,8 +316,7 @@ static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error)
330 spin_lock_irqsave(&dio->bio_lock, flags); 316 spin_lock_irqsave(&dio->bio_lock, flags);
331 bio->bi_private = dio->bio_list; 317 bio->bi_private = dio->bio_list;
332 dio->bio_list = bio; 318 dio->bio_list = bio;
333 dio->bios_in_flight--; 319 if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter)
334 if (dio->waiter && dio->bios_in_flight == 0)
335 wake_up_process(dio->waiter); 320 wake_up_process(dio->waiter);
336 spin_unlock_irqrestore(&dio->bio_lock, flags); 321 spin_unlock_irqrestore(&dio->bio_lock, flags);
337 return 0; 322 return 0;
@@ -362,17 +347,15 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
362 * In the AIO read case we speculatively dirty the pages before starting IO. 347 * In the AIO read case we speculatively dirty the pages before starting IO.
363 * During IO completion, any of these pages which happen to have been written 348 * During IO completion, any of these pages which happen to have been written
364 * back will be redirtied by bio_check_pages_dirty(). 349 * back will be redirtied by bio_check_pages_dirty().
350 *
351 * bios hold a dio reference between submit_bio and ->end_io.
365 */ 352 */
366static void dio_bio_submit(struct dio *dio) 353static void dio_bio_submit(struct dio *dio)
367{ 354{
368 struct bio *bio = dio->bio; 355 struct bio *bio = dio->bio;
369 unsigned long flags;
370 356
371 bio->bi_private = dio; 357 bio->bi_private = dio;
372 spin_lock_irqsave(&dio->bio_lock, flags); 358 atomic_inc(&dio->refcount);
373 dio->bio_count++;
374 dio->bios_in_flight++;
375 spin_unlock_irqrestore(&dio->bio_lock, flags);
376 if (dio->is_async && dio->rw == READ) 359 if (dio->is_async && dio->rw == READ)
377 bio_set_pages_dirty(bio); 360 bio_set_pages_dirty(bio);
378 submit_bio(dio->rw, bio); 361 submit_bio(dio->rw, bio);
@@ -390,18 +373,28 @@ static void dio_cleanup(struct dio *dio)
390 page_cache_release(dio_get_page(dio)); 373 page_cache_release(dio_get_page(dio));
391} 374}
392 375
376static int wait_for_more_bios(struct dio *dio)
377{
378 assert_spin_locked(&dio->bio_lock);
379
380 return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
381}
382
393/* 383/*
394 * Wait for the next BIO to complete. Remove it and return it. 384 * Wait for the next BIO to complete. Remove it and return it. NULL is
385 * returned once all BIOs have been completed. This must only be called once
386 * all bios have been issued so that dio->refcount can only decrease. This
387 * requires that that the caller hold a reference on the dio.
395 */ 388 */
396static struct bio *dio_await_one(struct dio *dio) 389static struct bio *dio_await_one(struct dio *dio)
397{ 390{
398 unsigned long flags; 391 unsigned long flags;
399 struct bio *bio; 392 struct bio *bio = NULL;
400 393
401 spin_lock_irqsave(&dio->bio_lock, flags); 394 spin_lock_irqsave(&dio->bio_lock, flags);
402 while (dio->bio_list == NULL) { 395 while (wait_for_more_bios(dio)) {
403 set_current_state(TASK_UNINTERRUPTIBLE); 396 set_current_state(TASK_UNINTERRUPTIBLE);
404 if (dio->bio_list == NULL) { 397 if (wait_for_more_bios(dio)) {
405 dio->waiter = current; 398 dio->waiter = current;
406 spin_unlock_irqrestore(&dio->bio_lock, flags); 399 spin_unlock_irqrestore(&dio->bio_lock, flags);
407 io_schedule(); 400 io_schedule();
@@ -410,8 +403,10 @@ static struct bio *dio_await_one(struct dio *dio)
410 } 403 }
411 set_current_state(TASK_RUNNING); 404 set_current_state(TASK_RUNNING);
412 } 405 }
413 bio = dio->bio_list; 406 if (dio->bio_list) {
414 dio->bio_list = bio->bi_private; 407 bio = dio->bio_list;
408 dio->bio_list = bio->bi_private;
409 }
415 spin_unlock_irqrestore(&dio->bio_lock, flags); 410 spin_unlock_irqrestore(&dio->bio_lock, flags);
416 return bio; 411 return bio;
417} 412}
@@ -440,25 +435,24 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio)
440 } 435 }
441 bio_put(bio); 436 bio_put(bio);
442 } 437 }
443 finished_one_bio(dio);
444 return uptodate ? 0 : -EIO; 438 return uptodate ? 0 : -EIO;
445} 439}
446 440
447/* 441/*
448 * Wait on and process all in-flight BIOs. 442 * Wait on and process all in-flight BIOs. This must only be called once
443 * all bios have been issued so that the refcount can only decrease.
444 * This just waits for all bios to make it through dio_bio_complete. IO
445 * errors are propogated through dio->io_error and should be propogated via
446 * dio_complete().
449 */ 447 */
450static void dio_await_completion(struct dio *dio) 448static void dio_await_completion(struct dio *dio)
451{ 449{
452 /* 450 struct bio *bio;
453 * The bio_lock is not held for the read of bio_count. 451 do {
454 * This is ok since it is the dio_bio_complete() that changes 452 bio = dio_await_one(dio);
455 * bio_count. 453 if (bio)
456 */ 454 dio_bio_complete(dio, bio);
457 while (dio->bio_count) { 455 } while (bio);
458 struct bio *bio = dio_await_one(dio);
459 /* io errors are propogated through dio->io_error */
460 dio_bio_complete(dio, bio);
461 }
462} 456}
463 457
464/* 458/*
@@ -995,16 +989,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
995 dio->iocb = iocb; 989 dio->iocb = iocb;
996 dio->i_size = i_size_read(inode); 990 dio->i_size = i_size_read(inode);
997 991
998 /* 992 atomic_set(&dio->refcount, 1);
999 * BIO completion state.
1000 *
1001 * ->bio_count starts out at one, and we decrement it to zero after all
1002 * BIOs are submitted. This to avoid the situation where a really fast
1003 * (or synchronous) device could take the count to zero while we're
1004 * still submitting BIOs.
1005 */
1006 dio->bio_count = 1;
1007 dio->bios_in_flight = 0;
1008 spin_lock_init(&dio->bio_lock); 993 spin_lock_init(&dio->bio_lock);
1009 dio->bio_list = NULL; 994 dio->bio_list = NULL;
1010 dio->waiter = NULL; 995 dio->waiter = NULL;
@@ -1111,7 +1096,11 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
1111 } 1096 }
1112 if (ret == 0) 1097 if (ret == 0)
1113 ret = dio->result; 1098 ret = dio->result;
1114 finished_one_bio(dio); /* This can free the dio */ 1099
1100 /* this can free the dio */
1101 if (atomic_dec_and_test(&dio->refcount))
1102 dio_complete_aio(dio);
1103
1115 if (should_wait) { 1104 if (should_wait) {
1116 unsigned long flags; 1105 unsigned long flags;
1117 /* 1106 /*
@@ -1122,7 +1111,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
1122 1111
1123 spin_lock_irqsave(&dio->bio_lock, flags); 1112 spin_lock_irqsave(&dio->bio_lock, flags);
1124 set_current_state(TASK_UNINTERRUPTIBLE); 1113 set_current_state(TASK_UNINTERRUPTIBLE);
1125 while (dio->bio_count) { 1114 while (atomic_read(&dio->refcount)) {
1126 spin_unlock_irqrestore(&dio->bio_lock, flags); 1115 spin_unlock_irqrestore(&dio->bio_lock, flags);
1127 io_schedule(); 1116 io_schedule();
1128 spin_lock_irqsave(&dio->bio_lock, flags); 1117 spin_lock_irqsave(&dio->bio_lock, flags);
@@ -1133,7 +1122,6 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
1133 kfree(dio); 1122 kfree(dio);
1134 } 1123 }
1135 } else { 1124 } else {
1136 finished_one_bio(dio);
1137 dio_await_completion(dio); 1125 dio_await_completion(dio);
1138 1126
1139 ret = dio_complete(dio, offset, ret); 1127 ret = dio_complete(dio, offset, ret);
@@ -1146,7 +1134,11 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
1146 * i/o, we have to mark the the aio complete. 1134 * i/o, we have to mark the the aio complete.
1147 */ 1135 */
1148 aio_complete(iocb, ret, 0); 1136 aio_complete(iocb, ret, 0);
1149 kfree(dio); 1137
1138 if (atomic_dec_and_test(&dio->refcount))
1139 kfree(dio);
1140 else
1141 BUG();
1150 } 1142 }
1151 return ret; 1143 return ret;
1152} 1144}