aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZach Brown <zab@redhat.com>2013-05-07 19:18:25 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-05-07 21:38:27 -0400
commit41003a7bcfed1255032e1e7c7b487e505b22e298 (patch)
treeb09cb3e5efaeaabbee5a27daff490f77c78aa27b
parenta80bf61ef36da48285850974f30700d1e8efbfc0 (diff)
aio: remove retry-based AIO
This removes the retry-based AIO infrastructure now that nothing in tree is using it. We want to remove retry-based AIO because it is fundemantally unsafe. It retries IO submission from a kernel thread that has only assumed the mm of the submitting task. All other task_struct references in the IO submission path will see the kernel thread, not the submitting task. This design flaw means that nothing of any meaningful complexity can use retry-based AIO. This removes all the code and data associated with the retry machinery. The most significant benefit of this is the removal of the locking around the unused run list in the submission path. [akpm@linux-foundation.org: coding-style fixes] Signed-off-by: Kent Overstreet <koverstreet@google.com> Signed-off-by: Zach Brown <zab@redhat.com> Cc: Zach Brown <zab@redhat.com> Cc: Felipe Balbi <balbi@ti.com> Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org> Cc: Mark Fasheh <mfasheh@suse.com> Cc: Joel Becker <jlbec@evilplan.org> Cc: Rusty Russell <rusty@rustcorp.com.au> Cc: Jens Axboe <axboe@kernel.dk> Cc: Asai Thambi S P <asamymuthupa@micron.com> Cc: Selvan Mani <smani@micron.com> Cc: Sam Bradshaw <sbradshaw@micron.com> Acked-by: Jeff Moyer <jmoyer@redhat.com> Cc: Al Viro <viro@zeniv.linux.org.uk> Cc: Benjamin LaHaise <bcrl@kvack.org> Reviewed-by: "Theodore Ts'o" <tytso@mit.edu> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
-rw-r--r--fs/aio.c351
-rw-r--r--fs/ocfs2/dlmglue.c2
-rw-r--r--fs/read_write.c34
-rw-r--r--include/linux/aio.h22
-rw-r--r--include/linux/errno.h1
5 files changed, 31 insertions, 379 deletions
diff --git a/fs/aio.c b/fs/aio.c
index 351afe7ac78e..6e095a95a7c6 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -54,11 +54,6 @@ unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio request
54static struct kmem_cache *kiocb_cachep; 54static struct kmem_cache *kiocb_cachep;
55static struct kmem_cache *kioctx_cachep; 55static struct kmem_cache *kioctx_cachep;
56 56
57static struct workqueue_struct *aio_wq;
58
59static void aio_kick_handler(struct work_struct *);
60static void aio_queue_work(struct kioctx *);
61
62/* aio_setup 57/* aio_setup
63 * Creates the slab caches used by the aio routines, panic on 58 * Creates the slab caches used by the aio routines, panic on
64 * failure as this is done early during the boot sequence. 59 * failure as this is done early during the boot sequence.
@@ -68,9 +63,6 @@ static int __init aio_setup(void)
68 kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC); 63 kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
69 kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC); 64 kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
70 65
71 aio_wq = alloc_workqueue("aio", 0, 1); /* used to limit concurrency */
72 BUG_ON(!aio_wq);
73
74 pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page)); 66 pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
75 67
76 return 0; 68 return 0;
@@ -86,7 +78,6 @@ static void aio_free_ring(struct kioctx *ctx)
86 put_page(info->ring_pages[i]); 78 put_page(info->ring_pages[i]);
87 79
88 if (info->mmap_size) { 80 if (info->mmap_size) {
89 BUG_ON(ctx->mm != current->mm);
90 vm_munmap(info->mmap_base, info->mmap_size); 81 vm_munmap(info->mmap_base, info->mmap_size);
91 } 82 }
92 83
@@ -101,6 +92,7 @@ static int aio_setup_ring(struct kioctx *ctx)
101 struct aio_ring *ring; 92 struct aio_ring *ring;
102 struct aio_ring_info *info = &ctx->ring_info; 93 struct aio_ring_info *info = &ctx->ring_info;
103 unsigned nr_events = ctx->max_reqs; 94 unsigned nr_events = ctx->max_reqs;
95 struct mm_struct *mm = current->mm;
104 unsigned long size, populate; 96 unsigned long size, populate;
105 int nr_pages; 97 int nr_pages;
106 98
@@ -126,23 +118,22 @@ static int aio_setup_ring(struct kioctx *ctx)
126 118
127 info->mmap_size = nr_pages * PAGE_SIZE; 119 info->mmap_size = nr_pages * PAGE_SIZE;
128 dprintk("attempting mmap of %lu bytes\n", info->mmap_size); 120 dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
129 down_write(&ctx->mm->mmap_sem); 121 down_write(&mm->mmap_sem);
130 info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size, 122 info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size,
131 PROT_READ|PROT_WRITE, 123 PROT_READ|PROT_WRITE,
132 MAP_ANONYMOUS|MAP_PRIVATE, 0, 124 MAP_ANONYMOUS|MAP_PRIVATE, 0,
133 &populate); 125 &populate);
134 if (IS_ERR((void *)info->mmap_base)) { 126 if (IS_ERR((void *)info->mmap_base)) {
135 up_write(&ctx->mm->mmap_sem); 127 up_write(&mm->mmap_sem);
136 info->mmap_size = 0; 128 info->mmap_size = 0;
137 aio_free_ring(ctx); 129 aio_free_ring(ctx);
138 return -EAGAIN; 130 return -EAGAIN;
139 } 131 }
140 132
141 dprintk("mmap address: 0x%08lx\n", info->mmap_base); 133 dprintk("mmap address: 0x%08lx\n", info->mmap_base);
142 info->nr_pages = get_user_pages(current, ctx->mm, 134 info->nr_pages = get_user_pages(current, mm, info->mmap_base, nr_pages,
143 info->mmap_base, nr_pages,
144 1, 0, info->ring_pages, NULL); 135 1, 0, info->ring_pages, NULL);
145 up_write(&ctx->mm->mmap_sem); 136 up_write(&mm->mmap_sem);
146 137
147 if (unlikely(info->nr_pages != nr_pages)) { 138 if (unlikely(info->nr_pages != nr_pages)) {
148 aio_free_ring(ctx); 139 aio_free_ring(ctx);
@@ -206,10 +197,7 @@ static void __put_ioctx(struct kioctx *ctx)
206 unsigned nr_events = ctx->max_reqs; 197 unsigned nr_events = ctx->max_reqs;
207 BUG_ON(ctx->reqs_active); 198 BUG_ON(ctx->reqs_active);
208 199
209 cancel_delayed_work_sync(&ctx->wq);
210 aio_free_ring(ctx); 200 aio_free_ring(ctx);
211 mmdrop(ctx->mm);
212 ctx->mm = NULL;
213 if (nr_events) { 201 if (nr_events) {
214 spin_lock(&aio_nr_lock); 202 spin_lock(&aio_nr_lock);
215 BUG_ON(aio_nr - nr_events > aio_nr); 203 BUG_ON(aio_nr - nr_events > aio_nr);
@@ -237,7 +225,7 @@ static inline void put_ioctx(struct kioctx *kioctx)
237 */ 225 */
238static struct kioctx *ioctx_alloc(unsigned nr_events) 226static struct kioctx *ioctx_alloc(unsigned nr_events)
239{ 227{
240 struct mm_struct *mm; 228 struct mm_struct *mm = current->mm;
241 struct kioctx *ctx; 229 struct kioctx *ctx;
242 int err = -ENOMEM; 230 int err = -ENOMEM;
243 231
@@ -256,8 +244,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
256 return ERR_PTR(-ENOMEM); 244 return ERR_PTR(-ENOMEM);
257 245
258 ctx->max_reqs = nr_events; 246 ctx->max_reqs = nr_events;
259 mm = ctx->mm = current->mm;
260 atomic_inc(&mm->mm_count);
261 247
262 atomic_set(&ctx->users, 2); 248 atomic_set(&ctx->users, 2);
263 spin_lock_init(&ctx->ctx_lock); 249 spin_lock_init(&ctx->ctx_lock);
@@ -265,8 +251,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
265 init_waitqueue_head(&ctx->wait); 251 init_waitqueue_head(&ctx->wait);
266 252
267 INIT_LIST_HEAD(&ctx->active_reqs); 253 INIT_LIST_HEAD(&ctx->active_reqs);
268 INIT_LIST_HEAD(&ctx->run_list);
269 INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler);
270 254
271 if (aio_setup_ring(ctx) < 0) 255 if (aio_setup_ring(ctx) < 0)
272 goto out_freectx; 256 goto out_freectx;
@@ -287,14 +271,13 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
287 spin_unlock(&mm->ioctx_lock); 271 spin_unlock(&mm->ioctx_lock);
288 272
289 dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n", 273 dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
290 ctx, ctx->user_id, current->mm, ctx->ring_info.nr); 274 ctx, ctx->user_id, mm, ctx->ring_info.nr);
291 return ctx; 275 return ctx;
292 276
293out_cleanup: 277out_cleanup:
294 err = -EAGAIN; 278 err = -EAGAIN;
295 aio_free_ring(ctx); 279 aio_free_ring(ctx);
296out_freectx: 280out_freectx:
297 mmdrop(mm);
298 kmem_cache_free(kioctx_cachep, ctx); 281 kmem_cache_free(kioctx_cachep, ctx);
299 dprintk("aio: error allocating ioctx %d\n", err); 282 dprintk("aio: error allocating ioctx %d\n", err);
300 return ERR_PTR(err); 283 return ERR_PTR(err);
@@ -391,8 +374,6 @@ void exit_aio(struct mm_struct *mm)
391 * as indicator that it needs to unmap the area, 374 * as indicator that it needs to unmap the area,
392 * just set it to 0; aio_free_ring() is the only 375 * just set it to 0; aio_free_ring() is the only
393 * place that uses ->mmap_size, so it's safe. 376 * place that uses ->mmap_size, so it's safe.
394 * That way we get all munmap done to current->mm -
395 * all other callers have ctx->mm == current->mm.
396 */ 377 */
397 ctx->ring_info.mmap_size = 0; 378 ctx->ring_info.mmap_size = 0;
398 put_ioctx(ctx); 379 put_ioctx(ctx);
@@ -426,7 +407,6 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
426 req->ki_dtor = NULL; 407 req->ki_dtor = NULL;
427 req->private = NULL; 408 req->private = NULL;
428 req->ki_iovec = NULL; 409 req->ki_iovec = NULL;
429 INIT_LIST_HEAD(&req->ki_run_list);
430 req->ki_eventfd = NULL; 410 req->ki_eventfd = NULL;
431 411
432 return req; 412 return req;
@@ -611,281 +591,6 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
611 return ret; 591 return ret;
612} 592}
613 593
614/*
615 * Queue up a kiocb to be retried. Assumes that the kiocb
616 * has already been marked as kicked, and places it on
617 * the retry run list for the corresponding ioctx, if it
618 * isn't already queued. Returns 1 if it actually queued
619 * the kiocb (to tell the caller to activate the work
620 * queue to process it), or 0, if it found that it was
621 * already queued.
622 */
623static inline int __queue_kicked_iocb(struct kiocb *iocb)
624{
625 struct kioctx *ctx = iocb->ki_ctx;
626
627 assert_spin_locked(&ctx->ctx_lock);
628
629 if (list_empty(&iocb->ki_run_list)) {
630 list_add_tail(&iocb->ki_run_list,
631 &ctx->run_list);
632 return 1;
633 }
634 return 0;
635}
636
637/* aio_run_iocb
638 * This is the core aio execution routine. It is
639 * invoked both for initial i/o submission and
640 * subsequent retries via the aio_kick_handler.
641 * Expects to be invoked with iocb->ki_ctx->lock
642 * already held. The lock is released and reacquired
643 * as needed during processing.
644 *
645 * Calls the iocb retry method (already setup for the
646 * iocb on initial submission) for operation specific
647 * handling, but takes care of most of common retry
648 * execution details for a given iocb. The retry method
649 * needs to be non-blocking as far as possible, to avoid
650 * holding up other iocbs waiting to be serviced by the
651 * retry kernel thread.
652 *
653 * The trickier parts in this code have to do with
654 * ensuring that only one retry instance is in progress
655 * for a given iocb at any time. Providing that guarantee
656 * simplifies the coding of individual aio operations as
657 * it avoids various potential races.
658 */
659static ssize_t aio_run_iocb(struct kiocb *iocb)
660{
661 struct kioctx *ctx = iocb->ki_ctx;
662 ssize_t (*retry)(struct kiocb *);
663 ssize_t ret;
664
665 if (!(retry = iocb->ki_retry)) {
666 printk("aio_run_iocb: iocb->ki_retry = NULL\n");
667 return 0;
668 }
669
670 /*
671 * We don't want the next retry iteration for this
672 * operation to start until this one has returned and
673 * updated the iocb state. However, wait_queue functions
674 * can trigger a kick_iocb from interrupt context in the
675 * meantime, indicating that data is available for the next
676 * iteration. We want to remember that and enable the
677 * next retry iteration _after_ we are through with
678 * this one.
679 *
680 * So, in order to be able to register a "kick", but
681 * prevent it from being queued now, we clear the kick
682 * flag, but make the kick code *think* that the iocb is
683 * still on the run list until we are actually done.
684 * When we are done with this iteration, we check if
685 * the iocb was kicked in the meantime and if so, queue
686 * it up afresh.
687 */
688
689 kiocbClearKicked(iocb);
690
691 /*
692 * This is so that aio_complete knows it doesn't need to
693 * pull the iocb off the run list (We can't just call
694 * INIT_LIST_HEAD because we don't want a kick_iocb to
695 * queue this on the run list yet)
696 */
697 iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
698 spin_unlock_irq(&ctx->ctx_lock);
699
700 /* Quit retrying if the i/o has been cancelled */
701 if (kiocbIsCancelled(iocb)) {
702 ret = -EINTR;
703 aio_complete(iocb, ret, 0);
704 /* must not access the iocb after this */
705 goto out;
706 }
707
708 /*
709 * Now we are all set to call the retry method in async
710 * context.
711 */
712 ret = retry(iocb);
713
714 if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
715 /*
716 * There's no easy way to restart the syscall since other AIO's
717 * may be already running. Just fail this IO with EINTR.
718 */
719 if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
720 ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
721 ret = -EINTR;
722 aio_complete(iocb, ret, 0);
723 }
724out:
725 spin_lock_irq(&ctx->ctx_lock);
726
727 if (-EIOCBRETRY == ret) {
728 /*
729 * OK, now that we are done with this iteration
730 * and know that there is more left to go,
731 * this is where we let go so that a subsequent
732 * "kick" can start the next iteration
733 */
734
735 /* will make __queue_kicked_iocb succeed from here on */
736 INIT_LIST_HEAD(&iocb->ki_run_list);
737 /* we must queue the next iteration ourselves, if it
738 * has already been kicked */
739 if (kiocbIsKicked(iocb)) {
740 __queue_kicked_iocb(iocb);
741
742 /*
743 * __queue_kicked_iocb will always return 1 here, because
744 * iocb->ki_run_list is empty at this point so it should
745 * be safe to unconditionally queue the context into the
746 * work queue.
747 */
748 aio_queue_work(ctx);
749 }
750 }
751 return ret;
752}
753
754/*
755 * __aio_run_iocbs:
756 * Process all pending retries queued on the ioctx
757 * run list.
758 * Assumes it is operating within the aio issuer's mm
759 * context.
760 */
761static int __aio_run_iocbs(struct kioctx *ctx)
762{
763 struct kiocb *iocb;
764 struct list_head run_list;
765
766 assert_spin_locked(&ctx->ctx_lock);
767
768 list_replace_init(&ctx->run_list, &run_list);
769 while (!list_empty(&run_list)) {
770 iocb = list_entry(run_list.next, struct kiocb,
771 ki_run_list);
772 list_del(&iocb->ki_run_list);
773 /*
774 * Hold an extra reference while retrying i/o.
775 */
776 iocb->ki_users++; /* grab extra reference */
777 aio_run_iocb(iocb);
778 __aio_put_req(ctx, iocb);
779 }
780 if (!list_empty(&ctx->run_list))
781 return 1;
782 return 0;
783}
784
785static void aio_queue_work(struct kioctx * ctx)
786{
787 unsigned long timeout;
788 /*
789 * if someone is waiting, get the work started right
790 * away, otherwise, use a longer delay
791 */
792 smp_mb();
793 if (waitqueue_active(&ctx->wait))
794 timeout = 1;
795 else
796 timeout = HZ/10;
797 queue_delayed_work(aio_wq, &ctx->wq, timeout);
798}
799
800/*
801 * aio_run_all_iocbs:
802 * Process all pending retries queued on the ioctx
803 * run list, and keep running them until the list
804 * stays empty.
805 * Assumes it is operating within the aio issuer's mm context.
806 */
807static inline void aio_run_all_iocbs(struct kioctx *ctx)
808{
809 spin_lock_irq(&ctx->ctx_lock);
810 while (__aio_run_iocbs(ctx))
811 ;
812 spin_unlock_irq(&ctx->ctx_lock);
813}
814
815/*
816 * aio_kick_handler:
817 * Work queue handler triggered to process pending
818 * retries on an ioctx. Takes on the aio issuer's
819 * mm context before running the iocbs, so that
820 * copy_xxx_user operates on the issuer's address
821 * space.
822 * Run on aiod's context.
823 */
824static void aio_kick_handler(struct work_struct *work)
825{
826 struct kioctx *ctx = container_of(work, struct kioctx, wq.work);
827 mm_segment_t oldfs = get_fs();
828 struct mm_struct *mm;
829 int requeue;
830
831 set_fs(USER_DS);
832 use_mm(ctx->mm);
833 spin_lock_irq(&ctx->ctx_lock);
834 requeue =__aio_run_iocbs(ctx);
835 mm = ctx->mm;
836 spin_unlock_irq(&ctx->ctx_lock);
837 unuse_mm(mm);
838 set_fs(oldfs);
839 /*
840 * we're in a worker thread already; no point using non-zero delay
841 */
842 if (requeue)
843 queue_delayed_work(aio_wq, &ctx->wq, 0);
844}
845
846
847/*
848 * Called by kick_iocb to queue the kiocb for retry
849 * and if required activate the aio work queue to process
850 * it
851 */
852static void try_queue_kicked_iocb(struct kiocb *iocb)
853{
854 struct kioctx *ctx = iocb->ki_ctx;
855 unsigned long flags;
856 int run = 0;
857
858 spin_lock_irqsave(&ctx->ctx_lock, flags);
859 /* set this inside the lock so that we can't race with aio_run_iocb()
860 * testing it and putting the iocb on the run list under the lock */
861 if (!kiocbTryKick(iocb))
862 run = __queue_kicked_iocb(iocb);
863 spin_unlock_irqrestore(&ctx->ctx_lock, flags);
864 if (run)
865 aio_queue_work(ctx);
866}
867
868/*
869 * kick_iocb:
870 * Called typically from a wait queue callback context
871 * to trigger a retry of the iocb.
872 * The retry is usually executed by aio workqueue
873 * threads (See aio_kick_handler).
874 */
875void kick_iocb(struct kiocb *iocb)
876{
877 /* sync iocbs are easy: they can only ever be executing from a
878 * single context. */
879 if (is_sync_kiocb(iocb)) {
880 kiocbSetKicked(iocb);
881 wake_up_process(iocb->ki_obj.tsk);
882 return;
883 }
884
885 try_queue_kicked_iocb(iocb);
886}
887EXPORT_SYMBOL(kick_iocb);
888
889/* aio_complete 594/* aio_complete
890 * Called when the io request on the given iocb is complete. 595 * Called when the io request on the given iocb is complete.
891 * Returns true if this is the last user of the request. The 596 * Returns true if this is the last user of the request. The
@@ -926,9 +631,6 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
926 */ 631 */
927 spin_lock_irqsave(&ctx->ctx_lock, flags); 632 spin_lock_irqsave(&ctx->ctx_lock, flags);
928 633
929 if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list))
930 list_del_init(&iocb->ki_run_list);
931
932 /* 634 /*
933 * cancelled requests don't get events, userland was given one 635 * cancelled requests don't get events, userland was given one
934 * when the event got cancelled. 636 * when the event got cancelled.
@@ -1083,13 +785,11 @@ static int read_events(struct kioctx *ctx,
1083 int i = 0; 785 int i = 0;
1084 struct io_event ent; 786 struct io_event ent;
1085 struct aio_timeout to; 787 struct aio_timeout to;
1086 int retry = 0;
1087 788
1088 /* needed to zero any padding within an entry (there shouldn't be 789 /* needed to zero any padding within an entry (there shouldn't be
1089 * any, but C is fun! 790 * any, but C is fun!
1090 */ 791 */
1091 memset(&ent, 0, sizeof(ent)); 792 memset(&ent, 0, sizeof(ent));
1092retry:
1093 ret = 0; 793 ret = 0;
1094 while (likely(i < nr)) { 794 while (likely(i < nr)) {
1095 ret = aio_read_evt(ctx, &ent); 795 ret = aio_read_evt(ctx, &ent);
@@ -1119,13 +819,6 @@ retry:
1119 819
1120 /* End fast path */ 820 /* End fast path */
1121 821
1122 /* racey check, but it gets redone */
1123 if (!retry && unlikely(!list_empty(&ctx->run_list))) {
1124 retry = 1;
1125 aio_run_all_iocbs(ctx);
1126 goto retry;
1127 }
1128
1129 init_timeout(&to); 822 init_timeout(&to);
1130 if (timeout) { 823 if (timeout) {
1131 struct timespec ts; 824 struct timespec ts;
@@ -1349,7 +1042,7 @@ static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
1349 /* If we managed to write some out we return that, rather than 1042 /* If we managed to write some out we return that, rather than
1350 * the eventual error. */ 1043 * the eventual error. */
1351 if (opcode == IOCB_CMD_PWRITEV 1044 if (opcode == IOCB_CMD_PWRITEV
1352 && ret < 0 && ret != -EIOCBQUEUED && ret != -EIOCBRETRY 1045 && ret < 0 && ret != -EIOCBQUEUED
1353 && iocb->ki_nbytes - iocb->ki_left) 1046 && iocb->ki_nbytes - iocb->ki_left)
1354 ret = iocb->ki_nbytes - iocb->ki_left; 1047 ret = iocb->ki_nbytes - iocb->ki_left;
1355 1048
@@ -1591,18 +1284,28 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1591 * don't see ctx->dead set here, io_destroy() waits for our IO to 1284 * don't see ctx->dead set here, io_destroy() waits for our IO to
1592 * finish. 1285 * finish.
1593 */ 1286 */
1594 if (ctx->dead) { 1287 if (ctx->dead)
1595 spin_unlock_irq(&ctx->ctx_lock);
1596 ret = -EINVAL; 1288 ret = -EINVAL;
1289 spin_unlock_irq(&ctx->ctx_lock);
1290 if (ret)
1597 goto out_put_req; 1291 goto out_put_req;
1292
1293 if (unlikely(kiocbIsCancelled(req)))
1294 ret = -EINTR;
1295 else
1296 ret = req->ki_retry(req);
1297
1298 if (ret != -EIOCBQUEUED) {
1299 /*
1300 * There's no easy way to restart the syscall since other AIO's
1301 * may be already running. Just fail this IO with EINTR.
1302 */
1303 if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
1304 ret == -ERESTARTNOHAND ||
1305 ret == -ERESTART_RESTARTBLOCK))
1306 ret = -EINTR;
1307 aio_complete(req, ret, 0);
1598 } 1308 }
1599 aio_run_iocb(req);
1600 if (!list_empty(&ctx->run_list)) {
1601 /* drain the run list */
1602 while (__aio_run_iocbs(ctx))
1603 ;
1604 }
1605 spin_unlock_irq(&ctx->ctx_lock);
1606 1309
1607 aio_put_req(req); /* drop extra ref to req */ 1310 aio_put_req(req); /* drop extra ref to req */
1608 return 0; 1311 return 0;
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 12ae194ac943..3a44a648dae7 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -2322,7 +2322,7 @@ int ocfs2_inode_lock_full_nested(struct inode *inode,
2322 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2322 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2323 arg_flags, subclass, _RET_IP_); 2323 arg_flags, subclass, _RET_IP_);
2324 if (status < 0) { 2324 if (status < 0) {
2325 if (status != -EAGAIN && status != -EIOCBRETRY) 2325 if (status != -EAGAIN)
2326 mlog_errno(status); 2326 mlog_errno(status);
2327 goto bail; 2327 goto bail;
2328 } 2328 }
diff --git a/fs/read_write.c b/fs/read_write.c
index 90ba3b350e50..bce289afd21a 100644
--- a/fs/read_write.c
+++ b/fs/read_write.c
@@ -329,16 +329,6 @@ int rw_verify_area(int read_write, struct file *file, loff_t *ppos, size_t count
329 return count > MAX_RW_COUNT ? MAX_RW_COUNT : count; 329 return count > MAX_RW_COUNT ? MAX_RW_COUNT : count;
330} 330}
331 331
332static void wait_on_retry_sync_kiocb(struct kiocb *iocb)
333{
334 set_current_state(TASK_UNINTERRUPTIBLE);
335 if (!kiocbIsKicked(iocb))
336 schedule();
337 else
338 kiocbClearKicked(iocb);
339 __set_current_state(TASK_RUNNING);
340}
341
342ssize_t do_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos) 332ssize_t do_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos)
343{ 333{
344 struct iovec iov = { .iov_base = buf, .iov_len = len }; 334 struct iovec iov = { .iov_base = buf, .iov_len = len };
@@ -350,13 +340,7 @@ ssize_t do_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *pp
350 kiocb.ki_left = len; 340 kiocb.ki_left = len;
351 kiocb.ki_nbytes = len; 341 kiocb.ki_nbytes = len;
352 342
353 for (;;) { 343 ret = filp->f_op->aio_read(&kiocb, &iov, 1, kiocb.ki_pos);
354 ret = filp->f_op->aio_read(&kiocb, &iov, 1, kiocb.ki_pos);
355 if (ret != -EIOCBRETRY)
356 break;
357 wait_on_retry_sync_kiocb(&kiocb);
358 }
359
360 if (-EIOCBQUEUED == ret) 344 if (-EIOCBQUEUED == ret)
361 ret = wait_on_sync_kiocb(&kiocb); 345 ret = wait_on_sync_kiocb(&kiocb);
362 *ppos = kiocb.ki_pos; 346 *ppos = kiocb.ki_pos;
@@ -406,13 +390,7 @@ ssize_t do_sync_write(struct file *filp, const char __user *buf, size_t len, lof
406 kiocb.ki_left = len; 390 kiocb.ki_left = len;
407 kiocb.ki_nbytes = len; 391 kiocb.ki_nbytes = len;
408 392
409 for (;;) { 393 ret = filp->f_op->aio_write(&kiocb, &iov, 1, kiocb.ki_pos);
410 ret = filp->f_op->aio_write(&kiocb, &iov, 1, kiocb.ki_pos);
411 if (ret != -EIOCBRETRY)
412 break;
413 wait_on_retry_sync_kiocb(&kiocb);
414 }
415
416 if (-EIOCBQUEUED == ret) 394 if (-EIOCBQUEUED == ret)
417 ret = wait_on_sync_kiocb(&kiocb); 395 ret = wait_on_sync_kiocb(&kiocb);
418 *ppos = kiocb.ki_pos; 396 *ppos = kiocb.ki_pos;
@@ -592,13 +570,7 @@ static ssize_t do_sync_readv_writev(struct file *filp, const struct iovec *iov,
592 kiocb.ki_left = len; 570 kiocb.ki_left = len;
593 kiocb.ki_nbytes = len; 571 kiocb.ki_nbytes = len;
594 572
595 for (;;) { 573 ret = fn(&kiocb, iov, nr_segs, kiocb.ki_pos);
596 ret = fn(&kiocb, iov, nr_segs, kiocb.ki_pos);
597 if (ret != -EIOCBRETRY)
598 break;
599 wait_on_retry_sync_kiocb(&kiocb);
600 }
601
602 if (ret == -EIOCBQUEUED) 574 if (ret == -EIOCBQUEUED)
603 ret = wait_on_sync_kiocb(&kiocb); 575 ret = wait_on_sync_kiocb(&kiocb);
604 *ppos = kiocb.ki_pos; 576 *ppos = kiocb.ki_pos;
diff --git a/include/linux/aio.h b/include/linux/aio.h
index b46a09f73f1d..019204e46c11 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -14,18 +14,12 @@ struct kioctx;
14#define KIOCB_SYNC_KEY (~0U) 14#define KIOCB_SYNC_KEY (~0U)
15 15
16/* ki_flags bits */ 16/* ki_flags bits */
17#define KIF_KICKED 1
18#define KIF_CANCELLED 2 17#define KIF_CANCELLED 2
19 18
20#define kiocbTryKick(iocb) test_and_set_bit(KIF_KICKED, &(iocb)->ki_flags)
21
22#define kiocbSetKicked(iocb) set_bit(KIF_KICKED, &(iocb)->ki_flags)
23#define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags) 19#define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags)
24 20
25#define kiocbClearKicked(iocb) clear_bit(KIF_KICKED, &(iocb)->ki_flags)
26#define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags) 21#define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags)
27 22
28#define kiocbIsKicked(iocb) test_bit(KIF_KICKED, &(iocb)->ki_flags)
29#define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags) 23#define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags)
30 24
31/* is there a better place to document function pointer methods? */ 25/* is there a better place to document function pointer methods? */
@@ -52,18 +46,8 @@ struct kioctx;
52 * not ask the method again -- ki_retry must ensure forward progress. 46 * not ask the method again -- ki_retry must ensure forward progress.
53 * aio_complete() must be called once and only once in the future, multiple 47 * aio_complete() must be called once and only once in the future, multiple
54 * calls may result in undefined behaviour. 48 * calls may result in undefined behaviour.
55 *
56 * If ki_retry returns -EIOCBRETRY it has made a promise that kick_iocb()
57 * will be called on the kiocb pointer in the future. This may happen
58 * through generic helpers that associate kiocb->ki_wait with a wait
59 * queue head that ki_retry uses via current->io_wait. It can also happen
60 * with custom tracking and manual calls to kick_iocb(), though that is
61 * discouraged. In either case, kick_iocb() must be called once and only
62 * once. ki_retry must ensure forward progress, the AIO core will wait
63 * indefinitely for kick_iocb() to be called.
64 */ 49 */
65struct kiocb { 50struct kiocb {
66 struct list_head ki_run_list;
67 unsigned long ki_flags; 51 unsigned long ki_flags;
68 int ki_users; 52 int ki_users;
69 unsigned ki_key; /* id of this request */ 53 unsigned ki_key; /* id of this request */
@@ -160,7 +144,6 @@ static inline unsigned aio_ring_avail(struct aio_ring_info *info,
160struct kioctx { 144struct kioctx {
161 atomic_t users; 145 atomic_t users;
162 int dead; 146 int dead;
163 struct mm_struct *mm;
164 147
165 /* This needs improving */ 148 /* This needs improving */
166 unsigned long user_id; 149 unsigned long user_id;
@@ -172,15 +155,12 @@ struct kioctx {
172 155
173 int reqs_active; 156 int reqs_active;
174 struct list_head active_reqs; /* used for cancellation */ 157 struct list_head active_reqs; /* used for cancellation */
175 struct list_head run_list; /* used for kicked reqs */
176 158
177 /* sys_io_setup currently limits this to an unsigned int */ 159 /* sys_io_setup currently limits this to an unsigned int */
178 unsigned max_reqs; 160 unsigned max_reqs;
179 161
180 struct aio_ring_info ring_info; 162 struct aio_ring_info ring_info;
181 163
182 struct delayed_work wq;
183
184 struct rcu_head rcu_head; 164 struct rcu_head rcu_head;
185}; 165};
186 166
@@ -188,7 +168,6 @@ struct kioctx {
188#ifdef CONFIG_AIO 168#ifdef CONFIG_AIO
189extern ssize_t wait_on_sync_kiocb(struct kiocb *iocb); 169extern ssize_t wait_on_sync_kiocb(struct kiocb *iocb);
190extern int aio_put_req(struct kiocb *iocb); 170extern int aio_put_req(struct kiocb *iocb);
191extern void kick_iocb(struct kiocb *iocb);
192extern int aio_complete(struct kiocb *iocb, long res, long res2); 171extern int aio_complete(struct kiocb *iocb, long res, long res2);
193struct mm_struct; 172struct mm_struct;
194extern void exit_aio(struct mm_struct *mm); 173extern void exit_aio(struct mm_struct *mm);
@@ -197,7 +176,6 @@ extern long do_io_submit(aio_context_t ctx_id, long nr,
197#else 176#else
198static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; } 177static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
199static inline int aio_put_req(struct kiocb *iocb) { return 0; } 178static inline int aio_put_req(struct kiocb *iocb) { return 0; }
200static inline void kick_iocb(struct kiocb *iocb) { }
201static inline int aio_complete(struct kiocb *iocb, long res, long res2) { return 0; } 179static inline int aio_complete(struct kiocb *iocb, long res, long res2) { return 0; }
202struct mm_struct; 180struct mm_struct;
203static inline void exit_aio(struct mm_struct *mm) { } 181static inline void exit_aio(struct mm_struct *mm) { }
diff --git a/include/linux/errno.h b/include/linux/errno.h
index f6bf082d4d4f..89627b9187f9 100644
--- a/include/linux/errno.h
+++ b/include/linux/errno.h
@@ -28,6 +28,5 @@
28#define EBADTYPE 527 /* Type not supported by server */ 28#define EBADTYPE 527 /* Type not supported by server */
29#define EJUKEBOX 528 /* Request initiated, but will not complete before timeout */ 29#define EJUKEBOX 528 /* Request initiated, but will not complete before timeout */
30#define EIOCBQUEUED 529 /* iocb queued, will get completion event */ 30#define EIOCBQUEUED 529 /* iocb queued, will get completion event */
31#define EIOCBRETRY 530 /* iocb queued, will trigger a retry */
32 31
33#endif 32#endif