aboutsummaryrefslogtreecommitdiffstats
path: root/fs/direct-io.c
diff options
context:
space:
mode:
authorChristoph Hellwig <hch@infradead.org>2013-09-04 09:04:39 -0400
committerAl Viro <viro@zeniv.linux.org.uk>2013-09-04 09:23:46 -0400
commit7b7a8665edd8db733980389b098530f9e4f630b2 (patch)
tree968d570a9f0c4d861226aefed2f5f97a131c8d53 /fs/direct-io.c
parent4b6ccca701ef5977d0ffbc2c932430dea88b38b6 (diff)
direct-io: Implement generic deferred AIO completions
Add support to the core direct-io code to defer AIO completions to user context using a workqueue. This replaces opencoded and less efficient code in XFS and ext4 (we save a memory allocation for each direct IO) and will be needed to properly support O_(D)SYNC for AIO. The communication between the filesystem and the direct I/O code requires a new buffer head flag, which is a bit ugly but not avoidable until the direct I/O code stops abusing the buffer_head structure for communicating with the filesystems. Currently this creates a per-superblock unbound workqueue for these completions, which is taken from an earlier patch by Jan Kara. I'm not really convinced about this use and would prefer a "normal" global workqueue with a high concurrency limit, but this needs further discussion. JK: Fixed ext4 part, dynamic allocation of the workqueue. Signed-off-by: Christoph Hellwig <hch@lst.de> Signed-off-by: Jan Kara <jack@suse.cz> Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
Diffstat (limited to 'fs/direct-io.c')
-rw-r--r--fs/direct-io.c85
1 files changed, 69 insertions, 16 deletions
diff --git a/fs/direct-io.c b/fs/direct-io.c
index 7ab90f5081ee..8b31b9f449f4 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -127,6 +127,7 @@ struct dio {
127 spinlock_t bio_lock; /* protects BIO fields below */ 127 spinlock_t bio_lock; /* protects BIO fields below */
128 int page_errors; /* errno from get_user_pages() */ 128 int page_errors; /* errno from get_user_pages() */
129 int is_async; /* is IO async ? */ 129 int is_async; /* is IO async ? */
130 bool defer_completion; /* defer AIO completion to workqueue? */
130 int io_error; /* IO error in completion path */ 131 int io_error; /* IO error in completion path */
131 unsigned long refcount; /* direct_io_worker() and bios */ 132 unsigned long refcount; /* direct_io_worker() and bios */
132 struct bio *bio_list; /* singly linked via bi_private */ 133 struct bio *bio_list; /* singly linked via bi_private */
@@ -141,7 +142,10 @@ struct dio {
141 * allocation time. Don't add new fields after pages[] unless you 142 * allocation time. Don't add new fields after pages[] unless you
142 * wish that they not be zeroed. 143 * wish that they not be zeroed.
143 */ 144 */
144 struct page *pages[DIO_PAGES]; /* page buffer */ 145 union {
146 struct page *pages[DIO_PAGES]; /* page buffer */
147 struct work_struct complete_work;/* deferred AIO completion */
148 };
145} ____cacheline_aligned_in_smp; 149} ____cacheline_aligned_in_smp;
146 150
147static struct kmem_cache *dio_cache __read_mostly; 151static struct kmem_cache *dio_cache __read_mostly;
@@ -221,16 +225,16 @@ static inline struct page *dio_get_page(struct dio *dio,
221 * dio_complete() - called when all DIO BIO I/O has been completed 225 * dio_complete() - called when all DIO BIO I/O has been completed
222 * @offset: the byte offset in the file of the completed operation 226 * @offset: the byte offset in the file of the completed operation
223 * 227 *
224 * This releases locks as dictated by the locking type, lets interested parties 228 * This drops i_dio_count, lets interested parties know that a DIO operation
225 * know that a DIO operation has completed, and calculates the resulting return 229 * has completed, and calculates the resulting return code for the operation.
226 * code for the operation.
227 * 230 *
228 * It lets the filesystem know if it registered an interest earlier via 231 * It lets the filesystem know if it registered an interest earlier via
229 * get_block. Pass the private field of the map buffer_head so that 232 * get_block. Pass the private field of the map buffer_head so that
230 * filesystems can use it to hold additional state between get_block calls and 233 * filesystems can use it to hold additional state between get_block calls and
231 * dio_complete. 234 * dio_complete.
232 */ 235 */
233static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async) 236static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret,
237 bool is_async)
234{ 238{
235 ssize_t transferred = 0; 239 ssize_t transferred = 0;
236 240
@@ -258,19 +262,26 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is
258 if (ret == 0) 262 if (ret == 0)
259 ret = transferred; 263 ret = transferred;
260 264
261 if (dio->end_io && dio->result) { 265 if (dio->end_io && dio->result)
262 dio->end_io(dio->iocb, offset, transferred, 266 dio->end_io(dio->iocb, offset, transferred, dio->private);
263 dio->private, ret, is_async); 267
264 } else { 268 inode_dio_done(dio->inode);
265 inode_dio_done(dio->inode); 269 if (is_async)
266 if (is_async) 270 aio_complete(dio->iocb, ret, 0);
267 aio_complete(dio->iocb, ret, 0);
268 }
269 271
272 kmem_cache_free(dio_cache, dio);
270 return ret; 273 return ret;
271} 274}
272 275
276static void dio_aio_complete_work(struct work_struct *work)
277{
278 struct dio *dio = container_of(work, struct dio, complete_work);
279
280 dio_complete(dio, dio->iocb->ki_pos, 0, true);
281}
282
273static int dio_bio_complete(struct dio *dio, struct bio *bio); 283static int dio_bio_complete(struct dio *dio, struct bio *bio);
284
274/* 285/*
275 * Asynchronous IO callback. 286 * Asynchronous IO callback.
276 */ 287 */
@@ -290,8 +301,13 @@ static void dio_bio_end_aio(struct bio *bio, int error)
290 spin_unlock_irqrestore(&dio->bio_lock, flags); 301 spin_unlock_irqrestore(&dio->bio_lock, flags);
291 302
292 if (remaining == 0) { 303 if (remaining == 0) {
293 dio_complete(dio, dio->iocb->ki_pos, 0, true); 304 if (dio->result && dio->defer_completion) {
294 kmem_cache_free(dio_cache, dio); 305 INIT_WORK(&dio->complete_work, dio_aio_complete_work);
306 queue_work(dio->inode->i_sb->s_dio_done_wq,
307 &dio->complete_work);
308 } else {
309 dio_complete(dio, dio->iocb->ki_pos, 0, true);
310 }
295 } 311 }
296} 312}
297 313
@@ -511,6 +527,41 @@ static inline int dio_bio_reap(struct dio *dio, struct dio_submit *sdio)
511} 527}
512 528
513/* 529/*
530 * Create workqueue for deferred direct IO completions. We allocate the
531 * workqueue when it's first needed. This avoids creating workqueue for
532 * filesystems that don't need it and also allows us to create the workqueue
533 * late enough so the we can include s_id in the name of the workqueue.
534 */
535static int sb_init_dio_done_wq(struct super_block *sb)
536{
537 struct workqueue_struct *wq = alloc_workqueue("dio/%s",
538 WQ_MEM_RECLAIM, 0,
539 sb->s_id);
540 if (!wq)
541 return -ENOMEM;
542 /*
543 * This has to be atomic as more DIOs can race to create the workqueue
544 */
545 cmpxchg(&sb->s_dio_done_wq, NULL, wq);
546 /* Someone created workqueue before us? Free ours... */
547 if (wq != sb->s_dio_done_wq)
548 destroy_workqueue(wq);
549 return 0;
550}
551
552static int dio_set_defer_completion(struct dio *dio)
553{
554 struct super_block *sb = dio->inode->i_sb;
555
556 if (dio->defer_completion)
557 return 0;
558 dio->defer_completion = true;
559 if (!sb->s_dio_done_wq)
560 return sb_init_dio_done_wq(sb);
561 return 0;
562}
563
564/*
514 * Call into the fs to map some more disk blocks. We record the current number 565 * Call into the fs to map some more disk blocks. We record the current number
515 * of available blocks at sdio->blocks_available. These are in units of the 566 * of available blocks at sdio->blocks_available. These are in units of the
516 * fs blocksize, (1 << inode->i_blkbits). 567 * fs blocksize, (1 << inode->i_blkbits).
@@ -581,6 +632,9 @@ static int get_more_blocks(struct dio *dio, struct dio_submit *sdio,
581 632
582 /* Store for completion */ 633 /* Store for completion */
583 dio->private = map_bh->b_private; 634 dio->private = map_bh->b_private;
635
636 if (ret == 0 && buffer_defer_completion(map_bh))
637 ret = dio_set_defer_completion(dio);
584 } 638 }
585 return ret; 639 return ret;
586} 640}
@@ -1269,7 +1323,6 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
1269 1323
1270 if (drop_refcount(dio) == 0) { 1324 if (drop_refcount(dio) == 0) {
1271 retval = dio_complete(dio, offset, retval, false); 1325 retval = dio_complete(dio, offset, retval, false);
1272 kmem_cache_free(dio_cache, dio);
1273 } else 1326 } else
1274 BUG_ON(retval != -EIOCBQUEUED); 1327 BUG_ON(retval != -EIOCBQUEUED);
1275 1328