aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorKent Overstreet <koverstreet@google.com>2013-04-25 20:58:39 -0400
committerBenjamin LaHaise <bcrl@kvack.org>2013-07-30 11:53:11 -0400
commite1bdd5f27a5b14e24a658d5511bebceb67679d83 (patch)
tree3c18d12918a5ebe02bc38f63dd29031ea40673e0 /fs
parent34e83fc618085e00dc9803286c581f51966673bd (diff)
aio: percpu reqs_available
See the previous patch ("aio: reqs_active -> reqs_available") for why we want to do this - this basically implements a per cpu allocator for reqs_available that doesn't actually allocate anything. Note that we need to increase the size of the ringbuffer we allocate, since a single thread won't necessarily be able to use all the reqs_available slots - some (up to about half) might be on other per cpu lists, unavailable for the current thread. We size the ringbuffer based on the nr_events userspace passed to io_setup(), so this is a slight behaviour change - but nr_events wasn't being used as a hard limit before, it was being rounded up to the next page before so this doesn't change the actual semantics. Signed-off-by: Kent Overstreet <koverstreet@google.com> Cc: Zach Brown <zab@redhat.com> Cc: Felipe Balbi <balbi@ti.com> Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org> Cc: Mark Fasheh <mfasheh@suse.com> Cc: Joel Becker <jlbec@evilplan.org> Cc: Rusty Russell <rusty@rustcorp.com.au> Cc: Jens Axboe <axboe@kernel.dk> Cc: Asai Thambi S P <asamymuthupa@micron.com> Cc: Selvan Mani <smani@micron.com> Cc: Sam Bradshaw <sbradshaw@micron.com> Cc: Jeff Moyer <jmoyer@redhat.com> Cc: Al Viro <viro@zeniv.linux.org.uk> Cc: Benjamin LaHaise <bcrl@kvack.org> Reviewed-by: "Theodore Ts'o" <tytso@mit.edu> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Benjamin LaHaise <bcrl@kvack.org>
Diffstat (limited to 'fs')
-rw-r--r--fs/aio.c106
1 files changed, 99 insertions, 7 deletions
diff --git a/fs/aio.c b/fs/aio.c
index 0e23dfa77b0e..bb1a6c433110 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -26,6 +26,7 @@
26#include <linux/mm.h> 26#include <linux/mm.h>
27#include <linux/mman.h> 27#include <linux/mman.h>
28#include <linux/mmu_context.h> 28#include <linux/mmu_context.h>
29#include <linux/percpu.h>
29#include <linux/slab.h> 30#include <linux/slab.h>
30#include <linux/timer.h> 31#include <linux/timer.h>
31#include <linux/aio.h> 32#include <linux/aio.h>
@@ -64,6 +65,10 @@ struct aio_ring {
64 65
65#define AIO_RING_PAGES 8 66#define AIO_RING_PAGES 8
66 67
68struct kioctx_cpu {
69 unsigned reqs_available;
70};
71
67struct kioctx { 72struct kioctx {
68 atomic_t users; 73 atomic_t users;
69 atomic_t dead; 74 atomic_t dead;
@@ -72,6 +77,13 @@ struct kioctx {
72 unsigned long user_id; 77 unsigned long user_id;
73 struct hlist_node list; 78 struct hlist_node list;
74 79
80 struct __percpu kioctx_cpu *cpu;
81
82 /*
83 * For percpu reqs_available, number of slots we move to/from global
84 * counter at a time:
85 */
86 unsigned req_batch;
75 /* 87 /*
76 * This is what userspace passed to io_setup(), it's not used for 88 * This is what userspace passed to io_setup(), it's not used for
77 * anything but counting against the global max_reqs quota. 89 * anything but counting against the global max_reqs quota.
@@ -99,6 +111,8 @@ struct kioctx {
99 * so we avoid overflowing it: it's decremented (if positive) 111 * so we avoid overflowing it: it's decremented (if positive)
100 * when allocating a kiocb and incremented when the resulting 112 * when allocating a kiocb and incremented when the resulting
101 * io_event is pulled off the ringbuffer. 113 * io_event is pulled off the ringbuffer.
114 *
115 * We batch accesses to it with a percpu version.
102 */ 116 */
103 atomic_t reqs_available; 117 atomic_t reqs_available;
104 } ____cacheline_aligned_in_smp; 118 } ____cacheline_aligned_in_smp;
@@ -379,6 +393,8 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
379static void free_ioctx_rcu(struct rcu_head *head) 393static void free_ioctx_rcu(struct rcu_head *head)
380{ 394{
381 struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); 395 struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
396
397 free_percpu(ctx->cpu);
382 kmem_cache_free(kioctx_cachep, ctx); 398 kmem_cache_free(kioctx_cachep, ctx);
383} 399}
384 400
@@ -392,7 +408,7 @@ static void free_ioctx(struct kioctx *ctx)
392 struct aio_ring *ring; 408 struct aio_ring *ring;
393 struct io_event res; 409 struct io_event res;
394 struct kiocb *req; 410 struct kiocb *req;
395 unsigned head, avail; 411 unsigned cpu, head, avail;
396 412
397 spin_lock_irq(&ctx->ctx_lock); 413 spin_lock_irq(&ctx->ctx_lock);
398 414
@@ -406,6 +422,13 @@ static void free_ioctx(struct kioctx *ctx)
406 422
407 spin_unlock_irq(&ctx->ctx_lock); 423 spin_unlock_irq(&ctx->ctx_lock);
408 424
425 for_each_possible_cpu(cpu) {
426 struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu);
427
428 atomic_add(kcpu->reqs_available, &ctx->reqs_available);
429 kcpu->reqs_available = 0;
430 }
431
409 ring = kmap_atomic(ctx->ring_pages[0]); 432 ring = kmap_atomic(ctx->ring_pages[0]);
410 head = ring->head; 433 head = ring->head;
411 kunmap_atomic(ring); 434 kunmap_atomic(ring);
@@ -454,6 +477,18 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
454 struct kioctx *ctx; 477 struct kioctx *ctx;
455 int err = -ENOMEM; 478 int err = -ENOMEM;
456 479
480 /*
481 * We keep track of the number of available ringbuffer slots, to prevent
482 * overflow (reqs_available), and we also use percpu counters for this.
483 *
484 * So since up to half the slots might be on other cpu's percpu counters
485 * and unavailable, double nr_events so userspace sees what they
486 * expected: additionally, we move req_batch slots to/from percpu
487 * counters at a time, so make sure that isn't 0:
488 */
489 nr_events = max(nr_events, num_possible_cpus() * 4);
490 nr_events *= 2;
491
457 /* Prevent overflows */ 492 /* Prevent overflows */
458 if ((nr_events > (0x10000000U / sizeof(struct io_event))) || 493 if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
459 (nr_events > (0x10000000U / sizeof(struct kiocb)))) { 494 (nr_events > (0x10000000U / sizeof(struct kiocb)))) {
@@ -479,10 +514,16 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
479 514
480 INIT_LIST_HEAD(&ctx->active_reqs); 515 INIT_LIST_HEAD(&ctx->active_reqs);
481 516
482 if (aio_setup_ring(ctx) < 0) 517 ctx->cpu = alloc_percpu(struct kioctx_cpu);
518 if (!ctx->cpu)
483 goto out_freectx; 519 goto out_freectx;
484 520
521 if (aio_setup_ring(ctx) < 0)
522 goto out_freepcpu;
523
485 atomic_set(&ctx->reqs_available, ctx->nr_events - 1); 524 atomic_set(&ctx->reqs_available, ctx->nr_events - 1);
525 ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4);
526 BUG_ON(!ctx->req_batch);
486 527
487 /* limit the number of system wide aios */ 528 /* limit the number of system wide aios */
488 spin_lock(&aio_nr_lock); 529 spin_lock(&aio_nr_lock);
@@ -506,6 +547,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
506out_cleanup: 547out_cleanup:
507 err = -EAGAIN; 548 err = -EAGAIN;
508 aio_free_ring(ctx); 549 aio_free_ring(ctx);
550out_freepcpu:
551 free_percpu(ctx->cpu);
509out_freectx: 552out_freectx:
510 if (ctx->aio_ring_file) 553 if (ctx->aio_ring_file)
511 fput(ctx->aio_ring_file); 554 fput(ctx->aio_ring_file);
@@ -610,6 +653,52 @@ void exit_aio(struct mm_struct *mm)
610 } 653 }
611} 654}
612 655
656static void put_reqs_available(struct kioctx *ctx, unsigned nr)
657{
658 struct kioctx_cpu *kcpu;
659
660 preempt_disable();
661 kcpu = this_cpu_ptr(ctx->cpu);
662
663 kcpu->reqs_available += nr;
664 while (kcpu->reqs_available >= ctx->req_batch * 2) {
665 kcpu->reqs_available -= ctx->req_batch;
666 atomic_add(ctx->req_batch, &ctx->reqs_available);
667 }
668
669 preempt_enable();
670}
671
672static bool get_reqs_available(struct kioctx *ctx)
673{
674 struct kioctx_cpu *kcpu;
675 bool ret = false;
676
677 preempt_disable();
678 kcpu = this_cpu_ptr(ctx->cpu);
679
680 if (!kcpu->reqs_available) {
681 int old, avail = atomic_read(&ctx->reqs_available);
682
683 do {
684 if (avail < ctx->req_batch)
685 goto out;
686
687 old = avail;
688 avail = atomic_cmpxchg(&ctx->reqs_available,
689 avail, avail - ctx->req_batch);
690 } while (avail != old);
691
692 kcpu->reqs_available += ctx->req_batch;
693 }
694
695 ret = true;
696 kcpu->reqs_available--;
697out:
698 preempt_enable();
699 return ret;
700}
701
613/* aio_get_req 702/* aio_get_req
614 * Allocate a slot for an aio request. Increments the ki_users count 703 * Allocate a slot for an aio request. Increments the ki_users count
615 * of the kioctx so that the kioctx stays around until all requests are 704 * of the kioctx so that the kioctx stays around until all requests are
@@ -624,7 +713,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
624{ 713{
625 struct kiocb *req; 714 struct kiocb *req;
626 715
627 if (atomic_dec_if_positive(&ctx->reqs_available) <= 0) 716 if (!get_reqs_available(ctx))
628 return NULL; 717 return NULL;
629 718
630 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO); 719 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
@@ -633,10 +722,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
633 722
634 atomic_set(&req->ki_users, 2); 723 atomic_set(&req->ki_users, 2);
635 req->ki_ctx = ctx; 724 req->ki_ctx = ctx;
636
637 return req; 725 return req;
638out_put: 726out_put:
639 atomic_inc(&ctx->reqs_available); 727 put_reqs_available(ctx, 1);
640 return NULL; 728 return NULL;
641} 729}
642 730
@@ -725,6 +813,10 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
725 */ 813 */
726 if (unlikely(xchg(&iocb->ki_cancel, 814 if (unlikely(xchg(&iocb->ki_cancel,
727 KIOCB_CANCELLED) == KIOCB_CANCELLED)) { 815 KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
816 /*
817 * Can't use the percpu reqs_available here - could race with
818 * free_ioctx()
819 */
728 atomic_inc(&ctx->reqs_available); 820 atomic_inc(&ctx->reqs_available);
729 /* Still need the wake_up in case free_ioctx is waiting */ 821 /* Still need the wake_up in case free_ioctx is waiting */
730 goto put_rq; 822 goto put_rq;
@@ -863,7 +955,7 @@ static long aio_read_events_ring(struct kioctx *ctx,
863 955
864 pr_debug("%li h%u t%u\n", ret, head, ctx->tail); 956 pr_debug("%li h%u t%u\n", ret, head, ctx->tail);
865 957
866 atomic_add(ret, &ctx->reqs_available); 958 put_reqs_available(ctx, ret);
867out: 959out:
868 mutex_unlock(&ctx->ring_lock); 960 mutex_unlock(&ctx->ring_lock);
869 961
@@ -1247,7 +1339,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1247 aio_put_req(req); /* drop extra ref to req */ 1339 aio_put_req(req); /* drop extra ref to req */
1248 return 0; 1340 return 0;
1249out_put_req: 1341out_put_req:
1250 atomic_inc(&ctx->reqs_available); 1342 put_reqs_available(ctx, 1);
1251 aio_put_req(req); /* drop extra ref to req */ 1343 aio_put_req(req); /* drop extra ref to req */
1252 aio_put_req(req); /* drop i/o ref to req */ 1344 aio_put_req(req); /* drop i/o ref to req */
1253 return ret; 1345 return ret;