aboutsummaryrefslogtreecommitdiffstats
path: root/fs/direct-io.c
diff options
context:
space:
mode:
authorZach Brown <zach.brown@oracle.com>2006-12-10 05:21:07 -0500
committerLinus Torvalds <torvalds@woody.osdl.org>2006-12-10 12:57:21 -0500
commit5eb6c7a2ab413dea1ee6c08dd58263a1c2c2efa3 (patch)
treef1e2f7994321290f4e6e641894cc21176c16767b /fs/direct-io.c
parent8459d86aff04fa53c2ab6a6b9f355b3063cc8014 (diff)
[PATCH] dio: lock refcount operations
The wait_for_more_bios() function name was poorly chosen. While looking to clean it up it I noticed that the dio struct refcounting between the bio completion and dio submission paths was racey. The bio submission path was simply freeing the dio struct if atomic_dec_and_test() indicated that it dropped the final reference. The aio bio completion path was dereferencing its dio struct pointer *after dropping its reference* based on the remaining number of references. These two paths could race and result in the aio bio completion path dereferencing a freed dio, though this was not observed in the wild. This moves the refcount under the bio lock so that bio completion can drop its reference and decide to wake all in one atomic step. Once testing and waking is locked dio_await_one() can test its sleeping condition and mark itself uninterruptible under the lock. It gets simpler and wait_for_more_bios() disappears. The addition of the interrupt masking spin lock acquiry in dio_bio_submit() looks alarming. This lock acquiry existed in that path before the recent dio completion patch set. We shouldn't expect significant performance regression from returning to the behaviour that existed before the completion clean up work. This passed 4k block ext3 O_DIRECT fsx and aio-stress on an SMP machine. Signed-off-by: Zach Brown <zach.brown@oracle.com> Cc: Badari Pulavarty <pbadari@us.ibm.com> Cc: Suparna Bhattacharya <suparna@in.ibm.com> Cc: Jeff Moyer <jmoyer@redhat.com> Cc: <xfs-masters@oss.sgi.com> Signed-off-by: Andrew Morton <akpm@osdl.org> Signed-off-by: Linus Torvalds <torvalds@osdl.org>
Diffstat (limited to 'fs/direct-io.c')
-rw-r--r--fs/direct-io.c76
1 files changed, 45 insertions, 31 deletions
diff --git a/fs/direct-io.c b/fs/direct-io.c
index 71f4aeac7632..d9d0833444f5 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -121,8 +121,8 @@ struct dio {
121 int page_errors; /* errno from get_user_pages() */ 121 int page_errors; /* errno from get_user_pages() */
122 122
123 /* BIO completion state */ 123 /* BIO completion state */
124 atomic_t refcount; /* direct_io_worker() and bios */
125 spinlock_t bio_lock; /* protects BIO fields below */ 124 spinlock_t bio_lock; /* protects BIO fields below */
125 unsigned long refcount; /* direct_io_worker() and bios */
126 struct bio *bio_list; /* singly linked via bi_private */ 126 struct bio *bio_list; /* singly linked via bi_private */
127 struct task_struct *waiter; /* waiting task (NULL if none) */ 127 struct task_struct *waiter; /* waiting task (NULL if none) */
128 128
@@ -267,8 +267,8 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio);
267static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error) 267static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
268{ 268{
269 struct dio *dio = bio->bi_private; 269 struct dio *dio = bio->bi_private;
270 int waiter_holds_ref = 0; 270 unsigned long remaining;
271 int remaining; 271 unsigned long flags;
272 272
273 if (bio->bi_size) 273 if (bio->bi_size)
274 return 1; 274 return 1;
@@ -276,10 +276,11 @@ static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
276 /* cleanup the bio */ 276 /* cleanup the bio */
277 dio_bio_complete(dio, bio); 277 dio_bio_complete(dio, bio);
278 278
279 waiter_holds_ref = !!dio->waiter; 279 spin_lock_irqsave(&dio->bio_lock, flags);
280 remaining = atomic_sub_return(1, (&dio->refcount)); 280 remaining = --dio->refcount;
281 if (remaining == 1 && waiter_holds_ref) 281 if (remaining == 1 && dio->waiter)
282 wake_up_process(dio->waiter); 282 wake_up_process(dio->waiter);
283 spin_unlock_irqrestore(&dio->bio_lock, flags);
283 284
284 if (remaining == 0) { 285 if (remaining == 0) {
285 int ret = dio_complete(dio, dio->iocb->ki_pos, 0); 286 int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
@@ -308,7 +309,7 @@ static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error)
308 spin_lock_irqsave(&dio->bio_lock, flags); 309 spin_lock_irqsave(&dio->bio_lock, flags);
309 bio->bi_private = dio->bio_list; 310 bio->bi_private = dio->bio_list;
310 dio->bio_list = bio; 311 dio->bio_list = bio;
311 if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter) 312 if (--dio->refcount == 1 && dio->waiter)
312 wake_up_process(dio->waiter); 313 wake_up_process(dio->waiter);
313 spin_unlock_irqrestore(&dio->bio_lock, flags); 314 spin_unlock_irqrestore(&dio->bio_lock, flags);
314 return 0; 315 return 0;
@@ -345,11 +346,17 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
345static void dio_bio_submit(struct dio *dio) 346static void dio_bio_submit(struct dio *dio)
346{ 347{
347 struct bio *bio = dio->bio; 348 struct bio *bio = dio->bio;
349 unsigned long flags;
348 350
349 bio->bi_private = dio; 351 bio->bi_private = dio;
350 atomic_inc(&dio->refcount); 352
353 spin_lock_irqsave(&dio->bio_lock, flags);
354 dio->refcount++;
355 spin_unlock_irqrestore(&dio->bio_lock, flags);
356
351 if (dio->is_async && dio->rw == READ) 357 if (dio->is_async && dio->rw == READ)
352 bio_set_pages_dirty(bio); 358 bio_set_pages_dirty(bio);
359
353 submit_bio(dio->rw, bio); 360 submit_bio(dio->rw, bio);
354 361
355 dio->bio = NULL; 362 dio->bio = NULL;
@@ -365,13 +372,6 @@ static void dio_cleanup(struct dio *dio)
365 page_cache_release(dio_get_page(dio)); 372 page_cache_release(dio_get_page(dio));
366} 373}
367 374
368static int wait_for_more_bios(struct dio *dio)
369{
370 assert_spin_locked(&dio->bio_lock);
371
372 return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
373}
374
375/* 375/*
376 * Wait for the next BIO to complete. Remove it and return it. NULL is 376 * Wait for the next BIO to complete. Remove it and return it. NULL is
377 * returned once all BIOs have been completed. This must only be called once 377 * returned once all BIOs have been completed. This must only be called once
@@ -384,16 +384,21 @@ static struct bio *dio_await_one(struct dio *dio)
384 struct bio *bio = NULL; 384 struct bio *bio = NULL;
385 385
386 spin_lock_irqsave(&dio->bio_lock, flags); 386 spin_lock_irqsave(&dio->bio_lock, flags);
387 while (wait_for_more_bios(dio)) { 387
388 set_current_state(TASK_UNINTERRUPTIBLE); 388 /*
389 if (wait_for_more_bios(dio)) { 389 * Wait as long as the list is empty and there are bios in flight. bio
390 dio->waiter = current; 390 * completion drops the count, maybe adds to the list, and wakes while
391 spin_unlock_irqrestore(&dio->bio_lock, flags); 391 * holding the bio_lock so we don't need set_current_state()'s barrier
392 io_schedule(); 392 * and can call it after testing our condition.
393 spin_lock_irqsave(&dio->bio_lock, flags); 393 */
394 dio->waiter = NULL; 394 while (dio->refcount > 1 && dio->bio_list == NULL) {
395 } 395 __set_current_state(TASK_UNINTERRUPTIBLE);
396 set_current_state(TASK_RUNNING); 396 dio->waiter = current;
397 spin_unlock_irqrestore(&dio->bio_lock, flags);
398 io_schedule();
399 /* wake up sets us TASK_RUNNING */
400 spin_lock_irqsave(&dio->bio_lock, flags);
401 dio->waiter = NULL;
397 } 402 }
398 if (dio->bio_list) { 403 if (dio->bio_list) {
399 bio = dio->bio_list; 404 bio = dio->bio_list;
@@ -951,6 +956,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
951 struct dio *dio) 956 struct dio *dio)
952{ 957{
953 unsigned long user_addr; 958 unsigned long user_addr;
959 unsigned long flags;
954 int seg; 960 int seg;
955 ssize_t ret = 0; 961 ssize_t ret = 0;
956 ssize_t ret2; 962 ssize_t ret2;
@@ -981,8 +987,8 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
981 dio->iocb = iocb; 987 dio->iocb = iocb;
982 dio->i_size = i_size_read(inode); 988 dio->i_size = i_size_read(inode);
983 989
984 atomic_set(&dio->refcount, 1);
985 spin_lock_init(&dio->bio_lock); 990 spin_lock_init(&dio->bio_lock);
991 dio->refcount = 1;
986 dio->bio_list = NULL; 992 dio->bio_list = NULL;
987 dio->waiter = NULL; 993 dio->waiter = NULL;
988 994
@@ -1092,12 +1098,20 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
1092 1098
1093 /* 1099 /*
1094 * Sync will always be dropping the final ref and completing the 1100 * Sync will always be dropping the final ref and completing the
1095 * operation. AIO can if it was a broken operation described above 1101 * operation. AIO can if it was a broken operation described above or
1096 * or in fact if all the bios race to complete before we get here. 1102 * in fact if all the bios race to complete before we get here. In
1097 * In that case dio_complete() translates the EIOCBQUEUED into 1103 * that case dio_complete() translates the EIOCBQUEUED into the proper
1098 * the proper return code that the caller will hand to aio_complete(). 1104 * return code that the caller will hand to aio_complete().
1105 *
1106 * This is managed by the bio_lock instead of being an atomic_t so that
1107 * completion paths can drop their ref and use the remaining count to
1108 * decide to wake the submission path atomically.
1099 */ 1109 */
1100 if (atomic_dec_and_test(&dio->refcount)) { 1110 spin_lock_irqsave(&dio->bio_lock, flags);
1111 ret2 = --dio->refcount;
1112 spin_unlock_irqrestore(&dio->bio_lock, flags);
1113 BUG_ON(!dio->is_async && ret2 != 0);
1114 if (ret2 == 0) {
1101 ret = dio_complete(dio, offset, ret); 1115 ret = dio_complete(dio, offset, ret);
1102 kfree(dio); 1116 kfree(dio);
1103 } else 1117 } else