aboutsummaryrefslogtreecommitdiffstats
path: root/fs/direct-io.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/direct-io.c')
-rw-r--r--fs/direct-io.c126
1 files changed, 103 insertions, 23 deletions
diff --git a/fs/direct-io.c b/fs/direct-io.c
index 7ab90f5081ee..1782023bd68a 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -127,6 +127,7 @@ struct dio {
127 spinlock_t bio_lock; /* protects BIO fields below */ 127 spinlock_t bio_lock; /* protects BIO fields below */
128 int page_errors; /* errno from get_user_pages() */ 128 int page_errors; /* errno from get_user_pages() */
129 int is_async; /* is IO async ? */ 129 int is_async; /* is IO async ? */
130 bool defer_completion; /* defer AIO completion to workqueue? */
130 int io_error; /* IO error in completion path */ 131 int io_error; /* IO error in completion path */
131 unsigned long refcount; /* direct_io_worker() and bios */ 132 unsigned long refcount; /* direct_io_worker() and bios */
132 struct bio *bio_list; /* singly linked via bi_private */ 133 struct bio *bio_list; /* singly linked via bi_private */
@@ -141,7 +142,10 @@ struct dio {
141 * allocation time. Don't add new fields after pages[] unless you 142 * allocation time. Don't add new fields after pages[] unless you
142 * wish that they not be zeroed. 143 * wish that they not be zeroed.
143 */ 144 */
144 struct page *pages[DIO_PAGES]; /* page buffer */ 145 union {
146 struct page *pages[DIO_PAGES]; /* page buffer */
147 struct work_struct complete_work;/* deferred AIO completion */
148 };
145} ____cacheline_aligned_in_smp; 149} ____cacheline_aligned_in_smp;
146 150
147static struct kmem_cache *dio_cache __read_mostly; 151static struct kmem_cache *dio_cache __read_mostly;
@@ -221,16 +225,16 @@ static inline struct page *dio_get_page(struct dio *dio,
221 * dio_complete() - called when all DIO BIO I/O has been completed 225 * dio_complete() - called when all DIO BIO I/O has been completed
222 * @offset: the byte offset in the file of the completed operation 226 * @offset: the byte offset in the file of the completed operation
223 * 227 *
224 * This releases locks as dictated by the locking type, lets interested parties 228 * This drops i_dio_count, lets interested parties know that a DIO operation
225 * know that a DIO operation has completed, and calculates the resulting return 229 * has completed, and calculates the resulting return code for the operation.
226 * code for the operation.
227 * 230 *
228 * It lets the filesystem know if it registered an interest earlier via 231 * It lets the filesystem know if it registered an interest earlier via
229 * get_block. Pass the private field of the map buffer_head so that 232 * get_block. Pass the private field of the map buffer_head so that
230 * filesystems can use it to hold additional state between get_block calls and 233 * filesystems can use it to hold additional state between get_block calls and
231 * dio_complete. 234 * dio_complete.
232 */ 235 */
233static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async) 236static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret,
237 bool is_async)
234{ 238{
235 ssize_t transferred = 0; 239 ssize_t transferred = 0;
236 240
@@ -258,19 +262,36 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is
258 if (ret == 0) 262 if (ret == 0)
259 ret = transferred; 263 ret = transferred;
260 264
261 if (dio->end_io && dio->result) { 265 if (dio->end_io && dio->result)
262 dio->end_io(dio->iocb, offset, transferred, 266 dio->end_io(dio->iocb, offset, transferred, dio->private);
263 dio->private, ret, is_async); 267
264 } else { 268 inode_dio_done(dio->inode);
265 inode_dio_done(dio->inode); 269 if (is_async) {
266 if (is_async) 270 if (dio->rw & WRITE) {
267 aio_complete(dio->iocb, ret, 0); 271 int err;
272
273 err = generic_write_sync(dio->iocb->ki_filp, offset,
274 transferred);
275 if (err < 0 && ret > 0)
276 ret = err;
277 }
278
279 aio_complete(dio->iocb, ret, 0);
268 } 280 }
269 281
282 kmem_cache_free(dio_cache, dio);
270 return ret; 283 return ret;
271} 284}
272 285
286static void dio_aio_complete_work(struct work_struct *work)
287{
288 struct dio *dio = container_of(work, struct dio, complete_work);
289
290 dio_complete(dio, dio->iocb->ki_pos, 0, true);
291}
292
273static int dio_bio_complete(struct dio *dio, struct bio *bio); 293static int dio_bio_complete(struct dio *dio, struct bio *bio);
294
274/* 295/*
275 * Asynchronous IO callback. 296 * Asynchronous IO callback.
276 */ 297 */
@@ -290,8 +311,13 @@ static void dio_bio_end_aio(struct bio *bio, int error)
290 spin_unlock_irqrestore(&dio->bio_lock, flags); 311 spin_unlock_irqrestore(&dio->bio_lock, flags);
291 312
292 if (remaining == 0) { 313 if (remaining == 0) {
293 dio_complete(dio, dio->iocb->ki_pos, 0, true); 314 if (dio->result && dio->defer_completion) {
294 kmem_cache_free(dio_cache, dio); 315 INIT_WORK(&dio->complete_work, dio_aio_complete_work);
316 queue_work(dio->inode->i_sb->s_dio_done_wq,
317 &dio->complete_work);
318 } else {
319 dio_complete(dio, dio->iocb->ki_pos, 0, true);
320 }
295 } 321 }
296} 322}
297 323
@@ -511,6 +537,41 @@ static inline int dio_bio_reap(struct dio *dio, struct dio_submit *sdio)
511} 537}
512 538
513/* 539/*
540 * Create workqueue for deferred direct IO completions. We allocate the
541 * workqueue when it's first needed. This avoids creating workqueue for
542 * filesystems that don't need it and also allows us to create the workqueue
543 * late enough so the we can include s_id in the name of the workqueue.
544 */
545static int sb_init_dio_done_wq(struct super_block *sb)
546{
547 struct workqueue_struct *wq = alloc_workqueue("dio/%s",
548 WQ_MEM_RECLAIM, 0,
549 sb->s_id);
550 if (!wq)
551 return -ENOMEM;
552 /*
553 * This has to be atomic as more DIOs can race to create the workqueue
554 */
555 cmpxchg(&sb->s_dio_done_wq, NULL, wq);
556 /* Someone created workqueue before us? Free ours... */
557 if (wq != sb->s_dio_done_wq)
558 destroy_workqueue(wq);
559 return 0;
560}
561
562static int dio_set_defer_completion(struct dio *dio)
563{
564 struct super_block *sb = dio->inode->i_sb;
565
566 if (dio->defer_completion)
567 return 0;
568 dio->defer_completion = true;
569 if (!sb->s_dio_done_wq)
570 return sb_init_dio_done_wq(sb);
571 return 0;
572}
573
574/*
514 * Call into the fs to map some more disk blocks. We record the current number 575 * Call into the fs to map some more disk blocks. We record the current number
515 * of available blocks at sdio->blocks_available. These are in units of the 576 * of available blocks at sdio->blocks_available. These are in units of the
516 * fs blocksize, (1 << inode->i_blkbits). 577 * fs blocksize, (1 << inode->i_blkbits).
@@ -581,6 +642,9 @@ static int get_more_blocks(struct dio *dio, struct dio_submit *sdio,
581 642
582 /* Store for completion */ 643 /* Store for completion */
583 dio->private = map_bh->b_private; 644 dio->private = map_bh->b_private;
645
646 if (ret == 0 && buffer_defer_completion(map_bh))
647 ret = dio_set_defer_completion(dio);
584 } 648 }
585 return ret; 649 return ret;
586} 650}
@@ -1129,11 +1193,6 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
1129 } 1193 }
1130 1194
1131 /* 1195 /*
1132 * Will be decremented at I/O completion time.
1133 */
1134 atomic_inc(&inode->i_dio_count);
1135
1136 /*
1137 * For file extending writes updating i_size before data 1196 * For file extending writes updating i_size before data
1138 * writeouts complete can expose uninitialized blocks. So 1197 * writeouts complete can expose uninitialized blocks. So
1139 * even for AIO, we need to wait for i/o to complete before 1198 * even for AIO, we need to wait for i/o to complete before
@@ -1141,11 +1200,33 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
1141 */ 1200 */
1142 dio->is_async = !is_sync_kiocb(iocb) && !((rw & WRITE) && 1201 dio->is_async = !is_sync_kiocb(iocb) && !((rw & WRITE) &&
1143 (end > i_size_read(inode))); 1202 (end > i_size_read(inode)));
1144
1145 retval = 0;
1146
1147 dio->inode = inode; 1203 dio->inode = inode;
1148 dio->rw = rw; 1204 dio->rw = rw;
1205
1206 /*
1207 * For AIO O_(D)SYNC writes we need to defer completions to a workqueue
1208 * so that we can call ->fsync.
1209 */
1210 if (dio->is_async && (rw & WRITE) &&
1211 ((iocb->ki_filp->f_flags & O_DSYNC) ||
1212 IS_SYNC(iocb->ki_filp->f_mapping->host))) {
1213 retval = dio_set_defer_completion(dio);
1214 if (retval) {
1215 /*
1216 * We grab i_mutex only for reads so we don't have
1217 * to release it here
1218 */
1219 kmem_cache_free(dio_cache, dio);
1220 goto out;
1221 }
1222 }
1223
1224 /*
1225 * Will be decremented at I/O completion time.
1226 */
1227 atomic_inc(&inode->i_dio_count);
1228
1229 retval = 0;
1149 sdio.blkbits = blkbits; 1230 sdio.blkbits = blkbits;
1150 sdio.blkfactor = i_blkbits - blkbits; 1231 sdio.blkfactor = i_blkbits - blkbits;
1151 sdio.block_in_file = offset >> blkbits; 1232 sdio.block_in_file = offset >> blkbits;
@@ -1269,7 +1350,6 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
1269 1350
1270 if (drop_refcount(dio) == 0) { 1351 if (drop_refcount(dio) == 0) {
1271 retval = dio_complete(dio, offset, retval, false); 1352 retval = dio_complete(dio, offset, retval, false);
1272 kmem_cache_free(dio_cache, dio);
1273 } else 1353 } else
1274 BUG_ON(retval != -EIOCBQUEUED); 1354 BUG_ON(retval != -EIOCBQUEUED);
1275 1355