summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2019-03-08 17:48:40 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2019-03-08 17:48:40 -0500
commit38e7571c07be01f9f19b355a9306a4e3d5cb0f5b (patch)
tree48812ba46a6fe37ee59d31e0de418f336bbb15ca
parent80201fe175cbf7f3e372f53eba0a881a702ad926 (diff)
parent21b4aa5d20fd07207e73270cadffed5c63fb4343 (diff)
Merge tag 'io_uring-2019-03-06' of git://git.kernel.dk/linux-block
Pull io_uring IO interface from Jens Axboe: "Second attempt at adding the io_uring interface. Since the first one, we've added basic unit testing of the three system calls, that resides in liburing like the other unit tests that we have so far. It'll take a while to get full coverage of it, but we're working towards it. I've also added two basic test programs to tools/io_uring. One uses the raw interface and has support for all the various features that io_uring supports outside of standard IO, like fixed files, fixed IO buffers, and polled IO. The other uses the liburing API, and is a simplified version of cp(1). This adds support for a new IO interface, io_uring. io_uring allows an application to communicate with the kernel through two rings, the submission queue (SQ) and completion queue (CQ) ring. This allows for very efficient handling of IOs, see the v5 posting for some basic numbers: https://lore.kernel.org/linux-block/20190116175003.17880-1-axboe@kernel.dk/ Outside of just efficiency, the interface is also flexible and extendable, and allows for future use cases like the upcoming NVMe key-value store API, networked IO, and so on. It also supports async buffered IO, something that we've always failed to support in the kernel. Outside of basic IO features, it supports async polled IO as well. This particular feature has already been tested at Facebook months ago for flash storage boxes, with 25-33% improvements. It makes polled IO actually useful for real world use cases, where even basic flash sees a nice win in terms of efficiency, latency, and performance. These boxes were IOPS bound before, now they are not. This series adds three new system calls. One for setting up an io_uring instance (io_uring_setup(2)), one for submitting/completing IO (io_uring_enter(2)), and one for aux functions like registrating file sets, buffers, etc (io_uring_register(2)). Through the help of Arnd, I've coordinated the syscall numbers so merge on that front should be painless. Jon did a writeup of the interface a while back, which (except for minor details that have been tweaked) is still accurate. Find that here: https://lwn.net/Articles/776703/ Huge thanks to Al Viro for helping getting the reference cycle code correct, and to Jann Horn for his extensive reviews focused on both security and bugs in general. There's a userspace library that provides basic functionality for applications that don't need or want to care about how to fiddle with the rings directly. It has helpers to allow applications to easily set up an io_uring instance, and submit/complete IO through it without knowing about the intricacies of the rings. It also includes man pages (thanks to Jeff Moyer), and will continue to grow support helper functions and features as time progresses. Find it here: git://git.kernel.dk/liburing Fio has full support for the raw interface, both in the form of an IO engine (io_uring), but also with a small test application (t/io_uring) that can exercise and benchmark the interface" * tag 'io_uring-2019-03-06' of git://git.kernel.dk/linux-block: io_uring: add a few test tools io_uring: allow workqueue item to handle multiple buffered requests io_uring: add support for IORING_OP_POLL io_uring: add io_kiocb ref count io_uring: add submission polling io_uring: add file set registration net: split out functions related to registering inflight socket files io_uring: add support for pre-mapped user IO buffers block: implement bio helper to add iter bvec pages to bio io_uring: batch io_kiocb allocation io_uring: use fget/fput_many() for file references fs: add fget_many() and fput_many() io_uring: support for IO polling io_uring: add fsync support Add io_uring IO interface
-rw-r--r--arch/x86/entry/syscalls/syscall_32.tbl3
-rw-r--r--arch/x86/entry/syscalls/syscall_64.tbl3
-rw-r--r--block/bio.c62
-rw-r--r--fs/Makefile1
-rw-r--r--fs/file.c15
-rw-r--r--fs/file_table.c9
-rw-r--r--fs/io_uring.c2971
-rw-r--r--include/linux/file.h2
-rw-r--r--include/linux/fs.h13
-rw-r--r--include/linux/sched/user.h2
-rw-r--r--include/linux/syscalls.h8
-rw-r--r--include/net/af_unix.h1
-rw-r--r--include/uapi/asm-generic/unistd.h9
-rw-r--r--include/uapi/linux/io_uring.h137
-rw-r--r--init/Kconfig9
-rw-r--r--kernel/sys_ni.c3
-rw-r--r--net/Makefile2
-rw-r--r--net/unix/Kconfig5
-rw-r--r--net/unix/Makefile2
-rw-r--r--net/unix/af_unix.c63
-rw-r--r--net/unix/garbage.c68
-rw-r--r--net/unix/scm.c151
-rw-r--r--net/unix/scm.h10
-rw-r--r--tools/io_uring/Makefile18
-rw-r--r--tools/io_uring/README29
-rw-r--r--tools/io_uring/barrier.h16
-rw-r--r--tools/io_uring/io_uring-bench.c616
-rw-r--r--tools/io_uring/io_uring-cp.c251
-rw-r--r--tools/io_uring/liburing.h143
-rw-r--r--tools/io_uring/queue.c164
-rw-r--r--tools/io_uring/setup.c103
-rw-r--r--tools/io_uring/syscall.c40
32 files changed, 4783 insertions, 146 deletions
diff --git a/arch/x86/entry/syscalls/syscall_32.tbl b/arch/x86/entry/syscalls/syscall_32.tbl
index 955ab6a3b61f..8da78595d69d 100644
--- a/arch/x86/entry/syscalls/syscall_32.tbl
+++ b/arch/x86/entry/syscalls/syscall_32.tbl
@@ -429,3 +429,6 @@
429421 i386 rt_sigtimedwait_time64 sys_rt_sigtimedwait __ia32_compat_sys_rt_sigtimedwait_time64 429421 i386 rt_sigtimedwait_time64 sys_rt_sigtimedwait __ia32_compat_sys_rt_sigtimedwait_time64
430422 i386 futex_time64 sys_futex __ia32_sys_futex 430422 i386 futex_time64 sys_futex __ia32_sys_futex
431423 i386 sched_rr_get_interval_time64 sys_sched_rr_get_interval __ia32_sys_sched_rr_get_interval 431423 i386 sched_rr_get_interval_time64 sys_sched_rr_get_interval __ia32_sys_sched_rr_get_interval
432425 i386 io_uring_setup sys_io_uring_setup __ia32_sys_io_uring_setup
433426 i386 io_uring_enter sys_io_uring_enter __ia32_sys_io_uring_enter
434427 i386 io_uring_register sys_io_uring_register __ia32_sys_io_uring_register
diff --git a/arch/x86/entry/syscalls/syscall_64.tbl b/arch/x86/entry/syscalls/syscall_64.tbl
index 2ae92fddb6d5..c768447f97ec 100644
--- a/arch/x86/entry/syscalls/syscall_64.tbl
+++ b/arch/x86/entry/syscalls/syscall_64.tbl
@@ -345,6 +345,9 @@
345334 common rseq __x64_sys_rseq 345334 common rseq __x64_sys_rseq
346# don't use numbers 387 through 423, add new calls after the last 346# don't use numbers 387 through 423, add new calls after the last
347# 'common' entry 347# 'common' entry
348425 common io_uring_setup __x64_sys_io_uring_setup
349426 common io_uring_enter __x64_sys_io_uring_enter
350427 common io_uring_register __x64_sys_io_uring_register
348 351
349# 352#
350# x32-specific system call numbers start at 512 to avoid cache impact 353# x32-specific system call numbers start at 512 to avoid cache impact
diff --git a/block/bio.c b/block/bio.c
index 83a2dfa417ca..71a78d9fb8b7 100644
--- a/block/bio.c
+++ b/block/bio.c
@@ -836,6 +836,40 @@ int bio_add_page(struct bio *bio, struct page *page,
836} 836}
837EXPORT_SYMBOL(bio_add_page); 837EXPORT_SYMBOL(bio_add_page);
838 838
839static int __bio_iov_bvec_add_pages(struct bio *bio, struct iov_iter *iter)
840{
841 const struct bio_vec *bv = iter->bvec;
842 unsigned int len;
843 size_t size;
844
845 if (WARN_ON_ONCE(iter->iov_offset > bv->bv_len))
846 return -EINVAL;
847
848 len = min_t(size_t, bv->bv_len - iter->iov_offset, iter->count);
849 size = bio_add_page(bio, bv->bv_page, len,
850 bv->bv_offset + iter->iov_offset);
851 if (size == len) {
852 struct page *page;
853 int i;
854
855 /*
856 * For the normal O_DIRECT case, we could skip grabbing this
857 * reference and then not have to put them again when IO
858 * completes. But this breaks some in-kernel users, like
859 * splicing to/from a loop device, where we release the pipe
860 * pages unconditionally. If we can fix that case, we can
861 * get rid of the get here and the need to call
862 * bio_release_pages() at IO completion time.
863 */
864 mp_bvec_for_each_page(page, bv, i)
865 get_page(page);
866 iov_iter_advance(iter, size);
867 return 0;
868 }
869
870 return -EINVAL;
871}
872
839#define PAGE_PTRS_PER_BVEC (sizeof(struct bio_vec) / sizeof(struct page *)) 873#define PAGE_PTRS_PER_BVEC (sizeof(struct bio_vec) / sizeof(struct page *))
840 874
841/** 875/**
@@ -884,23 +918,35 @@ static int __bio_iov_iter_get_pages(struct bio *bio, struct iov_iter *iter)
884} 918}
885 919
886/** 920/**
887 * bio_iov_iter_get_pages - pin user or kernel pages and add them to a bio 921 * bio_iov_iter_get_pages - add user or kernel pages to a bio
888 * @bio: bio to add pages to 922 * @bio: bio to add pages to
889 * @iter: iov iterator describing the region to be mapped 923 * @iter: iov iterator describing the region to be added
924 *
925 * This takes either an iterator pointing to user memory, or one pointing to
926 * kernel pages (BVEC iterator). If we're adding user pages, we pin them and
927 * map them into the kernel. On IO completion, the caller should put those
928 * pages. For now, when adding kernel pages, we still grab a reference to the
929 * page. This isn't strictly needed for the common case, but some call paths
930 * end up releasing pages from eg a pipe and we can't easily control these.
931 * See comment in __bio_iov_bvec_add_pages().
890 * 932 *
891 * Pins pages from *iter and appends them to @bio's bvec array. The
892 * pages will have to be released using put_page() when done.
893 * The function tries, but does not guarantee, to pin as many pages as 933 * The function tries, but does not guarantee, to pin as many pages as
894 * fit into the bio, or are requested in *iter, whatever is smaller. 934 * fit into the bio, or are requested in *iter, whatever is smaller. If
895 * If MM encounters an error pinning the requested pages, it stops. 935 * MM encounters an error pinning the requested pages, it stops. Error
896 * Error is returned only if 0 pages could be pinned. 936 * is returned only if 0 pages could be pinned.
897 */ 937 */
898int bio_iov_iter_get_pages(struct bio *bio, struct iov_iter *iter) 938int bio_iov_iter_get_pages(struct bio *bio, struct iov_iter *iter)
899{ 939{
940 const bool is_bvec = iov_iter_is_bvec(iter);
900 unsigned short orig_vcnt = bio->bi_vcnt; 941 unsigned short orig_vcnt = bio->bi_vcnt;
901 942
902 do { 943 do {
903 int ret = __bio_iov_iter_get_pages(bio, iter); 944 int ret;
945
946 if (is_bvec)
947 ret = __bio_iov_bvec_add_pages(bio, iter);
948 else
949 ret = __bio_iov_iter_get_pages(bio, iter);
904 950
905 if (unlikely(ret)) 951 if (unlikely(ret))
906 return bio->bi_vcnt > orig_vcnt ? 0 : ret; 952 return bio->bi_vcnt > orig_vcnt ? 0 : ret;
diff --git a/fs/Makefile b/fs/Makefile
index 23fcd8c164a3..ffeaa6632ab4 100644
--- a/fs/Makefile
+++ b/fs/Makefile
@@ -31,6 +31,7 @@ obj-$(CONFIG_TIMERFD) += timerfd.o
31obj-$(CONFIG_EVENTFD) += eventfd.o 31obj-$(CONFIG_EVENTFD) += eventfd.o
32obj-$(CONFIG_USERFAULTFD) += userfaultfd.o 32obj-$(CONFIG_USERFAULTFD) += userfaultfd.o
33obj-$(CONFIG_AIO) += aio.o 33obj-$(CONFIG_AIO) += aio.o
34obj-$(CONFIG_IO_URING) += io_uring.o
34obj-$(CONFIG_FS_DAX) += dax.o 35obj-$(CONFIG_FS_DAX) += dax.o
35obj-$(CONFIG_FS_ENCRYPTION) += crypto/ 36obj-$(CONFIG_FS_ENCRYPTION) += crypto/
36obj-$(CONFIG_FILE_LOCKING) += locks.o 37obj-$(CONFIG_FILE_LOCKING) += locks.o
diff --git a/fs/file.c b/fs/file.c
index a10487aa0a84..3da91a112bab 100644
--- a/fs/file.c
+++ b/fs/file.c
@@ -706,7 +706,7 @@ void do_close_on_exec(struct files_struct *files)
706 spin_unlock(&files->file_lock); 706 spin_unlock(&files->file_lock);
707} 707}
708 708
709static struct file *__fget(unsigned int fd, fmode_t mask) 709static struct file *__fget(unsigned int fd, fmode_t mask, unsigned int refs)
710{ 710{
711 struct files_struct *files = current->files; 711 struct files_struct *files = current->files;
712 struct file *file; 712 struct file *file;
@@ -721,7 +721,7 @@ loop:
721 */ 721 */
722 if (file->f_mode & mask) 722 if (file->f_mode & mask)
723 file = NULL; 723 file = NULL;
724 else if (!get_file_rcu(file)) 724 else if (!get_file_rcu_many(file, refs))
725 goto loop; 725 goto loop;
726 } 726 }
727 rcu_read_unlock(); 727 rcu_read_unlock();
@@ -729,15 +729,20 @@ loop:
729 return file; 729 return file;
730} 730}
731 731
732struct file *fget_many(unsigned int fd, unsigned int refs)
733{
734 return __fget(fd, FMODE_PATH, refs);
735}
736
732struct file *fget(unsigned int fd) 737struct file *fget(unsigned int fd)
733{ 738{
734 return __fget(fd, FMODE_PATH); 739 return __fget(fd, FMODE_PATH, 1);
735} 740}
736EXPORT_SYMBOL(fget); 741EXPORT_SYMBOL(fget);
737 742
738struct file *fget_raw(unsigned int fd) 743struct file *fget_raw(unsigned int fd)
739{ 744{
740 return __fget(fd, 0); 745 return __fget(fd, 0, 1);
741} 746}
742EXPORT_SYMBOL(fget_raw); 747EXPORT_SYMBOL(fget_raw);
743 748
@@ -768,7 +773,7 @@ static unsigned long __fget_light(unsigned int fd, fmode_t mask)
768 return 0; 773 return 0;
769 return (unsigned long)file; 774 return (unsigned long)file;
770 } else { 775 } else {
771 file = __fget(fd, mask); 776 file = __fget(fd, mask, 1);
772 if (!file) 777 if (!file)
773 return 0; 778 return 0;
774 return FDPUT_FPUT | (unsigned long)file; 779 return FDPUT_FPUT | (unsigned long)file;
diff --git a/fs/file_table.c b/fs/file_table.c
index 5679e7fcb6b0..155d7514a094 100644
--- a/fs/file_table.c
+++ b/fs/file_table.c
@@ -326,9 +326,9 @@ void flush_delayed_fput(void)
326 326
327static DECLARE_DELAYED_WORK(delayed_fput_work, delayed_fput); 327static DECLARE_DELAYED_WORK(delayed_fput_work, delayed_fput);
328 328
329void fput(struct file *file) 329void fput_many(struct file *file, unsigned int refs)
330{ 330{
331 if (atomic_long_dec_and_test(&file->f_count)) { 331 if (atomic_long_sub_and_test(refs, &file->f_count)) {
332 struct task_struct *task = current; 332 struct task_struct *task = current;
333 333
334 if (likely(!in_interrupt() && !(task->flags & PF_KTHREAD))) { 334 if (likely(!in_interrupt() && !(task->flags & PF_KTHREAD))) {
@@ -347,6 +347,11 @@ void fput(struct file *file)
347 } 347 }
348} 348}
349 349
350void fput(struct file *file)
351{
352 fput_many(file, 1);
353}
354
350/* 355/*
351 * synchronous analog of fput(); for kernel threads that might be needed 356 * synchronous analog of fput(); for kernel threads that might be needed
352 * in some umount() (and thus can't use flush_delayed_fput() without 357 * in some umount() (and thus can't use flush_delayed_fput() without
diff --git a/fs/io_uring.c b/fs/io_uring.c
new file mode 100644
index 000000000000..5d99376d2369
--- /dev/null
+++ b/fs/io_uring.c
@@ -0,0 +1,2971 @@
1// SPDX-License-Identifier: GPL-2.0
2/*
3 * Shared application/kernel submission and completion ring pairs, for
4 * supporting fast/efficient IO.
5 *
6 * A note on the read/write ordering memory barriers that are matched between
7 * the application and kernel side. When the application reads the CQ ring
8 * tail, it must use an appropriate smp_rmb() to order with the smp_wmb()
9 * the kernel uses after writing the tail. Failure to do so could cause a
10 * delay in when the application notices that completion events available.
11 * This isn't a fatal condition. Likewise, the application must use an
12 * appropriate smp_wmb() both before writing the SQ tail, and after writing
13 * the SQ tail. The first one orders the sqe writes with the tail write, and
14 * the latter is paired with the smp_rmb() the kernel will issue before
15 * reading the SQ tail on submission.
16 *
17 * Also see the examples in the liburing library:
18 *
19 * git://git.kernel.dk/liburing
20 *
21 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
22 * from data shared between the kernel and application. This is done both
23 * for ordering purposes, but also to ensure that once a value is loaded from
24 * data that the application could potentially modify, it remains stable.
25 *
26 * Copyright (C) 2018-2019 Jens Axboe
27 * Copyright (c) 2018-2019 Christoph Hellwig
28 */
29#include <linux/kernel.h>
30#include <linux/init.h>
31#include <linux/errno.h>
32#include <linux/syscalls.h>
33#include <linux/compat.h>
34#include <linux/refcount.h>
35#include <linux/uio.h>
36
37#include <linux/sched/signal.h>
38#include <linux/fs.h>
39#include <linux/file.h>
40#include <linux/fdtable.h>
41#include <linux/mm.h>
42#include <linux/mman.h>
43#include <linux/mmu_context.h>
44#include <linux/percpu.h>
45#include <linux/slab.h>
46#include <linux/workqueue.h>
47#include <linux/kthread.h>
48#include <linux/blkdev.h>
49#include <linux/bvec.h>
50#include <linux/net.h>
51#include <net/sock.h>
52#include <net/af_unix.h>
53#include <net/scm.h>
54#include <linux/anon_inodes.h>
55#include <linux/sched/mm.h>
56#include <linux/uaccess.h>
57#include <linux/nospec.h>
58#include <linux/sizes.h>
59#include <linux/hugetlb.h>
60
61#include <uapi/linux/io_uring.h>
62
63#include "internal.h"
64
65#define IORING_MAX_ENTRIES 4096
66#define IORING_MAX_FIXED_FILES 1024
67
68struct io_uring {
69 u32 head ____cacheline_aligned_in_smp;
70 u32 tail ____cacheline_aligned_in_smp;
71};
72
73struct io_sq_ring {
74 struct io_uring r;
75 u32 ring_mask;
76 u32 ring_entries;
77 u32 dropped;
78 u32 flags;
79 u32 array[];
80};
81
82struct io_cq_ring {
83 struct io_uring r;
84 u32 ring_mask;
85 u32 ring_entries;
86 u32 overflow;
87 struct io_uring_cqe cqes[];
88};
89
90struct io_mapped_ubuf {
91 u64 ubuf;
92 size_t len;
93 struct bio_vec *bvec;
94 unsigned int nr_bvecs;
95};
96
97struct async_list {
98 spinlock_t lock;
99 atomic_t cnt;
100 struct list_head list;
101
102 struct file *file;
103 off_t io_end;
104 size_t io_pages;
105};
106
107struct io_ring_ctx {
108 struct {
109 struct percpu_ref refs;
110 } ____cacheline_aligned_in_smp;
111
112 struct {
113 unsigned int flags;
114 bool compat;
115 bool account_mem;
116
117 /* SQ ring */
118 struct io_sq_ring *sq_ring;
119 unsigned cached_sq_head;
120 unsigned sq_entries;
121 unsigned sq_mask;
122 unsigned sq_thread_idle;
123 struct io_uring_sqe *sq_sqes;
124 } ____cacheline_aligned_in_smp;
125
126 /* IO offload */
127 struct workqueue_struct *sqo_wq;
128 struct task_struct *sqo_thread; /* if using sq thread polling */
129 struct mm_struct *sqo_mm;
130 wait_queue_head_t sqo_wait;
131 unsigned sqo_stop;
132
133 struct {
134 /* CQ ring */
135 struct io_cq_ring *cq_ring;
136 unsigned cached_cq_tail;
137 unsigned cq_entries;
138 unsigned cq_mask;
139 struct wait_queue_head cq_wait;
140 struct fasync_struct *cq_fasync;
141 } ____cacheline_aligned_in_smp;
142
143 /*
144 * If used, fixed file set. Writers must ensure that ->refs is dead,
145 * readers must ensure that ->refs is alive as long as the file* is
146 * used. Only updated through io_uring_register(2).
147 */
148 struct file **user_files;
149 unsigned nr_user_files;
150
151 /* if used, fixed mapped user buffers */
152 unsigned nr_user_bufs;
153 struct io_mapped_ubuf *user_bufs;
154
155 struct user_struct *user;
156
157 struct completion ctx_done;
158
159 struct {
160 struct mutex uring_lock;
161 wait_queue_head_t wait;
162 } ____cacheline_aligned_in_smp;
163
164 struct {
165 spinlock_t completion_lock;
166 bool poll_multi_file;
167 /*
168 * ->poll_list is protected by the ctx->uring_lock for
169 * io_uring instances that don't use IORING_SETUP_SQPOLL.
170 * For SQPOLL, only the single threaded io_sq_thread() will
171 * manipulate the list, hence no extra locking is needed there.
172 */
173 struct list_head poll_list;
174 struct list_head cancel_list;
175 } ____cacheline_aligned_in_smp;
176
177 struct async_list pending_async[2];
178
179#if defined(CONFIG_UNIX)
180 struct socket *ring_sock;
181#endif
182};
183
184struct sqe_submit {
185 const struct io_uring_sqe *sqe;
186 unsigned short index;
187 bool has_user;
188 bool needs_lock;
189 bool needs_fixed_file;
190};
191
192struct io_poll_iocb {
193 struct file *file;
194 struct wait_queue_head *head;
195 __poll_t events;
196 bool woken;
197 bool canceled;
198 struct wait_queue_entry wait;
199};
200
201struct io_kiocb {
202 union {
203 struct kiocb rw;
204 struct io_poll_iocb poll;
205 };
206
207 struct sqe_submit submit;
208
209 struct io_ring_ctx *ctx;
210 struct list_head list;
211 unsigned int flags;
212 refcount_t refs;
213#define REQ_F_FORCE_NONBLOCK 1 /* inline submission attempt */
214#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */
215#define REQ_F_FIXED_FILE 4 /* ctx owns file */
216#define REQ_F_SEQ_PREV 8 /* sequential with previous */
217 u64 user_data;
218 u64 error;
219
220 struct work_struct work;
221};
222
223#define IO_PLUG_THRESHOLD 2
224#define IO_IOPOLL_BATCH 8
225
226struct io_submit_state {
227 struct blk_plug plug;
228
229 /*
230 * io_kiocb alloc cache
231 */
232 void *reqs[IO_IOPOLL_BATCH];
233 unsigned int free_reqs;
234 unsigned int cur_req;
235
236 /*
237 * File reference cache
238 */
239 struct file *file;
240 unsigned int fd;
241 unsigned int has_refs;
242 unsigned int used_refs;
243 unsigned int ios_left;
244};
245
246static struct kmem_cache *req_cachep;
247
248static const struct file_operations io_uring_fops;
249
250struct sock *io_uring_get_socket(struct file *file)
251{
252#if defined(CONFIG_UNIX)
253 if (file->f_op == &io_uring_fops) {
254 struct io_ring_ctx *ctx = file->private_data;
255
256 return ctx->ring_sock->sk;
257 }
258#endif
259 return NULL;
260}
261EXPORT_SYMBOL(io_uring_get_socket);
262
263static void io_ring_ctx_ref_free(struct percpu_ref *ref)
264{
265 struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
266
267 complete(&ctx->ctx_done);
268}
269
270static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
271{
272 struct io_ring_ctx *ctx;
273 int i;
274
275 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
276 if (!ctx)
277 return NULL;
278
279 if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
280 kfree(ctx);
281 return NULL;
282 }
283
284 ctx->flags = p->flags;
285 init_waitqueue_head(&ctx->cq_wait);
286 init_completion(&ctx->ctx_done);
287 mutex_init(&ctx->uring_lock);
288 init_waitqueue_head(&ctx->wait);
289 for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
290 spin_lock_init(&ctx->pending_async[i].lock);
291 INIT_LIST_HEAD(&ctx->pending_async[i].list);
292 atomic_set(&ctx->pending_async[i].cnt, 0);
293 }
294 spin_lock_init(&ctx->completion_lock);
295 INIT_LIST_HEAD(&ctx->poll_list);
296 INIT_LIST_HEAD(&ctx->cancel_list);
297 return ctx;
298}
299
300static void io_commit_cqring(struct io_ring_ctx *ctx)
301{
302 struct io_cq_ring *ring = ctx->cq_ring;
303
304 if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
305 /* order cqe stores with ring update */
306 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
307
308 /*
309 * Write sider barrier of tail update, app has read side. See
310 * comment at the top of this file.
311 */
312 smp_wmb();
313
314 if (wq_has_sleeper(&ctx->cq_wait)) {
315 wake_up_interruptible(&ctx->cq_wait);
316 kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
317 }
318 }
319}
320
321static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
322{
323 struct io_cq_ring *ring = ctx->cq_ring;
324 unsigned tail;
325
326 tail = ctx->cached_cq_tail;
327 /* See comment at the top of the file */
328 smp_rmb();
329 if (tail + 1 == READ_ONCE(ring->r.head))
330 return NULL;
331
332 ctx->cached_cq_tail++;
333 return &ring->cqes[tail & ctx->cq_mask];
334}
335
336static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
337 long res, unsigned ev_flags)
338{
339 struct io_uring_cqe *cqe;
340
341 /*
342 * If we can't get a cq entry, userspace overflowed the
343 * submission (by quite a lot). Increment the overflow count in
344 * the ring.
345 */
346 cqe = io_get_cqring(ctx);
347 if (cqe) {
348 WRITE_ONCE(cqe->user_data, ki_user_data);
349 WRITE_ONCE(cqe->res, res);
350 WRITE_ONCE(cqe->flags, ev_flags);
351 } else {
352 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
353
354 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
355 }
356}
357
358static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data,
359 long res, unsigned ev_flags)
360{
361 unsigned long flags;
362
363 spin_lock_irqsave(&ctx->completion_lock, flags);
364 io_cqring_fill_event(ctx, ki_user_data, res, ev_flags);
365 io_commit_cqring(ctx);
366 spin_unlock_irqrestore(&ctx->completion_lock, flags);
367
368 if (waitqueue_active(&ctx->wait))
369 wake_up(&ctx->wait);
370 if (waitqueue_active(&ctx->sqo_wait))
371 wake_up(&ctx->sqo_wait);
372}
373
374static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
375{
376 percpu_ref_put_many(&ctx->refs, refs);
377
378 if (waitqueue_active(&ctx->wait))
379 wake_up(&ctx->wait);
380}
381
382static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
383 struct io_submit_state *state)
384{
385 struct io_kiocb *req;
386
387 if (!percpu_ref_tryget(&ctx->refs))
388 return NULL;
389
390 if (!state) {
391 req = kmem_cache_alloc(req_cachep, __GFP_NOWARN);
392 if (unlikely(!req))
393 goto out;
394 } else if (!state->free_reqs) {
395 size_t sz;
396 int ret;
397
398 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
399 ret = kmem_cache_alloc_bulk(req_cachep, __GFP_NOWARN, sz,
400 state->reqs);
401 if (unlikely(ret <= 0))
402 goto out;
403 state->free_reqs = ret - 1;
404 state->cur_req = 1;
405 req = state->reqs[0];
406 } else {
407 req = state->reqs[state->cur_req];
408 state->free_reqs--;
409 state->cur_req++;
410 }
411
412 req->ctx = ctx;
413 req->flags = 0;
414 refcount_set(&req->refs, 0);
415 return req;
416out:
417 io_ring_drop_ctx_refs(ctx, 1);
418 return NULL;
419}
420
421static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
422{
423 if (*nr) {
424 kmem_cache_free_bulk(req_cachep, *nr, reqs);
425 io_ring_drop_ctx_refs(ctx, *nr);
426 *nr = 0;
427 }
428}
429
430static void io_free_req(struct io_kiocb *req)
431{
432 if (!refcount_read(&req->refs) || refcount_dec_and_test(&req->refs)) {
433 io_ring_drop_ctx_refs(req->ctx, 1);
434 kmem_cache_free(req_cachep, req);
435 }
436}
437
438/*
439 * Find and free completed poll iocbs
440 */
441static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
442 struct list_head *done)
443{
444 void *reqs[IO_IOPOLL_BATCH];
445 int file_count, to_free;
446 struct file *file = NULL;
447 struct io_kiocb *req;
448
449 file_count = to_free = 0;
450 while (!list_empty(done)) {
451 req = list_first_entry(done, struct io_kiocb, list);
452 list_del(&req->list);
453
454 io_cqring_fill_event(ctx, req->user_data, req->error, 0);
455
456 reqs[to_free++] = req;
457 (*nr_events)++;
458
459 /*
460 * Batched puts of the same file, to avoid dirtying the
461 * file usage count multiple times, if avoidable.
462 */
463 if (!(req->flags & REQ_F_FIXED_FILE)) {
464 if (!file) {
465 file = req->rw.ki_filp;
466 file_count = 1;
467 } else if (file == req->rw.ki_filp) {
468 file_count++;
469 } else {
470 fput_many(file, file_count);
471 file = req->rw.ki_filp;
472 file_count = 1;
473 }
474 }
475
476 if (to_free == ARRAY_SIZE(reqs))
477 io_free_req_many(ctx, reqs, &to_free);
478 }
479 io_commit_cqring(ctx);
480
481 if (file)
482 fput_many(file, file_count);
483 io_free_req_many(ctx, reqs, &to_free);
484}
485
486static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
487 long min)
488{
489 struct io_kiocb *req, *tmp;
490 LIST_HEAD(done);
491 bool spin;
492 int ret;
493
494 /*
495 * Only spin for completions if we don't have multiple devices hanging
496 * off our complete list, and we're under the requested amount.
497 */
498 spin = !ctx->poll_multi_file && *nr_events < min;
499
500 ret = 0;
501 list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
502 struct kiocb *kiocb = &req->rw;
503
504 /*
505 * Move completed entries to our local list. If we find a
506 * request that requires polling, break out and complete
507 * the done list first, if we have entries there.
508 */
509 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
510 list_move_tail(&req->list, &done);
511 continue;
512 }
513 if (!list_empty(&done))
514 break;
515
516 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
517 if (ret < 0)
518 break;
519
520 if (ret && spin)
521 spin = false;
522 ret = 0;
523 }
524
525 if (!list_empty(&done))
526 io_iopoll_complete(ctx, nr_events, &done);
527
528 return ret;
529}
530
531/*
532 * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
533 * non-spinning poll check - we'll still enter the driver poll loop, but only
534 * as a non-spinning completion check.
535 */
536static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
537 long min)
538{
539 while (!list_empty(&ctx->poll_list)) {
540 int ret;
541
542 ret = io_do_iopoll(ctx, nr_events, min);
543 if (ret < 0)
544 return ret;
545 if (!min || *nr_events >= min)
546 return 0;
547 }
548
549 return 1;
550}
551
552/*
553 * We can't just wait for polled events to come to us, we have to actively
554 * find and complete them.
555 */
556static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
557{
558 if (!(ctx->flags & IORING_SETUP_IOPOLL))
559 return;
560
561 mutex_lock(&ctx->uring_lock);
562 while (!list_empty(&ctx->poll_list)) {
563 unsigned int nr_events = 0;
564
565 io_iopoll_getevents(ctx, &nr_events, 1);
566 }
567 mutex_unlock(&ctx->uring_lock);
568}
569
570static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
571 long min)
572{
573 int ret = 0;
574
575 do {
576 int tmin = 0;
577
578 if (*nr_events < min)
579 tmin = min - *nr_events;
580
581 ret = io_iopoll_getevents(ctx, nr_events, tmin);
582 if (ret <= 0)
583 break;
584 ret = 0;
585 } while (min && !*nr_events && !need_resched());
586
587 return ret;
588}
589
590static void kiocb_end_write(struct kiocb *kiocb)
591{
592 if (kiocb->ki_flags & IOCB_WRITE) {
593 struct inode *inode = file_inode(kiocb->ki_filp);
594
595 /*
596 * Tell lockdep we inherited freeze protection from submission
597 * thread.
598 */
599 if (S_ISREG(inode->i_mode))
600 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
601 file_end_write(kiocb->ki_filp);
602 }
603}
604
605static void io_fput(struct io_kiocb *req)
606{
607 if (!(req->flags & REQ_F_FIXED_FILE))
608 fput(req->rw.ki_filp);
609}
610
611static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
612{
613 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
614
615 kiocb_end_write(kiocb);
616
617 io_fput(req);
618 io_cqring_add_event(req->ctx, req->user_data, res, 0);
619 io_free_req(req);
620}
621
622static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
623{
624 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
625
626 kiocb_end_write(kiocb);
627
628 req->error = res;
629 if (res != -EAGAIN)
630 req->flags |= REQ_F_IOPOLL_COMPLETED;
631}
632
633/*
634 * After the iocb has been issued, it's safe to be found on the poll list.
635 * Adding the kiocb to the list AFTER submission ensures that we don't
636 * find it from a io_iopoll_getevents() thread before the issuer is done
637 * accessing the kiocb cookie.
638 */
639static void io_iopoll_req_issued(struct io_kiocb *req)
640{
641 struct io_ring_ctx *ctx = req->ctx;
642
643 /*
644 * Track whether we have multiple files in our lists. This will impact
645 * how we do polling eventually, not spinning if we're on potentially
646 * different devices.
647 */
648 if (list_empty(&ctx->poll_list)) {
649 ctx->poll_multi_file = false;
650 } else if (!ctx->poll_multi_file) {
651 struct io_kiocb *list_req;
652
653 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
654 list);
655 if (list_req->rw.ki_filp != req->rw.ki_filp)
656 ctx->poll_multi_file = true;
657 }
658
659 /*
660 * For fast devices, IO may have already completed. If it has, add
661 * it to the front so we find it first.
662 */
663 if (req->flags & REQ_F_IOPOLL_COMPLETED)
664 list_add(&req->list, &ctx->poll_list);
665 else
666 list_add_tail(&req->list, &ctx->poll_list);
667}
668
669static void io_file_put(struct io_submit_state *state, struct file *file)
670{
671 if (!state) {
672 fput(file);
673 } else if (state->file) {
674 int diff = state->has_refs - state->used_refs;
675
676 if (diff)
677 fput_many(state->file, diff);
678 state->file = NULL;
679 }
680}
681
682/*
683 * Get as many references to a file as we have IOs left in this submission,
684 * assuming most submissions are for one file, or at least that each file
685 * has more than one submission.
686 */
687static struct file *io_file_get(struct io_submit_state *state, int fd)
688{
689 if (!state)
690 return fget(fd);
691
692 if (state->file) {
693 if (state->fd == fd) {
694 state->used_refs++;
695 state->ios_left--;
696 return state->file;
697 }
698 io_file_put(state, NULL);
699 }
700 state->file = fget_many(fd, state->ios_left);
701 if (!state->file)
702 return NULL;
703
704 state->fd = fd;
705 state->has_refs = state->ios_left;
706 state->used_refs = 1;
707 state->ios_left--;
708 return state->file;
709}
710
711/*
712 * If we tracked the file through the SCM inflight mechanism, we could support
713 * any file. For now, just ensure that anything potentially problematic is done
714 * inline.
715 */
716static bool io_file_supports_async(struct file *file)
717{
718 umode_t mode = file_inode(file)->i_mode;
719
720 if (S_ISBLK(mode) || S_ISCHR(mode))
721 return true;
722 if (S_ISREG(mode) && file->f_op != &io_uring_fops)
723 return true;
724
725 return false;
726}
727
728static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
729 bool force_nonblock, struct io_submit_state *state)
730{
731 const struct io_uring_sqe *sqe = s->sqe;
732 struct io_ring_ctx *ctx = req->ctx;
733 struct kiocb *kiocb = &req->rw;
734 unsigned ioprio, flags;
735 int fd, ret;
736
737 /* For -EAGAIN retry, everything is already prepped */
738 if (kiocb->ki_filp)
739 return 0;
740
741 flags = READ_ONCE(sqe->flags);
742 fd = READ_ONCE(sqe->fd);
743
744 if (flags & IOSQE_FIXED_FILE) {
745 if (unlikely(!ctx->user_files ||
746 (unsigned) fd >= ctx->nr_user_files))
747 return -EBADF;
748 kiocb->ki_filp = ctx->user_files[fd];
749 req->flags |= REQ_F_FIXED_FILE;
750 } else {
751 if (s->needs_fixed_file)
752 return -EBADF;
753 kiocb->ki_filp = io_file_get(state, fd);
754 if (unlikely(!kiocb->ki_filp))
755 return -EBADF;
756 if (force_nonblock && !io_file_supports_async(kiocb->ki_filp))
757 force_nonblock = false;
758 }
759 kiocb->ki_pos = READ_ONCE(sqe->off);
760 kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
761 kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
762
763 ioprio = READ_ONCE(sqe->ioprio);
764 if (ioprio) {
765 ret = ioprio_check_cap(ioprio);
766 if (ret)
767 goto out_fput;
768
769 kiocb->ki_ioprio = ioprio;
770 } else
771 kiocb->ki_ioprio = get_current_ioprio();
772
773 ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
774 if (unlikely(ret))
775 goto out_fput;
776 if (force_nonblock) {
777 kiocb->ki_flags |= IOCB_NOWAIT;
778 req->flags |= REQ_F_FORCE_NONBLOCK;
779 }
780 if (ctx->flags & IORING_SETUP_IOPOLL) {
781 ret = -EOPNOTSUPP;
782 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
783 !kiocb->ki_filp->f_op->iopoll)
784 goto out_fput;
785
786 req->error = 0;
787 kiocb->ki_flags |= IOCB_HIPRI;
788 kiocb->ki_complete = io_complete_rw_iopoll;
789 } else {
790 if (kiocb->ki_flags & IOCB_HIPRI) {
791 ret = -EINVAL;
792 goto out_fput;
793 }
794 kiocb->ki_complete = io_complete_rw;
795 }
796 return 0;
797out_fput:
798 if (!(flags & IOSQE_FIXED_FILE)) {
799 /*
800 * in case of error, we didn't use this file reference. drop it.
801 */
802 if (state)
803 state->used_refs--;
804 io_file_put(state, kiocb->ki_filp);
805 }
806 return ret;
807}
808
809static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
810{
811 switch (ret) {
812 case -EIOCBQUEUED:
813 break;
814 case -ERESTARTSYS:
815 case -ERESTARTNOINTR:
816 case -ERESTARTNOHAND:
817 case -ERESTART_RESTARTBLOCK:
818 /*
819 * We can't just restart the syscall, since previously
820 * submitted sqes may already be in progress. Just fail this
821 * IO with EINTR.
822 */
823 ret = -EINTR;
824 /* fall through */
825 default:
826 kiocb->ki_complete(kiocb, ret, 0);
827 }
828}
829
830static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
831 const struct io_uring_sqe *sqe,
832 struct iov_iter *iter)
833{
834 size_t len = READ_ONCE(sqe->len);
835 struct io_mapped_ubuf *imu;
836 unsigned index, buf_index;
837 size_t offset;
838 u64 buf_addr;
839
840 /* attempt to use fixed buffers without having provided iovecs */
841 if (unlikely(!ctx->user_bufs))
842 return -EFAULT;
843
844 buf_index = READ_ONCE(sqe->buf_index);
845 if (unlikely(buf_index >= ctx->nr_user_bufs))
846 return -EFAULT;
847
848 index = array_index_nospec(buf_index, ctx->nr_user_bufs);
849 imu = &ctx->user_bufs[index];
850 buf_addr = READ_ONCE(sqe->addr);
851
852 /* overflow */
853 if (buf_addr + len < buf_addr)
854 return -EFAULT;
855 /* not inside the mapped region */
856 if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
857 return -EFAULT;
858
859 /*
860 * May not be a start of buffer, set size appropriately
861 * and advance us to the beginning.
862 */
863 offset = buf_addr - imu->ubuf;
864 iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
865 if (offset)
866 iov_iter_advance(iter, offset);
867 return 0;
868}
869
870static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
871 const struct sqe_submit *s, struct iovec **iovec,
872 struct iov_iter *iter)
873{
874 const struct io_uring_sqe *sqe = s->sqe;
875 void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
876 size_t sqe_len = READ_ONCE(sqe->len);
877 u8 opcode;
878
879 /*
880 * We're reading ->opcode for the second time, but the first read
881 * doesn't care whether it's _FIXED or not, so it doesn't matter
882 * whether ->opcode changes concurrently. The first read does care
883 * about whether it is a READ or a WRITE, so we don't trust this read
884 * for that purpose and instead let the caller pass in the read/write
885 * flag.
886 */
887 opcode = READ_ONCE(sqe->opcode);
888 if (opcode == IORING_OP_READ_FIXED ||
889 opcode == IORING_OP_WRITE_FIXED) {
890 ssize_t ret = io_import_fixed(ctx, rw, sqe, iter);
891 *iovec = NULL;
892 return ret;
893 }
894
895 if (!s->has_user)
896 return -EFAULT;
897
898#ifdef CONFIG_COMPAT
899 if (ctx->compat)
900 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
901 iovec, iter);
902#endif
903
904 return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
905}
906
907/*
908 * Make a note of the last file/offset/direction we punted to async
909 * context. We'll use this information to see if we can piggy back a
910 * sequential request onto the previous one, if it's still hasn't been
911 * completed by the async worker.
912 */
913static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
914{
915 struct async_list *async_list = &req->ctx->pending_async[rw];
916 struct kiocb *kiocb = &req->rw;
917 struct file *filp = kiocb->ki_filp;
918 off_t io_end = kiocb->ki_pos + len;
919
920 if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
921 unsigned long max_pages;
922
923 /* Use 8x RA size as a decent limiter for both reads/writes */
924 max_pages = filp->f_ra.ra_pages;
925 if (!max_pages)
926 max_pages = VM_MAX_READAHEAD >> (PAGE_SHIFT - 10);
927 max_pages *= 8;
928
929 /* If max pages are exceeded, reset the state */
930 len >>= PAGE_SHIFT;
931 if (async_list->io_pages + len <= max_pages) {
932 req->flags |= REQ_F_SEQ_PREV;
933 async_list->io_pages += len;
934 } else {
935 io_end = 0;
936 async_list->io_pages = 0;
937 }
938 }
939
940 /* New file? Reset state. */
941 if (async_list->file != filp) {
942 async_list->io_pages = 0;
943 async_list->file = filp;
944 }
945 async_list->io_end = io_end;
946}
947
948static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
949 bool force_nonblock, struct io_submit_state *state)
950{
951 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
952 struct kiocb *kiocb = &req->rw;
953 struct iov_iter iter;
954 struct file *file;
955 size_t iov_count;
956 ssize_t ret;
957
958 ret = io_prep_rw(req, s, force_nonblock, state);
959 if (ret)
960 return ret;
961 file = kiocb->ki_filp;
962
963 ret = -EBADF;
964 if (unlikely(!(file->f_mode & FMODE_READ)))
965 goto out_fput;
966 ret = -EINVAL;
967 if (unlikely(!file->f_op->read_iter))
968 goto out_fput;
969
970 ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
971 if (ret)
972 goto out_fput;
973
974 iov_count = iov_iter_count(&iter);
975 ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
976 if (!ret) {
977 ssize_t ret2;
978
979 /* Catch -EAGAIN return for forced non-blocking submission */
980 ret2 = call_read_iter(file, kiocb, &iter);
981 if (!force_nonblock || ret2 != -EAGAIN) {
982 io_rw_done(kiocb, ret2);
983 } else {
984 /*
985 * If ->needs_lock is true, we're already in async
986 * context.
987 */
988 if (!s->needs_lock)
989 io_async_list_note(READ, req, iov_count);
990 ret = -EAGAIN;
991 }
992 }
993 kfree(iovec);
994out_fput:
995 /* Hold on to the file for -EAGAIN */
996 if (unlikely(ret && ret != -EAGAIN))
997 io_fput(req);
998 return ret;
999}
1000
1001static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
1002 bool force_nonblock, struct io_submit_state *state)
1003{
1004 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1005 struct kiocb *kiocb = &req->rw;
1006 struct iov_iter iter;
1007 struct file *file;
1008 size_t iov_count;
1009 ssize_t ret;
1010
1011 ret = io_prep_rw(req, s, force_nonblock, state);
1012 if (ret)
1013 return ret;
1014
1015 ret = -EBADF;
1016 file = kiocb->ki_filp;
1017 if (unlikely(!(file->f_mode & FMODE_WRITE)))
1018 goto out_fput;
1019 ret = -EINVAL;
1020 if (unlikely(!file->f_op->write_iter))
1021 goto out_fput;
1022
1023 ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1024 if (ret)
1025 goto out_fput;
1026
1027 iov_count = iov_iter_count(&iter);
1028
1029 ret = -EAGAIN;
1030 if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1031 /* If ->needs_lock is true, we're already in async context. */
1032 if (!s->needs_lock)
1033 io_async_list_note(WRITE, req, iov_count);
1034 goto out_free;
1035 }
1036
1037 ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1038 if (!ret) {
1039 /*
1040 * Open-code file_start_write here to grab freeze protection,
1041 * which will be released by another thread in
1042 * io_complete_rw(). Fool lockdep by telling it the lock got
1043 * released so that it doesn't complain about the held lock when
1044 * we return to userspace.
1045 */
1046 if (S_ISREG(file_inode(file)->i_mode)) {
1047 __sb_start_write(file_inode(file)->i_sb,
1048 SB_FREEZE_WRITE, true);
1049 __sb_writers_release(file_inode(file)->i_sb,
1050 SB_FREEZE_WRITE);
1051 }
1052 kiocb->ki_flags |= IOCB_WRITE;
1053 io_rw_done(kiocb, call_write_iter(file, kiocb, &iter));
1054 }
1055out_free:
1056 kfree(iovec);
1057out_fput:
1058 /* Hold on to the file for -EAGAIN */
1059 if (unlikely(ret && ret != -EAGAIN))
1060 io_fput(req);
1061 return ret;
1062}
1063
1064/*
1065 * IORING_OP_NOP just posts a completion event, nothing else.
1066 */
1067static int io_nop(struct io_kiocb *req, u64 user_data)
1068{
1069 struct io_ring_ctx *ctx = req->ctx;
1070 long err = 0;
1071
1072 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1073 return -EINVAL;
1074
1075 /*
1076 * Twilight zone - it's possible that someone issued an opcode that
1077 * has a file attached, then got -EAGAIN on submission, and changed
1078 * the sqe before we retried it from async context. Avoid dropping
1079 * a file reference for this malicious case, and flag the error.
1080 */
1081 if (req->rw.ki_filp) {
1082 err = -EBADF;
1083 io_fput(req);
1084 }
1085 io_cqring_add_event(ctx, user_data, err, 0);
1086 io_free_req(req);
1087 return 0;
1088}
1089
1090static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1091{
1092 struct io_ring_ctx *ctx = req->ctx;
1093 unsigned flags;
1094 int fd;
1095
1096 /* Prep already done */
1097 if (req->rw.ki_filp)
1098 return 0;
1099
1100 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1101 return -EINVAL;
1102 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1103 return -EINVAL;
1104
1105 fd = READ_ONCE(sqe->fd);
1106 flags = READ_ONCE(sqe->flags);
1107
1108 if (flags & IOSQE_FIXED_FILE) {
1109 if (unlikely(!ctx->user_files || fd >= ctx->nr_user_files))
1110 return -EBADF;
1111 req->rw.ki_filp = ctx->user_files[fd];
1112 req->flags |= REQ_F_FIXED_FILE;
1113 } else {
1114 req->rw.ki_filp = fget(fd);
1115 if (unlikely(!req->rw.ki_filp))
1116 return -EBADF;
1117 }
1118
1119 return 0;
1120}
1121
1122static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1123 bool force_nonblock)
1124{
1125 loff_t sqe_off = READ_ONCE(sqe->off);
1126 loff_t sqe_len = READ_ONCE(sqe->len);
1127 loff_t end = sqe_off + sqe_len;
1128 unsigned fsync_flags;
1129 int ret;
1130
1131 fsync_flags = READ_ONCE(sqe->fsync_flags);
1132 if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1133 return -EINVAL;
1134
1135 ret = io_prep_fsync(req, sqe);
1136 if (ret)
1137 return ret;
1138
1139 /* fsync always requires a blocking context */
1140 if (force_nonblock)
1141 return -EAGAIN;
1142
1143 ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1144 end > 0 ? end : LLONG_MAX,
1145 fsync_flags & IORING_FSYNC_DATASYNC);
1146
1147 io_fput(req);
1148 io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1149 io_free_req(req);
1150 return 0;
1151}
1152
1153static void io_poll_remove_one(struct io_kiocb *req)
1154{
1155 struct io_poll_iocb *poll = &req->poll;
1156
1157 spin_lock(&poll->head->lock);
1158 WRITE_ONCE(poll->canceled, true);
1159 if (!list_empty(&poll->wait.entry)) {
1160 list_del_init(&poll->wait.entry);
1161 queue_work(req->ctx->sqo_wq, &req->work);
1162 }
1163 spin_unlock(&poll->head->lock);
1164
1165 list_del_init(&req->list);
1166}
1167
1168static void io_poll_remove_all(struct io_ring_ctx *ctx)
1169{
1170 struct io_kiocb *req;
1171
1172 spin_lock_irq(&ctx->completion_lock);
1173 while (!list_empty(&ctx->cancel_list)) {
1174 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1175 io_poll_remove_one(req);
1176 }
1177 spin_unlock_irq(&ctx->completion_lock);
1178}
1179
1180/*
1181 * Find a running poll command that matches one specified in sqe->addr,
1182 * and remove it if found.
1183 */
1184static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1185{
1186 struct io_ring_ctx *ctx = req->ctx;
1187 struct io_kiocb *poll_req, *next;
1188 int ret = -ENOENT;
1189
1190 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1191 return -EINVAL;
1192 if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1193 sqe->poll_events)
1194 return -EINVAL;
1195
1196 spin_lock_irq(&ctx->completion_lock);
1197 list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1198 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1199 io_poll_remove_one(poll_req);
1200 ret = 0;
1201 break;
1202 }
1203 }
1204 spin_unlock_irq(&ctx->completion_lock);
1205
1206 io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1207 io_free_req(req);
1208 return 0;
1209}
1210
1211static void io_poll_complete(struct io_kiocb *req, __poll_t mask)
1212{
1213 io_cqring_add_event(req->ctx, req->user_data, mangle_poll(mask), 0);
1214 io_fput(req);
1215 io_free_req(req);
1216}
1217
1218static void io_poll_complete_work(struct work_struct *work)
1219{
1220 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1221 struct io_poll_iocb *poll = &req->poll;
1222 struct poll_table_struct pt = { ._key = poll->events };
1223 struct io_ring_ctx *ctx = req->ctx;
1224 __poll_t mask = 0;
1225
1226 if (!READ_ONCE(poll->canceled))
1227 mask = vfs_poll(poll->file, &pt) & poll->events;
1228
1229 /*
1230 * Note that ->ki_cancel callers also delete iocb from active_reqs after
1231 * calling ->ki_cancel. We need the ctx_lock roundtrip here to
1232 * synchronize with them. In the cancellation case the list_del_init
1233 * itself is not actually needed, but harmless so we keep it in to
1234 * avoid further branches in the fast path.
1235 */
1236 spin_lock_irq(&ctx->completion_lock);
1237 if (!mask && !READ_ONCE(poll->canceled)) {
1238 add_wait_queue(poll->head, &poll->wait);
1239 spin_unlock_irq(&ctx->completion_lock);
1240 return;
1241 }
1242 list_del_init(&req->list);
1243 spin_unlock_irq(&ctx->completion_lock);
1244
1245 io_poll_complete(req, mask);
1246}
1247
1248static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1249 void *key)
1250{
1251 struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1252 wait);
1253 struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1254 struct io_ring_ctx *ctx = req->ctx;
1255 __poll_t mask = key_to_poll(key);
1256
1257 poll->woken = true;
1258
1259 /* for instances that support it check for an event match first: */
1260 if (mask) {
1261 unsigned long flags;
1262
1263 if (!(mask & poll->events))
1264 return 0;
1265
1266 /* try to complete the iocb inline if we can: */
1267 if (spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1268 list_del(&req->list);
1269 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1270
1271 list_del_init(&poll->wait.entry);
1272 io_poll_complete(req, mask);
1273 return 1;
1274 }
1275 }
1276
1277 list_del_init(&poll->wait.entry);
1278 queue_work(ctx->sqo_wq, &req->work);
1279 return 1;
1280}
1281
1282struct io_poll_table {
1283 struct poll_table_struct pt;
1284 struct io_kiocb *req;
1285 int error;
1286};
1287
1288static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1289 struct poll_table_struct *p)
1290{
1291 struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1292
1293 if (unlikely(pt->req->poll.head)) {
1294 pt->error = -EINVAL;
1295 return;
1296 }
1297
1298 pt->error = 0;
1299 pt->req->poll.head = head;
1300 add_wait_queue(head, &pt->req->poll.wait);
1301}
1302
1303static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1304{
1305 struct io_poll_iocb *poll = &req->poll;
1306 struct io_ring_ctx *ctx = req->ctx;
1307 struct io_poll_table ipt;
1308 unsigned flags;
1309 __poll_t mask;
1310 u16 events;
1311 int fd;
1312
1313 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1314 return -EINVAL;
1315 if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1316 return -EINVAL;
1317
1318 INIT_WORK(&req->work, io_poll_complete_work);
1319 events = READ_ONCE(sqe->poll_events);
1320 poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1321
1322 flags = READ_ONCE(sqe->flags);
1323 fd = READ_ONCE(sqe->fd);
1324
1325 if (flags & IOSQE_FIXED_FILE) {
1326 if (unlikely(!ctx->user_files || fd >= ctx->nr_user_files))
1327 return -EBADF;
1328 poll->file = ctx->user_files[fd];
1329 req->flags |= REQ_F_FIXED_FILE;
1330 } else {
1331 poll->file = fget(fd);
1332 }
1333 if (unlikely(!poll->file))
1334 return -EBADF;
1335
1336 poll->head = NULL;
1337 poll->woken = false;
1338 poll->canceled = false;
1339
1340 ipt.pt._qproc = io_poll_queue_proc;
1341 ipt.pt._key = poll->events;
1342 ipt.req = req;
1343 ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1344
1345 /* initialized the list so that we can do list_empty checks */
1346 INIT_LIST_HEAD(&poll->wait.entry);
1347 init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1348
1349 /* one for removal from waitqueue, one for this function */
1350 refcount_set(&req->refs, 2);
1351
1352 mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1353 if (unlikely(!poll->head)) {
1354 /* we did not manage to set up a waitqueue, done */
1355 goto out;
1356 }
1357
1358 spin_lock_irq(&ctx->completion_lock);
1359 spin_lock(&poll->head->lock);
1360 if (poll->woken) {
1361 /* wake_up context handles the rest */
1362 mask = 0;
1363 ipt.error = 0;
1364 } else if (mask || ipt.error) {
1365 /* if we get an error or a mask we are done */
1366 WARN_ON_ONCE(list_empty(&poll->wait.entry));
1367 list_del_init(&poll->wait.entry);
1368 } else {
1369 /* actually waiting for an event */
1370 list_add_tail(&req->list, &ctx->cancel_list);
1371 }
1372 spin_unlock(&poll->head->lock);
1373 spin_unlock_irq(&ctx->completion_lock);
1374
1375out:
1376 if (unlikely(ipt.error)) {
1377 if (!(flags & IOSQE_FIXED_FILE))
1378 fput(poll->file);
1379 /*
1380 * Drop one of our refs to this req, __io_submit_sqe() will
1381 * drop the other one since we're returning an error.
1382 */
1383 io_free_req(req);
1384 return ipt.error;
1385 }
1386
1387 if (mask)
1388 io_poll_complete(req, mask);
1389 io_free_req(req);
1390 return 0;
1391}
1392
1393static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1394 const struct sqe_submit *s, bool force_nonblock,
1395 struct io_submit_state *state)
1396{
1397 ssize_t ret;
1398 int opcode;
1399
1400 if (unlikely(s->index >= ctx->sq_entries))
1401 return -EINVAL;
1402 req->user_data = READ_ONCE(s->sqe->user_data);
1403
1404 opcode = READ_ONCE(s->sqe->opcode);
1405 switch (opcode) {
1406 case IORING_OP_NOP:
1407 ret = io_nop(req, req->user_data);
1408 break;
1409 case IORING_OP_READV:
1410 if (unlikely(s->sqe->buf_index))
1411 return -EINVAL;
1412 ret = io_read(req, s, force_nonblock, state);
1413 break;
1414 case IORING_OP_WRITEV:
1415 if (unlikely(s->sqe->buf_index))
1416 return -EINVAL;
1417 ret = io_write(req, s, force_nonblock, state);
1418 break;
1419 case IORING_OP_READ_FIXED:
1420 ret = io_read(req, s, force_nonblock, state);
1421 break;
1422 case IORING_OP_WRITE_FIXED:
1423 ret = io_write(req, s, force_nonblock, state);
1424 break;
1425 case IORING_OP_FSYNC:
1426 ret = io_fsync(req, s->sqe, force_nonblock);
1427 break;
1428 case IORING_OP_POLL_ADD:
1429 ret = io_poll_add(req, s->sqe);
1430 break;
1431 case IORING_OP_POLL_REMOVE:
1432 ret = io_poll_remove(req, s->sqe);
1433 break;
1434 default:
1435 ret = -EINVAL;
1436 break;
1437 }
1438
1439 if (ret)
1440 return ret;
1441
1442 if (ctx->flags & IORING_SETUP_IOPOLL) {
1443 if (req->error == -EAGAIN)
1444 return -EAGAIN;
1445
1446 /* workqueue context doesn't hold uring_lock, grab it now */
1447 if (s->needs_lock)
1448 mutex_lock(&ctx->uring_lock);
1449 io_iopoll_req_issued(req);
1450 if (s->needs_lock)
1451 mutex_unlock(&ctx->uring_lock);
1452 }
1453
1454 return 0;
1455}
1456
1457static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1458 const struct io_uring_sqe *sqe)
1459{
1460 switch (sqe->opcode) {
1461 case IORING_OP_READV:
1462 case IORING_OP_READ_FIXED:
1463 return &ctx->pending_async[READ];
1464 case IORING_OP_WRITEV:
1465 case IORING_OP_WRITE_FIXED:
1466 return &ctx->pending_async[WRITE];
1467 default:
1468 return NULL;
1469 }
1470}
1471
1472static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1473{
1474 u8 opcode = READ_ONCE(sqe->opcode);
1475
1476 return !(opcode == IORING_OP_READ_FIXED ||
1477 opcode == IORING_OP_WRITE_FIXED);
1478}
1479
1480static void io_sq_wq_submit_work(struct work_struct *work)
1481{
1482 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1483 struct io_ring_ctx *ctx = req->ctx;
1484 struct mm_struct *cur_mm = NULL;
1485 struct async_list *async_list;
1486 LIST_HEAD(req_list);
1487 mm_segment_t old_fs;
1488 int ret;
1489
1490 async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1491restart:
1492 do {
1493 struct sqe_submit *s = &req->submit;
1494 const struct io_uring_sqe *sqe = s->sqe;
1495
1496 /* Ensure we clear previously set forced non-block flag */
1497 req->flags &= ~REQ_F_FORCE_NONBLOCK;
1498 req->rw.ki_flags &= ~IOCB_NOWAIT;
1499
1500 ret = 0;
1501 if (io_sqe_needs_user(sqe) && !cur_mm) {
1502 if (!mmget_not_zero(ctx->sqo_mm)) {
1503 ret = -EFAULT;
1504 } else {
1505 cur_mm = ctx->sqo_mm;
1506 use_mm(cur_mm);
1507 old_fs = get_fs();
1508 set_fs(USER_DS);
1509 }
1510 }
1511
1512 if (!ret) {
1513 s->has_user = cur_mm != NULL;
1514 s->needs_lock = true;
1515 do {
1516 ret = __io_submit_sqe(ctx, req, s, false, NULL);
1517 /*
1518 * We can get EAGAIN for polled IO even though
1519 * we're forcing a sync submission from here,
1520 * since we can't wait for request slots on the
1521 * block side.
1522 */
1523 if (ret != -EAGAIN)
1524 break;
1525 cond_resched();
1526 } while (1);
1527 }
1528 if (ret) {
1529 io_cqring_add_event(ctx, sqe->user_data, ret, 0);
1530 io_free_req(req);
1531 }
1532
1533 /* async context always use a copy of the sqe */
1534 kfree(sqe);
1535
1536 if (!async_list)
1537 break;
1538 if (!list_empty(&req_list)) {
1539 req = list_first_entry(&req_list, struct io_kiocb,
1540 list);
1541 list_del(&req->list);
1542 continue;
1543 }
1544 if (list_empty(&async_list->list))
1545 break;
1546
1547 req = NULL;
1548 spin_lock(&async_list->lock);
1549 if (list_empty(&async_list->list)) {
1550 spin_unlock(&async_list->lock);
1551 break;
1552 }
1553 list_splice_init(&async_list->list, &req_list);
1554 spin_unlock(&async_list->lock);
1555
1556 req = list_first_entry(&req_list, struct io_kiocb, list);
1557 list_del(&req->list);
1558 } while (req);
1559
1560 /*
1561 * Rare case of racing with a submitter. If we find the count has
1562 * dropped to zero AND we have pending work items, then restart
1563 * the processing. This is a tiny race window.
1564 */
1565 if (async_list) {
1566 ret = atomic_dec_return(&async_list->cnt);
1567 while (!ret && !list_empty(&async_list->list)) {
1568 spin_lock(&async_list->lock);
1569 atomic_inc(&async_list->cnt);
1570 list_splice_init(&async_list->list, &req_list);
1571 spin_unlock(&async_list->lock);
1572
1573 if (!list_empty(&req_list)) {
1574 req = list_first_entry(&req_list,
1575 struct io_kiocb, list);
1576 list_del(&req->list);
1577 goto restart;
1578 }
1579 ret = atomic_dec_return(&async_list->cnt);
1580 }
1581 }
1582
1583 if (cur_mm) {
1584 set_fs(old_fs);
1585 unuse_mm(cur_mm);
1586 mmput(cur_mm);
1587 }
1588}
1589
1590/*
1591 * See if we can piggy back onto previously submitted work, that is still
1592 * running. We currently only allow this if the new request is sequential
1593 * to the previous one we punted.
1594 */
1595static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1596{
1597 bool ret = false;
1598
1599 if (!list)
1600 return false;
1601 if (!(req->flags & REQ_F_SEQ_PREV))
1602 return false;
1603 if (!atomic_read(&list->cnt))
1604 return false;
1605
1606 ret = true;
1607 spin_lock(&list->lock);
1608 list_add_tail(&req->list, &list->list);
1609 if (!atomic_read(&list->cnt)) {
1610 list_del_init(&req->list);
1611 ret = false;
1612 }
1613 spin_unlock(&list->lock);
1614 return ret;
1615}
1616
1617static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1618 struct io_submit_state *state)
1619{
1620 struct io_kiocb *req;
1621 ssize_t ret;
1622
1623 /* enforce forwards compatibility on users */
1624 if (unlikely(s->sqe->flags & ~IOSQE_FIXED_FILE))
1625 return -EINVAL;
1626
1627 req = io_get_req(ctx, state);
1628 if (unlikely(!req))
1629 return -EAGAIN;
1630
1631 req->rw.ki_filp = NULL;
1632
1633 ret = __io_submit_sqe(ctx, req, s, true, state);
1634 if (ret == -EAGAIN) {
1635 struct io_uring_sqe *sqe_copy;
1636
1637 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1638 if (sqe_copy) {
1639 struct async_list *list;
1640
1641 memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1642 s->sqe = sqe_copy;
1643
1644 memcpy(&req->submit, s, sizeof(*s));
1645 list = io_async_list_from_sqe(ctx, s->sqe);
1646 if (!io_add_to_prev_work(list, req)) {
1647 if (list)
1648 atomic_inc(&list->cnt);
1649 INIT_WORK(&req->work, io_sq_wq_submit_work);
1650 queue_work(ctx->sqo_wq, &req->work);
1651 }
1652 ret = 0;
1653 }
1654 }
1655 if (ret)
1656 io_free_req(req);
1657
1658 return ret;
1659}
1660
1661/*
1662 * Batched submission is done, ensure local IO is flushed out.
1663 */
1664static void io_submit_state_end(struct io_submit_state *state)
1665{
1666 blk_finish_plug(&state->plug);
1667 io_file_put(state, NULL);
1668 if (state->free_reqs)
1669 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1670 &state->reqs[state->cur_req]);
1671}
1672
1673/*
1674 * Start submission side cache.
1675 */
1676static void io_submit_state_start(struct io_submit_state *state,
1677 struct io_ring_ctx *ctx, unsigned max_ios)
1678{
1679 blk_start_plug(&state->plug);
1680 state->free_reqs = 0;
1681 state->file = NULL;
1682 state->ios_left = max_ios;
1683}
1684
1685static void io_commit_sqring(struct io_ring_ctx *ctx)
1686{
1687 struct io_sq_ring *ring = ctx->sq_ring;
1688
1689 if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1690 /*
1691 * Ensure any loads from the SQEs are done at this point,
1692 * since once we write the new head, the application could
1693 * write new data to them.
1694 */
1695 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1696
1697 /*
1698 * write side barrier of head update, app has read side. See
1699 * comment at the top of this file
1700 */
1701 smp_wmb();
1702 }
1703}
1704
1705/*
1706 * Undo last io_get_sqring()
1707 */
1708static void io_drop_sqring(struct io_ring_ctx *ctx)
1709{
1710 ctx->cached_sq_head--;
1711}
1712
1713/*
1714 * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1715 * that is mapped by userspace. This means that care needs to be taken to
1716 * ensure that reads are stable, as we cannot rely on userspace always
1717 * being a good citizen. If members of the sqe are validated and then later
1718 * used, it's important that those reads are done through READ_ONCE() to
1719 * prevent a re-load down the line.
1720 */
1721static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1722{
1723 struct io_sq_ring *ring = ctx->sq_ring;
1724 unsigned head;
1725
1726 /*
1727 * The cached sq head (or cq tail) serves two purposes:
1728 *
1729 * 1) allows us to batch the cost of updating the user visible
1730 * head updates.
1731 * 2) allows the kernel side to track the head on its own, even
1732 * though the application is the one updating it.
1733 */
1734 head = ctx->cached_sq_head;
1735 /* See comment at the top of this file */
1736 smp_rmb();
1737 if (head == READ_ONCE(ring->r.tail))
1738 return false;
1739
1740 head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1741 if (head < ctx->sq_entries) {
1742 s->index = head;
1743 s->sqe = &ctx->sq_sqes[head];
1744 ctx->cached_sq_head++;
1745 return true;
1746 }
1747
1748 /* drop invalid entries */
1749 ctx->cached_sq_head++;
1750 ring->dropped++;
1751 /* See comment at the top of this file */
1752 smp_wmb();
1753 return false;
1754}
1755
1756static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1757 unsigned int nr, bool has_user, bool mm_fault)
1758{
1759 struct io_submit_state state, *statep = NULL;
1760 int ret, i, submitted = 0;
1761
1762 if (nr > IO_PLUG_THRESHOLD) {
1763 io_submit_state_start(&state, ctx, nr);
1764 statep = &state;
1765 }
1766
1767 for (i = 0; i < nr; i++) {
1768 if (unlikely(mm_fault)) {
1769 ret = -EFAULT;
1770 } else {
1771 sqes[i].has_user = has_user;
1772 sqes[i].needs_lock = true;
1773 sqes[i].needs_fixed_file = true;
1774 ret = io_submit_sqe(ctx, &sqes[i], statep);
1775 }
1776 if (!ret) {
1777 submitted++;
1778 continue;
1779 }
1780
1781 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
1782 }
1783
1784 if (statep)
1785 io_submit_state_end(&state);
1786
1787 return submitted;
1788}
1789
1790static int io_sq_thread(void *data)
1791{
1792 struct sqe_submit sqes[IO_IOPOLL_BATCH];
1793 struct io_ring_ctx *ctx = data;
1794 struct mm_struct *cur_mm = NULL;
1795 mm_segment_t old_fs;
1796 DEFINE_WAIT(wait);
1797 unsigned inflight;
1798 unsigned long timeout;
1799
1800 old_fs = get_fs();
1801 set_fs(USER_DS);
1802
1803 timeout = inflight = 0;
1804 while (!kthread_should_stop() && !ctx->sqo_stop) {
1805 bool all_fixed, mm_fault = false;
1806 int i;
1807
1808 if (inflight) {
1809 unsigned nr_events = 0;
1810
1811 if (ctx->flags & IORING_SETUP_IOPOLL) {
1812 /*
1813 * We disallow the app entering submit/complete
1814 * with polling, but we still need to lock the
1815 * ring to prevent racing with polled issue
1816 * that got punted to a workqueue.
1817 */
1818 mutex_lock(&ctx->uring_lock);
1819 io_iopoll_check(ctx, &nr_events, 0);
1820 mutex_unlock(&ctx->uring_lock);
1821 } else {
1822 /*
1823 * Normal IO, just pretend everything completed.
1824 * We don't have to poll completions for that.
1825 */
1826 nr_events = inflight;
1827 }
1828
1829 inflight -= nr_events;
1830 if (!inflight)
1831 timeout = jiffies + ctx->sq_thread_idle;
1832 }
1833
1834 if (!io_get_sqring(ctx, &sqes[0])) {
1835 /*
1836 * We're polling. If we're within the defined idle
1837 * period, then let us spin without work before going
1838 * to sleep.
1839 */
1840 if (inflight || !time_after(jiffies, timeout)) {
1841 cpu_relax();
1842 continue;
1843 }
1844
1845 /*
1846 * Drop cur_mm before scheduling, we can't hold it for
1847 * long periods (or over schedule()). Do this before
1848 * adding ourselves to the waitqueue, as the unuse/drop
1849 * may sleep.
1850 */
1851 if (cur_mm) {
1852 unuse_mm(cur_mm);
1853 mmput(cur_mm);
1854 cur_mm = NULL;
1855 }
1856
1857 prepare_to_wait(&ctx->sqo_wait, &wait,
1858 TASK_INTERRUPTIBLE);
1859
1860 /* Tell userspace we may need a wakeup call */
1861 ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
1862 smp_wmb();
1863
1864 if (!io_get_sqring(ctx, &sqes[0])) {
1865 if (kthread_should_stop()) {
1866 finish_wait(&ctx->sqo_wait, &wait);
1867 break;
1868 }
1869 if (signal_pending(current))
1870 flush_signals(current);
1871 schedule();
1872 finish_wait(&ctx->sqo_wait, &wait);
1873
1874 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
1875 smp_wmb();
1876 continue;
1877 }
1878 finish_wait(&ctx->sqo_wait, &wait);
1879
1880 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
1881 smp_wmb();
1882 }
1883
1884 i = 0;
1885 all_fixed = true;
1886 do {
1887 if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
1888 all_fixed = false;
1889
1890 i++;
1891 if (i == ARRAY_SIZE(sqes))
1892 break;
1893 } while (io_get_sqring(ctx, &sqes[i]));
1894
1895 /* Unless all new commands are FIXED regions, grab mm */
1896 if (!all_fixed && !cur_mm) {
1897 mm_fault = !mmget_not_zero(ctx->sqo_mm);
1898 if (!mm_fault) {
1899 use_mm(ctx->sqo_mm);
1900 cur_mm = ctx->sqo_mm;
1901 }
1902 }
1903
1904 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
1905 mm_fault);
1906
1907 /* Commit SQ ring head once we've consumed all SQEs */
1908 io_commit_sqring(ctx);
1909 }
1910
1911 set_fs(old_fs);
1912 if (cur_mm) {
1913 unuse_mm(cur_mm);
1914 mmput(cur_mm);
1915 }
1916 return 0;
1917}
1918
1919static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
1920{
1921 struct io_submit_state state, *statep = NULL;
1922 int i, ret = 0, submit = 0;
1923
1924 if (to_submit > IO_PLUG_THRESHOLD) {
1925 io_submit_state_start(&state, ctx, to_submit);
1926 statep = &state;
1927 }
1928
1929 for (i = 0; i < to_submit; i++) {
1930 struct sqe_submit s;
1931
1932 if (!io_get_sqring(ctx, &s))
1933 break;
1934
1935 s.has_user = true;
1936 s.needs_lock = false;
1937 s.needs_fixed_file = false;
1938
1939 ret = io_submit_sqe(ctx, &s, statep);
1940 if (ret) {
1941 io_drop_sqring(ctx);
1942 break;
1943 }
1944
1945 submit++;
1946 }
1947 io_commit_sqring(ctx);
1948
1949 if (statep)
1950 io_submit_state_end(statep);
1951
1952 return submit ? submit : ret;
1953}
1954
1955static unsigned io_cqring_events(struct io_cq_ring *ring)
1956{
1957 return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
1958}
1959
1960/*
1961 * Wait until events become available, if we don't already have some. The
1962 * application must reap them itself, as they reside on the shared cq ring.
1963 */
1964static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
1965 const sigset_t __user *sig, size_t sigsz)
1966{
1967 struct io_cq_ring *ring = ctx->cq_ring;
1968 sigset_t ksigmask, sigsaved;
1969 DEFINE_WAIT(wait);
1970 int ret;
1971
1972 /* See comment at the top of this file */
1973 smp_rmb();
1974 if (io_cqring_events(ring) >= min_events)
1975 return 0;
1976
1977 if (sig) {
1978 ret = set_user_sigmask(sig, &ksigmask, &sigsaved, sigsz);
1979 if (ret)
1980 return ret;
1981 }
1982
1983 do {
1984 prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
1985
1986 ret = 0;
1987 /* See comment at the top of this file */
1988 smp_rmb();
1989 if (io_cqring_events(ring) >= min_events)
1990 break;
1991
1992 schedule();
1993
1994 ret = -EINTR;
1995 if (signal_pending(current))
1996 break;
1997 } while (1);
1998
1999 finish_wait(&ctx->wait, &wait);
2000
2001 if (sig)
2002 restore_user_sigmask(sig, &sigsaved);
2003
2004 return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2005}
2006
2007static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2008{
2009#if defined(CONFIG_UNIX)
2010 if (ctx->ring_sock) {
2011 struct sock *sock = ctx->ring_sock->sk;
2012 struct sk_buff *skb;
2013
2014 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2015 kfree_skb(skb);
2016 }
2017#else
2018 int i;
2019
2020 for (i = 0; i < ctx->nr_user_files; i++)
2021 fput(ctx->user_files[i]);
2022#endif
2023}
2024
2025static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2026{
2027 if (!ctx->user_files)
2028 return -ENXIO;
2029
2030 __io_sqe_files_unregister(ctx);
2031 kfree(ctx->user_files);
2032 ctx->user_files = NULL;
2033 ctx->nr_user_files = 0;
2034 return 0;
2035}
2036
2037static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2038{
2039 if (ctx->sqo_thread) {
2040 ctx->sqo_stop = 1;
2041 mb();
2042 kthread_stop(ctx->sqo_thread);
2043 ctx->sqo_thread = NULL;
2044 }
2045}
2046
2047static void io_finish_async(struct io_ring_ctx *ctx)
2048{
2049 io_sq_thread_stop(ctx);
2050
2051 if (ctx->sqo_wq) {
2052 destroy_workqueue(ctx->sqo_wq);
2053 ctx->sqo_wq = NULL;
2054 }
2055}
2056
2057#if defined(CONFIG_UNIX)
2058static void io_destruct_skb(struct sk_buff *skb)
2059{
2060 struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2061
2062 io_finish_async(ctx);
2063 unix_destruct_scm(skb);
2064}
2065
2066/*
2067 * Ensure the UNIX gc is aware of our file set, so we are certain that
2068 * the io_uring can be safely unregistered on process exit, even if we have
2069 * loops in the file referencing.
2070 */
2071static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2072{
2073 struct sock *sk = ctx->ring_sock->sk;
2074 struct scm_fp_list *fpl;
2075 struct sk_buff *skb;
2076 int i;
2077
2078 if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2079 unsigned long inflight = ctx->user->unix_inflight + nr;
2080
2081 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2082 return -EMFILE;
2083 }
2084
2085 fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2086 if (!fpl)
2087 return -ENOMEM;
2088
2089 skb = alloc_skb(0, GFP_KERNEL);
2090 if (!skb) {
2091 kfree(fpl);
2092 return -ENOMEM;
2093 }
2094
2095 skb->sk = sk;
2096 skb->destructor = io_destruct_skb;
2097
2098 fpl->user = get_uid(ctx->user);
2099 for (i = 0; i < nr; i++) {
2100 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2101 unix_inflight(fpl->user, fpl->fp[i]);
2102 }
2103
2104 fpl->max = fpl->count = nr;
2105 UNIXCB(skb).fp = fpl;
2106 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2107 skb_queue_head(&sk->sk_receive_queue, skb);
2108
2109 for (i = 0; i < nr; i++)
2110 fput(fpl->fp[i]);
2111
2112 return 0;
2113}
2114
2115/*
2116 * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2117 * causes regular reference counting to break down. We rely on the UNIX
2118 * garbage collection to take care of this problem for us.
2119 */
2120static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2121{
2122 unsigned left, total;
2123 int ret = 0;
2124
2125 total = 0;
2126 left = ctx->nr_user_files;
2127 while (left) {
2128 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2129 int ret;
2130
2131 ret = __io_sqe_files_scm(ctx, this_files, total);
2132 if (ret)
2133 break;
2134 left -= this_files;
2135 total += this_files;
2136 }
2137
2138 if (!ret)
2139 return 0;
2140
2141 while (total < ctx->nr_user_files) {
2142 fput(ctx->user_files[total]);
2143 total++;
2144 }
2145
2146 return ret;
2147}
2148#else
2149static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2150{
2151 return 0;
2152}
2153#endif
2154
2155static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2156 unsigned nr_args)
2157{
2158 __s32 __user *fds = (__s32 __user *) arg;
2159 int fd, ret = 0;
2160 unsigned i;
2161
2162 if (ctx->user_files)
2163 return -EBUSY;
2164 if (!nr_args)
2165 return -EINVAL;
2166 if (nr_args > IORING_MAX_FIXED_FILES)
2167 return -EMFILE;
2168
2169 ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2170 if (!ctx->user_files)
2171 return -ENOMEM;
2172
2173 for (i = 0; i < nr_args; i++) {
2174 ret = -EFAULT;
2175 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2176 break;
2177
2178 ctx->user_files[i] = fget(fd);
2179
2180 ret = -EBADF;
2181 if (!ctx->user_files[i])
2182 break;
2183 /*
2184 * Don't allow io_uring instances to be registered. If UNIX
2185 * isn't enabled, then this causes a reference cycle and this
2186 * instance can never get freed. If UNIX is enabled we'll
2187 * handle it just fine, but there's still no point in allowing
2188 * a ring fd as it doesn't support regular read/write anyway.
2189 */
2190 if (ctx->user_files[i]->f_op == &io_uring_fops) {
2191 fput(ctx->user_files[i]);
2192 break;
2193 }
2194 ctx->nr_user_files++;
2195 ret = 0;
2196 }
2197
2198 if (ret) {
2199 for (i = 0; i < ctx->nr_user_files; i++)
2200 fput(ctx->user_files[i]);
2201
2202 kfree(ctx->user_files);
2203 ctx->nr_user_files = 0;
2204 return ret;
2205 }
2206
2207 ret = io_sqe_files_scm(ctx);
2208 if (ret)
2209 io_sqe_files_unregister(ctx);
2210
2211 return ret;
2212}
2213
2214static int io_sq_offload_start(struct io_ring_ctx *ctx,
2215 struct io_uring_params *p)
2216{
2217 int ret;
2218
2219 init_waitqueue_head(&ctx->sqo_wait);
2220 mmgrab(current->mm);
2221 ctx->sqo_mm = current->mm;
2222
2223 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2224 if (!ctx->sq_thread_idle)
2225 ctx->sq_thread_idle = HZ;
2226
2227 ret = -EINVAL;
2228 if (!cpu_possible(p->sq_thread_cpu))
2229 goto err;
2230
2231 if (ctx->flags & IORING_SETUP_SQPOLL) {
2232 if (p->flags & IORING_SETUP_SQ_AFF) {
2233 int cpu;
2234
2235 cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS);
2236 ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2237 ctx, cpu,
2238 "io_uring-sq");
2239 } else {
2240 ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2241 "io_uring-sq");
2242 }
2243 if (IS_ERR(ctx->sqo_thread)) {
2244 ret = PTR_ERR(ctx->sqo_thread);
2245 ctx->sqo_thread = NULL;
2246 goto err;
2247 }
2248 wake_up_process(ctx->sqo_thread);
2249 } else if (p->flags & IORING_SETUP_SQ_AFF) {
2250 /* Can't have SQ_AFF without SQPOLL */
2251 ret = -EINVAL;
2252 goto err;
2253 }
2254
2255 /* Do QD, or 2 * CPUS, whatever is smallest */
2256 ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2257 min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2258 if (!ctx->sqo_wq) {
2259 ret = -ENOMEM;
2260 goto err;
2261 }
2262
2263 return 0;
2264err:
2265 io_sq_thread_stop(ctx);
2266 mmdrop(ctx->sqo_mm);
2267 ctx->sqo_mm = NULL;
2268 return ret;
2269}
2270
2271static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2272{
2273 atomic_long_sub(nr_pages, &user->locked_vm);
2274}
2275
2276static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2277{
2278 unsigned long page_limit, cur_pages, new_pages;
2279
2280 /* Don't allow more pages than we can safely lock */
2281 page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2282
2283 do {
2284 cur_pages = atomic_long_read(&user->locked_vm);
2285 new_pages = cur_pages + nr_pages;
2286 if (new_pages > page_limit)
2287 return -ENOMEM;
2288 } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2289 new_pages) != cur_pages);
2290
2291 return 0;
2292}
2293
2294static void io_mem_free(void *ptr)
2295{
2296 struct page *page = virt_to_head_page(ptr);
2297
2298 if (put_page_testzero(page))
2299 free_compound_page(page);
2300}
2301
2302static void *io_mem_alloc(size_t size)
2303{
2304 gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2305 __GFP_NORETRY;
2306
2307 return (void *) __get_free_pages(gfp_flags, get_order(size));
2308}
2309
2310static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2311{
2312 struct io_sq_ring *sq_ring;
2313 struct io_cq_ring *cq_ring;
2314 size_t bytes;
2315
2316 bytes = struct_size(sq_ring, array, sq_entries);
2317 bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2318 bytes += struct_size(cq_ring, cqes, cq_entries);
2319
2320 return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2321}
2322
2323static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2324{
2325 int i, j;
2326
2327 if (!ctx->user_bufs)
2328 return -ENXIO;
2329
2330 for (i = 0; i < ctx->nr_user_bufs; i++) {
2331 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2332
2333 for (j = 0; j < imu->nr_bvecs; j++)
2334 put_page(imu->bvec[j].bv_page);
2335
2336 if (ctx->account_mem)
2337 io_unaccount_mem(ctx->user, imu->nr_bvecs);
2338 kfree(imu->bvec);
2339 imu->nr_bvecs = 0;
2340 }
2341
2342 kfree(ctx->user_bufs);
2343 ctx->user_bufs = NULL;
2344 ctx->nr_user_bufs = 0;
2345 return 0;
2346}
2347
2348static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2349 void __user *arg, unsigned index)
2350{
2351 struct iovec __user *src;
2352
2353#ifdef CONFIG_COMPAT
2354 if (ctx->compat) {
2355 struct compat_iovec __user *ciovs;
2356 struct compat_iovec ciov;
2357
2358 ciovs = (struct compat_iovec __user *) arg;
2359 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2360 return -EFAULT;
2361
2362 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2363 dst->iov_len = ciov.iov_len;
2364 return 0;
2365 }
2366#endif
2367 src = (struct iovec __user *) arg;
2368 if (copy_from_user(dst, &src[index], sizeof(*dst)))
2369 return -EFAULT;
2370 return 0;
2371}
2372
2373static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2374 unsigned nr_args)
2375{
2376 struct vm_area_struct **vmas = NULL;
2377 struct page **pages = NULL;
2378 int i, j, got_pages = 0;
2379 int ret = -EINVAL;
2380
2381 if (ctx->user_bufs)
2382 return -EBUSY;
2383 if (!nr_args || nr_args > UIO_MAXIOV)
2384 return -EINVAL;
2385
2386 ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2387 GFP_KERNEL);
2388 if (!ctx->user_bufs)
2389 return -ENOMEM;
2390
2391 for (i = 0; i < nr_args; i++) {
2392 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2393 unsigned long off, start, end, ubuf;
2394 int pret, nr_pages;
2395 struct iovec iov;
2396 size_t size;
2397
2398 ret = io_copy_iov(ctx, &iov, arg, i);
2399 if (ret)
2400 break;
2401
2402 /*
2403 * Don't impose further limits on the size and buffer
2404 * constraints here, we'll -EINVAL later when IO is
2405 * submitted if they are wrong.
2406 */
2407 ret = -EFAULT;
2408 if (!iov.iov_base || !iov.iov_len)
2409 goto err;
2410
2411 /* arbitrary limit, but we need something */
2412 if (iov.iov_len > SZ_1G)
2413 goto err;
2414
2415 ubuf = (unsigned long) iov.iov_base;
2416 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2417 start = ubuf >> PAGE_SHIFT;
2418 nr_pages = end - start;
2419
2420 if (ctx->account_mem) {
2421 ret = io_account_mem(ctx->user, nr_pages);
2422 if (ret)
2423 goto err;
2424 }
2425
2426 ret = 0;
2427 if (!pages || nr_pages > got_pages) {
2428 kfree(vmas);
2429 kfree(pages);
2430 pages = kmalloc_array(nr_pages, sizeof(struct page *),
2431 GFP_KERNEL);
2432 vmas = kmalloc_array(nr_pages,
2433 sizeof(struct vm_area_struct *),
2434 GFP_KERNEL);
2435 if (!pages || !vmas) {
2436 ret = -ENOMEM;
2437 if (ctx->account_mem)
2438 io_unaccount_mem(ctx->user, nr_pages);
2439 goto err;
2440 }
2441 got_pages = nr_pages;
2442 }
2443
2444 imu->bvec = kmalloc_array(nr_pages, sizeof(struct bio_vec),
2445 GFP_KERNEL);
2446 ret = -ENOMEM;
2447 if (!imu->bvec) {
2448 if (ctx->account_mem)
2449 io_unaccount_mem(ctx->user, nr_pages);
2450 goto err;
2451 }
2452
2453 ret = 0;
2454 down_read(&current->mm->mmap_sem);
2455 pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE,
2456 pages, vmas);
2457 if (pret == nr_pages) {
2458 /* don't support file backed memory */
2459 for (j = 0; j < nr_pages; j++) {
2460 struct vm_area_struct *vma = vmas[j];
2461
2462 if (vma->vm_file &&
2463 !is_file_hugepages(vma->vm_file)) {
2464 ret = -EOPNOTSUPP;
2465 break;
2466 }
2467 }
2468 } else {
2469 ret = pret < 0 ? pret : -EFAULT;
2470 }
2471 up_read(&current->mm->mmap_sem);
2472 if (ret) {
2473 /*
2474 * if we did partial map, or found file backed vmas,
2475 * release any pages we did get
2476 */
2477 if (pret > 0) {
2478 for (j = 0; j < pret; j++)
2479 put_page(pages[j]);
2480 }
2481 if (ctx->account_mem)
2482 io_unaccount_mem(ctx->user, nr_pages);
2483 goto err;
2484 }
2485
2486 off = ubuf & ~PAGE_MASK;
2487 size = iov.iov_len;
2488 for (j = 0; j < nr_pages; j++) {
2489 size_t vec_len;
2490
2491 vec_len = min_t(size_t, size, PAGE_SIZE - off);
2492 imu->bvec[j].bv_page = pages[j];
2493 imu->bvec[j].bv_len = vec_len;
2494 imu->bvec[j].bv_offset = off;
2495 off = 0;
2496 size -= vec_len;
2497 }
2498 /* store original address for later verification */
2499 imu->ubuf = ubuf;
2500 imu->len = iov.iov_len;
2501 imu->nr_bvecs = nr_pages;
2502
2503 ctx->nr_user_bufs++;
2504 }
2505 kfree(pages);
2506 kfree(vmas);
2507 return 0;
2508err:
2509 kfree(pages);
2510 kfree(vmas);
2511 io_sqe_buffer_unregister(ctx);
2512 return ret;
2513}
2514
2515static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2516{
2517 io_finish_async(ctx);
2518 if (ctx->sqo_mm)
2519 mmdrop(ctx->sqo_mm);
2520
2521 io_iopoll_reap_events(ctx);
2522 io_sqe_buffer_unregister(ctx);
2523 io_sqe_files_unregister(ctx);
2524
2525#if defined(CONFIG_UNIX)
2526 if (ctx->ring_sock)
2527 sock_release(ctx->ring_sock);
2528#endif
2529
2530 io_mem_free(ctx->sq_ring);
2531 io_mem_free(ctx->sq_sqes);
2532 io_mem_free(ctx->cq_ring);
2533
2534 percpu_ref_exit(&ctx->refs);
2535 if (ctx->account_mem)
2536 io_unaccount_mem(ctx->user,
2537 ring_pages(ctx->sq_entries, ctx->cq_entries));
2538 free_uid(ctx->user);
2539 kfree(ctx);
2540}
2541
2542static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2543{
2544 struct io_ring_ctx *ctx = file->private_data;
2545 __poll_t mask = 0;
2546
2547 poll_wait(file, &ctx->cq_wait, wait);
2548 /* See comment at the top of this file */
2549 smp_rmb();
2550 if (READ_ONCE(ctx->sq_ring->r.tail) + 1 != ctx->cached_sq_head)
2551 mask |= EPOLLOUT | EPOLLWRNORM;
2552 if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2553 mask |= EPOLLIN | EPOLLRDNORM;
2554
2555 return mask;
2556}
2557
2558static int io_uring_fasync(int fd, struct file *file, int on)
2559{
2560 struct io_ring_ctx *ctx = file->private_data;
2561
2562 return fasync_helper(fd, file, on, &ctx->cq_fasync);
2563}
2564
2565static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2566{
2567 mutex_lock(&ctx->uring_lock);
2568 percpu_ref_kill(&ctx->refs);
2569 mutex_unlock(&ctx->uring_lock);
2570
2571 io_poll_remove_all(ctx);
2572 io_iopoll_reap_events(ctx);
2573 wait_for_completion(&ctx->ctx_done);
2574 io_ring_ctx_free(ctx);
2575}
2576
2577static int io_uring_release(struct inode *inode, struct file *file)
2578{
2579 struct io_ring_ctx *ctx = file->private_data;
2580
2581 file->private_data = NULL;
2582 io_ring_ctx_wait_and_kill(ctx);
2583 return 0;
2584}
2585
2586static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2587{
2588 loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2589 unsigned long sz = vma->vm_end - vma->vm_start;
2590 struct io_ring_ctx *ctx = file->private_data;
2591 unsigned long pfn;
2592 struct page *page;
2593 void *ptr;
2594
2595 switch (offset) {
2596 case IORING_OFF_SQ_RING:
2597 ptr = ctx->sq_ring;
2598 break;
2599 case IORING_OFF_SQES:
2600 ptr = ctx->sq_sqes;
2601 break;
2602 case IORING_OFF_CQ_RING:
2603 ptr = ctx->cq_ring;
2604 break;
2605 default:
2606 return -EINVAL;
2607 }
2608
2609 page = virt_to_head_page(ptr);
2610 if (sz > (PAGE_SIZE << compound_order(page)))
2611 return -EINVAL;
2612
2613 pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2614 return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2615}
2616
2617SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2618 u32, min_complete, u32, flags, const sigset_t __user *, sig,
2619 size_t, sigsz)
2620{
2621 struct io_ring_ctx *ctx;
2622 long ret = -EBADF;
2623 int submitted = 0;
2624 struct fd f;
2625
2626 if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2627 return -EINVAL;
2628
2629 f = fdget(fd);
2630 if (!f.file)
2631 return -EBADF;
2632
2633 ret = -EOPNOTSUPP;
2634 if (f.file->f_op != &io_uring_fops)
2635 goto out_fput;
2636
2637 ret = -ENXIO;
2638 ctx = f.file->private_data;
2639 if (!percpu_ref_tryget(&ctx->refs))
2640 goto out_fput;
2641
2642 /*
2643 * For SQ polling, the thread will do all submissions and completions.
2644 * Just return the requested submit count, and wake the thread if
2645 * we were asked to.
2646 */
2647 if (ctx->flags & IORING_SETUP_SQPOLL) {
2648 if (flags & IORING_ENTER_SQ_WAKEUP)
2649 wake_up(&ctx->sqo_wait);
2650 submitted = to_submit;
2651 goto out_ctx;
2652 }
2653
2654 ret = 0;
2655 if (to_submit) {
2656 to_submit = min(to_submit, ctx->sq_entries);
2657
2658 mutex_lock(&ctx->uring_lock);
2659 submitted = io_ring_submit(ctx, to_submit);
2660 mutex_unlock(&ctx->uring_lock);
2661
2662 if (submitted < 0)
2663 goto out_ctx;
2664 }
2665 if (flags & IORING_ENTER_GETEVENTS) {
2666 unsigned nr_events = 0;
2667
2668 min_complete = min(min_complete, ctx->cq_entries);
2669
2670 /*
2671 * The application could have included the 'to_submit' count
2672 * in how many events it wanted to wait for. If we failed to
2673 * submit the desired count, we may need to adjust the number
2674 * of events to poll/wait for.
2675 */
2676 if (submitted < to_submit)
2677 min_complete = min_t(unsigned, submitted, min_complete);
2678
2679 if (ctx->flags & IORING_SETUP_IOPOLL) {
2680 mutex_lock(&ctx->uring_lock);
2681 ret = io_iopoll_check(ctx, &nr_events, min_complete);
2682 mutex_unlock(&ctx->uring_lock);
2683 } else {
2684 ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2685 }
2686 }
2687
2688out_ctx:
2689 io_ring_drop_ctx_refs(ctx, 1);
2690out_fput:
2691 fdput(f);
2692 return submitted ? submitted : ret;
2693}
2694
2695static const struct file_operations io_uring_fops = {
2696 .release = io_uring_release,
2697 .mmap = io_uring_mmap,
2698 .poll = io_uring_poll,
2699 .fasync = io_uring_fasync,
2700};
2701
2702static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2703 struct io_uring_params *p)
2704{
2705 struct io_sq_ring *sq_ring;
2706 struct io_cq_ring *cq_ring;
2707 size_t size;
2708
2709 sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2710 if (!sq_ring)
2711 return -ENOMEM;
2712
2713 ctx->sq_ring = sq_ring;
2714 sq_ring->ring_mask = p->sq_entries - 1;
2715 sq_ring->ring_entries = p->sq_entries;
2716 ctx->sq_mask = sq_ring->ring_mask;
2717 ctx->sq_entries = sq_ring->ring_entries;
2718
2719 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2720 if (size == SIZE_MAX)
2721 return -EOVERFLOW;
2722
2723 ctx->sq_sqes = io_mem_alloc(size);
2724 if (!ctx->sq_sqes) {
2725 io_mem_free(ctx->sq_ring);
2726 return -ENOMEM;
2727 }
2728
2729 cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2730 if (!cq_ring) {
2731 io_mem_free(ctx->sq_ring);
2732 io_mem_free(ctx->sq_sqes);
2733 return -ENOMEM;
2734 }
2735
2736 ctx->cq_ring = cq_ring;
2737 cq_ring->ring_mask = p->cq_entries - 1;
2738 cq_ring->ring_entries = p->cq_entries;
2739 ctx->cq_mask = cq_ring->ring_mask;
2740 ctx->cq_entries = cq_ring->ring_entries;
2741 return 0;
2742}
2743
2744/*
2745 * Allocate an anonymous fd, this is what constitutes the application
2746 * visible backing of an io_uring instance. The application mmaps this
2747 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
2748 * we have to tie this fd to a socket for file garbage collection purposes.
2749 */
2750static int io_uring_get_fd(struct io_ring_ctx *ctx)
2751{
2752 struct file *file;
2753 int ret;
2754
2755#if defined(CONFIG_UNIX)
2756 ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
2757 &ctx->ring_sock);
2758 if (ret)
2759 return ret;
2760#endif
2761
2762 ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
2763 if (ret < 0)
2764 goto err;
2765
2766 file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
2767 O_RDWR | O_CLOEXEC);
2768 if (IS_ERR(file)) {
2769 put_unused_fd(ret);
2770 ret = PTR_ERR(file);
2771 goto err;
2772 }
2773
2774#if defined(CONFIG_UNIX)
2775 ctx->ring_sock->file = file;
2776 ctx->ring_sock->sk->sk_user_data = ctx;
2777#endif
2778 fd_install(ret, file);
2779 return ret;
2780err:
2781#if defined(CONFIG_UNIX)
2782 sock_release(ctx->ring_sock);
2783 ctx->ring_sock = NULL;
2784#endif
2785 return ret;
2786}
2787
2788static int io_uring_create(unsigned entries, struct io_uring_params *p)
2789{
2790 struct user_struct *user = NULL;
2791 struct io_ring_ctx *ctx;
2792 bool account_mem;
2793 int ret;
2794
2795 if (!entries || entries > IORING_MAX_ENTRIES)
2796 return -EINVAL;
2797
2798 /*
2799 * Use twice as many entries for the CQ ring. It's possible for the
2800 * application to drive a higher depth than the size of the SQ ring,
2801 * since the sqes are only used at submission time. This allows for
2802 * some flexibility in overcommitting a bit.
2803 */
2804 p->sq_entries = roundup_pow_of_two(entries);
2805 p->cq_entries = 2 * p->sq_entries;
2806
2807 user = get_uid(current_user());
2808 account_mem = !capable(CAP_IPC_LOCK);
2809
2810 if (account_mem) {
2811 ret = io_account_mem(user,
2812 ring_pages(p->sq_entries, p->cq_entries));
2813 if (ret) {
2814 free_uid(user);
2815 return ret;
2816 }
2817 }
2818
2819 ctx = io_ring_ctx_alloc(p);
2820 if (!ctx) {
2821 if (account_mem)
2822 io_unaccount_mem(user, ring_pages(p->sq_entries,
2823 p->cq_entries));
2824 free_uid(user);
2825 return -ENOMEM;
2826 }
2827 ctx->compat = in_compat_syscall();
2828 ctx->account_mem = account_mem;
2829 ctx->user = user;
2830
2831 ret = io_allocate_scq_urings(ctx, p);
2832 if (ret)
2833 goto err;
2834
2835 ret = io_sq_offload_start(ctx, p);
2836 if (ret)
2837 goto err;
2838
2839 ret = io_uring_get_fd(ctx);
2840 if (ret < 0)
2841 goto err;
2842
2843 memset(&p->sq_off, 0, sizeof(p->sq_off));
2844 p->sq_off.head = offsetof(struct io_sq_ring, r.head);
2845 p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
2846 p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
2847 p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
2848 p->sq_off.flags = offsetof(struct io_sq_ring, flags);
2849 p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
2850 p->sq_off.array = offsetof(struct io_sq_ring, array);
2851
2852 memset(&p->cq_off, 0, sizeof(p->cq_off));
2853 p->cq_off.head = offsetof(struct io_cq_ring, r.head);
2854 p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
2855 p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
2856 p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
2857 p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
2858 p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
2859 return ret;
2860err:
2861 io_ring_ctx_wait_and_kill(ctx);
2862 return ret;
2863}
2864
2865/*
2866 * Sets up an aio uring context, and returns the fd. Applications asks for a
2867 * ring size, we return the actual sq/cq ring sizes (among other things) in the
2868 * params structure passed in.
2869 */
2870static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
2871{
2872 struct io_uring_params p;
2873 long ret;
2874 int i;
2875
2876 if (copy_from_user(&p, params, sizeof(p)))
2877 return -EFAULT;
2878 for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
2879 if (p.resv[i])
2880 return -EINVAL;
2881 }
2882
2883 if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
2884 IORING_SETUP_SQ_AFF))
2885 return -EINVAL;
2886
2887 ret = io_uring_create(entries, &p);
2888 if (ret < 0)
2889 return ret;
2890
2891 if (copy_to_user(params, &p, sizeof(p)))
2892 return -EFAULT;
2893
2894 return ret;
2895}
2896
2897SYSCALL_DEFINE2(io_uring_setup, u32, entries,
2898 struct io_uring_params __user *, params)
2899{
2900 return io_uring_setup(entries, params);
2901}
2902
2903static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
2904 void __user *arg, unsigned nr_args)
2905{
2906 int ret;
2907
2908 percpu_ref_kill(&ctx->refs);
2909 wait_for_completion(&ctx->ctx_done);
2910
2911 switch (opcode) {
2912 case IORING_REGISTER_BUFFERS:
2913 ret = io_sqe_buffer_register(ctx, arg, nr_args);
2914 break;
2915 case IORING_UNREGISTER_BUFFERS:
2916 ret = -EINVAL;
2917 if (arg || nr_args)
2918 break;
2919 ret = io_sqe_buffer_unregister(ctx);
2920 break;
2921 case IORING_REGISTER_FILES:
2922 ret = io_sqe_files_register(ctx, arg, nr_args);
2923 break;
2924 case IORING_UNREGISTER_FILES:
2925 ret = -EINVAL;
2926 if (arg || nr_args)
2927 break;
2928 ret = io_sqe_files_unregister(ctx);
2929 break;
2930 default:
2931 ret = -EINVAL;
2932 break;
2933 }
2934
2935 /* bring the ctx back to life */
2936 reinit_completion(&ctx->ctx_done);
2937 percpu_ref_reinit(&ctx->refs);
2938 return ret;
2939}
2940
2941SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
2942 void __user *, arg, unsigned int, nr_args)
2943{
2944 struct io_ring_ctx *ctx;
2945 long ret = -EBADF;
2946 struct fd f;
2947
2948 f = fdget(fd);
2949 if (!f.file)
2950 return -EBADF;
2951
2952 ret = -EOPNOTSUPP;
2953 if (f.file->f_op != &io_uring_fops)
2954 goto out_fput;
2955
2956 ctx = f.file->private_data;
2957
2958 mutex_lock(&ctx->uring_lock);
2959 ret = __io_uring_register(ctx, opcode, arg, nr_args);
2960 mutex_unlock(&ctx->uring_lock);
2961out_fput:
2962 fdput(f);
2963 return ret;
2964}
2965
2966static int __init io_uring_init(void)
2967{
2968 req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
2969 return 0;
2970};
2971__initcall(io_uring_init);
diff --git a/include/linux/file.h b/include/linux/file.h
index 6b2fb032416c..3fcddff56bc4 100644
--- a/include/linux/file.h
+++ b/include/linux/file.h
@@ -13,6 +13,7 @@
13struct file; 13struct file;
14 14
15extern void fput(struct file *); 15extern void fput(struct file *);
16extern void fput_many(struct file *, unsigned int);
16 17
17struct file_operations; 18struct file_operations;
18struct vfsmount; 19struct vfsmount;
@@ -44,6 +45,7 @@ static inline void fdput(struct fd fd)
44} 45}
45 46
46extern struct file *fget(unsigned int fd); 47extern struct file *fget(unsigned int fd);
48extern struct file *fget_many(unsigned int fd, unsigned int refs);
47extern struct file *fget_raw(unsigned int fd); 49extern struct file *fget_raw(unsigned int fd);
48extern unsigned long __fdget(unsigned int fd); 50extern unsigned long __fdget(unsigned int fd);
49extern unsigned long __fdget_raw(unsigned int fd); 51extern unsigned long __fdget_raw(unsigned int fd);
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 7442329a0011..0a257d89208e 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -961,7 +961,9 @@ static inline struct file *get_file(struct file *f)
961 atomic_long_inc(&f->f_count); 961 atomic_long_inc(&f->f_count);
962 return f; 962 return f;
963} 963}
964#define get_file_rcu(x) atomic_long_inc_not_zero(&(x)->f_count) 964#define get_file_rcu_many(x, cnt) \
965 atomic_long_add_unless(&(x)->f_count, (cnt), 0)
966#define get_file_rcu(x) get_file_rcu_many((x), 1)
965#define fput_atomic(x) atomic_long_add_unless(&(x)->f_count, -1, 1) 967#define fput_atomic(x) atomic_long_add_unless(&(x)->f_count, -1, 1)
966#define file_count(x) atomic_long_read(&(x)->f_count) 968#define file_count(x) atomic_long_read(&(x)->f_count)
967 969
@@ -3511,4 +3513,13 @@ extern void inode_nohighmem(struct inode *inode);
3511extern int vfs_fadvise(struct file *file, loff_t offset, loff_t len, 3513extern int vfs_fadvise(struct file *file, loff_t offset, loff_t len,
3512 int advice); 3514 int advice);
3513 3515
3516#if defined(CONFIG_IO_URING)
3517extern struct sock *io_uring_get_socket(struct file *file);
3518#else
3519static inline struct sock *io_uring_get_socket(struct file *file)
3520{
3521 return NULL;
3522}
3523#endif
3524
3514#endif /* _LINUX_FS_H */ 3525#endif /* _LINUX_FS_H */
diff --git a/include/linux/sched/user.h b/include/linux/sched/user.h
index 39ad98c09c58..c7b5f86b91a1 100644
--- a/include/linux/sched/user.h
+++ b/include/linux/sched/user.h
@@ -40,7 +40,7 @@ struct user_struct {
40 kuid_t uid; 40 kuid_t uid;
41 41
42#if defined(CONFIG_PERF_EVENTS) || defined(CONFIG_BPF_SYSCALL) || \ 42#if defined(CONFIG_PERF_EVENTS) || defined(CONFIG_BPF_SYSCALL) || \
43 defined(CONFIG_NET) 43 defined(CONFIG_NET) || defined(CONFIG_IO_URING)
44 atomic_long_t locked_vm; 44 atomic_long_t locked_vm;
45#endif 45#endif
46 46
diff --git a/include/linux/syscalls.h b/include/linux/syscalls.h
index 94369f5bd8e5..c2962953bf11 100644
--- a/include/linux/syscalls.h
+++ b/include/linux/syscalls.h
@@ -69,6 +69,7 @@ struct file_handle;
69struct sigaltstack; 69struct sigaltstack;
70struct rseq; 70struct rseq;
71union bpf_attr; 71union bpf_attr;
72struct io_uring_params;
72 73
73#include <linux/types.h> 74#include <linux/types.h>
74#include <linux/aio_abi.h> 75#include <linux/aio_abi.h>
@@ -314,6 +315,13 @@ asmlinkage long sys_io_pgetevents_time32(aio_context_t ctx_id,
314 struct io_event __user *events, 315 struct io_event __user *events,
315 struct old_timespec32 __user *timeout, 316 struct old_timespec32 __user *timeout,
316 const struct __aio_sigset *sig); 317 const struct __aio_sigset *sig);
318asmlinkage long sys_io_uring_setup(u32 entries,
319 struct io_uring_params __user *p);
320asmlinkage long sys_io_uring_enter(unsigned int fd, u32 to_submit,
321 u32 min_complete, u32 flags,
322 const sigset_t __user *sig, size_t sigsz);
323asmlinkage long sys_io_uring_register(unsigned int fd, unsigned int op,
324 void __user *arg, unsigned int nr_args);
317 325
318/* fs/xattr.c */ 326/* fs/xattr.c */
319asmlinkage long sys_setxattr(const char __user *path, const char __user *name, 327asmlinkage long sys_setxattr(const char __user *path, const char __user *name,
diff --git a/include/net/af_unix.h b/include/net/af_unix.h
index ddbba838d048..3426d6dacc45 100644
--- a/include/net/af_unix.h
+++ b/include/net/af_unix.h
@@ -10,6 +10,7 @@
10 10
11void unix_inflight(struct user_struct *user, struct file *fp); 11void unix_inflight(struct user_struct *user, struct file *fp);
12void unix_notinflight(struct user_struct *user, struct file *fp); 12void unix_notinflight(struct user_struct *user, struct file *fp);
13void unix_destruct_scm(struct sk_buff *skb);
13void unix_gc(void); 14void unix_gc(void);
14void wait_for_unix_gc(void); 15void wait_for_unix_gc(void);
15struct sock *unix_get_socket(struct file *filp); 16struct sock *unix_get_socket(struct file *filp);
diff --git a/include/uapi/asm-generic/unistd.h b/include/uapi/asm-generic/unistd.h
index 12cdf611d217..bf4624efe5e6 100644
--- a/include/uapi/asm-generic/unistd.h
+++ b/include/uapi/asm-generic/unistd.h
@@ -824,8 +824,15 @@ __SYSCALL(__NR_futex_time64, sys_futex)
824__SYSCALL(__NR_sched_rr_get_interval_time64, sys_sched_rr_get_interval) 824__SYSCALL(__NR_sched_rr_get_interval_time64, sys_sched_rr_get_interval)
825#endif 825#endif
826 826
827#define __NR_io_uring_setup 425
828__SYSCALL(__NR_io_uring_setup, sys_io_uring_setup)
829#define __NR_io_uring_enter 426
830__SYSCALL(__NR_io_uring_enter, sys_io_uring_enter)
831#define __NR_io_uring_register 427
832__SYSCALL(__NR_io_uring_register, sys_io_uring_register)
833
827#undef __NR_syscalls 834#undef __NR_syscalls
828#define __NR_syscalls 424 835#define __NR_syscalls 428
829 836
830/* 837/*
831 * 32 bit systems traditionally used different 838 * 32 bit systems traditionally used different
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
new file mode 100644
index 000000000000..e23408692118
--- /dev/null
+++ b/include/uapi/linux/io_uring.h
@@ -0,0 +1,137 @@
1/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
2/*
3 * Header file for the io_uring interface.
4 *
5 * Copyright (C) 2019 Jens Axboe
6 * Copyright (C) 2019 Christoph Hellwig
7 */
8#ifndef LINUX_IO_URING_H
9#define LINUX_IO_URING_H
10
11#include <linux/fs.h>
12#include <linux/types.h>
13
14/*
15 * IO submission data structure (Submission Queue Entry)
16 */
17struct io_uring_sqe {
18 __u8 opcode; /* type of operation for this sqe */
19 __u8 flags; /* IOSQE_ flags */
20 __u16 ioprio; /* ioprio for the request */
21 __s32 fd; /* file descriptor to do IO on */
22 __u64 off; /* offset into file */
23 __u64 addr; /* pointer to buffer or iovecs */
24 __u32 len; /* buffer size or number of iovecs */
25 union {
26 __kernel_rwf_t rw_flags;
27 __u32 fsync_flags;
28 __u16 poll_events;
29 };
30 __u64 user_data; /* data to be passed back at completion time */
31 union {
32 __u16 buf_index; /* index into fixed buffers, if used */
33 __u64 __pad2[3];
34 };
35};
36
37/*
38 * sqe->flags
39 */
40#define IOSQE_FIXED_FILE (1U << 0) /* use fixed fileset */
41
42/*
43 * io_uring_setup() flags
44 */
45#define IORING_SETUP_IOPOLL (1U << 0) /* io_context is polled */
46#define IORING_SETUP_SQPOLL (1U << 1) /* SQ poll thread */
47#define IORING_SETUP_SQ_AFF (1U << 2) /* sq_thread_cpu is valid */
48
49#define IORING_OP_NOP 0
50#define IORING_OP_READV 1
51#define IORING_OP_WRITEV 2
52#define IORING_OP_FSYNC 3
53#define IORING_OP_READ_FIXED 4
54#define IORING_OP_WRITE_FIXED 5
55#define IORING_OP_POLL_ADD 6
56#define IORING_OP_POLL_REMOVE 7
57
58/*
59 * sqe->fsync_flags
60 */
61#define IORING_FSYNC_DATASYNC (1U << 0)
62
63/*
64 * IO completion data structure (Completion Queue Entry)
65 */
66struct io_uring_cqe {
67 __u64 user_data; /* sqe->data submission passed back */
68 __s32 res; /* result code for this event */
69 __u32 flags;
70};
71
72/*
73 * Magic offsets for the application to mmap the data it needs
74 */
75#define IORING_OFF_SQ_RING 0ULL
76#define IORING_OFF_CQ_RING 0x8000000ULL
77#define IORING_OFF_SQES 0x10000000ULL
78
79/*
80 * Filled with the offset for mmap(2)
81 */
82struct io_sqring_offsets {
83 __u32 head;
84 __u32 tail;
85 __u32 ring_mask;
86 __u32 ring_entries;
87 __u32 flags;
88 __u32 dropped;
89 __u32 array;
90 __u32 resv1;
91 __u64 resv2;
92};
93
94/*
95 * sq_ring->flags
96 */
97#define IORING_SQ_NEED_WAKEUP (1U << 0) /* needs io_uring_enter wakeup */
98
99struct io_cqring_offsets {
100 __u32 head;
101 __u32 tail;
102 __u32 ring_mask;
103 __u32 ring_entries;
104 __u32 overflow;
105 __u32 cqes;
106 __u64 resv[2];
107};
108
109/*
110 * io_uring_enter(2) flags
111 */
112#define IORING_ENTER_GETEVENTS (1U << 0)
113#define IORING_ENTER_SQ_WAKEUP (1U << 1)
114
115/*
116 * Passed in for io_uring_setup(2). Copied back with updated info on success
117 */
118struct io_uring_params {
119 __u32 sq_entries;
120 __u32 cq_entries;
121 __u32 flags;
122 __u32 sq_thread_cpu;
123 __u32 sq_thread_idle;
124 __u32 resv[5];
125 struct io_sqring_offsets sq_off;
126 struct io_cqring_offsets cq_off;
127};
128
129/*
130 * io_uring_register(2) opcodes and arguments
131 */
132#define IORING_REGISTER_BUFFERS 0
133#define IORING_UNREGISTER_BUFFERS 1
134#define IORING_REGISTER_FILES 2
135#define IORING_UNREGISTER_FILES 3
136
137#endif
diff --git a/init/Kconfig b/init/Kconfig
index c9386a365eea..53b54214a36e 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -1414,6 +1414,15 @@ config AIO
1414 by some high performance threaded applications. Disabling 1414 by some high performance threaded applications. Disabling
1415 this option saves about 7k. 1415 this option saves about 7k.
1416 1416
1417config IO_URING
1418 bool "Enable IO uring support" if EXPERT
1419 select ANON_INODES
1420 default y
1421 help
1422 This option enables support for the io_uring interface, enabling
1423 applications to submit and complete IO through submission and
1424 completion rings that are shared between the kernel and application.
1425
1417config ADVISE_SYSCALLS 1426config ADVISE_SYSCALLS
1418 bool "Enable madvise/fadvise syscalls" if EXPERT 1427 bool "Enable madvise/fadvise syscalls" if EXPERT
1419 default y 1428 default y
diff --git a/kernel/sys_ni.c b/kernel/sys_ni.c
index 62a6c8707799..51d7c6794bf1 100644
--- a/kernel/sys_ni.c
+++ b/kernel/sys_ni.c
@@ -48,6 +48,9 @@ COND_SYSCALL(io_pgetevents_time32);
48COND_SYSCALL(io_pgetevents); 48COND_SYSCALL(io_pgetevents);
49COND_SYSCALL_COMPAT(io_pgetevents_time32); 49COND_SYSCALL_COMPAT(io_pgetevents_time32);
50COND_SYSCALL_COMPAT(io_pgetevents); 50COND_SYSCALL_COMPAT(io_pgetevents);
51COND_SYSCALL(io_uring_setup);
52COND_SYSCALL(io_uring_enter);
53COND_SYSCALL(io_uring_register);
51 54
52/* fs/xattr.c */ 55/* fs/xattr.c */
53 56
diff --git a/net/Makefile b/net/Makefile
index bdaf53925acd..449fc0b221f8 100644
--- a/net/Makefile
+++ b/net/Makefile
@@ -18,7 +18,7 @@ obj-$(CONFIG_NETFILTER) += netfilter/
18obj-$(CONFIG_INET) += ipv4/ 18obj-$(CONFIG_INET) += ipv4/
19obj-$(CONFIG_TLS) += tls/ 19obj-$(CONFIG_TLS) += tls/
20obj-$(CONFIG_XFRM) += xfrm/ 20obj-$(CONFIG_XFRM) += xfrm/
21obj-$(CONFIG_UNIX) += unix/ 21obj-$(CONFIG_UNIX_SCM) += unix/
22obj-$(CONFIG_NET) += ipv6/ 22obj-$(CONFIG_NET) += ipv6/
23obj-$(CONFIG_BPFILTER) += bpfilter/ 23obj-$(CONFIG_BPFILTER) += bpfilter/
24obj-$(CONFIG_PACKET) += packet/ 24obj-$(CONFIG_PACKET) += packet/
diff --git a/net/unix/Kconfig b/net/unix/Kconfig
index 8b31ab85d050..3b9e450656a4 100644
--- a/net/unix/Kconfig
+++ b/net/unix/Kconfig
@@ -19,6 +19,11 @@ config UNIX
19 19
20 Say Y unless you know what you are doing. 20 Say Y unless you know what you are doing.
21 21
22config UNIX_SCM
23 bool
24 depends on UNIX
25 default y
26
22config UNIX_DIAG 27config UNIX_DIAG
23 tristate "UNIX: socket monitoring interface" 28 tristate "UNIX: socket monitoring interface"
24 depends on UNIX 29 depends on UNIX
diff --git a/net/unix/Makefile b/net/unix/Makefile
index ffd0a275c3a7..54e58cc4f945 100644
--- a/net/unix/Makefile
+++ b/net/unix/Makefile
@@ -10,3 +10,5 @@ unix-$(CONFIG_SYSCTL) += sysctl_net_unix.o
10 10
11obj-$(CONFIG_UNIX_DIAG) += unix_diag.o 11obj-$(CONFIG_UNIX_DIAG) += unix_diag.o
12unix_diag-y := diag.o 12unix_diag-y := diag.o
13
14obj-$(CONFIG_UNIX_SCM) += scm.o
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index a95d479caeea..ddb838a1b74c 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -119,6 +119,8 @@
119#include <linux/freezer.h> 119#include <linux/freezer.h>
120#include <linux/file.h> 120#include <linux/file.h>
121 121
122#include "scm.h"
123
122struct hlist_head unix_socket_table[2 * UNIX_HASH_SIZE]; 124struct hlist_head unix_socket_table[2 * UNIX_HASH_SIZE];
123EXPORT_SYMBOL_GPL(unix_socket_table); 125EXPORT_SYMBOL_GPL(unix_socket_table);
124DEFINE_SPINLOCK(unix_table_lock); 126DEFINE_SPINLOCK(unix_table_lock);
@@ -1496,67 +1498,6 @@ out:
1496 return err; 1498 return err;
1497} 1499}
1498 1500
1499static void unix_detach_fds(struct scm_cookie *scm, struct sk_buff *skb)
1500{
1501 int i;
1502
1503 scm->fp = UNIXCB(skb).fp;
1504 UNIXCB(skb).fp = NULL;
1505
1506 for (i = scm->fp->count-1; i >= 0; i--)
1507 unix_notinflight(scm->fp->user, scm->fp->fp[i]);
1508}
1509
1510static void unix_destruct_scm(struct sk_buff *skb)
1511{
1512 struct scm_cookie scm;
1513 memset(&scm, 0, sizeof(scm));
1514 scm.pid = UNIXCB(skb).pid;
1515 if (UNIXCB(skb).fp)
1516 unix_detach_fds(&scm, skb);
1517
1518 /* Alas, it calls VFS */
1519 /* So fscking what? fput() had been SMP-safe since the last Summer */
1520 scm_destroy(&scm);
1521 sock_wfree(skb);
1522}
1523
1524/*
1525 * The "user->unix_inflight" variable is protected by the garbage
1526 * collection lock, and we just read it locklessly here. If you go
1527 * over the limit, there might be a tiny race in actually noticing
1528 * it across threads. Tough.
1529 */
1530static inline bool too_many_unix_fds(struct task_struct *p)
1531{
1532 struct user_struct *user = current_user();
1533
1534 if (unlikely(user->unix_inflight > task_rlimit(p, RLIMIT_NOFILE)))
1535 return !capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN);
1536 return false;
1537}
1538
1539static int unix_attach_fds(struct scm_cookie *scm, struct sk_buff *skb)
1540{
1541 int i;
1542
1543 if (too_many_unix_fds(current))
1544 return -ETOOMANYREFS;
1545
1546 /*
1547 * Need to duplicate file references for the sake of garbage
1548 * collection. Otherwise a socket in the fps might become a
1549 * candidate for GC while the skb is not yet queued.
1550 */
1551 UNIXCB(skb).fp = scm_fp_dup(scm->fp);
1552 if (!UNIXCB(skb).fp)
1553 return -ENOMEM;
1554
1555 for (i = scm->fp->count - 1; i >= 0; i--)
1556 unix_inflight(scm->fp->user, scm->fp->fp[i]);
1557 return 0;
1558}
1559
1560static int unix_scm_to_skb(struct scm_cookie *scm, struct sk_buff *skb, bool send_fds) 1501static int unix_scm_to_skb(struct scm_cookie *scm, struct sk_buff *skb, bool send_fds)
1561{ 1502{
1562 int err = 0; 1503 int err = 0;
diff --git a/net/unix/garbage.c b/net/unix/garbage.c
index c36757e72844..8bbe1b8e4ff7 100644
--- a/net/unix/garbage.c
+++ b/net/unix/garbage.c
@@ -86,77 +86,13 @@
86#include <net/scm.h> 86#include <net/scm.h>
87#include <net/tcp_states.h> 87#include <net/tcp_states.h>
88 88
89#include "scm.h"
90
89/* Internal data structures and random procedures: */ 91/* Internal data structures and random procedures: */
90 92
91static LIST_HEAD(gc_inflight_list);
92static LIST_HEAD(gc_candidates); 93static LIST_HEAD(gc_candidates);
93static DEFINE_SPINLOCK(unix_gc_lock);
94static DECLARE_WAIT_QUEUE_HEAD(unix_gc_wait); 94static DECLARE_WAIT_QUEUE_HEAD(unix_gc_wait);
95 95
96unsigned int unix_tot_inflight;
97
98struct sock *unix_get_socket(struct file *filp)
99{
100 struct sock *u_sock = NULL;
101 struct inode *inode = file_inode(filp);
102
103 /* Socket ? */
104 if (S_ISSOCK(inode->i_mode) && !(filp->f_mode & FMODE_PATH)) {
105 struct socket *sock = SOCKET_I(inode);
106 struct sock *s = sock->sk;
107
108 /* PF_UNIX ? */
109 if (s && sock->ops && sock->ops->family == PF_UNIX)
110 u_sock = s;
111 }
112 return u_sock;
113}
114
115/* Keep the number of times in flight count for the file
116 * descriptor if it is for an AF_UNIX socket.
117 */
118
119void unix_inflight(struct user_struct *user, struct file *fp)
120{
121 struct sock *s = unix_get_socket(fp);
122
123 spin_lock(&unix_gc_lock);
124
125 if (s) {
126 struct unix_sock *u = unix_sk(s);
127
128 if (atomic_long_inc_return(&u->inflight) == 1) {
129 BUG_ON(!list_empty(&u->link));
130 list_add_tail(&u->link, &gc_inflight_list);
131 } else {
132 BUG_ON(list_empty(&u->link));
133 }
134 unix_tot_inflight++;
135 }
136 user->unix_inflight++;
137 spin_unlock(&unix_gc_lock);
138}
139
140void unix_notinflight(struct user_struct *user, struct file *fp)
141{
142 struct sock *s = unix_get_socket(fp);
143
144 spin_lock(&unix_gc_lock);
145
146 if (s) {
147 struct unix_sock *u = unix_sk(s);
148
149 BUG_ON(!atomic_long_read(&u->inflight));
150 BUG_ON(list_empty(&u->link));
151
152 if (atomic_long_dec_and_test(&u->inflight))
153 list_del_init(&u->link);
154 unix_tot_inflight--;
155 }
156 user->unix_inflight--;
157 spin_unlock(&unix_gc_lock);
158}
159
160static void scan_inflight(struct sock *x, void (*func)(struct unix_sock *), 96static void scan_inflight(struct sock *x, void (*func)(struct unix_sock *),
161 struct sk_buff_head *hitlist) 97 struct sk_buff_head *hitlist)
162{ 98{
diff --git a/net/unix/scm.c b/net/unix/scm.c
new file mode 100644
index 000000000000..8c40f2b32392
--- /dev/null
+++ b/net/unix/scm.c
@@ -0,0 +1,151 @@
1// SPDX-License-Identifier: GPL-2.0
2#include <linux/module.h>
3#include <linux/kernel.h>
4#include <linux/string.h>
5#include <linux/socket.h>
6#include <linux/net.h>
7#include <linux/fs.h>
8#include <net/af_unix.h>
9#include <net/scm.h>
10#include <linux/init.h>
11
12#include "scm.h"
13
14unsigned int unix_tot_inflight;
15EXPORT_SYMBOL(unix_tot_inflight);
16
17LIST_HEAD(gc_inflight_list);
18EXPORT_SYMBOL(gc_inflight_list);
19
20DEFINE_SPINLOCK(unix_gc_lock);
21EXPORT_SYMBOL(unix_gc_lock);
22
23struct sock *unix_get_socket(struct file *filp)
24{
25 struct sock *u_sock = NULL;
26 struct inode *inode = file_inode(filp);
27
28 /* Socket ? */
29 if (S_ISSOCK(inode->i_mode) && !(filp->f_mode & FMODE_PATH)) {
30 struct socket *sock = SOCKET_I(inode);
31 struct sock *s = sock->sk;
32
33 /* PF_UNIX ? */
34 if (s && sock->ops && sock->ops->family == PF_UNIX)
35 u_sock = s;
36 } else {
37 /* Could be an io_uring instance */
38 u_sock = io_uring_get_socket(filp);
39 }
40 return u_sock;
41}
42EXPORT_SYMBOL(unix_get_socket);
43
44/* Keep the number of times in flight count for the file
45 * descriptor if it is for an AF_UNIX socket.
46 */
47void unix_inflight(struct user_struct *user, struct file *fp)
48{
49 struct sock *s = unix_get_socket(fp);
50
51 spin_lock(&unix_gc_lock);
52
53 if (s) {
54 struct unix_sock *u = unix_sk(s);
55
56 if (atomic_long_inc_return(&u->inflight) == 1) {
57 BUG_ON(!list_empty(&u->link));
58 list_add_tail(&u->link, &gc_inflight_list);
59 } else {
60 BUG_ON(list_empty(&u->link));
61 }
62 unix_tot_inflight++;
63 }
64 user->unix_inflight++;
65 spin_unlock(&unix_gc_lock);
66}
67
68void unix_notinflight(struct user_struct *user, struct file *fp)
69{
70 struct sock *s = unix_get_socket(fp);
71
72 spin_lock(&unix_gc_lock);
73
74 if (s) {
75 struct unix_sock *u = unix_sk(s);
76
77 BUG_ON(!atomic_long_read(&u->inflight));
78 BUG_ON(list_empty(&u->link));
79
80 if (atomic_long_dec_and_test(&u->inflight))
81 list_del_init(&u->link);
82 unix_tot_inflight--;
83 }
84 user->unix_inflight--;
85 spin_unlock(&unix_gc_lock);
86}
87
88/*
89 * The "user->unix_inflight" variable is protected by the garbage
90 * collection lock, and we just read it locklessly here. If you go
91 * over the limit, there might be a tiny race in actually noticing
92 * it across threads. Tough.
93 */
94static inline bool too_many_unix_fds(struct task_struct *p)
95{
96 struct user_struct *user = current_user();
97
98 if (unlikely(user->unix_inflight > task_rlimit(p, RLIMIT_NOFILE)))
99 return !capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN);
100 return false;
101}
102
103int unix_attach_fds(struct scm_cookie *scm, struct sk_buff *skb)
104{
105 int i;
106
107 if (too_many_unix_fds(current))
108 return -ETOOMANYREFS;
109
110 /*
111 * Need to duplicate file references for the sake of garbage
112 * collection. Otherwise a socket in the fps might become a
113 * candidate for GC while the skb is not yet queued.
114 */
115 UNIXCB(skb).fp = scm_fp_dup(scm->fp);
116 if (!UNIXCB(skb).fp)
117 return -ENOMEM;
118
119 for (i = scm->fp->count - 1; i >= 0; i--)
120 unix_inflight(scm->fp->user, scm->fp->fp[i]);
121 return 0;
122}
123EXPORT_SYMBOL(unix_attach_fds);
124
125void unix_detach_fds(struct scm_cookie *scm, struct sk_buff *skb)
126{
127 int i;
128
129 scm->fp = UNIXCB(skb).fp;
130 UNIXCB(skb).fp = NULL;
131
132 for (i = scm->fp->count-1; i >= 0; i--)
133 unix_notinflight(scm->fp->user, scm->fp->fp[i]);
134}
135EXPORT_SYMBOL(unix_detach_fds);
136
137void unix_destruct_scm(struct sk_buff *skb)
138{
139 struct scm_cookie scm;
140
141 memset(&scm, 0, sizeof(scm));
142 scm.pid = UNIXCB(skb).pid;
143 if (UNIXCB(skb).fp)
144 unix_detach_fds(&scm, skb);
145
146 /* Alas, it calls VFS */
147 /* So fscking what? fput() had been SMP-safe since the last Summer */
148 scm_destroy(&scm);
149 sock_wfree(skb);
150}
151EXPORT_SYMBOL(unix_destruct_scm);
diff --git a/net/unix/scm.h b/net/unix/scm.h
new file mode 100644
index 000000000000..5a255a477f16
--- /dev/null
+++ b/net/unix/scm.h
@@ -0,0 +1,10 @@
1#ifndef NET_UNIX_SCM_H
2#define NET_UNIX_SCM_H
3
4extern struct list_head gc_inflight_list;
5extern spinlock_t unix_gc_lock;
6
7int unix_attach_fds(struct scm_cookie *scm, struct sk_buff *skb);
8void unix_detach_fds(struct scm_cookie *scm, struct sk_buff *skb);
9
10#endif
diff --git a/tools/io_uring/Makefile b/tools/io_uring/Makefile
new file mode 100644
index 000000000000..f79522fc37b5
--- /dev/null
+++ b/tools/io_uring/Makefile
@@ -0,0 +1,18 @@
1# SPDX-License-Identifier: GPL-2.0
2# Makefile for io_uring test tools
3CFLAGS += -Wall -Wextra -g -D_GNU_SOURCE
4LDLIBS += -lpthread
5
6all: io_uring-cp io_uring-bench
7%: %.c
8 $(CC) $(CFLAGS) -o $@ $^
9
10io_uring-bench: syscall.o io_uring-bench.o
11 $(CC) $(CFLAGS) $(LDLIBS) -o $@ $^
12
13io_uring-cp: setup.o syscall.o queue.o
14
15clean:
16 $(RM) io_uring-cp io_uring-bench *.o
17
18.PHONY: all clean
diff --git a/tools/io_uring/README b/tools/io_uring/README
new file mode 100644
index 000000000000..67fd70115cff
--- /dev/null
+++ b/tools/io_uring/README
@@ -0,0 +1,29 @@
1This directory includes a few programs that demonstrate how to use io_uring
2in an application. The examples are:
3
4io_uring-cp
5 A very basic io_uring implementation of cp(1). It takes two
6 arguments, copies the first argument to the second. This example
7 is part of liburing, and hence uses the simplified liburing API
8 for setting up an io_uring instance, submitting IO, completing IO,
9 etc. The support functions in queue.c and setup.c are straight
10 out of liburing.
11
12io_uring-bench
13 Benchmark program that does random reads on a number of files. This
14 app demonstrates the various features of io_uring, like fixed files,
15 fixed buffers, and polled IO. There are options in the program to
16 control which features to use. Arguments is the file (or files) that
17 io_uring-bench should operate on. This uses the raw io_uring
18 interface.
19
20liburing can be cloned with git here:
21
22 git://git.kernel.dk/liburing
23
24and contains a number of unit tests as well for testing io_uring. It also
25comes with man pages for the three system calls.
26
27Fio includes an io_uring engine, you can clone fio here:
28
29 git://git.kernel.dk/fio
diff --git a/tools/io_uring/barrier.h b/tools/io_uring/barrier.h
new file mode 100644
index 000000000000..ef00f6722ba9
--- /dev/null
+++ b/tools/io_uring/barrier.h
@@ -0,0 +1,16 @@
1#ifndef LIBURING_BARRIER_H
2#define LIBURING_BARRIER_H
3
4#if defined(__x86_64) || defined(__i386__)
5#define read_barrier() __asm__ __volatile__("":::"memory")
6#define write_barrier() __asm__ __volatile__("":::"memory")
7#else
8/*
9 * Add arch appropriate definitions. Be safe and use full barriers for
10 * archs we don't have support for.
11 */
12#define read_barrier() __sync_synchronize()
13#define write_barrier() __sync_synchronize()
14#endif
15
16#endif
diff --git a/tools/io_uring/io_uring-bench.c b/tools/io_uring/io_uring-bench.c
new file mode 100644
index 000000000000..512306a37531
--- /dev/null
+++ b/tools/io_uring/io_uring-bench.c
@@ -0,0 +1,616 @@
1// SPDX-License-Identifier: GPL-2.0
2/*
3 * Simple benchmark program that uses the various features of io_uring
4 * to provide fast random access to a device/file. It has various
5 * options that are control how we use io_uring, see the OPTIONS section
6 * below. This uses the raw io_uring interface.
7 *
8 * Copyright (C) 2018-2019 Jens Axboe
9 */
10#include <stdio.h>
11#include <errno.h>
12#include <assert.h>
13#include <stdlib.h>
14#include <stddef.h>
15#include <signal.h>
16#include <inttypes.h>
17
18#include <sys/types.h>
19#include <sys/stat.h>
20#include <sys/ioctl.h>
21#include <sys/syscall.h>
22#include <sys/resource.h>
23#include <sys/mman.h>
24#include <sys/uio.h>
25#include <linux/fs.h>
26#include <fcntl.h>
27#include <unistd.h>
28#include <string.h>
29#include <pthread.h>
30#include <sched.h>
31
32#include "liburing.h"
33#include "barrier.h"
34
35#ifndef IOCQE_FLAG_CACHEHIT
36#define IOCQE_FLAG_CACHEHIT (1U << 0)
37#endif
38
39#define min(a, b) ((a < b) ? (a) : (b))
40
41struct io_sq_ring {
42 unsigned *head;
43 unsigned *tail;
44 unsigned *ring_mask;
45 unsigned *ring_entries;
46 unsigned *flags;
47 unsigned *array;
48};
49
50struct io_cq_ring {
51 unsigned *head;
52 unsigned *tail;
53 unsigned *ring_mask;
54 unsigned *ring_entries;
55 struct io_uring_cqe *cqes;
56};
57
58#define DEPTH 128
59
60#define BATCH_SUBMIT 32
61#define BATCH_COMPLETE 32
62
63#define BS 4096
64
65#define MAX_FDS 16
66
67static unsigned sq_ring_mask, cq_ring_mask;
68
69struct file {
70 unsigned long max_blocks;
71 unsigned pending_ios;
72 int real_fd;
73 int fixed_fd;
74};
75
76struct submitter {
77 pthread_t thread;
78 int ring_fd;
79 struct drand48_data rand;
80 struct io_sq_ring sq_ring;
81 struct io_uring_sqe *sqes;
82 struct iovec iovecs[DEPTH];
83 struct io_cq_ring cq_ring;
84 int inflight;
85 unsigned long reaps;
86 unsigned long done;
87 unsigned long calls;
88 unsigned long cachehit, cachemiss;
89 volatile int finish;
90
91 __s32 *fds;
92
93 struct file files[MAX_FDS];
94 unsigned nr_files;
95 unsigned cur_file;
96};
97
98static struct submitter submitters[1];
99static volatile int finish;
100
101/*
102 * OPTIONS: Set these to test the various features of io_uring.
103 */
104static int polled = 1; /* use IO polling */
105static int fixedbufs = 1; /* use fixed user buffers */
106static int register_files = 1; /* use fixed files */
107static int buffered = 0; /* use buffered IO, not O_DIRECT */
108static int sq_thread_poll = 0; /* use kernel submission/poller thread */
109static int sq_thread_cpu = -1; /* pin above thread to this CPU */
110static int do_nop = 0; /* no-op SQ ring commands */
111
112static int io_uring_register_buffers(struct submitter *s)
113{
114 if (do_nop)
115 return 0;
116
117 return io_uring_register(s->ring_fd, IORING_REGISTER_BUFFERS, s->iovecs,
118 DEPTH);
119}
120
121static int io_uring_register_files(struct submitter *s)
122{
123 unsigned i;
124
125 if (do_nop)
126 return 0;
127
128 s->fds = calloc(s->nr_files, sizeof(__s32));
129 for (i = 0; i < s->nr_files; i++) {
130 s->fds[i] = s->files[i].real_fd;
131 s->files[i].fixed_fd = i;
132 }
133
134 return io_uring_register(s->ring_fd, IORING_REGISTER_FILES, s->fds,
135 s->nr_files);
136}
137
138static int gettid(void)
139{
140 return syscall(__NR_gettid);
141}
142
143static unsigned file_depth(struct submitter *s)
144{
145 return (DEPTH + s->nr_files - 1) / s->nr_files;
146}
147
148static void init_io(struct submitter *s, unsigned index)
149{
150 struct io_uring_sqe *sqe = &s->sqes[index];
151 unsigned long offset;
152 struct file *f;
153 long r;
154
155 if (do_nop) {
156 sqe->opcode = IORING_OP_NOP;
157 return;
158 }
159
160 if (s->nr_files == 1) {
161 f = &s->files[0];
162 } else {
163 f = &s->files[s->cur_file];
164 if (f->pending_ios >= file_depth(s)) {
165 s->cur_file++;
166 if (s->cur_file == s->nr_files)
167 s->cur_file = 0;
168 f = &s->files[s->cur_file];
169 }
170 }
171 f->pending_ios++;
172
173 lrand48_r(&s->rand, &r);
174 offset = (r % (f->max_blocks - 1)) * BS;
175
176 if (register_files) {
177 sqe->flags = IOSQE_FIXED_FILE;
178 sqe->fd = f->fixed_fd;
179 } else {
180 sqe->flags = 0;
181 sqe->fd = f->real_fd;
182 }
183 if (fixedbufs) {
184 sqe->opcode = IORING_OP_READ_FIXED;
185 sqe->addr = (unsigned long) s->iovecs[index].iov_base;
186 sqe->len = BS;
187 sqe->buf_index = index;
188 } else {
189 sqe->opcode = IORING_OP_READV;
190 sqe->addr = (unsigned long) &s->iovecs[index];
191 sqe->len = 1;
192 sqe->buf_index = 0;
193 }
194 sqe->ioprio = 0;
195 sqe->off = offset;
196 sqe->user_data = (unsigned long) f;
197}
198
199static int prep_more_ios(struct submitter *s, unsigned max_ios)
200{
201 struct io_sq_ring *ring = &s->sq_ring;
202 unsigned index, tail, next_tail, prepped = 0;
203
204 next_tail = tail = *ring->tail;
205 do {
206 next_tail++;
207 read_barrier();
208 if (next_tail == *ring->head)
209 break;
210
211 index = tail & sq_ring_mask;
212 init_io(s, index);
213 ring->array[index] = index;
214 prepped++;
215 tail = next_tail;
216 } while (prepped < max_ios);
217
218 if (*ring->tail != tail) {
219 /* order tail store with writes to sqes above */
220 write_barrier();
221 *ring->tail = tail;
222 write_barrier();
223 }
224 return prepped;
225}
226
227static int get_file_size(struct file *f)
228{
229 struct stat st;
230
231 if (fstat(f->real_fd, &st) < 0)
232 return -1;
233 if (S_ISBLK(st.st_mode)) {
234 unsigned long long bytes;
235
236 if (ioctl(f->real_fd, BLKGETSIZE64, &bytes) != 0)
237 return -1;
238
239 f->max_blocks = bytes / BS;
240 return 0;
241 } else if (S_ISREG(st.st_mode)) {
242 f->max_blocks = st.st_size / BS;
243 return 0;
244 }
245
246 return -1;
247}
248
249static int reap_events(struct submitter *s)
250{
251 struct io_cq_ring *ring = &s->cq_ring;
252 struct io_uring_cqe *cqe;
253 unsigned head, reaped = 0;
254
255 head = *ring->head;
256 do {
257 struct file *f;
258
259 read_barrier();
260 if (head == *ring->tail)
261 break;
262 cqe = &ring->cqes[head & cq_ring_mask];
263 if (!do_nop) {
264 f = (struct file *) (uintptr_t) cqe->user_data;
265 f->pending_ios--;
266 if (cqe->res != BS) {
267 printf("io: unexpected ret=%d\n", cqe->res);
268 if (polled && cqe->res == -EOPNOTSUPP)
269 printf("Your filesystem doesn't support poll\n");
270 return -1;
271 }
272 }
273 if (cqe->flags & IOCQE_FLAG_CACHEHIT)
274 s->cachehit++;
275 else
276 s->cachemiss++;
277 reaped++;
278 head++;
279 } while (1);
280
281 s->inflight -= reaped;
282 *ring->head = head;
283 write_barrier();
284 return reaped;
285}
286
287static void *submitter_fn(void *data)
288{
289 struct submitter *s = data;
290 struct io_sq_ring *ring = &s->sq_ring;
291 int ret, prepped;
292
293 printf("submitter=%d\n", gettid());
294
295 srand48_r(pthread_self(), &s->rand);
296
297 prepped = 0;
298 do {
299 int to_wait, to_submit, this_reap, to_prep;
300
301 if (!prepped && s->inflight < DEPTH) {
302 to_prep = min(DEPTH - s->inflight, BATCH_SUBMIT);
303 prepped = prep_more_ios(s, to_prep);
304 }
305 s->inflight += prepped;
306submit_more:
307 to_submit = prepped;
308submit:
309 if (to_submit && (s->inflight + to_submit <= DEPTH))
310 to_wait = 0;
311 else
312 to_wait = min(s->inflight + to_submit, BATCH_COMPLETE);
313
314 /*
315 * Only need to call io_uring_enter if we're not using SQ thread
316 * poll, or if IORING_SQ_NEED_WAKEUP is set.
317 */
318 if (!sq_thread_poll || (*ring->flags & IORING_SQ_NEED_WAKEUP)) {
319 unsigned flags = 0;
320
321 if (to_wait)
322 flags = IORING_ENTER_GETEVENTS;
323 if ((*ring->flags & IORING_SQ_NEED_WAKEUP))
324 flags |= IORING_ENTER_SQ_WAKEUP;
325 ret = io_uring_enter(s->ring_fd, to_submit, to_wait,
326 flags, NULL);
327 s->calls++;
328 }
329
330 /*
331 * For non SQ thread poll, we already got the events we needed
332 * through the io_uring_enter() above. For SQ thread poll, we
333 * need to loop here until we find enough events.
334 */
335 this_reap = 0;
336 do {
337 int r;
338 r = reap_events(s);
339 if (r == -1) {
340 s->finish = 1;
341 break;
342 } else if (r > 0)
343 this_reap += r;
344 } while (sq_thread_poll && this_reap < to_wait);
345 s->reaps += this_reap;
346
347 if (ret >= 0) {
348 if (!ret) {
349 to_submit = 0;
350 if (s->inflight)
351 goto submit;
352 continue;
353 } else if (ret < to_submit) {
354 int diff = to_submit - ret;
355
356 s->done += ret;
357 prepped -= diff;
358 goto submit_more;
359 }
360 s->done += ret;
361 prepped = 0;
362 continue;
363 } else if (ret < 0) {
364 if (errno == EAGAIN) {
365 if (s->finish)
366 break;
367 if (this_reap)
368 goto submit;
369 to_submit = 0;
370 goto submit;
371 }
372 printf("io_submit: %s\n", strerror(errno));
373 break;
374 }
375 } while (!s->finish);
376
377 finish = 1;
378 return NULL;
379}
380
381static void sig_int(int sig)
382{
383 printf("Exiting on signal %d\n", sig);
384 submitters[0].finish = 1;
385 finish = 1;
386}
387
388static void arm_sig_int(void)
389{
390 struct sigaction act;
391
392 memset(&act, 0, sizeof(act));
393 act.sa_handler = sig_int;
394 act.sa_flags = SA_RESTART;
395 sigaction(SIGINT, &act, NULL);
396}
397
398static int setup_ring(struct submitter *s)
399{
400 struct io_sq_ring *sring = &s->sq_ring;
401 struct io_cq_ring *cring = &s->cq_ring;
402 struct io_uring_params p;
403 int ret, fd;
404 void *ptr;
405
406 memset(&p, 0, sizeof(p));
407
408 if (polled && !do_nop)
409 p.flags |= IORING_SETUP_IOPOLL;
410 if (sq_thread_poll) {
411 p.flags |= IORING_SETUP_SQPOLL;
412 if (sq_thread_cpu != -1) {
413 p.flags |= IORING_SETUP_SQ_AFF;
414 p.sq_thread_cpu = sq_thread_cpu;
415 }
416 }
417
418 fd = io_uring_setup(DEPTH, &p);
419 if (fd < 0) {
420 perror("io_uring_setup");
421 return 1;
422 }
423 s->ring_fd = fd;
424
425 if (fixedbufs) {
426 ret = io_uring_register_buffers(s);
427 if (ret < 0) {
428 perror("io_uring_register_buffers");
429 return 1;
430 }
431 }
432
433 if (register_files) {
434 ret = io_uring_register_files(s);
435 if (ret < 0) {
436 perror("io_uring_register_files");
437 return 1;
438 }
439 }
440
441 ptr = mmap(0, p.sq_off.array + p.sq_entries * sizeof(__u32),
442 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
443 IORING_OFF_SQ_RING);
444 printf("sq_ring ptr = 0x%p\n", ptr);
445 sring->head = ptr + p.sq_off.head;
446 sring->tail = ptr + p.sq_off.tail;
447 sring->ring_mask = ptr + p.sq_off.ring_mask;
448 sring->ring_entries = ptr + p.sq_off.ring_entries;
449 sring->flags = ptr + p.sq_off.flags;
450 sring->array = ptr + p.sq_off.array;
451 sq_ring_mask = *sring->ring_mask;
452
453 s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
454 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
455 IORING_OFF_SQES);
456 printf("sqes ptr = 0x%p\n", s->sqes);
457
458 ptr = mmap(0, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe),
459 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
460 IORING_OFF_CQ_RING);
461 printf("cq_ring ptr = 0x%p\n", ptr);
462 cring->head = ptr + p.cq_off.head;
463 cring->tail = ptr + p.cq_off.tail;
464 cring->ring_mask = ptr + p.cq_off.ring_mask;
465 cring->ring_entries = ptr + p.cq_off.ring_entries;
466 cring->cqes = ptr + p.cq_off.cqes;
467 cq_ring_mask = *cring->ring_mask;
468 return 0;
469}
470
471static void file_depths(char *buf)
472{
473 struct submitter *s = &submitters[0];
474 unsigned i;
475 char *p;
476
477 buf[0] = '\0';
478 p = buf;
479 for (i = 0; i < s->nr_files; i++) {
480 struct file *f = &s->files[i];
481
482 if (i + 1 == s->nr_files)
483 p += sprintf(p, "%d", f->pending_ios);
484 else
485 p += sprintf(p, "%d, ", f->pending_ios);
486 }
487}
488
489int main(int argc, char *argv[])
490{
491 struct submitter *s = &submitters[0];
492 unsigned long done, calls, reap, cache_hit, cache_miss;
493 int err, i, flags, fd;
494 char *fdepths;
495 void *ret;
496
497 if (!do_nop && argc < 2) {
498 printf("%s: filename\n", argv[0]);
499 return 1;
500 }
501
502 flags = O_RDONLY | O_NOATIME;
503 if (!buffered)
504 flags |= O_DIRECT;
505
506 i = 1;
507 while (!do_nop && i < argc) {
508 struct file *f;
509
510 if (s->nr_files == MAX_FDS) {
511 printf("Max number of files (%d) reached\n", MAX_FDS);
512 break;
513 }
514 fd = open(argv[i], flags);
515 if (fd < 0) {
516 perror("open");
517 return 1;
518 }
519
520 f = &s->files[s->nr_files];
521 f->real_fd = fd;
522 if (get_file_size(f)) {
523 printf("failed getting size of device/file\n");
524 return 1;
525 }
526 if (f->max_blocks <= 1) {
527 printf("Zero file/device size?\n");
528 return 1;
529 }
530 f->max_blocks--;
531
532 printf("Added file %s\n", argv[i]);
533 s->nr_files++;
534 i++;
535 }
536
537 if (fixedbufs) {
538 struct rlimit rlim;
539
540 rlim.rlim_cur = RLIM_INFINITY;
541 rlim.rlim_max = RLIM_INFINITY;
542 if (setrlimit(RLIMIT_MEMLOCK, &rlim) < 0) {
543 perror("setrlimit");
544 return 1;
545 }
546 }
547
548 arm_sig_int();
549
550 for (i = 0; i < DEPTH; i++) {
551 void *buf;
552
553 if (posix_memalign(&buf, BS, BS)) {
554 printf("failed alloc\n");
555 return 1;
556 }
557 s->iovecs[i].iov_base = buf;
558 s->iovecs[i].iov_len = BS;
559 }
560
561 err = setup_ring(s);
562 if (err) {
563 printf("ring setup failed: %s, %d\n", strerror(errno), err);
564 return 1;
565 }
566 printf("polled=%d, fixedbufs=%d, buffered=%d", polled, fixedbufs, buffered);
567 printf(" QD=%d, sq_ring=%d, cq_ring=%d\n", DEPTH, *s->sq_ring.ring_entries, *s->cq_ring.ring_entries);
568
569 pthread_create(&s->thread, NULL, submitter_fn, s);
570
571 fdepths = malloc(8 * s->nr_files);
572 cache_hit = cache_miss = reap = calls = done = 0;
573 do {
574 unsigned long this_done = 0;
575 unsigned long this_reap = 0;
576 unsigned long this_call = 0;
577 unsigned long this_cache_hit = 0;
578 unsigned long this_cache_miss = 0;
579 unsigned long rpc = 0, ipc = 0;
580 double hit = 0.0;
581
582 sleep(1);
583 this_done += s->done;
584 this_call += s->calls;
585 this_reap += s->reaps;
586 this_cache_hit += s->cachehit;
587 this_cache_miss += s->cachemiss;
588 if (this_cache_hit && this_cache_miss) {
589 unsigned long hits, total;
590
591 hits = this_cache_hit - cache_hit;
592 total = hits + this_cache_miss - cache_miss;
593 hit = (double) hits / (double) total;
594 hit *= 100.0;
595 }
596 if (this_call - calls) {
597 rpc = (this_done - done) / (this_call - calls);
598 ipc = (this_reap - reap) / (this_call - calls);
599 } else
600 rpc = ipc = -1;
601 file_depths(fdepths);
602 printf("IOPS=%lu, IOS/call=%ld/%ld, inflight=%u (%s), Cachehit=%0.2f%%\n",
603 this_done - done, rpc, ipc, s->inflight,
604 fdepths, hit);
605 done = this_done;
606 calls = this_call;
607 reap = this_reap;
608 cache_hit = s->cachehit;
609 cache_miss = s->cachemiss;
610 } while (!finish);
611
612 pthread_join(s->thread, &ret);
613 close(s->ring_fd);
614 free(fdepths);
615 return 0;
616}
diff --git a/tools/io_uring/io_uring-cp.c b/tools/io_uring/io_uring-cp.c
new file mode 100644
index 000000000000..633f65bb43a7
--- /dev/null
+++ b/tools/io_uring/io_uring-cp.c
@@ -0,0 +1,251 @@
1// SPDX-License-Identifier: GPL-2.0
2/*
3 * Simple test program that demonstrates a file copy through io_uring. This
4 * uses the API exposed by liburing.
5 *
6 * Copyright (C) 2018-2019 Jens Axboe
7 */
8#include <stdio.h>
9#include <fcntl.h>
10#include <string.h>
11#include <stdlib.h>
12#include <unistd.h>
13#include <assert.h>
14#include <errno.h>
15#include <inttypes.h>
16#include <sys/stat.h>
17#include <sys/ioctl.h>
18
19#include "liburing.h"
20
21#define QD 64
22#define BS (32*1024)
23
24static int infd, outfd;
25
26struct io_data {
27 int read;
28 off_t first_offset, offset;
29 size_t first_len;
30 struct iovec iov;
31};
32
33static int setup_context(unsigned entries, struct io_uring *ring)
34{
35 int ret;
36
37 ret = io_uring_queue_init(entries, ring, 0);
38 if (ret < 0) {
39 fprintf(stderr, "queue_init: %s\n", strerror(-ret));
40 return -1;
41 }
42
43 return 0;
44}
45
46static int get_file_size(int fd, off_t *size)
47{
48 struct stat st;
49
50 if (fstat(fd, &st) < 0)
51 return -1;
52 if (S_ISREG(st.st_mode)) {
53 *size = st.st_size;
54 return 0;
55 } else if (S_ISBLK(st.st_mode)) {
56 unsigned long long bytes;
57
58 if (ioctl(fd, BLKGETSIZE64, &bytes) != 0)
59 return -1;
60
61 *size = bytes;
62 return 0;
63 }
64
65 return -1;
66}
67
68static void queue_prepped(struct io_uring *ring, struct io_data *data)
69{
70 struct io_uring_sqe *sqe;
71
72 sqe = io_uring_get_sqe(ring);
73 assert(sqe);
74
75 if (data->read)
76 io_uring_prep_readv(sqe, infd, &data->iov, 1, data->offset);
77 else
78 io_uring_prep_writev(sqe, outfd, &data->iov, 1, data->offset);
79
80 io_uring_sqe_set_data(sqe, data);
81}
82
83static int queue_read(struct io_uring *ring, off_t size, off_t offset)
84{
85 struct io_uring_sqe *sqe;
86 struct io_data *data;
87
88 sqe = io_uring_get_sqe(ring);
89 if (!sqe)
90 return 1;
91
92 data = malloc(size + sizeof(*data));
93 data->read = 1;
94 data->offset = data->first_offset = offset;
95
96 data->iov.iov_base = data + 1;
97 data->iov.iov_len = size;
98 data->first_len = size;
99
100 io_uring_prep_readv(sqe, infd, &data->iov, 1, offset);
101 io_uring_sqe_set_data(sqe, data);
102 return 0;
103}
104
105static void queue_write(struct io_uring *ring, struct io_data *data)
106{
107 data->read = 0;
108 data->offset = data->first_offset;
109
110 data->iov.iov_base = data + 1;
111 data->iov.iov_len = data->first_len;
112
113 queue_prepped(ring, data);
114 io_uring_submit(ring);
115}
116
117static int copy_file(struct io_uring *ring, off_t insize)
118{
119 unsigned long reads, writes;
120 struct io_uring_cqe *cqe;
121 off_t write_left, offset;
122 int ret;
123
124 write_left = insize;
125 writes = reads = offset = 0;
126
127 while (insize || write_left) {
128 unsigned long had_reads;
129 int got_comp;
130
131 /*
132 * Queue up as many reads as we can
133 */
134 had_reads = reads;
135 while (insize) {
136 off_t this_size = insize;
137
138 if (reads + writes >= QD)
139 break;
140 if (this_size > BS)
141 this_size = BS;
142 else if (!this_size)
143 break;
144
145 if (queue_read(ring, this_size, offset))
146 break;
147
148 insize -= this_size;
149 offset += this_size;
150 reads++;
151 }
152
153 if (had_reads != reads) {
154 ret = io_uring_submit(ring);
155 if (ret < 0) {
156 fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret));
157 break;
158 }
159 }
160
161 /*
162 * Queue is full at this point. Find at least one completion.
163 */
164 got_comp = 0;
165 while (write_left) {
166 struct io_data *data;
167
168 if (!got_comp) {
169 ret = io_uring_wait_completion(ring, &cqe);
170 got_comp = 1;
171 } else
172 ret = io_uring_get_completion(ring, &cqe);
173 if (ret < 0) {
174 fprintf(stderr, "io_uring_get_completion: %s\n",
175 strerror(-ret));
176 return 1;
177 }
178 if (!cqe)
179 break;
180
181 data = (struct io_data *) (uintptr_t) cqe->user_data;
182 if (cqe->res < 0) {
183 if (cqe->res == -EAGAIN) {
184 queue_prepped(ring, data);
185 continue;
186 }
187 fprintf(stderr, "cqe failed: %s\n",
188 strerror(-cqe->res));
189 return 1;
190 } else if ((size_t) cqe->res != data->iov.iov_len) {
191 /* Short read/write, adjust and requeue */
192 data->iov.iov_base += cqe->res;
193 data->iov.iov_len -= cqe->res;
194 data->offset += cqe->res;
195 queue_prepped(ring, data);
196 continue;
197 }
198
199 /*
200 * All done. if write, nothing else to do. if read,
201 * queue up corresponding write.
202 */
203 if (data->read) {
204 queue_write(ring, data);
205 write_left -= data->first_len;
206 reads--;
207 writes++;
208 } else {
209 free(data);
210 writes--;
211 }
212 }
213 }
214
215 return 0;
216}
217
218int main(int argc, char *argv[])
219{
220 struct io_uring ring;
221 off_t insize;
222 int ret;
223
224 if (argc < 3) {
225 printf("%s: infile outfile\n", argv[0]);
226 return 1;
227 }
228
229 infd = open(argv[1], O_RDONLY);
230 if (infd < 0) {
231 perror("open infile");
232 return 1;
233 }
234 outfd = open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644);
235 if (outfd < 0) {
236 perror("open outfile");
237 return 1;
238 }
239
240 if (setup_context(QD, &ring))
241 return 1;
242 if (get_file_size(infd, &insize))
243 return 1;
244
245 ret = copy_file(&ring, insize);
246
247 close(infd);
248 close(outfd);
249 io_uring_queue_exit(&ring);
250 return ret;
251}
diff --git a/tools/io_uring/liburing.h b/tools/io_uring/liburing.h
new file mode 100644
index 000000000000..cab0f50257ba
--- /dev/null
+++ b/tools/io_uring/liburing.h
@@ -0,0 +1,143 @@
1#ifndef LIB_URING_H
2#define LIB_URING_H
3
4#include <sys/uio.h>
5#include <signal.h>
6#include <string.h>
7#include "../../include/uapi/linux/io_uring.h"
8
9/*
10 * Library interface to io_uring
11 */
12struct io_uring_sq {
13 unsigned *khead;
14 unsigned *ktail;
15 unsigned *kring_mask;
16 unsigned *kring_entries;
17 unsigned *kflags;
18 unsigned *kdropped;
19 unsigned *array;
20 struct io_uring_sqe *sqes;
21
22 unsigned sqe_head;
23 unsigned sqe_tail;
24
25 size_t ring_sz;
26};
27
28struct io_uring_cq {
29 unsigned *khead;
30 unsigned *ktail;
31 unsigned *kring_mask;
32 unsigned *kring_entries;
33 unsigned *koverflow;
34 struct io_uring_cqe *cqes;
35
36 size_t ring_sz;
37};
38
39struct io_uring {
40 struct io_uring_sq sq;
41 struct io_uring_cq cq;
42 int ring_fd;
43};
44
45/*
46 * System calls
47 */
48extern int io_uring_setup(unsigned entries, struct io_uring_params *p);
49extern int io_uring_enter(unsigned fd, unsigned to_submit,
50 unsigned min_complete, unsigned flags, sigset_t *sig);
51extern int io_uring_register(int fd, unsigned int opcode, void *arg,
52 unsigned int nr_args);
53
54/*
55 * Library interface
56 */
57extern int io_uring_queue_init(unsigned entries, struct io_uring *ring,
58 unsigned flags);
59extern int io_uring_queue_mmap(int fd, struct io_uring_params *p,
60 struct io_uring *ring);
61extern void io_uring_queue_exit(struct io_uring *ring);
62extern int io_uring_get_completion(struct io_uring *ring,
63 struct io_uring_cqe **cqe_ptr);
64extern int io_uring_wait_completion(struct io_uring *ring,
65 struct io_uring_cqe **cqe_ptr);
66extern int io_uring_submit(struct io_uring *ring);
67extern struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);
68
69/*
70 * Command prep helpers
71 */
72static inline void io_uring_sqe_set_data(struct io_uring_sqe *sqe, void *data)
73{
74 sqe->user_data = (unsigned long) data;
75}
76
77static inline void io_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd,
78 void *addr, unsigned len, off_t offset)
79{
80 memset(sqe, 0, sizeof(*sqe));
81 sqe->opcode = op;
82 sqe->fd = fd;
83 sqe->off = offset;
84 sqe->addr = (unsigned long) addr;
85 sqe->len = len;
86}
87
88static inline void io_uring_prep_readv(struct io_uring_sqe *sqe, int fd,
89 struct iovec *iovecs, unsigned nr_vecs,
90 off_t offset)
91{
92 io_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset);
93}
94
95static inline void io_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd,
96 void *buf, unsigned nbytes,
97 off_t offset)
98{
99 io_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset);
100}
101
102static inline void io_uring_prep_writev(struct io_uring_sqe *sqe, int fd,
103 struct iovec *iovecs, unsigned nr_vecs,
104 off_t offset)
105{
106 io_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset);
107}
108
109static inline void io_uring_prep_write_fixed(struct io_uring_sqe *sqe, int fd,
110 void *buf, unsigned nbytes,
111 off_t offset)
112{
113 io_uring_prep_rw(IORING_OP_WRITE_FIXED, sqe, fd, buf, nbytes, offset);
114}
115
116static inline void io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd,
117 short poll_mask)
118{
119 memset(sqe, 0, sizeof(*sqe));
120 sqe->opcode = IORING_OP_POLL_ADD;
121 sqe->fd = fd;
122 sqe->poll_events = poll_mask;
123}
124
125static inline void io_uring_prep_poll_remove(struct io_uring_sqe *sqe,
126 void *user_data)
127{
128 memset(sqe, 0, sizeof(*sqe));
129 sqe->opcode = IORING_OP_POLL_REMOVE;
130 sqe->addr = (unsigned long) user_data;
131}
132
133static inline void io_uring_prep_fsync(struct io_uring_sqe *sqe, int fd,
134 int datasync)
135{
136 memset(sqe, 0, sizeof(*sqe));
137 sqe->opcode = IORING_OP_FSYNC;
138 sqe->fd = fd;
139 if (datasync)
140 sqe->fsync_flags = IORING_FSYNC_DATASYNC;
141}
142
143#endif
diff --git a/tools/io_uring/queue.c b/tools/io_uring/queue.c
new file mode 100644
index 000000000000..88505e873ad9
--- /dev/null
+++ b/tools/io_uring/queue.c
@@ -0,0 +1,164 @@
1#include <sys/types.h>
2#include <sys/stat.h>
3#include <sys/mman.h>
4#include <unistd.h>
5#include <errno.h>
6#include <string.h>
7
8#include "liburing.h"
9#include "barrier.h"
10
11static int __io_uring_get_completion(struct io_uring *ring,
12 struct io_uring_cqe **cqe_ptr, int wait)
13{
14 struct io_uring_cq *cq = &ring->cq;
15 const unsigned mask = *cq->kring_mask;
16 unsigned head;
17 int ret;
18
19 *cqe_ptr = NULL;
20 head = *cq->khead;
21 do {
22 /*
23 * It's necessary to use a read_barrier() before reading
24 * the CQ tail, since the kernel updates it locklessly. The
25 * kernel has the matching store barrier for the update. The
26 * kernel also ensures that previous stores to CQEs are ordered
27 * with the tail update.
28 */
29 read_barrier();
30 if (head != *cq->ktail) {
31 *cqe_ptr = &cq->cqes[head & mask];
32 break;
33 }
34 if (!wait)
35 break;
36 ret = io_uring_enter(ring->ring_fd, 0, 1,
37 IORING_ENTER_GETEVENTS, NULL);
38 if (ret < 0)
39 return -errno;
40 } while (1);
41
42 if (*cqe_ptr) {
43 *cq->khead = head + 1;
44 /*
45 * Ensure that the kernel sees our new head, the kernel has
46 * the matching read barrier.
47 */
48 write_barrier();
49 }
50
51 return 0;
52}
53
54/*
55 * Return an IO completion, if one is readily available
56 */
57int io_uring_get_completion(struct io_uring *ring,
58 struct io_uring_cqe **cqe_ptr)
59{
60 return __io_uring_get_completion(ring, cqe_ptr, 0);
61}
62
63/*
64 * Return an IO completion, waiting for it if necessary
65 */
66int io_uring_wait_completion(struct io_uring *ring,
67 struct io_uring_cqe **cqe_ptr)
68{
69 return __io_uring_get_completion(ring, cqe_ptr, 1);
70}
71
72/*
73 * Submit sqes acquired from io_uring_get_sqe() to the kernel.
74 *
75 * Returns number of sqes submitted
76 */
77int io_uring_submit(struct io_uring *ring)
78{
79 struct io_uring_sq *sq = &ring->sq;
80 const unsigned mask = *sq->kring_mask;
81 unsigned ktail, ktail_next, submitted;
82 int ret;
83
84 /*
85 * If we have pending IO in the kring, submit it first. We need a
86 * read barrier here to match the kernels store barrier when updating
87 * the SQ head.
88 */
89 read_barrier();
90 if (*sq->khead != *sq->ktail) {
91 submitted = *sq->kring_entries;
92 goto submit;
93 }
94
95 if (sq->sqe_head == sq->sqe_tail)
96 return 0;
97
98 /*
99 * Fill in sqes that we have queued up, adding them to the kernel ring
100 */
101 submitted = 0;
102 ktail = ktail_next = *sq->ktail;
103 while (sq->sqe_head < sq->sqe_tail) {
104 ktail_next++;
105 read_barrier();
106
107 sq->array[ktail & mask] = sq->sqe_head & mask;
108 ktail = ktail_next;
109
110 sq->sqe_head++;
111 submitted++;
112 }
113
114 if (!submitted)
115 return 0;
116
117 if (*sq->ktail != ktail) {
118 /*
119 * First write barrier ensures that the SQE stores are updated
120 * with the tail update. This is needed so that the kernel
121 * will never see a tail update without the preceeding sQE
122 * stores being done.
123 */
124 write_barrier();
125 *sq->ktail = ktail;
126 /*
127 * The kernel has the matching read barrier for reading the
128 * SQ tail.
129 */
130 write_barrier();
131 }
132
133submit:
134 ret = io_uring_enter(ring->ring_fd, submitted, 0,
135 IORING_ENTER_GETEVENTS, NULL);
136 if (ret < 0)
137 return -errno;
138
139 return 0;
140}
141
142/*
143 * Return an sqe to fill. Application must later call io_uring_submit()
144 * when it's ready to tell the kernel about it. The caller may call this
145 * function multiple times before calling io_uring_submit().
146 *
147 * Returns a vacant sqe, or NULL if we're full.
148 */
149struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)
150{
151 struct io_uring_sq *sq = &ring->sq;
152 unsigned next = sq->sqe_tail + 1;
153 struct io_uring_sqe *sqe;
154
155 /*
156 * All sqes are used
157 */
158 if (next - sq->sqe_head > *sq->kring_entries)
159 return NULL;
160
161 sqe = &sq->sqes[sq->sqe_tail & *sq->kring_mask];
162 sq->sqe_tail = next;
163 return sqe;
164}
diff --git a/tools/io_uring/setup.c b/tools/io_uring/setup.c
new file mode 100644
index 000000000000..4da19a77132c
--- /dev/null
+++ b/tools/io_uring/setup.c
@@ -0,0 +1,103 @@
1#include <sys/types.h>
2#include <sys/stat.h>
3#include <sys/mman.h>
4#include <unistd.h>
5#include <errno.h>
6#include <string.h>
7
8#include "liburing.h"
9
10static int io_uring_mmap(int fd, struct io_uring_params *p,
11 struct io_uring_sq *sq, struct io_uring_cq *cq)
12{
13 size_t size;
14 void *ptr;
15 int ret;
16
17 sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
18 ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE,
19 MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
20 if (ptr == MAP_FAILED)
21 return -errno;
22 sq->khead = ptr + p->sq_off.head;
23 sq->ktail = ptr + p->sq_off.tail;
24 sq->kring_mask = ptr + p->sq_off.ring_mask;
25 sq->kring_entries = ptr + p->sq_off.ring_entries;
26 sq->kflags = ptr + p->sq_off.flags;
27 sq->kdropped = ptr + p->sq_off.dropped;
28 sq->array = ptr + p->sq_off.array;
29
30 size = p->sq_entries * sizeof(struct io_uring_sqe),
31 sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE,
32 MAP_SHARED | MAP_POPULATE, fd,
33 IORING_OFF_SQES);
34 if (sq->sqes == MAP_FAILED) {
35 ret = -errno;
36err:
37 munmap(sq->khead, sq->ring_sz);
38 return ret;
39 }
40
41 cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe);
42 ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE,
43 MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
44 if (ptr == MAP_FAILED) {
45 ret = -errno;
46 munmap(sq->sqes, p->sq_entries * sizeof(struct io_uring_sqe));
47 goto err;
48 }
49 cq->khead = ptr + p->cq_off.head;
50 cq->ktail = ptr + p->cq_off.tail;
51 cq->kring_mask = ptr + p->cq_off.ring_mask;
52 cq->kring_entries = ptr + p->cq_off.ring_entries;
53 cq->koverflow = ptr + p->cq_off.overflow;
54 cq->cqes = ptr + p->cq_off.cqes;
55 return 0;
56}
57
58/*
59 * For users that want to specify sq_thread_cpu or sq_thread_idle, this
60 * interface is a convenient helper for mmap()ing the rings.
61 * Returns -1 on error, or zero on success. On success, 'ring'
62 * contains the necessary information to read/write to the rings.
63 */
64int io_uring_queue_mmap(int fd, struct io_uring_params *p, struct io_uring *ring)
65{
66 int ret;
67
68 memset(ring, 0, sizeof(*ring));
69 ret = io_uring_mmap(fd, p, &ring->sq, &ring->cq);
70 if (!ret)
71 ring->ring_fd = fd;
72 return ret;
73}
74
75/*
76 * Returns -1 on error, or zero on success. On success, 'ring'
77 * contains the necessary information to read/write to the rings.
78 */
79int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags)
80{
81 struct io_uring_params p;
82 int fd;
83
84 memset(&p, 0, sizeof(p));
85 p.flags = flags;
86
87 fd = io_uring_setup(entries, &p);
88 if (fd < 0)
89 return fd;
90
91 return io_uring_queue_mmap(fd, &p, ring);
92}
93
94void io_uring_queue_exit(struct io_uring *ring)
95{
96 struct io_uring_sq *sq = &ring->sq;
97 struct io_uring_cq *cq = &ring->cq;
98
99 munmap(sq->sqes, *sq->kring_entries * sizeof(struct io_uring_sqe));
100 munmap(sq->khead, sq->ring_sz);
101 munmap(cq->khead, cq->ring_sz);
102 close(ring->ring_fd);
103}
diff --git a/tools/io_uring/syscall.c b/tools/io_uring/syscall.c
new file mode 100644
index 000000000000..6b835e5c6a5b
--- /dev/null
+++ b/tools/io_uring/syscall.c
@@ -0,0 +1,40 @@
1/*
2 * Will go away once libc support is there
3 */
4#include <unistd.h>
5#include <sys/syscall.h>
6#include <sys/uio.h>
7#include <signal.h>
8#include "liburing.h"
9
10#if defined(__x86_64) || defined(__i386__)
11#ifndef __NR_sys_io_uring_setup
12#define __NR_sys_io_uring_setup 425
13#endif
14#ifndef __NR_sys_io_uring_enter
15#define __NR_sys_io_uring_enter 426
16#endif
17#ifndef __NR_sys_io_uring_register
18#define __NR_sys_io_uring_register 427
19#endif
20#else
21#error "Arch not supported yet"
22#endif
23
24int io_uring_register(int fd, unsigned int opcode, void *arg,
25 unsigned int nr_args)
26{
27 return syscall(__NR_sys_io_uring_register, fd, opcode, arg, nr_args);
28}
29
30int io_uring_setup(unsigned entries, struct io_uring_params *p)
31{
32 return syscall(__NR_sys_io_uring_setup, entries, p);
33}
34
35int io_uring_enter(unsigned fd, unsigned to_submit, unsigned min_complete,
36 unsigned flags, sigset_t *sig)
37{
38 return syscall(__NR_sys_io_uring_enter, fd, to_submit, min_complete,
39 flags, sig, _NSIG / 8);
40}