aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorKent Overstreet <kmo@daterainc.com>2013-10-10 22:31:47 -0400
committerKent Overstreet <kmo@daterainc.com>2013-10-10 22:31:47 -0400
commite34ecee2ae791df674dfb466ce40692ca6218e43 (patch)
tree0ce1c1ad368f8e0562f0b656142dcfd5c070d66c /fs
parentd0e639c9e06d44e713170031fe05fb60ebe680af (diff)
aio: Fix a trinity splat
aio kiocb refcounting was broken - it was relying on keeping track of the number of available ring buffer entries, which it needs to do anyways; then at shutdown time it'd wait for completions to be delivered until the # of available ring buffer entries equalled what it was initialized to. Problem with that is that the ring buffer is mapped writable into userspace, so userspace could futz with the head and tail pointers to cause the kernel to see extra completions, and cause free_ioctx() to return while there were still outstanding kiocbs. Which would be bad. Fix is just to directly refcount the kiocbs - which is more straightforward, and with the new percpu refcounting code doesn't cost us any cacheline bouncing which was the whole point of the original scheme. Also clean up ioctx_alloc()'s error path and fix a bug where it wasn't subtracting from aio_nr if ioctx_add_table() failed. Signed-off-by: Kent Overstreet <kmo@daterainc.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/aio.c129
1 files changed, 48 insertions, 81 deletions
diff --git a/fs/aio.c b/fs/aio.c
index 067e3d340c35..ee77dc13d5b2 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -80,6 +80,8 @@ struct kioctx {
80 struct percpu_ref users; 80 struct percpu_ref users;
81 atomic_t dead; 81 atomic_t dead;
82 82
83 struct percpu_ref reqs;
84
83 unsigned long user_id; 85 unsigned long user_id;
84 86
85 struct __percpu kioctx_cpu *cpu; 87 struct __percpu kioctx_cpu *cpu;
@@ -107,7 +109,6 @@ struct kioctx {
107 struct page **ring_pages; 109 struct page **ring_pages;
108 long nr_pages; 110 long nr_pages;
109 111
110 struct rcu_head rcu_head;
111 struct work_struct free_work; 112 struct work_struct free_work;
112 113
113 struct { 114 struct {
@@ -412,26 +413,34 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb)
412 return cancel(kiocb); 413 return cancel(kiocb);
413} 414}
414 415
415static void free_ioctx_rcu(struct rcu_head *head) 416static void free_ioctx(struct work_struct *work)
416{ 417{
417 struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); 418 struct kioctx *ctx = container_of(work, struct kioctx, free_work);
419
420 pr_debug("freeing %p\n", ctx);
418 421
422 aio_free_ring(ctx);
419 free_percpu(ctx->cpu); 423 free_percpu(ctx->cpu);
420 kmem_cache_free(kioctx_cachep, ctx); 424 kmem_cache_free(kioctx_cachep, ctx);
421} 425}
422 426
427static void free_ioctx_reqs(struct percpu_ref *ref)
428{
429 struct kioctx *ctx = container_of(ref, struct kioctx, reqs);
430
431 INIT_WORK(&ctx->free_work, free_ioctx);
432 schedule_work(&ctx->free_work);
433}
434
423/* 435/*
424 * When this function runs, the kioctx has been removed from the "hash table" 436 * When this function runs, the kioctx has been removed from the "hash table"
425 * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted - 437 * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -
426 * now it's safe to cancel any that need to be. 438 * now it's safe to cancel any that need to be.
427 */ 439 */
428static void free_ioctx(struct work_struct *work) 440static void free_ioctx_users(struct percpu_ref *ref)
429{ 441{
430 struct kioctx *ctx = container_of(work, struct kioctx, free_work); 442 struct kioctx *ctx = container_of(ref, struct kioctx, users);
431 struct aio_ring *ring;
432 struct kiocb *req; 443 struct kiocb *req;
433 unsigned cpu, avail;
434 DEFINE_WAIT(wait);
435 444
436 spin_lock_irq(&ctx->ctx_lock); 445 spin_lock_irq(&ctx->ctx_lock);
437 446
@@ -445,54 +454,8 @@ static void free_ioctx(struct work_struct *work)
445 454
446 spin_unlock_irq(&ctx->ctx_lock); 455 spin_unlock_irq(&ctx->ctx_lock);
447 456
448 for_each_possible_cpu(cpu) { 457 percpu_ref_kill(&ctx->reqs);
449 struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu); 458 percpu_ref_put(&ctx->reqs);
450
451 atomic_add(kcpu->reqs_available, &ctx->reqs_available);
452 kcpu->reqs_available = 0;
453 }
454
455 while (1) {
456 prepare_to_wait(&ctx->wait, &wait, TASK_UNINTERRUPTIBLE);
457
458 ring = kmap_atomic(ctx->ring_pages[0]);
459 avail = (ring->head <= ring->tail)
460 ? ring->tail - ring->head
461 : ctx->nr_events - ring->head + ring->tail;
462
463 atomic_add(avail, &ctx->reqs_available);
464 ring->head = ring->tail;
465 kunmap_atomic(ring);
466
467 if (atomic_read(&ctx->reqs_available) >= ctx->nr_events - 1)
468 break;
469
470 schedule();
471 }
472 finish_wait(&ctx->wait, &wait);
473
474 WARN_ON(atomic_read(&ctx->reqs_available) > ctx->nr_events - 1);
475
476 aio_free_ring(ctx);
477
478 pr_debug("freeing %p\n", ctx);
479
480 /*
481 * Here the call_rcu() is between the wait_event() for reqs_active to
482 * hit 0, and freeing the ioctx.
483 *
484 * aio_complete() decrements reqs_active, but it has to touch the ioctx
485 * after to issue a wakeup so we use rcu.
486 */
487 call_rcu(&ctx->rcu_head, free_ioctx_rcu);
488}
489
490static void free_ioctx_ref(struct percpu_ref *ref)
491{
492 struct kioctx *ctx = container_of(ref, struct kioctx, users);
493
494 INIT_WORK(&ctx->free_work, free_ioctx);
495 schedule_work(&ctx->free_work);
496} 459}
497 460
498static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) 461static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
@@ -551,6 +514,16 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
551 } 514 }
552} 515}
553 516
517static void aio_nr_sub(unsigned nr)
518{
519 spin_lock(&aio_nr_lock);
520 if (WARN_ON(aio_nr - nr > aio_nr))
521 aio_nr = 0;
522 else
523 aio_nr -= nr;
524 spin_unlock(&aio_nr_lock);
525}
526
554/* ioctx_alloc 527/* ioctx_alloc
555 * Allocates and initializes an ioctx. Returns an ERR_PTR if it failed. 528 * Allocates and initializes an ioctx. Returns an ERR_PTR if it failed.
556 */ 529 */
@@ -588,8 +561,11 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
588 561
589 ctx->max_reqs = nr_events; 562 ctx->max_reqs = nr_events;
590 563
591 if (percpu_ref_init(&ctx->users, free_ioctx_ref)) 564 if (percpu_ref_init(&ctx->users, free_ioctx_users))
592 goto out_freectx; 565 goto err;
566
567 if (percpu_ref_init(&ctx->reqs, free_ioctx_reqs))
568 goto err;
593 569
594 spin_lock_init(&ctx->ctx_lock); 570 spin_lock_init(&ctx->ctx_lock);
595 spin_lock_init(&ctx->completion_lock); 571 spin_lock_init(&ctx->completion_lock);
@@ -600,10 +576,10 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
600 576
601 ctx->cpu = alloc_percpu(struct kioctx_cpu); 577 ctx->cpu = alloc_percpu(struct kioctx_cpu);
602 if (!ctx->cpu) 578 if (!ctx->cpu)
603 goto out_freeref; 579 goto err;
604 580
605 if (aio_setup_ring(ctx) < 0) 581 if (aio_setup_ring(ctx) < 0)
606 goto out_freepcpu; 582 goto err;
607 583
608 atomic_set(&ctx->reqs_available, ctx->nr_events - 1); 584 atomic_set(&ctx->reqs_available, ctx->nr_events - 1);
609 ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4); 585 ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4);
@@ -615,7 +591,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
615 if (aio_nr + nr_events > (aio_max_nr * 2UL) || 591 if (aio_nr + nr_events > (aio_max_nr * 2UL) ||
616 aio_nr + nr_events < aio_nr) { 592 aio_nr + nr_events < aio_nr) {
617 spin_unlock(&aio_nr_lock); 593 spin_unlock(&aio_nr_lock);
618 goto out_cleanup; 594 err = -EAGAIN;
595 goto err;
619 } 596 }
620 aio_nr += ctx->max_reqs; 597 aio_nr += ctx->max_reqs;
621 spin_unlock(&aio_nr_lock); 598 spin_unlock(&aio_nr_lock);
@@ -624,23 +601,19 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
624 601
625 err = ioctx_add_table(ctx, mm); 602 err = ioctx_add_table(ctx, mm);
626 if (err) 603 if (err)
627 goto out_cleanup_put; 604 goto err_cleanup;
628 605
629 pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n", 606 pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
630 ctx, ctx->user_id, mm, ctx->nr_events); 607 ctx, ctx->user_id, mm, ctx->nr_events);
631 return ctx; 608 return ctx;
632 609
633out_cleanup_put: 610err_cleanup:
634 percpu_ref_put(&ctx->users); 611 aio_nr_sub(ctx->max_reqs);
635out_cleanup: 612err:
636 err = -EAGAIN;
637 aio_free_ring(ctx); 613 aio_free_ring(ctx);
638out_freepcpu:
639 free_percpu(ctx->cpu); 614 free_percpu(ctx->cpu);
640out_freeref: 615 free_percpu(ctx->reqs.pcpu_count);
641 free_percpu(ctx->users.pcpu_count); 616 free_percpu(ctx->users.pcpu_count);
642out_freectx:
643 put_aio_ring_file(ctx);
644 kmem_cache_free(kioctx_cachep, ctx); 617 kmem_cache_free(kioctx_cachep, ctx);
645 pr_debug("error allocating ioctx %d\n", err); 618 pr_debug("error allocating ioctx %d\n", err);
646 return ERR_PTR(err); 619 return ERR_PTR(err);
@@ -675,10 +648,7 @@ static void kill_ioctx(struct mm_struct *mm, struct kioctx *ctx)
675 * -EAGAIN with no ioctxs actually in use (as far as userspace 648 * -EAGAIN with no ioctxs actually in use (as far as userspace
676 * could tell). 649 * could tell).
677 */ 650 */
678 spin_lock(&aio_nr_lock); 651 aio_nr_sub(ctx->max_reqs);
679 BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
680 aio_nr -= ctx->max_reqs;
681 spin_unlock(&aio_nr_lock);
682 652
683 if (ctx->mmap_size) 653 if (ctx->mmap_size)
684 vm_munmap(ctx->mmap_base, ctx->mmap_size); 654 vm_munmap(ctx->mmap_base, ctx->mmap_size);
@@ -810,6 +780,8 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
810 if (unlikely(!req)) 780 if (unlikely(!req))
811 goto out_put; 781 goto out_put;
812 782
783 percpu_ref_get(&ctx->reqs);
784
813 req->ki_ctx = ctx; 785 req->ki_ctx = ctx;
814 return req; 786 return req;
815out_put: 787out_put:
@@ -879,12 +851,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
879 return; 851 return;
880 } 852 }
881 853
882 /*
883 * Take rcu_read_lock() in case the kioctx is being destroyed, as we
884 * need to issue a wakeup after incrementing reqs_available.
885 */
886 rcu_read_lock();
887
888 if (iocb->ki_list.next) { 854 if (iocb->ki_list.next) {
889 unsigned long flags; 855 unsigned long flags;
890 856
@@ -959,7 +925,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
959 if (waitqueue_active(&ctx->wait)) 925 if (waitqueue_active(&ctx->wait))
960 wake_up(&ctx->wait); 926 wake_up(&ctx->wait);
961 927
962 rcu_read_unlock(); 928 percpu_ref_put(&ctx->reqs);
963} 929}
964EXPORT_SYMBOL(aio_complete); 930EXPORT_SYMBOL(aio_complete);
965 931
@@ -1370,6 +1336,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1370 return 0; 1336 return 0;
1371out_put_req: 1337out_put_req:
1372 put_reqs_available(ctx, 1); 1338 put_reqs_available(ctx, 1);
1339 percpu_ref_put(&ctx->reqs);
1373 kiocb_free(req); 1340 kiocb_free(req);
1374 return ret; 1341 return ret;
1375} 1342}