diff options
author | Christoph Hellwig <hch@infradead.org> | 2013-09-04 09:04:39 -0400 |
---|---|---|
committer | Al Viro <viro@zeniv.linux.org.uk> | 2013-09-04 09:23:46 -0400 |
commit | 7b7a8665edd8db733980389b098530f9e4f630b2 (patch) | |
tree | 968d570a9f0c4d861226aefed2f5f97a131c8d53 /fs/direct-io.c | |
parent | 4b6ccca701ef5977d0ffbc2c932430dea88b38b6 (diff) |
direct-io: Implement generic deferred AIO completions
Add support to the core direct-io code to defer AIO completions to user
context using a workqueue. This replaces opencoded and less efficient
code in XFS and ext4 (we save a memory allocation for each direct IO)
and will be needed to properly support O_(D)SYNC for AIO.
The communication between the filesystem and the direct I/O code requires
a new buffer head flag, which is a bit ugly but not avoidable until the
direct I/O code stops abusing the buffer_head structure for communicating
with the filesystems.
Currently this creates a per-superblock unbound workqueue for these
completions, which is taken from an earlier patch by Jan Kara. I'm
not really convinced about this use and would prefer a "normal" global
workqueue with a high concurrency limit, but this needs further discussion.
JK: Fixed ext4 part, dynamic allocation of the workqueue.
Signed-off-by: Christoph Hellwig <hch@lst.de>
Signed-off-by: Jan Kara <jack@suse.cz>
Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
Diffstat (limited to 'fs/direct-io.c')
-rw-r--r-- | fs/direct-io.c | 85 |
1 files changed, 69 insertions, 16 deletions
diff --git a/fs/direct-io.c b/fs/direct-io.c index 7ab90f5081ee..8b31b9f449f4 100644 --- a/fs/direct-io.c +++ b/fs/direct-io.c | |||
@@ -127,6 +127,7 @@ struct dio { | |||
127 | spinlock_t bio_lock; /* protects BIO fields below */ | 127 | spinlock_t bio_lock; /* protects BIO fields below */ |
128 | int page_errors; /* errno from get_user_pages() */ | 128 | int page_errors; /* errno from get_user_pages() */ |
129 | int is_async; /* is IO async ? */ | 129 | int is_async; /* is IO async ? */ |
130 | bool defer_completion; /* defer AIO completion to workqueue? */ | ||
130 | int io_error; /* IO error in completion path */ | 131 | int io_error; /* IO error in completion path */ |
131 | unsigned long refcount; /* direct_io_worker() and bios */ | 132 | unsigned long refcount; /* direct_io_worker() and bios */ |
132 | struct bio *bio_list; /* singly linked via bi_private */ | 133 | struct bio *bio_list; /* singly linked via bi_private */ |
@@ -141,7 +142,10 @@ struct dio { | |||
141 | * allocation time. Don't add new fields after pages[] unless you | 142 | * allocation time. Don't add new fields after pages[] unless you |
142 | * wish that they not be zeroed. | 143 | * wish that they not be zeroed. |
143 | */ | 144 | */ |
144 | struct page *pages[DIO_PAGES]; /* page buffer */ | 145 | union { |
146 | struct page *pages[DIO_PAGES]; /* page buffer */ | ||
147 | struct work_struct complete_work;/* deferred AIO completion */ | ||
148 | }; | ||
145 | } ____cacheline_aligned_in_smp; | 149 | } ____cacheline_aligned_in_smp; |
146 | 150 | ||
147 | static struct kmem_cache *dio_cache __read_mostly; | 151 | static struct kmem_cache *dio_cache __read_mostly; |
@@ -221,16 +225,16 @@ static inline struct page *dio_get_page(struct dio *dio, | |||
221 | * dio_complete() - called when all DIO BIO I/O has been completed | 225 | * dio_complete() - called when all DIO BIO I/O has been completed |
222 | * @offset: the byte offset in the file of the completed operation | 226 | * @offset: the byte offset in the file of the completed operation |
223 | * | 227 | * |
224 | * This releases locks as dictated by the locking type, lets interested parties | 228 | * This drops i_dio_count, lets interested parties know that a DIO operation |
225 | * know that a DIO operation has completed, and calculates the resulting return | 229 | * has completed, and calculates the resulting return code for the operation. |
226 | * code for the operation. | ||
227 | * | 230 | * |
228 | * It lets the filesystem know if it registered an interest earlier via | 231 | * It lets the filesystem know if it registered an interest earlier via |
229 | * get_block. Pass the private field of the map buffer_head so that | 232 | * get_block. Pass the private field of the map buffer_head so that |
230 | * filesystems can use it to hold additional state between get_block calls and | 233 | * filesystems can use it to hold additional state between get_block calls and |
231 | * dio_complete. | 234 | * dio_complete. |
232 | */ | 235 | */ |
233 | static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async) | 236 | static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, |
237 | bool is_async) | ||
234 | { | 238 | { |
235 | ssize_t transferred = 0; | 239 | ssize_t transferred = 0; |
236 | 240 | ||
@@ -258,19 +262,26 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is | |||
258 | if (ret == 0) | 262 | if (ret == 0) |
259 | ret = transferred; | 263 | ret = transferred; |
260 | 264 | ||
261 | if (dio->end_io && dio->result) { | 265 | if (dio->end_io && dio->result) |
262 | dio->end_io(dio->iocb, offset, transferred, | 266 | dio->end_io(dio->iocb, offset, transferred, dio->private); |
263 | dio->private, ret, is_async); | 267 | |
264 | } else { | 268 | inode_dio_done(dio->inode); |
265 | inode_dio_done(dio->inode); | 269 | if (is_async) |
266 | if (is_async) | 270 | aio_complete(dio->iocb, ret, 0); |
267 | aio_complete(dio->iocb, ret, 0); | ||
268 | } | ||
269 | 271 | ||
272 | kmem_cache_free(dio_cache, dio); | ||
270 | return ret; | 273 | return ret; |
271 | } | 274 | } |
272 | 275 | ||
276 | static void dio_aio_complete_work(struct work_struct *work) | ||
277 | { | ||
278 | struct dio *dio = container_of(work, struct dio, complete_work); | ||
279 | |||
280 | dio_complete(dio, dio->iocb->ki_pos, 0, true); | ||
281 | } | ||
282 | |||
273 | static int dio_bio_complete(struct dio *dio, struct bio *bio); | 283 | static int dio_bio_complete(struct dio *dio, struct bio *bio); |
284 | |||
274 | /* | 285 | /* |
275 | * Asynchronous IO callback. | 286 | * Asynchronous IO callback. |
276 | */ | 287 | */ |
@@ -290,8 +301,13 @@ static void dio_bio_end_aio(struct bio *bio, int error) | |||
290 | spin_unlock_irqrestore(&dio->bio_lock, flags); | 301 | spin_unlock_irqrestore(&dio->bio_lock, flags); |
291 | 302 | ||
292 | if (remaining == 0) { | 303 | if (remaining == 0) { |
293 | dio_complete(dio, dio->iocb->ki_pos, 0, true); | 304 | if (dio->result && dio->defer_completion) { |
294 | kmem_cache_free(dio_cache, dio); | 305 | INIT_WORK(&dio->complete_work, dio_aio_complete_work); |
306 | queue_work(dio->inode->i_sb->s_dio_done_wq, | ||
307 | &dio->complete_work); | ||
308 | } else { | ||
309 | dio_complete(dio, dio->iocb->ki_pos, 0, true); | ||
310 | } | ||
295 | } | 311 | } |
296 | } | 312 | } |
297 | 313 | ||
@@ -511,6 +527,41 @@ static inline int dio_bio_reap(struct dio *dio, struct dio_submit *sdio) | |||
511 | } | 527 | } |
512 | 528 | ||
513 | /* | 529 | /* |
530 | * Create workqueue for deferred direct IO completions. We allocate the | ||
531 | * workqueue when it's first needed. This avoids creating workqueue for | ||
532 | * filesystems that don't need it and also allows us to create the workqueue | ||
533 | * late enough so the we can include s_id in the name of the workqueue. | ||
534 | */ | ||
535 | static int sb_init_dio_done_wq(struct super_block *sb) | ||
536 | { | ||
537 | struct workqueue_struct *wq = alloc_workqueue("dio/%s", | ||
538 | WQ_MEM_RECLAIM, 0, | ||
539 | sb->s_id); | ||
540 | if (!wq) | ||
541 | return -ENOMEM; | ||
542 | /* | ||
543 | * This has to be atomic as more DIOs can race to create the workqueue | ||
544 | */ | ||
545 | cmpxchg(&sb->s_dio_done_wq, NULL, wq); | ||
546 | /* Someone created workqueue before us? Free ours... */ | ||
547 | if (wq != sb->s_dio_done_wq) | ||
548 | destroy_workqueue(wq); | ||
549 | return 0; | ||
550 | } | ||
551 | |||
552 | static int dio_set_defer_completion(struct dio *dio) | ||
553 | { | ||
554 | struct super_block *sb = dio->inode->i_sb; | ||
555 | |||
556 | if (dio->defer_completion) | ||
557 | return 0; | ||
558 | dio->defer_completion = true; | ||
559 | if (!sb->s_dio_done_wq) | ||
560 | return sb_init_dio_done_wq(sb); | ||
561 | return 0; | ||
562 | } | ||
563 | |||
564 | /* | ||
514 | * Call into the fs to map some more disk blocks. We record the current number | 565 | * Call into the fs to map some more disk blocks. We record the current number |
515 | * of available blocks at sdio->blocks_available. These are in units of the | 566 | * of available blocks at sdio->blocks_available. These are in units of the |
516 | * fs blocksize, (1 << inode->i_blkbits). | 567 | * fs blocksize, (1 << inode->i_blkbits). |
@@ -581,6 +632,9 @@ static int get_more_blocks(struct dio *dio, struct dio_submit *sdio, | |||
581 | 632 | ||
582 | /* Store for completion */ | 633 | /* Store for completion */ |
583 | dio->private = map_bh->b_private; | 634 | dio->private = map_bh->b_private; |
635 | |||
636 | if (ret == 0 && buffer_defer_completion(map_bh)) | ||
637 | ret = dio_set_defer_completion(dio); | ||
584 | } | 638 | } |
585 | return ret; | 639 | return ret; |
586 | } | 640 | } |
@@ -1269,7 +1323,6 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode, | |||
1269 | 1323 | ||
1270 | if (drop_refcount(dio) == 0) { | 1324 | if (drop_refcount(dio) == 0) { |
1271 | retval = dio_complete(dio, offset, retval, false); | 1325 | retval = dio_complete(dio, offset, retval, false); |
1272 | kmem_cache_free(dio_cache, dio); | ||
1273 | } else | 1326 | } else |
1274 | BUG_ON(retval != -EIOCBQUEUED); | 1327 | BUG_ON(retval != -EIOCBQUEUED); |
1275 | 1328 | ||