aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZach Brown <zach.brown@oracle.com>2006-12-10 05:20:54 -0500
committerLinus Torvalds <torvalds@woody.osdl.org>2006-12-10 12:57:21 -0500
commit6d544bb4d9019c3a0d7ee4af1e4bbbd61a6e16dc (patch)
treecbf528f4270b60527961633c839ef62cfa69ebdf
parent1757128438d41670ded8bc3bc735325cc07dc8f9 (diff)
[PATCH] dio: centralize completion in dio_complete()
There have been a lot of bugs recently due to the way direct_io_worker() tries to decide how to finish direct IO operations. In the worst examples it has failed to call aio_complete() at all (hang) or called it too many times (oops). This set of patches cleans up the completion phase with the goal of removing the complexity that lead to these bugs. We end up with one path that calculates the result of the operation after all off the bios have completed. We decide when to generate a result of the operation using that path based on the final release of a refcount on the dio structure. I tried to progress towards the final state in steps that were relatively easy to understand. Each step should compile but I only tested the final result of having all the patches applied. I've tested these on low end PC drives with aio-stress, the direct IO tests I could manage to get running in LTP, orasim, and some home-brew functional tests. In http://lkml.org/lkml/2006/9/21/103 IBM reports success with ext2 and ext3 running DIO LTP tests. They found that XFS bug which has since been addressed in the patch series. This patch: The mechanics which decide the result of a direct IO operation were duplicated in the sync and async paths. The async path didn't check page_errors which can manifest as silently returning success when the final pointer in an operation faults and its matching file region is filled with zeros. The sync path and async path differed in whether they passed errors to the caller's dio->end_io operation. The async path was passing errors to it which trips an assertion in XFS, though it is apparently harmless. This centralizes the completion phase of dio ops in one place. AIO will now return EFAULT consistently and all paths fall back to the previously sync behaviour of passing the number of bytes 'transferred' to the dio->end_io callback, regardless of errors. dio_await_completion() doesn't have to propogate EIO from non-uptodate bios now that it's being propogated through dio_complete() via dio->io_error. This lets it return void which simplifies its sole caller. Signed-off-by: Zach Brown <zach.brown@oracle.com> Cc: Badari Pulavarty <pbadari@us.ibm.com> Cc: Suparna Bhattacharya <suparna@in.ibm.com> Acked-by: Jeff Moyer <jmoyer@redhat.com> Signed-off-by: Andrew Morton <akpm@osdl.org> Signed-off-by: Linus Torvalds <torvalds@osdl.org>
-rw-r--r--fs/direct-io.c94
1 files changed, 42 insertions, 52 deletions
diff --git a/fs/direct-io.c b/fs/direct-io.c
index 45d34d807391..b57b671e1106 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -210,19 +210,46 @@ static struct page *dio_get_page(struct dio *dio)
210 return dio->pages[dio->head++]; 210 return dio->pages[dio->head++];
211} 211}
212 212
213/* 213/**
214 * Called when all DIO BIO I/O has been completed - let the filesystem 214 * dio_complete() - called when all DIO BIO I/O has been completed
215 * know, if it registered an interest earlier via get_block. Pass the 215 * @offset: the byte offset in the file of the completed operation
216 * private field of the map buffer_head so that filesystems can use it 216 *
217 * to hold additional state between get_block calls and dio_complete. 217 * This releases locks as dictated by the locking type, lets interested parties
218 * know that a DIO operation has completed, and calculates the resulting return
219 * code for the operation.
220 *
221 * It lets the filesystem know if it registered an interest earlier via
222 * get_block. Pass the private field of the map buffer_head so that
223 * filesystems can use it to hold additional state between get_block calls and
224 * dio_complete.
218 */ 225 */
219static void dio_complete(struct dio *dio, loff_t offset, ssize_t bytes) 226static int dio_complete(struct dio *dio, loff_t offset, int ret)
220{ 227{
228 ssize_t transferred = 0;
229
230 if (dio->result) {
231 transferred = dio->result;
232
233 /* Check for short read case */
234 if ((dio->rw == READ) && ((offset + transferred) > dio->i_size))
235 transferred = dio->i_size - offset;
236 }
237
221 if (dio->end_io && dio->result) 238 if (dio->end_io && dio->result)
222 dio->end_io(dio->iocb, offset, bytes, dio->map_bh.b_private); 239 dio->end_io(dio->iocb, offset, transferred,
240 dio->map_bh.b_private);
223 if (dio->lock_type == DIO_LOCKING) 241 if (dio->lock_type == DIO_LOCKING)
224 /* lockdep: non-owner release */ 242 /* lockdep: non-owner release */
225 up_read_non_owner(&dio->inode->i_alloc_sem); 243 up_read_non_owner(&dio->inode->i_alloc_sem);
244
245 if (ret == 0)
246 ret = dio->page_errors;
247 if (ret == 0)
248 ret = dio->io_error;
249 if (ret == 0)
250 ret = transferred;
251
252 return ret;
226} 253}
227 254
228/* 255/*
@@ -236,8 +263,7 @@ static void finished_one_bio(struct dio *dio)
236 spin_lock_irqsave(&dio->bio_lock, flags); 263 spin_lock_irqsave(&dio->bio_lock, flags);
237 if (dio->bio_count == 1) { 264 if (dio->bio_count == 1) {
238 if (dio->is_async) { 265 if (dio->is_async) {
239 ssize_t transferred; 266 int ret;
240 loff_t offset;
241 267
242 /* 268 /*
243 * Last reference to the dio is going away. 269 * Last reference to the dio is going away.
@@ -245,24 +271,12 @@ static void finished_one_bio(struct dio *dio)
245 */ 271 */
246 spin_unlock_irqrestore(&dio->bio_lock, flags); 272 spin_unlock_irqrestore(&dio->bio_lock, flags);
247 273
248 /* Check for short read case */ 274 ret = dio_complete(dio, dio->iocb->ki_pos, 0);
249 transferred = dio->result;
250 offset = dio->iocb->ki_pos;
251
252 if ((dio->rw == READ) &&
253 ((offset + transferred) > dio->i_size))
254 transferred = dio->i_size - offset;
255
256 /* check for error in completion path */
257 if (dio->io_error)
258 transferred = dio->io_error;
259
260 dio_complete(dio, offset, transferred);
261 275
262 /* Complete AIO later if falling back to buffered i/o */ 276 /* Complete AIO later if falling back to buffered i/o */
263 if (dio->result == dio->size || 277 if (dio->result == dio->size ||
264 ((dio->rw == READ) && dio->result)) { 278 ((dio->rw == READ) && dio->result)) {
265 aio_complete(dio->iocb, transferred, 0); 279 aio_complete(dio->iocb, ret, 0);
266 kfree(dio); 280 kfree(dio);
267 return; 281 return;
268 } else { 282 } else {
@@ -434,10 +448,8 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio)
434/* 448/*
435 * Wait on and process all in-flight BIOs. 449 * Wait on and process all in-flight BIOs.
436 */ 450 */
437static int dio_await_completion(struct dio *dio) 451static void dio_await_completion(struct dio *dio)
438{ 452{
439 int ret = 0;
440
441 if (dio->bio) 453 if (dio->bio)
442 dio_bio_submit(dio); 454 dio_bio_submit(dio);
443 455
@@ -448,13 +460,9 @@ static int dio_await_completion(struct dio *dio)
448 */ 460 */
449 while (dio->bio_count) { 461 while (dio->bio_count) {
450 struct bio *bio = dio_await_one(dio); 462 struct bio *bio = dio_await_one(dio);
451 int ret2; 463 /* io errors are propogated through dio->io_error */
452 464 dio_bio_complete(dio, bio);
453 ret2 = dio_bio_complete(dio, bio);
454 if (ret == 0)
455 ret = ret2;
456 } 465 }
457 return ret;
458} 466}
459 467
460/* 468/*
@@ -1127,28 +1135,10 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
1127 kfree(dio); 1135 kfree(dio);
1128 } 1136 }
1129 } else { 1137 } else {
1130 ssize_t transferred = 0;
1131
1132 finished_one_bio(dio); 1138 finished_one_bio(dio);
1133 ret2 = dio_await_completion(dio); 1139 dio_await_completion(dio);
1134 if (ret == 0)
1135 ret = ret2;
1136 if (ret == 0)
1137 ret = dio->page_errors;
1138 if (dio->result) {
1139 loff_t i_size = i_size_read(inode);
1140 1140
1141 transferred = dio->result; 1141 ret = dio_complete(dio, offset, ret);
1142 /*
1143 * Adjust the return value if the read crossed a
1144 * non-block-aligned EOF.
1145 */
1146 if (rw == READ && (offset + transferred > i_size))
1147 transferred = i_size - offset;
1148 }
1149 dio_complete(dio, offset, transferred);
1150 if (ret == 0)
1151 ret = transferred;
1152 1142
1153 /* We could have also come here on an AIO file extend */ 1143 /* We could have also come here on an AIO file extend */
1154 if (!is_sync_kiocb(iocb) && (rw & WRITE) && 1144 if (!is_sync_kiocb(iocb) && (rw & WRITE) &&