aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeff Moyer <jmoyer@redhat.com>2011-11-02 16:40:10 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2011-11-02 19:07:03 -0400
commit080d676de095a14ecba14c0b9a91acb5bbb634df (patch)
tree4a4c56bc86a8edf4a42f8ec7c65ba795997e50ab
parent2ca02df6b098be2d33a99a65531dcd84a10b6e21 (diff)
aio: allocate kiocbs in batches
In testing aio on a fast storage device, I found that the context lock takes up a fair amount of cpu time in the I/O submission path. The reason is that we take it for every I/O submitted (see __aio_get_req). Since we know how many I/Os are passed to io_submit, we can preallocate the kiocbs in batches, reducing the number of times we take and release the lock. In my testing, I was able to reduce the amount of time spent in _raw_spin_lock_irq by .56% (average of 3 runs). The command I used to test this was: aio-stress -O -o 2 -o 3 -r 8 -d 128 -b 32 -i 32 -s 16384 <dev> I also tested the patch with various numbers of events passed to io_submit, and I ran the xfstests aio group of tests to ensure I didn't break anything. Signed-off-by: Jeff Moyer <jmoyer@redhat.com> Cc: Daniel Ehrenberg <dehrenberg@google.com> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
-rw-r--r--fs/aio.c136
-rw-r--r--include/linux/aio.h1
2 files changed, 108 insertions, 29 deletions
diff --git a/fs/aio.c b/fs/aio.c
index 632b235f4fbe..78c514cfd212 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -440,8 +440,6 @@ void exit_aio(struct mm_struct *mm)
440static struct kiocb *__aio_get_req(struct kioctx *ctx) 440static struct kiocb *__aio_get_req(struct kioctx *ctx)
441{ 441{
442 struct kiocb *req = NULL; 442 struct kiocb *req = NULL;
443 struct aio_ring *ring;
444 int okay = 0;
445 443
446 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL); 444 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
447 if (unlikely(!req)) 445 if (unlikely(!req))
@@ -459,39 +457,114 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
459 INIT_LIST_HEAD(&req->ki_run_list); 457 INIT_LIST_HEAD(&req->ki_run_list);
460 req->ki_eventfd = NULL; 458 req->ki_eventfd = NULL;
461 459
462 /* Check if the completion queue has enough free space to 460 return req;
463 * accept an event from this io. 461}
464 */ 462
463/*
464 * struct kiocb's are allocated in batches to reduce the number of
465 * times the ctx lock is acquired and released.
466 */
467#define KIOCB_BATCH_SIZE 32L
468struct kiocb_batch {
469 struct list_head head;
470 long count; /* number of requests left to allocate */
471};
472
473static void kiocb_batch_init(struct kiocb_batch *batch, long total)
474{
475 INIT_LIST_HEAD(&batch->head);
476 batch->count = total;
477}
478
479static void kiocb_batch_free(struct kiocb_batch *batch)
480{
481 struct kiocb *req, *n;
482
483 list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
484 list_del(&req->ki_batch);
485 kmem_cache_free(kiocb_cachep, req);
486 }
487}
488
489/*
490 * Allocate a batch of kiocbs. This avoids taking and dropping the
491 * context lock a lot during setup.
492 */
493static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
494{
495 unsigned short allocated, to_alloc;
496 long avail;
497 bool called_fput = false;
498 struct kiocb *req, *n;
499 struct aio_ring *ring;
500
501 to_alloc = min(batch->count, KIOCB_BATCH_SIZE);
502 for (allocated = 0; allocated < to_alloc; allocated++) {
503 req = __aio_get_req(ctx);
504 if (!req)
505 /* allocation failed, go with what we've got */
506 break;
507 list_add(&req->ki_batch, &batch->head);
508 }
509
510 if (allocated == 0)
511 goto out;
512
513retry:
465 spin_lock_irq(&ctx->ctx_lock); 514 spin_lock_irq(&ctx->ctx_lock);
466 ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0); 515 ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
467 if (ctx->reqs_active < aio_ring_avail(&ctx->ring_info, ring)) { 516
517 avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active;
518 BUG_ON(avail < 0);
519 if (avail == 0 && !called_fput) {
520 /*
521 * Handle a potential starvation case. It is possible that
522 * we hold the last reference on a struct file, causing us
523 * to delay the final fput to non-irq context. In this case,
524 * ctx->reqs_active is artificially high. Calling the fput
525 * routine here may free up a slot in the event completion
526 * ring, allowing this allocation to succeed.
527 */
528 kunmap_atomic(ring);
529 spin_unlock_irq(&ctx->ctx_lock);
530 aio_fput_routine(NULL);
531 called_fput = true;
532 goto retry;
533 }
534
535 if (avail < allocated) {
536 /* Trim back the number of requests. */
537 list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
538 list_del(&req->ki_batch);
539 kmem_cache_free(kiocb_cachep, req);
540 if (--allocated <= avail)
541 break;
542 }
543 }
544
545 batch->count -= allocated;
546 list_for_each_entry(req, &batch->head, ki_batch) {
468 list_add(&req->ki_list, &ctx->active_reqs); 547 list_add(&req->ki_list, &ctx->active_reqs);
469 ctx->reqs_active++; 548 ctx->reqs_active++;
470 okay = 1;
471 } 549 }
472 kunmap_atomic(ring, KM_USER0);
473 spin_unlock_irq(&ctx->ctx_lock);
474 550
475 if (!okay) { 551 kunmap_atomic(ring);
476 kmem_cache_free(kiocb_cachep, req); 552 spin_unlock_irq(&ctx->ctx_lock);
477 req = NULL;
478 }
479 553
480 return req; 554out:
555 return allocated;
481} 556}
482 557
483static inline struct kiocb *aio_get_req(struct kioctx *ctx) 558static inline struct kiocb *aio_get_req(struct kioctx *ctx,
559 struct kiocb_batch *batch)
484{ 560{
485 struct kiocb *req; 561 struct kiocb *req;
486 /* Handle a potential starvation case -- should be exceedingly rare as 562
487 * requests will be stuck on fput_head only if the aio_fput_routine is 563 if (list_empty(&batch->head))
488 * delayed and the requests were the last user of the struct file. 564 if (kiocb_batch_refill(ctx, batch) == 0)
489 */ 565 return NULL;
490 req = __aio_get_req(ctx); 566 req = list_first_entry(&batch->head, struct kiocb, ki_batch);
491 if (unlikely(NULL == req)) { 567 list_del(&req->ki_batch);
492 aio_fput_routine(NULL);
493 req = __aio_get_req(ctx);
494 }
495 return req; 568 return req;
496} 569}
497 570
@@ -1515,7 +1588,8 @@ static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
1515} 1588}
1516 1589
1517static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, 1590static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1518 struct iocb *iocb, bool compat) 1591 struct iocb *iocb, struct kiocb_batch *batch,
1592 bool compat)
1519{ 1593{
1520 struct kiocb *req; 1594 struct kiocb *req;
1521 struct file *file; 1595 struct file *file;
@@ -1541,7 +1615,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1541 if (unlikely(!file)) 1615 if (unlikely(!file))
1542 return -EBADF; 1616 return -EBADF;
1543 1617
1544 req = aio_get_req(ctx); /* returns with 2 references to req */ 1618 req = aio_get_req(ctx, batch); /* returns with 2 references to req */
1545 if (unlikely(!req)) { 1619 if (unlikely(!req)) {
1546 fput(file); 1620 fput(file);
1547 return -EAGAIN; 1621 return -EAGAIN;
@@ -1621,8 +1695,9 @@ long do_io_submit(aio_context_t ctx_id, long nr,
1621{ 1695{
1622 struct kioctx *ctx; 1696 struct kioctx *ctx;
1623 long ret = 0; 1697 long ret = 0;
1624 int i; 1698 int i = 0;
1625 struct blk_plug plug; 1699 struct blk_plug plug;
1700 struct kiocb_batch batch;
1626 1701
1627 if (unlikely(nr < 0)) 1702 if (unlikely(nr < 0))
1628 return -EINVAL; 1703 return -EINVAL;
@@ -1639,6 +1714,8 @@ long do_io_submit(aio_context_t ctx_id, long nr,
1639 return -EINVAL; 1714 return -EINVAL;
1640 } 1715 }
1641 1716
1717 kiocb_batch_init(&batch, nr);
1718
1642 blk_start_plug(&plug); 1719 blk_start_plug(&plug);
1643 1720
1644 /* 1721 /*
@@ -1659,12 +1736,13 @@ long do_io_submit(aio_context_t ctx_id, long nr,
1659 break; 1736 break;
1660 } 1737 }
1661 1738
1662 ret = io_submit_one(ctx, user_iocb, &tmp, compat); 1739 ret = io_submit_one(ctx, user_iocb, &tmp, &batch, compat);
1663 if (ret) 1740 if (ret)
1664 break; 1741 break;
1665 } 1742 }
1666 blk_finish_plug(&plug); 1743 blk_finish_plug(&plug);
1667 1744
1745 kiocb_batch_free(&batch);
1668 put_ioctx(ctx); 1746 put_ioctx(ctx);
1669 return i ? i : ret; 1747 return i ? i : ret;
1670} 1748}
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 2dcb72bff4b6..2314ad8b3c9c 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -117,6 +117,7 @@ struct kiocb {
117 117
118 struct list_head ki_list; /* the aio core uses this 118 struct list_head ki_list; /* the aio core uses this
119 * for cancellation */ 119 * for cancellation */
120 struct list_head ki_batch; /* batch allocation */
120 121
121 /* 122 /*
122 * If the aio_resfd field of the userspace iocb is not zero, 123 * If the aio_resfd field of the userspace iocb is not zero,