aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJens Axboe <axboe@kernel.dk>2019-01-19 00:56:34 -0500
committerJens Axboe <axboe@kernel.dk>2019-03-06 15:00:16 -0500
commit31b515106428b9717d2b6475b6f6182cf231b1e6 (patch)
treeef022236522e520fd3c2c4796a3c45e5b3fa3df4
parent221c5eb2338232f7340386de1c43decc32682e58 (diff)
io_uring: allow workqueue item to handle multiple buffered requests
Right now we punt any buffered request that ends up triggering an -EAGAIN to an async workqueue. This works fine in terms of providing async execution of them, but it also can create quite a lot of work queue items. For sequentially buffered IO, it's advantageous to serialize the issue of them. For reads, the first one will trigger a read-ahead, and subsequent request merely end up waiting on later pages to complete. For writes, devices usually respond better to streamed sequential writes. Add state to track the last buffered request we punted to a work queue, and if the next one is sequential to the previous, attempt to get the previous work item to handle it. We limit the number of sequential add-ons to the a multiple (8) of the max read-ahead size of the file. This should be a good number for both reads and wries, as it defines the max IO size the device can do directly. This drastically cuts down on the number of context switches we need to handle buffered sequential IO, and a basic test case of copying a big file with io_uring sees a 5x speedup. Reviewed-by: Hannes Reinecke <hare@suse.com> Signed-off-by: Jens Axboe <axboe@kernel.dk>
-rw-r--r--fs/io_uring.c281
1 files changed, 229 insertions, 52 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 85b914bd823c..5d99376d2369 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -94,6 +94,16 @@ struct io_mapped_ubuf {
94 unsigned int nr_bvecs; 94 unsigned int nr_bvecs;
95}; 95};
96 96
97struct async_list {
98 spinlock_t lock;
99 atomic_t cnt;
100 struct list_head list;
101
102 struct file *file;
103 off_t io_end;
104 size_t io_pages;
105};
106
97struct io_ring_ctx { 107struct io_ring_ctx {
98 struct { 108 struct {
99 struct percpu_ref refs; 109 struct percpu_ref refs;
@@ -164,6 +174,8 @@ struct io_ring_ctx {
164 struct list_head cancel_list; 174 struct list_head cancel_list;
165 } ____cacheline_aligned_in_smp; 175 } ____cacheline_aligned_in_smp;
166 176
177 struct async_list pending_async[2];
178
167#if defined(CONFIG_UNIX) 179#if defined(CONFIG_UNIX)
168 struct socket *ring_sock; 180 struct socket *ring_sock;
169#endif 181#endif
@@ -201,6 +213,7 @@ struct io_kiocb {
201#define REQ_F_FORCE_NONBLOCK 1 /* inline submission attempt */ 213#define REQ_F_FORCE_NONBLOCK 1 /* inline submission attempt */
202#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */ 214#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */
203#define REQ_F_FIXED_FILE 4 /* ctx owns file */ 215#define REQ_F_FIXED_FILE 4 /* ctx owns file */
216#define REQ_F_SEQ_PREV 8 /* sequential with previous */
204 u64 user_data; 217 u64 user_data;
205 u64 error; 218 u64 error;
206 219
@@ -257,6 +270,7 @@ static void io_ring_ctx_ref_free(struct percpu_ref *ref)
257static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) 270static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
258{ 271{
259 struct io_ring_ctx *ctx; 272 struct io_ring_ctx *ctx;
273 int i;
260 274
261 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 275 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
262 if (!ctx) 276 if (!ctx)
@@ -272,6 +286,11 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
272 init_completion(&ctx->ctx_done); 286 init_completion(&ctx->ctx_done);
273 mutex_init(&ctx->uring_lock); 287 mutex_init(&ctx->uring_lock);
274 init_waitqueue_head(&ctx->wait); 288 init_waitqueue_head(&ctx->wait);
289 for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
290 spin_lock_init(&ctx->pending_async[i].lock);
291 INIT_LIST_HEAD(&ctx->pending_async[i].list);
292 atomic_set(&ctx->pending_async[i].cnt, 0);
293 }
275 spin_lock_init(&ctx->completion_lock); 294 spin_lock_init(&ctx->completion_lock);
276 INIT_LIST_HEAD(&ctx->poll_list); 295 INIT_LIST_HEAD(&ctx->poll_list);
277 INIT_LIST_HEAD(&ctx->cancel_list); 296 INIT_LIST_HEAD(&ctx->cancel_list);
@@ -885,6 +904,47 @@ static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
885 return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); 904 return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
886} 905}
887 906
907/*
908 * Make a note of the last file/offset/direction we punted to async
909 * context. We'll use this information to see if we can piggy back a
910 * sequential request onto the previous one, if it's still hasn't been
911 * completed by the async worker.
912 */
913static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
914{
915 struct async_list *async_list = &req->ctx->pending_async[rw];
916 struct kiocb *kiocb = &req->rw;
917 struct file *filp = kiocb->ki_filp;
918 off_t io_end = kiocb->ki_pos + len;
919
920 if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
921 unsigned long max_pages;
922
923 /* Use 8x RA size as a decent limiter for both reads/writes */
924 max_pages = filp->f_ra.ra_pages;
925 if (!max_pages)
926 max_pages = VM_MAX_READAHEAD >> (PAGE_SHIFT - 10);
927 max_pages *= 8;
928
929 /* If max pages are exceeded, reset the state */
930 len >>= PAGE_SHIFT;
931 if (async_list->io_pages + len <= max_pages) {
932 req->flags |= REQ_F_SEQ_PREV;
933 async_list->io_pages += len;
934 } else {
935 io_end = 0;
936 async_list->io_pages = 0;
937 }
938 }
939
940 /* New file? Reset state. */
941 if (async_list->file != filp) {
942 async_list->io_pages = 0;
943 async_list->file = filp;
944 }
945 async_list->io_end = io_end;
946}
947
888static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s, 948static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
889 bool force_nonblock, struct io_submit_state *state) 949 bool force_nonblock, struct io_submit_state *state)
890{ 950{
@@ -892,6 +952,7 @@ static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
892 struct kiocb *kiocb = &req->rw; 952 struct kiocb *kiocb = &req->rw;
893 struct iov_iter iter; 953 struct iov_iter iter;
894 struct file *file; 954 struct file *file;
955 size_t iov_count;
895 ssize_t ret; 956 ssize_t ret;
896 957
897 ret = io_prep_rw(req, s, force_nonblock, state); 958 ret = io_prep_rw(req, s, force_nonblock, state);
@@ -910,16 +971,24 @@ static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
910 if (ret) 971 if (ret)
911 goto out_fput; 972 goto out_fput;
912 973
913 ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_iter_count(&iter)); 974 iov_count = iov_iter_count(&iter);
975 ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
914 if (!ret) { 976 if (!ret) {
915 ssize_t ret2; 977 ssize_t ret2;
916 978
917 /* Catch -EAGAIN return for forced non-blocking submission */ 979 /* Catch -EAGAIN return for forced non-blocking submission */
918 ret2 = call_read_iter(file, kiocb, &iter); 980 ret2 = call_read_iter(file, kiocb, &iter);
919 if (!force_nonblock || ret2 != -EAGAIN) 981 if (!force_nonblock || ret2 != -EAGAIN) {
920 io_rw_done(kiocb, ret2); 982 io_rw_done(kiocb, ret2);
921 else 983 } else {
984 /*
985 * If ->needs_lock is true, we're already in async
986 * context.
987 */
988 if (!s->needs_lock)
989 io_async_list_note(READ, req, iov_count);
922 ret = -EAGAIN; 990 ret = -EAGAIN;
991 }
923 } 992 }
924 kfree(iovec); 993 kfree(iovec);
925out_fput: 994out_fput:
@@ -936,14 +1005,12 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
936 struct kiocb *kiocb = &req->rw; 1005 struct kiocb *kiocb = &req->rw;
937 struct iov_iter iter; 1006 struct iov_iter iter;
938 struct file *file; 1007 struct file *file;
1008 size_t iov_count;
939 ssize_t ret; 1009 ssize_t ret;
940 1010
941 ret = io_prep_rw(req, s, force_nonblock, state); 1011 ret = io_prep_rw(req, s, force_nonblock, state);
942 if (ret) 1012 if (ret)
943 return ret; 1013 return ret;
944 /* Hold on to the file for -EAGAIN */
945 if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT))
946 return -EAGAIN;
947 1014
948 ret = -EBADF; 1015 ret = -EBADF;
949 file = kiocb->ki_filp; 1016 file = kiocb->ki_filp;
@@ -957,8 +1024,17 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
957 if (ret) 1024 if (ret)
958 goto out_fput; 1025 goto out_fput;
959 1026
960 ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, 1027 iov_count = iov_iter_count(&iter);
961 iov_iter_count(&iter)); 1028
1029 ret = -EAGAIN;
1030 if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1031 /* If ->needs_lock is true, we're already in async context. */
1032 if (!s->needs_lock)
1033 io_async_list_note(WRITE, req, iov_count);
1034 goto out_free;
1035 }
1036
1037 ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
962 if (!ret) { 1038 if (!ret) {
963 /* 1039 /*
964 * Open-code file_start_write here to grab freeze protection, 1040 * Open-code file_start_write here to grab freeze protection,
@@ -976,9 +1052,11 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
976 kiocb->ki_flags |= IOCB_WRITE; 1052 kiocb->ki_flags |= IOCB_WRITE;
977 io_rw_done(kiocb, call_write_iter(file, kiocb, &iter)); 1053 io_rw_done(kiocb, call_write_iter(file, kiocb, &iter));
978 } 1054 }
1055out_free:
979 kfree(iovec); 1056 kfree(iovec);
980out_fput: 1057out_fput:
981 if (unlikely(ret)) 1058 /* Hold on to the file for -EAGAIN */
1059 if (unlikely(ret && ret != -EAGAIN))
982 io_fput(req); 1060 io_fput(req);
983 return ret; 1061 return ret;
984} 1062}
@@ -1376,6 +1454,21 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1376 return 0; 1454 return 0;
1377} 1455}
1378 1456
1457static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1458 const struct io_uring_sqe *sqe)
1459{
1460 switch (sqe->opcode) {
1461 case IORING_OP_READV:
1462 case IORING_OP_READ_FIXED:
1463 return &ctx->pending_async[READ];
1464 case IORING_OP_WRITEV:
1465 case IORING_OP_WRITE_FIXED:
1466 return &ctx->pending_async[WRITE];
1467 default:
1468 return NULL;
1469 }
1470}
1471
1379static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) 1472static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1380{ 1473{
1381 u8 opcode = READ_ONCE(sqe->opcode); 1474 u8 opcode = READ_ONCE(sqe->opcode);
@@ -1387,61 +1480,138 @@ static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1387static void io_sq_wq_submit_work(struct work_struct *work) 1480static void io_sq_wq_submit_work(struct work_struct *work)
1388{ 1481{
1389 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 1482 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1390 struct sqe_submit *s = &req->submit;
1391 const struct io_uring_sqe *sqe = s->sqe;
1392 struct io_ring_ctx *ctx = req->ctx; 1483 struct io_ring_ctx *ctx = req->ctx;
1484 struct mm_struct *cur_mm = NULL;
1485 struct async_list *async_list;
1486 LIST_HEAD(req_list);
1393 mm_segment_t old_fs; 1487 mm_segment_t old_fs;
1394 bool needs_user;
1395 int ret; 1488 int ret;
1396 1489
1397 /* Ensure we clear previously set forced non-block flag */ 1490 async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1398 req->flags &= ~REQ_F_FORCE_NONBLOCK; 1491restart:
1399 req->rw.ki_flags &= ~IOCB_NOWAIT; 1492 do {
1493 struct sqe_submit *s = &req->submit;
1494 const struct io_uring_sqe *sqe = s->sqe;
1495
1496 /* Ensure we clear previously set forced non-block flag */
1497 req->flags &= ~REQ_F_FORCE_NONBLOCK;
1498 req->rw.ki_flags &= ~IOCB_NOWAIT;
1499
1500 ret = 0;
1501 if (io_sqe_needs_user(sqe) && !cur_mm) {
1502 if (!mmget_not_zero(ctx->sqo_mm)) {
1503 ret = -EFAULT;
1504 } else {
1505 cur_mm = ctx->sqo_mm;
1506 use_mm(cur_mm);
1507 old_fs = get_fs();
1508 set_fs(USER_DS);
1509 }
1510 }
1511
1512 if (!ret) {
1513 s->has_user = cur_mm != NULL;
1514 s->needs_lock = true;
1515 do {
1516 ret = __io_submit_sqe(ctx, req, s, false, NULL);
1517 /*
1518 * We can get EAGAIN for polled IO even though
1519 * we're forcing a sync submission from here,
1520 * since we can't wait for request slots on the
1521 * block side.
1522 */
1523 if (ret != -EAGAIN)
1524 break;
1525 cond_resched();
1526 } while (1);
1527 }
1528 if (ret) {
1529 io_cqring_add_event(ctx, sqe->user_data, ret, 0);
1530 io_free_req(req);
1531 }
1400 1532
1401 s->needs_lock = true; 1533 /* async context always use a copy of the sqe */
1402 s->has_user = false; 1534 kfree(sqe);
1535
1536 if (!async_list)
1537 break;
1538 if (!list_empty(&req_list)) {
1539 req = list_first_entry(&req_list, struct io_kiocb,
1540 list);
1541 list_del(&req->list);
1542 continue;
1543 }
1544 if (list_empty(&async_list->list))
1545 break;
1546
1547 req = NULL;
1548 spin_lock(&async_list->lock);
1549 if (list_empty(&async_list->list)) {
1550 spin_unlock(&async_list->lock);
1551 break;
1552 }
1553 list_splice_init(&async_list->list, &req_list);
1554 spin_unlock(&async_list->lock);
1555
1556 req = list_first_entry(&req_list, struct io_kiocb, list);
1557 list_del(&req->list);
1558 } while (req);
1403 1559
1404 /* 1560 /*
1405 * If we're doing IO to fixed buffers, we don't need to get/set 1561 * Rare case of racing with a submitter. If we find the count has
1406 * user context 1562 * dropped to zero AND we have pending work items, then restart
1563 * the processing. This is a tiny race window.
1407 */ 1564 */
1408 needs_user = io_sqe_needs_user(s->sqe); 1565 if (async_list) {
1409 if (needs_user) { 1566 ret = atomic_dec_return(&async_list->cnt);
1410 if (!mmget_not_zero(ctx->sqo_mm)) { 1567 while (!ret && !list_empty(&async_list->list)) {
1411 ret = -EFAULT; 1568 spin_lock(&async_list->lock);
1412 goto err; 1569 atomic_inc(&async_list->cnt);
1570 list_splice_init(&async_list->list, &req_list);
1571 spin_unlock(&async_list->lock);
1572
1573 if (!list_empty(&req_list)) {
1574 req = list_first_entry(&req_list,
1575 struct io_kiocb, list);
1576 list_del(&req->list);
1577 goto restart;
1578 }
1579 ret = atomic_dec_return(&async_list->cnt);
1413 } 1580 }
1414 use_mm(ctx->sqo_mm);
1415 old_fs = get_fs();
1416 set_fs(USER_DS);
1417 s->has_user = true;
1418 } 1581 }
1419 1582
1420 do { 1583 if (cur_mm) {
1421 ret = __io_submit_sqe(ctx, req, s, false, NULL);
1422 /*
1423 * We can get EAGAIN for polled IO even though we're forcing
1424 * a sync submission from here, since we can't wait for
1425 * request slots on the block side.
1426 */
1427 if (ret != -EAGAIN)
1428 break;
1429 cond_resched();
1430 } while (1);
1431
1432 if (needs_user) {
1433 set_fs(old_fs); 1584 set_fs(old_fs);
1434 unuse_mm(ctx->sqo_mm); 1585 unuse_mm(cur_mm);
1435 mmput(ctx->sqo_mm); 1586 mmput(cur_mm);
1436 }
1437err:
1438 if (ret) {
1439 io_cqring_add_event(ctx, sqe->user_data, ret, 0);
1440 io_free_req(req);
1441 } 1587 }
1588}
1442 1589
1443 /* async context always use a copy of the sqe */ 1590/*
1444 kfree(sqe); 1591 * See if we can piggy back onto previously submitted work, that is still
1592 * running. We currently only allow this if the new request is sequential
1593 * to the previous one we punted.
1594 */
1595static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1596{
1597 bool ret = false;
1598
1599 if (!list)
1600 return false;
1601 if (!(req->flags & REQ_F_SEQ_PREV))
1602 return false;
1603 if (!atomic_read(&list->cnt))
1604 return false;
1605
1606 ret = true;
1607 spin_lock(&list->lock);
1608 list_add_tail(&req->list, &list->list);
1609 if (!atomic_read(&list->cnt)) {
1610 list_del_init(&req->list);
1611 ret = false;
1612 }
1613 spin_unlock(&list->lock);
1614 return ret;
1445} 1615}
1446 1616
1447static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, 1617static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
@@ -1466,12 +1636,19 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1466 1636
1467 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); 1637 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1468 if (sqe_copy) { 1638 if (sqe_copy) {
1639 struct async_list *list;
1640
1469 memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy)); 1641 memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1470 s->sqe = sqe_copy; 1642 s->sqe = sqe_copy;
1471 1643
1472 memcpy(&req->submit, s, sizeof(*s)); 1644 memcpy(&req->submit, s, sizeof(*s));
1473 INIT_WORK(&req->work, io_sq_wq_submit_work); 1645 list = io_async_list_from_sqe(ctx, s->sqe);
1474 queue_work(ctx->sqo_wq, &req->work); 1646 if (!io_add_to_prev_work(list, req)) {
1647 if (list)
1648 atomic_inc(&list->cnt);
1649 INIT_WORK(&req->work, io_sq_wq_submit_work);
1650 queue_work(ctx->sqo_wq, &req->work);
1651 }
1475 ret = 0; 1652 ret = 0;
1476 } 1653 }
1477 } 1654 }