aboutsummaryrefslogtreecommitdiffstats
path: root/fs/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r--fs/io_uring.c162
1 files changed, 119 insertions, 43 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c
index e2a66e12fbc6..cfb48bd088e1 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -202,7 +202,7 @@ struct async_list {
202 202
203 struct file *file; 203 struct file *file;
204 off_t io_end; 204 off_t io_end;
205 size_t io_pages; 205 size_t io_len;
206}; 206};
207 207
208struct io_ring_ctx { 208struct io_ring_ctx {
@@ -333,7 +333,8 @@ struct io_kiocb {
333#define REQ_F_IO_DRAIN 16 /* drain existing IO first */ 333#define REQ_F_IO_DRAIN 16 /* drain existing IO first */
334#define REQ_F_IO_DRAINED 32 /* drain done */ 334#define REQ_F_IO_DRAINED 32 /* drain done */
335#define REQ_F_LINK 64 /* linked sqes */ 335#define REQ_F_LINK 64 /* linked sqes */
336#define REQ_F_FAIL_LINK 128 /* fail rest of links */ 336#define REQ_F_LINK_DONE 128 /* linked sqes done */
337#define REQ_F_FAIL_LINK 256 /* fail rest of links */
337 u64 user_data; 338 u64 user_data;
338 u32 result; 339 u32 result;
339 u32 sequence; 340 u32 sequence;
@@ -429,7 +430,7 @@ static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
429 if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN) 430 if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
430 return false; 431 return false;
431 432
432 return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped; 433 return req->sequence != ctx->cached_cq_tail + ctx->sq_ring->dropped;
433} 434}
434 435
435static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) 436static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
@@ -632,6 +633,7 @@ static void io_req_link_next(struct io_kiocb *req)
632 nxt->flags |= REQ_F_LINK; 633 nxt->flags |= REQ_F_LINK;
633 } 634 }
634 635
636 nxt->flags |= REQ_F_LINK_DONE;
635 INIT_WORK(&nxt->work, io_sq_wq_submit_work); 637 INIT_WORK(&nxt->work, io_sq_wq_submit_work);
636 queue_work(req->ctx->sqo_wq, &nxt->work); 638 queue_work(req->ctx->sqo_wq, &nxt->work);
637 } 639 }
@@ -677,6 +679,13 @@ static void io_put_req(struct io_kiocb *req)
677 io_free_req(req); 679 io_free_req(req);
678} 680}
679 681
682static unsigned io_cqring_events(struct io_cq_ring *ring)
683{
684 /* See comment at the top of this file */
685 smp_rmb();
686 return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
687}
688
680/* 689/*
681 * Find and free completed poll iocbs 690 * Find and free completed poll iocbs
682 */ 691 */
@@ -769,7 +778,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
769static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events, 778static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
770 long min) 779 long min)
771{ 780{
772 while (!list_empty(&ctx->poll_list)) { 781 while (!list_empty(&ctx->poll_list) && !need_resched()) {
773 int ret; 782 int ret;
774 783
775 ret = io_do_iopoll(ctx, nr_events, min); 784 ret = io_do_iopoll(ctx, nr_events, min);
@@ -796,6 +805,12 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
796 unsigned int nr_events = 0; 805 unsigned int nr_events = 0;
797 806
798 io_iopoll_getevents(ctx, &nr_events, 1); 807 io_iopoll_getevents(ctx, &nr_events, 1);
808
809 /*
810 * Ensure we allow local-to-the-cpu processing to take place,
811 * in this case we need to ensure that we reap all events.
812 */
813 cond_resched();
799 } 814 }
800 mutex_unlock(&ctx->uring_lock); 815 mutex_unlock(&ctx->uring_lock);
801} 816}
@@ -803,11 +818,42 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
803static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, 818static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
804 long min) 819 long min)
805{ 820{
806 int ret = 0; 821 int iters, ret = 0;
822
823 /*
824 * We disallow the app entering submit/complete with polling, but we
825 * still need to lock the ring to prevent racing with polled issue
826 * that got punted to a workqueue.
827 */
828 mutex_lock(&ctx->uring_lock);
807 829
830 iters = 0;
808 do { 831 do {
809 int tmin = 0; 832 int tmin = 0;
810 833
834 /*
835 * Don't enter poll loop if we already have events pending.
836 * If we do, we can potentially be spinning for commands that
837 * already triggered a CQE (eg in error).
838 */
839 if (io_cqring_events(ctx->cq_ring))
840 break;
841
842 /*
843 * If a submit got punted to a workqueue, we can have the
844 * application entering polling for a command before it gets
845 * issued. That app will hold the uring_lock for the duration
846 * of the poll right here, so we need to take a breather every
847 * now and then to ensure that the issue has a chance to add
848 * the poll to the issued list. Otherwise we can spin here
849 * forever, while the workqueue is stuck trying to acquire the
850 * very same mutex.
851 */
852 if (!(++iters & 7)) {
853 mutex_unlock(&ctx->uring_lock);
854 mutex_lock(&ctx->uring_lock);
855 }
856
811 if (*nr_events < min) 857 if (*nr_events < min)
812 tmin = min - *nr_events; 858 tmin = min - *nr_events;
813 859
@@ -817,6 +863,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
817 ret = 0; 863 ret = 0;
818 } while (min && !*nr_events && !need_resched()); 864 } while (min && !*nr_events && !need_resched());
819 865
866 mutex_unlock(&ctx->uring_lock);
820 return ret; 867 return ret;
821} 868}
822 869
@@ -1064,8 +1111,42 @@ static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
1064 */ 1111 */
1065 offset = buf_addr - imu->ubuf; 1112 offset = buf_addr - imu->ubuf;
1066 iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len); 1113 iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1067 if (offset) 1114
1068 iov_iter_advance(iter, offset); 1115 if (offset) {
1116 /*
1117 * Don't use iov_iter_advance() here, as it's really slow for
1118 * using the latter parts of a big fixed buffer - it iterates
1119 * over each segment manually. We can cheat a bit here, because
1120 * we know that:
1121 *
1122 * 1) it's a BVEC iter, we set it up
1123 * 2) all bvecs are PAGE_SIZE in size, except potentially the
1124 * first and last bvec
1125 *
1126 * So just find our index, and adjust the iterator afterwards.
1127 * If the offset is within the first bvec (or the whole first
1128 * bvec, just use iov_iter_advance(). This makes it easier
1129 * since we can just skip the first segment, which may not
1130 * be PAGE_SIZE aligned.
1131 */
1132 const struct bio_vec *bvec = imu->bvec;
1133
1134 if (offset <= bvec->bv_len) {
1135 iov_iter_advance(iter, offset);
1136 } else {
1137 unsigned long seg_skip;
1138
1139 /* skip first vec */
1140 offset -= bvec->bv_len;
1141 seg_skip = 1 + (offset >> PAGE_SHIFT);
1142
1143 iter->bvec = bvec + seg_skip;
1144 iter->nr_segs -= seg_skip;
1145 iter->count -= bvec->bv_len + offset;
1146 iter->iov_offset = offset & ~PAGE_MASK;
1147 }
1148 }
1149
1069 return 0; 1150 return 0;
1070} 1151}
1071 1152
@@ -1120,28 +1201,26 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1120 off_t io_end = kiocb->ki_pos + len; 1201 off_t io_end = kiocb->ki_pos + len;
1121 1202
1122 if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) { 1203 if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1123 unsigned long max_pages; 1204 unsigned long max_bytes;
1124 1205
1125 /* Use 8x RA size as a decent limiter for both reads/writes */ 1206 /* Use 8x RA size as a decent limiter for both reads/writes */
1126 max_pages = filp->f_ra.ra_pages; 1207 max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3);
1127 if (!max_pages) 1208 if (!max_bytes)
1128 max_pages = VM_READAHEAD_PAGES; 1209 max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3);
1129 max_pages *= 8; 1210
1130 1211 /* If max len are exceeded, reset the state */
1131 /* If max pages are exceeded, reset the state */ 1212 if (async_list->io_len + len <= max_bytes) {
1132 len >>= PAGE_SHIFT;
1133 if (async_list->io_pages + len <= max_pages) {
1134 req->flags |= REQ_F_SEQ_PREV; 1213 req->flags |= REQ_F_SEQ_PREV;
1135 async_list->io_pages += len; 1214 async_list->io_len += len;
1136 } else { 1215 } else {
1137 io_end = 0; 1216 io_end = 0;
1138 async_list->io_pages = 0; 1217 async_list->io_len = 0;
1139 } 1218 }
1140 } 1219 }
1141 1220
1142 /* New file? Reset state. */ 1221 /* New file? Reset state. */
1143 if (async_list->file != filp) { 1222 if (async_list->file != filp) {
1144 async_list->io_pages = 0; 1223 async_list->io_len = 0;
1145 async_list->file = filp; 1224 async_list->file = filp;
1146 } 1225 }
1147 async_list->io_end = io_end; 1226 async_list->io_end = io_end;
@@ -1630,6 +1709,8 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1630 INIT_LIST_HEAD(&poll->wait.entry); 1709 INIT_LIST_HEAD(&poll->wait.entry);
1631 init_waitqueue_func_entry(&poll->wait, io_poll_wake); 1710 init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1632 1711
1712 INIT_LIST_HEAD(&req->list);
1713
1633 mask = vfs_poll(poll->file, &ipt.pt) & poll->events; 1714 mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1634 1715
1635 spin_lock_irq(&ctx->completion_lock); 1716 spin_lock_irq(&ctx->completion_lock);
@@ -1800,6 +1881,7 @@ restart:
1800 do { 1881 do {
1801 struct sqe_submit *s = &req->submit; 1882 struct sqe_submit *s = &req->submit;
1802 const struct io_uring_sqe *sqe = s->sqe; 1883 const struct io_uring_sqe *sqe = s->sqe;
1884 unsigned int flags = req->flags;
1803 1885
1804 /* Ensure we clear previously set non-block flag */ 1886 /* Ensure we clear previously set non-block flag */
1805 req->rw.ki_flags &= ~IOCB_NOWAIT; 1887 req->rw.ki_flags &= ~IOCB_NOWAIT;
@@ -1844,6 +1926,10 @@ restart:
1844 /* async context always use a copy of the sqe */ 1926 /* async context always use a copy of the sqe */
1845 kfree(sqe); 1927 kfree(sqe);
1846 1928
1929 /* req from defer and link list needn't decrease async cnt */
1930 if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE))
1931 goto out;
1932
1847 if (!async_list) 1933 if (!async_list)
1848 break; 1934 break;
1849 if (!list_empty(&req_list)) { 1935 if (!list_empty(&req_list)) {
@@ -1891,6 +1977,7 @@ restart:
1891 } 1977 }
1892 } 1978 }
1893 1979
1980out:
1894 if (cur_mm) { 1981 if (cur_mm) {
1895 set_fs(old_fs); 1982 set_fs(old_fs);
1896 unuse_mm(cur_mm); 1983 unuse_mm(cur_mm);
@@ -1917,6 +2004,10 @@ static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1917 ret = true; 2004 ret = true;
1918 spin_lock(&list->lock); 2005 spin_lock(&list->lock);
1919 list_add_tail(&req->list, &list->list); 2006 list_add_tail(&req->list, &list->list);
2007 /*
2008 * Ensure we see a simultaneous modification from io_sq_wq_submit_work()
2009 */
2010 smp_mb();
1920 if (!atomic_read(&list->cnt)) { 2011 if (!atomic_read(&list->cnt)) {
1921 list_del_init(&req->list); 2012 list_del_init(&req->list);
1922 ret = false; 2013 ret = false;
@@ -1977,6 +2068,15 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1977{ 2068{
1978 int ret; 2069 int ret;
1979 2070
2071 ret = io_req_defer(ctx, req, s->sqe);
2072 if (ret) {
2073 if (ret != -EIOCBQUEUED) {
2074 io_free_req(req);
2075 io_cqring_add_event(ctx, s->sqe->user_data, ret);
2076 }
2077 return 0;
2078 }
2079
1980 ret = __io_submit_sqe(ctx, req, s, true); 2080 ret = __io_submit_sqe(ctx, req, s, true);
1981 if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) { 2081 if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
1982 struct io_uring_sqe *sqe_copy; 2082 struct io_uring_sqe *sqe_copy;
@@ -2049,13 +2149,6 @@ err:
2049 return; 2149 return;
2050 } 2150 }
2051 2151
2052 ret = io_req_defer(ctx, req, s->sqe);
2053 if (ret) {
2054 if (ret != -EIOCBQUEUED)
2055 goto err_req;
2056 return;
2057 }
2058
2059 /* 2152 /*
2060 * If we already have a head request, queue this one for async 2153 * If we already have a head request, queue this one for async
2061 * submittal once the head completes. If we don't have a head but 2154 * submittal once the head completes. If we don't have a head but
@@ -2232,15 +2325,7 @@ static int io_sq_thread(void *data)
2232 unsigned nr_events = 0; 2325 unsigned nr_events = 0;
2233 2326
2234 if (ctx->flags & IORING_SETUP_IOPOLL) { 2327 if (ctx->flags & IORING_SETUP_IOPOLL) {
2235 /*
2236 * We disallow the app entering submit/complete
2237 * with polling, but we still need to lock the
2238 * ring to prevent racing with polled issue
2239 * that got punted to a workqueue.
2240 */
2241 mutex_lock(&ctx->uring_lock);
2242 io_iopoll_check(ctx, &nr_events, 0); 2328 io_iopoll_check(ctx, &nr_events, 0);
2243 mutex_unlock(&ctx->uring_lock);
2244 } else { 2329 } else {
2245 /* 2330 /*
2246 * Normal IO, just pretend everything completed. 2331 * Normal IO, just pretend everything completed.
@@ -2385,13 +2470,6 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2385 return submit; 2470 return submit;
2386} 2471}
2387 2472
2388static unsigned io_cqring_events(struct io_cq_ring *ring)
2389{
2390 /* See comment at the top of this file */
2391 smp_rmb();
2392 return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
2393}
2394
2395/* 2473/*
2396 * Wait until events become available, if we don't already have some. The 2474 * Wait until events become available, if we don't already have some. The
2397 * application must reap them itself, as they reside on the shared cq ring. 2475 * application must reap them itself, as they reside on the shared cq ring.
@@ -3142,9 +3220,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
3142 min_complete = min(min_complete, ctx->cq_entries); 3220 min_complete = min(min_complete, ctx->cq_entries);
3143 3221
3144 if (ctx->flags & IORING_SETUP_IOPOLL) { 3222 if (ctx->flags & IORING_SETUP_IOPOLL) {
3145 mutex_lock(&ctx->uring_lock);
3146 ret = io_iopoll_check(ctx, &nr_events, min_complete); 3223 ret = io_iopoll_check(ctx, &nr_events, min_complete);
3147 mutex_unlock(&ctx->uring_lock);
3148 } else { 3224 } else {
3149 ret = io_cqring_wait(ctx, min_complete, sig, sigsz); 3225 ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
3150 } 3226 }