From d843634a95a66237f88fa6affa9203c9ee503244 Mon Sep 17 00:00:00 2001 From: Olivier Langlois Date: Mon, 29 Jul 2024 18:38:33 -0400 Subject: [PATCH 01/35] io_uring: add napi busy settings to the fdinfo output This info may be useful when attempting to debug a problem involving a ring using the NAPI feature. Here is an example of the output: ip-172-31-39-89 /proc/772/fdinfo # cat 14 pos: 0 flags: 02000002 mnt_id: 16 ino: 10243 SqMask: 0xff SqHead: 633 SqTail: 633 CachedSqHead: 633 CqMask: 0x3fff CqHead: 430250 CqTail: 430250 CachedCqTail: 430250 SQEs: 0 CQEs: 0 SqThread: 885 SqThreadCpu: 0 SqTotalTime: 52793826 SqWorkTime: 3590465 UserFiles: 0 UserBufs: 0 PollList: op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=6, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=6, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 op=10, task_works=0 CqOverflowList: NAPI: enabled napi_busy_poll_to: 1 napi_prefer_busy_poll: true Signed-off-by: Olivier Langlois Reviewed-by: Pavel Begunkov Link: https://lore.kernel.org/r/bb184f8b62703ddd3e6e19eae7ab6c67b97e1e10.1722293317.git.olivier@trillion01.com Signed-off-by: Jens Axboe --- io_uring/fdinfo.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/io_uring/fdinfo.c b/io_uring/fdinfo.c index b1e0e0d85349..d43e1b5fcb36 100644 --- a/io_uring/fdinfo.c +++ b/io_uring/fdinfo.c @@ -221,7 +221,19 @@ __cold void io_uring_show_fdinfo(struct seq_file *m, struct file *file) cqe->user_data, cqe->res, cqe->flags); } - spin_unlock(&ctx->completion_lock); + +#ifdef CONFIG_NET_RX_BUSY_POLL + if (ctx->napi_enabled) { + seq_puts(m, "NAPI:\tenabled\n"); + seq_printf(m, "napi_busy_poll_dt:\t%llu\n", ctx->napi_busy_poll_dt); + if (ctx->napi_prefer_busy_poll) + seq_puts(m, "napi_prefer_busy_poll:\ttrue\n"); + else + seq_puts(m, "napi_prefer_busy_poll:\tfalse\n"); + } else { + seq_puts(m, "NAPI:\tdisabled\n"); + } +#endif } #endif From 3d6106aee4732a01a275c59ec94d05302d931c4b Mon Sep 17 00:00:00 2001 From: Chenliang Li Date: Wed, 31 Jul 2024 17:01:32 +0800 Subject: [PATCH 02/35] io_uring/rsrc: store folio shift and mask into imu Store the folio shift and folio mask into imu struct and use it in iov_iter adjust, as we will have non PAGE_SIZE'd chunks if a multi-hugepage buffer get coalesced. Signed-off-by: Chenliang Li Reviewed-by: Anuj Gupta Reviewed-by: Pavel Begunkov Link: https://lore.kernel.org/r/20240731090133.4106-2-cliang01.li@samsung.com Signed-off-by: Jens Axboe --- io_uring/rsrc.c | 15 ++++++--------- io_uring/rsrc.h | 2 ++ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index a860516bf448..64152dc6f293 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -915,6 +915,8 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov, imu->ubuf = (unsigned long) iov->iov_base; imu->ubuf_end = imu->ubuf + iov->iov_len; imu->nr_bvecs = nr_pages; + imu->folio_shift = PAGE_SHIFT; + imu->folio_mask = PAGE_MASK; *pimu = imu; ret = 0; @@ -1031,23 +1033,18 @@ int io_import_fixed(int ddir, struct iov_iter *iter, * we know that: * * 1) it's a BVEC iter, we set it up - * 2) all bvecs are PAGE_SIZE in size, except potentially the + * 2) all bvecs are the same in size, except potentially the * first and last bvec * * So just find our index, and adjust the iterator afterwards. * If the offset is within the first bvec (or the whole first * bvec, just use iov_iter_advance(). This makes it easier * since we can just skip the first segment, which may not - * be PAGE_SIZE aligned. + * be folio_size aligned. */ const struct bio_vec *bvec = imu->bvec; if (offset < bvec->bv_len) { - /* - * Note, huge pages buffers consists of one large - * bvec entry and should always go this way. The other - * branch doesn't expect non PAGE_SIZE'd chunks. - */ iter->bvec = bvec; iter->count -= offset; iter->iov_offset = offset; @@ -1056,12 +1053,12 @@ int io_import_fixed(int ddir, struct iov_iter *iter, /* skip first vec */ offset -= bvec->bv_len; - seg_skip = 1 + (offset >> PAGE_SHIFT); + seg_skip = 1 + (offset >> imu->folio_shift); iter->bvec = bvec + seg_skip; iter->nr_segs -= seg_skip; iter->count -= bvec->bv_len + offset; - iter->iov_offset = offset & ~PAGE_MASK; + iter->iov_offset = offset & ~imu->folio_mask; } } diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index c032ca3436ca..ee77e53328bf 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -46,7 +46,9 @@ struct io_mapped_ubuf { u64 ubuf; u64 ubuf_end; unsigned int nr_bvecs; + unsigned int folio_shift; unsigned long acct_pages; + unsigned long folio_mask; struct bio_vec bvec[] __counted_by(nr_bvecs); }; From a8edbb424b1391b077407c75d8f5d2ede77aa70d Mon Sep 17 00:00:00 2001 From: Chenliang Li Date: Wed, 31 Jul 2024 17:01:33 +0800 Subject: [PATCH 03/35] io_uring/rsrc: enable multi-hugepage buffer coalescing Add support for checking and coalescing multi-hugepage-backed fixed buffers. The coalescing optimizes both time and space consumption caused by mapping and storing multi-hugepage fixed buffers. A coalescable multi-hugepage buffer should fully cover its folios (except potentially the first and last one), and these folios should have the same size. These requirements are for easier processing later, also we need same size'd chunks in io_import_fixed for fast iov_iter adjust. Signed-off-by: Chenliang Li Reviewed-by: Pavel Begunkov Link: https://lore.kernel.org/r/20240731090133.4106-3-cliang01.li@samsung.com Signed-off-by: Jens Axboe --- io_uring/rsrc.c | 134 ++++++++++++++++++++++++++++++++++++------------ io_uring/rsrc.h | 8 +++ 2 files changed, 110 insertions(+), 32 deletions(-) diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index 64152dc6f293..7d639a996f28 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -849,6 +849,98 @@ static int io_buffer_account_pin(struct io_ring_ctx *ctx, struct page **pages, return ret; } +static bool io_do_coalesce_buffer(struct page ***pages, int *nr_pages, + struct io_imu_folio_data *data, int nr_folios) +{ + struct page **page_array = *pages, **new_array = NULL; + int nr_pages_left = *nr_pages, i, j; + + /* Store head pages only*/ + new_array = kvmalloc_array(nr_folios, sizeof(struct page *), + GFP_KERNEL); + if (!new_array) + return false; + + new_array[0] = compound_head(page_array[0]); + /* + * The pages are bound to the folio, it doesn't + * actually unpin them but drops all but one reference, + * which is usually put down by io_buffer_unmap(). + * Note, needs a better helper. + */ + if (data->nr_pages_head > 1) + unpin_user_pages(&page_array[1], data->nr_pages_head - 1); + + j = data->nr_pages_head; + nr_pages_left -= data->nr_pages_head; + for (i = 1; i < nr_folios; i++) { + unsigned int nr_unpin; + + new_array[i] = page_array[j]; + nr_unpin = min_t(unsigned int, nr_pages_left - 1, + data->nr_pages_mid - 1); + if (nr_unpin) + unpin_user_pages(&page_array[j+1], nr_unpin); + j += data->nr_pages_mid; + nr_pages_left -= data->nr_pages_mid; + } + kvfree(page_array); + *pages = new_array; + *nr_pages = nr_folios; + return true; +} + +static bool io_try_coalesce_buffer(struct page ***pages, int *nr_pages, + struct io_imu_folio_data *data) +{ + struct page **page_array = *pages; + struct folio *folio = page_folio(page_array[0]); + unsigned int count = 1, nr_folios = 1; + int i; + + if (*nr_pages <= 1) + return false; + + data->nr_pages_mid = folio_nr_pages(folio); + if (data->nr_pages_mid == 1) + return false; + + data->folio_shift = folio_shift(folio); + /* + * Check if pages are contiguous inside a folio, and all folios have + * the same page count except for the head and tail. + */ + for (i = 1; i < *nr_pages; i++) { + if (page_folio(page_array[i]) == folio && + page_array[i] == page_array[i-1] + 1) { + count++; + continue; + } + + if (nr_folios == 1) { + if (folio_page_idx(folio, page_array[i-1]) != + data->nr_pages_mid - 1) + return false; + + data->nr_pages_head = count; + } else if (count != data->nr_pages_mid) { + return false; + } + + folio = page_folio(page_array[i]); + if (folio_size(folio) != (1UL << data->folio_shift) || + folio_page_idx(folio, page_array[i]) != 0) + return false; + + count = 1; + nr_folios++; + } + if (nr_folios == 1) + data->nr_pages_head = count; + + return io_do_coalesce_buffer(pages, nr_pages, data, nr_folios); +} + static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov, struct io_mapped_ubuf **pimu, struct page **last_hpage) @@ -858,7 +950,8 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov, unsigned long off; size_t size; int ret, nr_pages, i; - struct folio *folio = NULL; + struct io_imu_folio_data data; + bool coalesced; *pimu = (struct io_mapped_ubuf *)&dummy_ubuf; if (!iov->iov_base) @@ -873,31 +966,8 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov, goto done; } - /* If it's a huge page, try to coalesce them into a single bvec entry */ - if (nr_pages > 1) { - folio = page_folio(pages[0]); - for (i = 1; i < nr_pages; i++) { - /* - * Pages must be consecutive and on the same folio for - * this to work - */ - if (page_folio(pages[i]) != folio || - pages[i] != pages[i - 1] + 1) { - folio = NULL; - break; - } - } - if (folio) { - /* - * The pages are bound to the folio, it doesn't - * actually unpin them but drops all but one reference, - * which is usually put down by io_buffer_unmap(). - * Note, needs a better helper. - */ - unpin_user_pages(&pages[1], nr_pages - 1); - nr_pages = 1; - } - } + /* If it's huge page(s), try to coalesce them into fewer bvec entries */ + coalesced = io_try_coalesce_buffer(&pages, &nr_pages, &data); imu = kvmalloc(struct_size(imu, bvec, nr_pages), GFP_KERNEL); if (!imu) @@ -909,7 +979,6 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov, goto done; } - off = (unsigned long) iov->iov_base & ~PAGE_MASK; size = iov->iov_len; /* store original address for later verification */ imu->ubuf = (unsigned long) iov->iov_base; @@ -917,17 +986,18 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov, imu->nr_bvecs = nr_pages; imu->folio_shift = PAGE_SHIFT; imu->folio_mask = PAGE_MASK; + if (coalesced) { + imu->folio_shift = data.folio_shift; + imu->folio_mask = ~((1UL << data.folio_shift) - 1); + } + off = (unsigned long) iov->iov_base & ~imu->folio_mask; *pimu = imu; ret = 0; - if (folio) { - bvec_set_page(&imu->bvec[0], pages[0], size, off); - goto done; - } for (i = 0; i < nr_pages; i++) { size_t vec_len; - vec_len = min_t(size_t, size, PAGE_SIZE - off); + vec_len = min_t(size_t, size, (1UL << imu->folio_shift) - off); bvec_set_page(&imu->bvec[i], pages[i], vec_len, off); off = 0; size -= vec_len; diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index ee77e53328bf..18242b2e9da4 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -52,6 +52,14 @@ struct io_mapped_ubuf { struct bio_vec bvec[] __counted_by(nr_bvecs); }; +struct io_imu_folio_data { + /* Head folio can be partially included in the fixed buf */ + unsigned int nr_pages_head; + /* For non-head/tail folios, has to be fully included */ + unsigned int nr_pages_mid; + unsigned int folio_shift; +}; + void io_rsrc_node_ref_zero(struct io_rsrc_node *node); void io_rsrc_node_destroy(struct io_ring_ctx *ctx, struct io_rsrc_node *ref_node); struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx); From 7255cd894539a96fefab9180185d268647c7341b Mon Sep 17 00:00:00 2001 From: Olivier Langlois Date: Tue, 30 Jul 2024 16:56:06 -0400 Subject: [PATCH 04/35] io_uring: micro optimization of __io_sq_thread() condition reverse the order of the element evaluation in an if statement. for many users that are not using iopoll, the iopoll_list will always evaluate to false after having made a memory access whereas to_submit is very likely already loaded in a register. Signed-off-by: Olivier Langlois Reviewed-by: Pavel Begunkov Link: https://lore.kernel.org/r/052ca60b5c49e7439e4b8bd33bfab4a09d36d3d6.1722374371.git.olivier@trillion01.com Signed-off-by: Jens Axboe --- io_uring/sqpoll.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c index 3b50dc9586d1..e545bf240d35 100644 --- a/io_uring/sqpoll.c +++ b/io_uring/sqpoll.c @@ -176,7 +176,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE) to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; - if (!wq_list_empty(&ctx->iopoll_list) || to_submit) { + if (to_submit || !wq_list_empty(&ctx->iopoll_list)) { const struct cred *creds = NULL; if (ctx->sq_creds != current_cred()) From 03e02e8f95fee0f45124976993ed2121e2369a12 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 8 Aug 2024 10:33:16 -0600 Subject: [PATCH 05/35] io_uring/kbuf: use 'bl' directly rather than req->buf_list req->buf_list is assigned higher up and is safe to use as we remain within a locked region, as is the 'bl' variable itself from which it was assigned. To improve readability, use 'bl' directly rather than get it from the io_kiocb, if we need to increment the head directly in the buffer selection path. This makes it readily apparent that it's the same io_buffer_list being used. Signed-off-by: Jens Axboe --- io_uring/kbuf.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index 1af2bd56af44..c61dd60113f7 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -298,7 +298,7 @@ int io_buffers_select(struct io_kiocb *req, struct buf_sel_arg *arg, */ if (ret > 0) { req->flags |= REQ_F_BL_NO_RECYCLE; - req->buf_list->head += ret; + bl->head += ret; } } else { ret = io_provided_buffers_select(req, &arg->out_len, bl, arg->iovs); From 566a424212d79b90e3a8fe6b5c7bd8f69174105c Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 8 Aug 2024 10:42:18 -0600 Subject: [PATCH 06/35] io_uring/net: use ITER_UBUF for single segment send maps Just like what is being done on the recv side, if we only map a single segment, then use ITER_UBUF for mapping it. That's more efficient than using an ITER_IOVEC. Signed-off-by: Jens Axboe --- io_uring/net.c | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/io_uring/net.c b/io_uring/net.c index d08abcca89cc..dc83a35b8af4 100644 --- a/io_uring/net.c +++ b/io_uring/net.c @@ -618,14 +618,23 @@ retry_bundle: if (unlikely(ret < 0)) return ret; - sr->len = arg.out_len; - iov_iter_init(&kmsg->msg.msg_iter, ITER_SOURCE, arg.iovs, ret, - arg.out_len); if (arg.iovs != &kmsg->fast_iov && arg.iovs != kmsg->free_iov) { kmsg->free_iov_nr = ret; kmsg->free_iov = arg.iovs; req->flags |= REQ_F_NEED_CLEANUP; } + sr->len = arg.out_len; + + if (ret == 1) { + sr->buf = arg.iovs[0].iov_base; + ret = import_ubuf(ITER_SOURCE, sr->buf, sr->len, + &kmsg->msg.msg_iter); + if (unlikely(ret)) + return ret; + } else { + iov_iter_init(&kmsg->msg.msg_iter, ITER_SOURCE, + arg.iovs, ret, arg.out_len); + } } /* From a69307a55454060b5795e68d249157f2961049c2 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Fri, 9 Aug 2024 10:39:38 -0600 Subject: [PATCH 07/35] io_uring/kbuf: turn io_buffer_list booleans into flags We could just move these two and save some space, but in preparation for adding another flag, turn them into flags first. This saves 8 bytes in struct io_buffer_list, making it exactly half a cacheline on 64-bit archs now rather than 40 bytes. Signed-off-by: Jens Axboe --- io_uring/kbuf.c | 35 +++++++++++++++++------------------ io_uring/kbuf.h | 14 +++++++++----- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index c61dd60113f7..a4bde998f50d 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -189,7 +189,7 @@ void __user *io_buffer_select(struct io_kiocb *req, size_t *len, bl = io_buffer_get_list(ctx, req->buf_index); if (likely(bl)) { - if (bl->is_buf_ring) + if (bl->flags & IOBL_BUF_RING) ret = io_ring_buffer_select(req, len, bl, issue_flags); else ret = io_provided_buffer_select(req, len, bl); @@ -287,7 +287,7 @@ int io_buffers_select(struct io_kiocb *req, struct buf_sel_arg *arg, if (unlikely(!bl)) goto out_unlock; - if (bl->is_buf_ring) { + if (bl->flags & IOBL_BUF_RING) { ret = io_ring_buffers_peek(req, arg, bl); /* * Don't recycle these buffers if we need to go through poll. @@ -320,7 +320,7 @@ int io_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg) if (unlikely(!bl)) return -ENOENT; - if (bl->is_buf_ring) { + if (bl->flags & IOBL_BUF_RING) { ret = io_ring_buffers_peek(req, arg, bl); if (ret > 0) req->flags |= REQ_F_BUFFERS_COMMIT; @@ -340,22 +340,22 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx, if (!nbufs) return 0; - if (bl->is_buf_ring) { + if (bl->flags & IOBL_BUF_RING) { i = bl->buf_ring->tail - bl->head; if (bl->buf_nr_pages) { int j; - if (!bl->is_mmap) { + if (!(bl->flags & IOBL_MMAP)) { for (j = 0; j < bl->buf_nr_pages; j++) unpin_user_page(bl->buf_pages[j]); } io_pages_unmap(bl->buf_ring, &bl->buf_pages, - &bl->buf_nr_pages, bl->is_mmap); - bl->is_mmap = 0; + &bl->buf_nr_pages, bl->flags & IOBL_MMAP); + bl->flags &= ~IOBL_MMAP; } /* make sure it's seen as empty */ INIT_LIST_HEAD(&bl->buf_list); - bl->is_buf_ring = 0; + bl->flags &= ~IOBL_BUF_RING; return i; } @@ -442,7 +442,7 @@ int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags) if (bl) { ret = -EINVAL; /* can't use provide/remove buffers command on mapped buffers */ - if (!bl->is_buf_ring) + if (!(bl->flags & IOBL_BUF_RING)) ret = __io_remove_buffers(ctx, bl, p->nbufs); } io_ring_submit_unlock(ctx, issue_flags); @@ -589,7 +589,7 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags) } } /* can't add buffers via this command for a mapped buffer ring */ - if (bl->is_buf_ring) { + if (bl->flags & IOBL_BUF_RING) { ret = -EINVAL; goto err; } @@ -641,8 +641,8 @@ static int io_pin_pbuf_ring(struct io_uring_buf_reg *reg, bl->buf_pages = pages; bl->buf_nr_pages = nr_pages; bl->buf_ring = br; - bl->is_buf_ring = 1; - bl->is_mmap = 0; + bl->flags |= IOBL_BUF_RING; + bl->flags &= ~IOBL_MMAP; return 0; error_unpin: unpin_user_pages(pages, nr_pages); @@ -665,8 +665,7 @@ static int io_alloc_pbuf_ring(struct io_ring_ctx *ctx, return -ENOMEM; } - bl->is_buf_ring = 1; - bl->is_mmap = 1; + bl->flags |= (IOBL_BUF_RING | IOBL_MMAP); return 0; } @@ -705,7 +704,7 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) bl = io_buffer_get_list(ctx, reg.bgid); if (bl) { /* if mapped buffer ring OR classic exists, don't allow */ - if (bl->is_buf_ring || !list_empty(&bl->buf_list)) + if (bl->flags & IOBL_BUF_RING || !list_empty(&bl->buf_list)) return -EEXIST; } else { free_bl = bl = kzalloc(sizeof(*bl), GFP_KERNEL); @@ -747,7 +746,7 @@ int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) bl = io_buffer_get_list(ctx, reg.bgid); if (!bl) return -ENOENT; - if (!bl->is_buf_ring) + if (!(bl->flags & IOBL_BUF_RING)) return -EINVAL; xa_erase(&ctx->io_bl_xa, bl->bgid); @@ -771,7 +770,7 @@ int io_register_pbuf_status(struct io_ring_ctx *ctx, void __user *arg) bl = io_buffer_get_list(ctx, buf_status.buf_group); if (!bl) return -ENOENT; - if (!bl->is_buf_ring) + if (!(bl->flags & IOBL_BUF_RING)) return -EINVAL; buf_status.head = bl->head; @@ -802,7 +801,7 @@ struct io_buffer_list *io_pbuf_get_bl(struct io_ring_ctx *ctx, bl = xa_load(&ctx->io_bl_xa, bgid); /* must be a mmap'able buffer ring and have pages */ ret = false; - if (bl && bl->is_mmap) + if (bl && bl->flags & IOBL_MMAP) ret = atomic_inc_not_zero(&bl->refs); rcu_read_unlock(); diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h index b90aca3a57fa..2ed141d7662e 100644 --- a/io_uring/kbuf.h +++ b/io_uring/kbuf.h @@ -4,6 +4,13 @@ #include +enum { + /* ring mapped provided buffers */ + IOBL_BUF_RING = 1, + /* ring mapped provided buffers, but mmap'ed by application */ + IOBL_MMAP = 2, +}; + struct io_buffer_list { /* * If ->buf_nr_pages is set, then buf_pages/buf_ring are used. If not, @@ -25,12 +32,9 @@ struct io_buffer_list { __u16 head; __u16 mask; - atomic_t refs; + __u16 flags; - /* ring mapped provided buffers */ - __u8 is_buf_ring; - /* ring mapped provided buffers, but mmap'ed by application */ - __u8 is_mmap; + atomic_t refs; }; struct io_buffer { From 489b80060cf645e958c4755c4b5032f234409f85 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 7 Aug 2024 15:18:11 +0100 Subject: [PATCH 08/35] io_uring/napi: refactor __io_napi_busy_loop() we don't need to set ->napi_prefer_busy_poll if we're not going to poll, do the checks first and all polling preparation after. Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/2ad7ede8cc7905328fc62e8c3805fdb11635ae0b.1723039801.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- io_uring/napi.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/io_uring/napi.c b/io_uring/napi.c index 1de1d4d62925..64fbbceba980 100644 --- a/io_uring/napi.c +++ b/io_uring/napi.c @@ -299,10 +299,11 @@ void __io_napi_adjust_timeout(struct io_ring_ctx *ctx, struct io_wait_queue *iow */ void __io_napi_busy_loop(struct io_ring_ctx *ctx, struct io_wait_queue *iowq) { - iowq->napi_prefer_busy_poll = READ_ONCE(ctx->napi_prefer_busy_poll); + if (ctx->flags & IORING_SETUP_SQPOLL) + return; - if (!(ctx->flags & IORING_SETUP_SQPOLL)) - io_napi_blocking_busy_loop(ctx, iowq); + iowq->napi_prefer_busy_poll = READ_ONCE(ctx->napi_prefer_busy_poll); + io_napi_blocking_busy_loop(ctx, iowq); } /* From d5cce407e4f59b2e08d03e29d2b3c55deacc1d48 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 7 Aug 2024 15:18:12 +0100 Subject: [PATCH 09/35] io_uring/napi: postpone napi timeout adjustment Remove io_napi_adjust_timeout() and move the adjustments out of the common path into __io_napi_busy_loop(). Now the limit it's calculated based on struct io_wait_queue::timeout, for which we query current time another time. The overhead shouldn't be a problem, it's a polling path, however that can be optimised later by additionally saving the delta time value in io_cqring_wait(). Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/88e14686e245b3b42ff90a3c4d70895d48676206.1723039801.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- io_uring/io_uring.c | 1 - io_uring/napi.c | 28 +++++++--------------------- io_uring/napi.h | 16 ---------------- 3 files changed, 7 insertions(+), 38 deletions(-) diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 3942db160f18..9ec07f76ad19 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -2423,7 +2423,6 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, dt = timespec64_to_ktime(ts); iowq.timeout = ktime_add(dt, ktime_get()); - io_napi_adjust_timeout(ctx, &iowq, dt); } if (sig) { diff --git a/io_uring/napi.c b/io_uring/napi.c index 64fbbceba980..d78fcbecdd27 100644 --- a/io_uring/napi.c +++ b/io_uring/napi.c @@ -269,27 +269,6 @@ int io_unregister_napi(struct io_ring_ctx *ctx, void __user *arg) return 0; } -/* - * __io_napi_adjust_timeout() - adjust busy loop timeout - * @ctx: pointer to io-uring context structure - * @iowq: pointer to io wait queue - * @ts: pointer to timespec or NULL - * - * Adjust the busy loop timeout according to timespec and busy poll timeout. - * If the specified NAPI timeout is bigger than the wait timeout, then adjust - * the NAPI timeout accordingly. - */ -void __io_napi_adjust_timeout(struct io_ring_ctx *ctx, struct io_wait_queue *iowq, - ktime_t to_wait) -{ - ktime_t poll_dt = READ_ONCE(ctx->napi_busy_poll_dt); - - if (to_wait) - poll_dt = min(poll_dt, to_wait); - - iowq->napi_busy_poll_dt = poll_dt; -} - /* * __io_napi_busy_loop() - execute busy poll loop * @ctx: pointer to io-uring context structure @@ -302,6 +281,13 @@ void __io_napi_busy_loop(struct io_ring_ctx *ctx, struct io_wait_queue *iowq) if (ctx->flags & IORING_SETUP_SQPOLL) return; + iowq->napi_busy_poll_dt = READ_ONCE(ctx->napi_busy_poll_dt); + if (iowq->timeout != KTIME_MAX) { + ktime_t dt = ktime_sub(iowq->timeout, ktime_get()); + + iowq->napi_busy_poll_dt = min_t(u64, iowq->napi_busy_poll_dt, dt); + } + iowq->napi_prefer_busy_poll = READ_ONCE(ctx->napi_prefer_busy_poll); io_napi_blocking_busy_loop(ctx, iowq); } diff --git a/io_uring/napi.h b/io_uring/napi.h index 27b88c3eb428..fd275ef0456d 100644 --- a/io_uring/napi.h +++ b/io_uring/napi.h @@ -17,8 +17,6 @@ int io_unregister_napi(struct io_ring_ctx *ctx, void __user *arg); void __io_napi_add(struct io_ring_ctx *ctx, struct socket *sock); -void __io_napi_adjust_timeout(struct io_ring_ctx *ctx, - struct io_wait_queue *iowq, ktime_t to_wait); void __io_napi_busy_loop(struct io_ring_ctx *ctx, struct io_wait_queue *iowq); int io_napi_sqpoll_busy_poll(struct io_ring_ctx *ctx); @@ -27,15 +25,6 @@ static inline bool io_napi(struct io_ring_ctx *ctx) return !list_empty(&ctx->napi_list); } -static inline void io_napi_adjust_timeout(struct io_ring_ctx *ctx, - struct io_wait_queue *iowq, - ktime_t to_wait) -{ - if (!io_napi(ctx)) - return; - __io_napi_adjust_timeout(ctx, iowq, to_wait); -} - static inline void io_napi_busy_loop(struct io_ring_ctx *ctx, struct io_wait_queue *iowq) { @@ -86,11 +75,6 @@ static inline bool io_napi(struct io_ring_ctx *ctx) static inline void io_napi_add(struct io_kiocb *req) { } -static inline void io_napi_adjust_timeout(struct io_ring_ctx *ctx, - struct io_wait_queue *iowq, - ktime_t to_wait) -{ -} static inline void io_napi_busy_loop(struct io_ring_ctx *ctx, struct io_wait_queue *iowq) { From d29cb3726f03cdac7889f0109a7cb84f79e168a8 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 7 Aug 2024 15:18:13 +0100 Subject: [PATCH 10/35] io_uring: add absolute mode wait timeouts In addition to current relative timeouts for the waiting loop, where the timespec argument specifies the maximum time it can wait for, add support for the absolute mode, with the value carrying a CLOCK_MONOTONIC absolute time until which we should return control back to the user. Suggested-by: Lewis Baker Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/4d5b74d67ada882590b2e42aa3aa7117bbf6b55f.1723039801.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- include/uapi/linux/io_uring.h | 1 + io_uring/io_uring.c | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index adc2524fd8e3..6a81f55fcd0d 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -507,6 +507,7 @@ struct io_cqring_offsets { #define IORING_ENTER_SQ_WAIT (1U << 2) #define IORING_ENTER_EXT_ARG (1U << 3) #define IORING_ENTER_REGISTERED_RING (1U << 4) +#define IORING_ENTER_ABS_TIMER (1U << 5) /* * Passed in for io_uring_setup(2). Copied back with updated info on success diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 9ec07f76ad19..5282f9887440 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -2387,7 +2387,7 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, * Wait until events become available, if we don't already have some. The * application must reap them itself, as they reside on the shared cq ring. */ -static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, +static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, const sigset_t __user *sig, size_t sigsz, struct __kernel_timespec __user *uts) { @@ -2416,13 +2416,13 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, if (uts) { struct timespec64 ts; - ktime_t dt; if (get_timespec64(&ts, uts)) return -EFAULT; - dt = timespec64_to_ktime(ts); - iowq.timeout = ktime_add(dt, ktime_get()); + iowq.timeout = timespec64_to_ktime(ts); + if (!(flags & IORING_ENTER_ABS_TIMER)) + iowq.timeout = ktime_add(iowq.timeout, ktime_get()); } if (sig) { @@ -3153,7 +3153,8 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP | IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG | - IORING_ENTER_REGISTERED_RING))) + IORING_ENTER_REGISTERED_RING | + IORING_ENTER_ABS_TIMER))) return -EINVAL; /* @@ -3251,8 +3252,8 @@ iopoll_locked: if (likely(!ret2)) { min_complete = min(min_complete, ctx->cq_entries); - ret2 = io_cqring_wait(ctx, min_complete, sig, - argsz, ts); + ret2 = io_cqring_wait(ctx, min_complete, flags, + sig, argsz, ts); } } From 2b8e976b984278edbeab3251d370e76d237699f9 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 7 Aug 2024 15:18:14 +0100 Subject: [PATCH 11/35] io_uring: user registered clockid for wait timeouts Add a new registration opcode IORING_REGISTER_CLOCK, which allows the user to select which clock id it wants to use with CQ waiting timeouts. It only allows a subset of all posix clocks and currently supports CLOCK_MONOTONIC and CLOCK_BOOTTIME. Suggested-by: Lewis Baker Signed-off-by: Pavel Begunkov Link: https://lore.kernel.org/r/98f2bc8a3c36cdf8f0e6a275245e81e903459703.1723039801.git.asml.silence@gmail.com Signed-off-by: Jens Axboe --- include/linux/io_uring_types.h | 3 +++ include/uapi/linux/io_uring.h | 7 +++++++ io_uring/io_uring.c | 8 ++++++-- io_uring/io_uring.h | 8 ++++++++ io_uring/napi.c | 2 +- io_uring/register.c | 31 +++++++++++++++++++++++++++++++ 6 files changed, 56 insertions(+), 3 deletions(-) diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 3315005df117..4b9ba523978d 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -239,6 +239,9 @@ struct io_ring_ctx { struct io_rings *rings; struct percpu_ref refs; + clockid_t clockid; + enum tk_offsets clock_offset; + enum task_work_notify_mode notify_method; unsigned sq_thread_idle; } ____cacheline_aligned_in_smp; diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 6a81f55fcd0d..7af716136df9 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -596,6 +596,8 @@ enum io_uring_register_op { IORING_REGISTER_NAPI = 27, IORING_UNREGISTER_NAPI = 28, + IORING_REGISTER_CLOCK = 29, + /* this goes last */ IORING_REGISTER_LAST, @@ -676,6 +678,11 @@ struct io_uring_restriction { __u32 resv2[3]; }; +struct io_uring_clock_register { + __u32 clockid; + __u32 __resv[3]; +}; + struct io_uring_buf { __u64 addr; __u32 len; diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 5282f9887440..20229e72b65c 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -2377,7 +2377,8 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, ret = 0; if (iowq->timeout == KTIME_MAX) schedule(); - else if (!schedule_hrtimeout(&iowq->timeout, HRTIMER_MODE_ABS)) + else if (!schedule_hrtimeout_range_clock(&iowq->timeout, 0, + HRTIMER_MODE_ABS, ctx->clockid)) ret = -ETIME; current->in_iowait = 0; return ret; @@ -2422,7 +2423,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, iowq.timeout = timespec64_to_ktime(ts); if (!(flags & IORING_ENTER_ABS_TIMER)) - iowq.timeout = ktime_add(iowq.timeout, ktime_get()); + iowq.timeout = ktime_add(iowq.timeout, io_get_time(ctx)); } if (sig) { @@ -3424,6 +3425,9 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, if (!ctx) return -ENOMEM; + ctx->clockid = CLOCK_MONOTONIC; + ctx->clock_offset = 0; + if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && !(ctx->flags & IORING_SETUP_IOPOLL) && !(ctx->flags & IORING_SETUP_SQPOLL)) diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index c2acf6180845..9935819f12b7 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -437,6 +437,14 @@ static inline bool io_file_can_poll(struct io_kiocb *req) return false; } +static inline ktime_t io_get_time(struct io_ring_ctx *ctx) +{ + if (ctx->clockid == CLOCK_MONOTONIC) + return ktime_get(); + + return ktime_get_with_offset(ctx->clock_offset); +} + enum { IO_CHECK_CQ_OVERFLOW_BIT, IO_CHECK_CQ_DROPPED_BIT, diff --git a/io_uring/napi.c b/io_uring/napi.c index d78fcbecdd27..d0cf694d0172 100644 --- a/io_uring/napi.c +++ b/io_uring/napi.c @@ -283,7 +283,7 @@ void __io_napi_busy_loop(struct io_ring_ctx *ctx, struct io_wait_queue *iowq) iowq->napi_busy_poll_dt = READ_ONCE(ctx->napi_busy_poll_dt); if (iowq->timeout != KTIME_MAX) { - ktime_t dt = ktime_sub(iowq->timeout, ktime_get()); + ktime_t dt = ktime_sub(iowq->timeout, io_get_time(ctx)); iowq->napi_busy_poll_dt = min_t(u64, iowq->napi_busy_poll_dt, dt); } diff --git a/io_uring/register.c b/io_uring/register.c index e3c20be5a198..57cb85c42526 100644 --- a/io_uring/register.c +++ b/io_uring/register.c @@ -335,6 +335,31 @@ err: return ret; } +static int io_register_clock(struct io_ring_ctx *ctx, + struct io_uring_clock_register __user *arg) +{ + struct io_uring_clock_register reg; + + if (copy_from_user(®, arg, sizeof(reg))) + return -EFAULT; + if (memchr_inv(®.__resv, 0, sizeof(reg.__resv))) + return -EINVAL; + + switch (reg.clockid) { + case CLOCK_MONOTONIC: + ctx->clock_offset = 0; + break; + case CLOCK_BOOTTIME: + ctx->clock_offset = TK_OFFS_BOOT; + break; + default: + return -EINVAL; + } + + ctx->clockid = reg.clockid; + return 0; +} + static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, void __user *arg, unsigned nr_args) __releases(ctx->uring_lock) @@ -511,6 +536,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; ret = io_unregister_napi(ctx, arg); break; + case IORING_REGISTER_CLOCK: + ret = -EINVAL; + if (!arg || nr_args) + break; + ret = io_register_clock(ctx, arg); + break; default: ret = -EINVAL; break; From f42b58e44802b0280a452a33fbeb37fee5b6318f Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 15 Aug 2024 16:13:32 -0600 Subject: [PATCH 12/35] io_uring: encapsulate extraneous wait flags into a separate struct Rather than need to pass in 2 or 3 separate arguments, add a struct to encapsulate the timeout and sigset_t parts of waiting. In preparation for adding another argument for waiting. Signed-off-by: Jens Axboe --- io_uring/io_uring.c | 45 ++++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 20229e72b65c..37053d32c668 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -2384,13 +2384,18 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, return ret; } +struct ext_arg { + size_t argsz; + struct __kernel_timespec __user *ts; + const sigset_t __user *sig; +}; + /* * Wait until events become available, if we don't already have some. The * application must reap them itself, as they reside on the shared cq ring. */ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, - const sigset_t __user *sig, size_t sigsz, - struct __kernel_timespec __user *uts) + struct ext_arg *ext_arg) { struct io_wait_queue iowq; struct io_rings *rings = ctx->rings; @@ -2415,10 +2420,10 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events; iowq.timeout = KTIME_MAX; - if (uts) { + if (ext_arg->ts) { struct timespec64 ts; - if (get_timespec64(&ts, uts)) + if (get_timespec64(&ts, ext_arg->ts)) return -EFAULT; iowq.timeout = timespec64_to_ktime(ts); @@ -2426,14 +2431,14 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, iowq.timeout = ktime_add(iowq.timeout, io_get_time(ctx)); } - if (sig) { + if (ext_arg->sig) { #ifdef CONFIG_COMPAT if (in_compat_syscall()) - ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig, - sigsz); + ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig, + ext_arg->argsz); else #endif - ret = set_user_sigmask(sig, sigsz); + ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz); if (ret) return ret; @@ -3112,9 +3117,8 @@ static int io_validate_ext_arg(unsigned flags, const void __user *argp, size_t a return 0; } -static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz, - struct __kernel_timespec __user **ts, - const sigset_t __user **sig) +static int io_get_ext_arg(unsigned flags, const void __user *argp, + struct ext_arg *ext_arg) { struct io_uring_getevents_arg arg; @@ -3123,8 +3127,8 @@ static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz * is just a pointer to the sigset_t. */ if (!(flags & IORING_ENTER_EXT_ARG)) { - *sig = (const sigset_t __user *) argp; - *ts = NULL; + ext_arg->sig = (const sigset_t __user *) argp; + ext_arg->ts = NULL; return 0; } @@ -3132,15 +3136,15 @@ static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz * EXT_ARG is set - ensure we agree on the size of it and copy in our * timespec and sigset_t pointers if good. */ - if (*argsz != sizeof(arg)) + if (ext_arg->argsz != sizeof(arg)) return -EINVAL; if (copy_from_user(&arg, argp, sizeof(arg))) return -EFAULT; if (arg.pad) return -EINVAL; - *sig = u64_to_user_ptr(arg.sigmask); - *argsz = arg.sigmask_sz; - *ts = u64_to_user_ptr(arg.ts); + ext_arg->sig = u64_to_user_ptr(arg.sigmask); + ext_arg->argsz = arg.sigmask_sz; + ext_arg->ts = u64_to_user_ptr(arg.ts); return 0; } @@ -3246,15 +3250,14 @@ iopoll_locked: } mutex_unlock(&ctx->uring_lock); } else { - const sigset_t __user *sig; - struct __kernel_timespec __user *ts; + struct ext_arg ext_arg = { .argsz = argsz }; - ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig); + ret2 = io_get_ext_arg(flags, argp, &ext_arg); if (likely(!ret2)) { min_complete = min(min_complete, ctx->cq_entries); ret2 = io_cqring_wait(ctx, min_complete, flags, - sig, argsz, ts); + &ext_arg); } } From 45a41e74b8f472254c64b42713bad0686350b0c6 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 4 Jan 2024 08:02:59 -0700 Subject: [PATCH 13/35] io_uring: move schedule wait logic into helper In preparation for expanding how we handle waits, move the actual schedule and schedule_timeout() handling into a helper. Signed-off-by: Jens Axboe --- io_uring/io_uring.c | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 37053d32c668..9e2b8d4c05db 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -2350,12 +2350,31 @@ static bool current_pending_io(void) return percpu_counter_read_positive(&tctx->inflight); } -/* when returns >0, the caller should retry */ +static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, + struct io_wait_queue *iowq) +{ + int ret = 0; + + /* + * Mark us as being in io_wait if we have pending requests, so cpufreq + * can take into account that the task is waiting for IO - turns out + * to be important for low QD IO. + */ + if (current_pending_io()) + current->in_iowait = 1; + if (iowq->timeout == KTIME_MAX) + schedule(); + else if (!schedule_hrtimeout_range_clock(&iowq->timeout, 0, + HRTIMER_MODE_ABS, ctx->clockid)) + ret = -ETIME; + current->in_iowait = 0; + return ret; +} + +/* If this returns > 0, the caller should retry */ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, struct io_wait_queue *iowq) { - int ret; - if (unlikely(READ_ONCE(ctx->check_cq))) return 1; if (unlikely(!llist_empty(&ctx->work_llist))) @@ -2367,21 +2386,7 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, if (unlikely(io_should_wake(iowq))) return 0; - /* - * Mark us as being in io_wait if we have pending requests, so cpufreq - * can take into account that the task is waiting for IO - turns out - * to be important for low QD IO. - */ - if (current_pending_io()) - current->in_iowait = 1; - ret = 0; - if (iowq->timeout == KTIME_MAX) - schedule(); - else if (!schedule_hrtimeout_range_clock(&iowq->timeout, 0, - HRTIMER_MODE_ABS, ctx->clockid)) - ret = -ETIME; - current->in_iowait = 0; - return ret; + return __io_cqring_wait_schedule(ctx, iowq); } struct ext_arg { From cebf123c634ab78d39af94caf0fc9cd2c60d82c3 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 4 Jan 2024 08:46:23 -0700 Subject: [PATCH 14/35] io_uring: implement our own schedule timeout handling In preparation for having two distinct timeouts and avoid waking the task if we don't need to. Signed-off-by: Jens Axboe --- io_uring/io_uring.c | 35 +++++++++++++++++++++++++++++++---- io_uring/io_uring.h | 2 ++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 9e2b8d4c05db..c443bac8bad8 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -2350,6 +2350,34 @@ static bool current_pending_io(void) return percpu_counter_read_positive(&tctx->inflight); } +static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer) +{ + struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); + + WRITE_ONCE(iowq->hit_timeout, 1); + wake_up_process(iowq->wq.private); + return HRTIMER_NORESTART; +} + +static int io_cqring_schedule_timeout(struct io_wait_queue *iowq, + clockid_t clock_id) +{ + iowq->hit_timeout = 0; + hrtimer_init_on_stack(&iowq->t, clock_id, HRTIMER_MODE_ABS); + iowq->t.function = io_cqring_timer_wakeup; + hrtimer_set_expires_range_ns(&iowq->t, iowq->timeout, 0); + hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS); + + if (!READ_ONCE(iowq->hit_timeout)) + schedule(); + + hrtimer_cancel(&iowq->t); + destroy_hrtimer_on_stack(&iowq->t); + __set_current_state(TASK_RUNNING); + + return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0; +} + static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, struct io_wait_queue *iowq) { @@ -2362,11 +2390,10 @@ static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, */ if (current_pending_io()) current->in_iowait = 1; - if (iowq->timeout == KTIME_MAX) + if (iowq->timeout != KTIME_MAX) + ret = io_cqring_schedule_timeout(iowq, ctx->clockid); + else schedule(); - else if (!schedule_hrtimeout_range_clock(&iowq->timeout, 0, - HRTIMER_MODE_ABS, ctx->clockid)) - ret = -ETIME; current->in_iowait = 0; return ret; } diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 9935819f12b7..f95c1b080f4b 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -40,7 +40,9 @@ struct io_wait_queue { struct io_ring_ctx *ctx; unsigned cq_tail; unsigned nr_timeouts; + int hit_timeout; ktime_t timeout; + struct hrtimer t; #ifdef CONFIG_NET_RX_BUSY_POLL ktime_t napi_busy_poll_dt; From 1100c4a2656d444785024cd9b585298729eff136 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 4 Jan 2024 10:17:54 -0700 Subject: [PATCH 15/35] io_uring: add support for batch wait timeout Waiting for events with io_uring has two knobs that can be set: 1) The number of events to wake for 2) The timeout associated with the event Waiting will abort when either of those conditions are met, as expected. This adds support for a third event, which is associated with the number of events to wait for. Applications generally like to handle batches of completions, and right now they'd set a number of events to wait for and the timeout for that. If no events have been received but the timeout triggers, control is returned to the application and it can wait again. However, if the application doesn't have anything to do until events are reaped, then it's possible to make this waiting more efficient. For example, the application may have a latency time of 50 usecs and wanting to handle a batch of 8 requests at the time. If it uses 50 usecs as the timeout, then it'll be doing 20K context switches per second even if nothing is happening. This introduces the notion of min batch wait time. If the min batch wait time expires, then we'll return to userspace if we have any events at all. If none are available, the general wait time is applied. Any request arriving after the min batch wait time will cause waiting to stop and return control to the application. Signed-off-by: Jens Axboe --- io_uring/io_uring.c | 95 ++++++++++++++++++++++++++++++++++++++------- io_uring/io_uring.h | 2 + 2 files changed, 83 insertions(+), 14 deletions(-) diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index c443bac8bad8..968cd5fb3f79 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -2355,17 +2355,70 @@ static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer) struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); WRITE_ONCE(iowq->hit_timeout, 1); + iowq->min_timeout = 0; wake_up_process(iowq->wq.private); return HRTIMER_NORESTART; } -static int io_cqring_schedule_timeout(struct io_wait_queue *iowq, - clockid_t clock_id) +/* + * Doing min_timeout portion. If we saw any timeouts, events, or have work, + * wake up. If not, and we have a normal timeout, switch to that and keep + * sleeping. + */ +static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) { - iowq->hit_timeout = 0; - hrtimer_init_on_stack(&iowq->t, clock_id, HRTIMER_MODE_ABS); + struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); + struct io_ring_ctx *ctx = iowq->ctx; + + /* no general timeout, or shorter (or equal), we are done */ + if (iowq->timeout == KTIME_MAX || + ktime_compare(iowq->min_timeout, iowq->timeout) >= 0) + goto out_wake; + /* work we may need to run, wake function will see if we need to wake */ + if (io_has_work(ctx)) + goto out_wake; + /* got events since we started waiting, min timeout is done */ + if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail)) + goto out_wake; + /* if we have any events and min timeout expired, we're done */ + if (io_cqring_events(ctx)) + goto out_wake; + + /* + * If using deferred task_work running and application is waiting on + * more than one request, ensure we reset it now where we are switching + * to normal sleeps. Any request completion post min_wait should wake + * the task and return. + */ + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { + atomic_set(&ctx->cq_wait_nr, 1); + smp_mb(); + if (!llist_empty(&ctx->work_llist)) + goto out_wake; + } + iowq->t.function = io_cqring_timer_wakeup; - hrtimer_set_expires_range_ns(&iowq->t, iowq->timeout, 0); + hrtimer_set_expires(timer, iowq->timeout); + return HRTIMER_RESTART; +out_wake: + return io_cqring_timer_wakeup(timer); +} + +static int io_cqring_schedule_timeout(struct io_wait_queue *iowq, + clockid_t clock_id, ktime_t start_time) +{ + ktime_t timeout; + + hrtimer_init_on_stack(&iowq->t, clock_id, HRTIMER_MODE_ABS); + if (iowq->min_timeout) { + timeout = ktime_add_ns(iowq->min_timeout, start_time); + iowq->t.function = io_cqring_min_timer_wakeup; + } else { + timeout = iowq->timeout; + iowq->t.function = io_cqring_timer_wakeup; + } + + hrtimer_set_expires_range_ns(&iowq->t, timeout, 0); hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS); if (!READ_ONCE(iowq->hit_timeout)) @@ -2379,7 +2432,8 @@ static int io_cqring_schedule_timeout(struct io_wait_queue *iowq, } static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, - struct io_wait_queue *iowq) + struct io_wait_queue *iowq, + ktime_t start_time) { int ret = 0; @@ -2390,8 +2444,8 @@ static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, */ if (current_pending_io()) current->in_iowait = 1; - if (iowq->timeout != KTIME_MAX) - ret = io_cqring_schedule_timeout(iowq, ctx->clockid); + if (iowq->timeout != KTIME_MAX || iowq->min_timeout) + ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time); else schedule(); current->in_iowait = 0; @@ -2400,7 +2454,8 @@ static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, /* If this returns > 0, the caller should retry */ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, - struct io_wait_queue *iowq) + struct io_wait_queue *iowq, + ktime_t start_time) { if (unlikely(READ_ONCE(ctx->check_cq))) return 1; @@ -2413,7 +2468,7 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, if (unlikely(io_should_wake(iowq))) return 0; - return __io_cqring_wait_schedule(ctx, iowq); + return __io_cqring_wait_schedule(ctx, iowq, start_time); } struct ext_arg { @@ -2431,6 +2486,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, { struct io_wait_queue iowq; struct io_rings *rings = ctx->rings; + ktime_t start_time; int ret; if (!io_allowed_run_tw(ctx)) @@ -2448,9 +2504,13 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, iowq.wq.private = current; INIT_LIST_HEAD(&iowq.wq.entry); iowq.ctx = ctx; - iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events; + iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail); + iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); + iowq.hit_timeout = 0; + iowq.min_timeout = 0; iowq.timeout = KTIME_MAX; + start_time = io_get_time(ctx); if (ext_arg->ts) { struct timespec64 ts; @@ -2460,7 +2520,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, iowq.timeout = timespec64_to_ktime(ts); if (!(flags & IORING_ENTER_ABS_TIMER)) - iowq.timeout = ktime_add(iowq.timeout, io_get_time(ctx)); + iowq.timeout = ktime_add(iowq.timeout, start_time); } if (ext_arg->sig) { @@ -2480,8 +2540,15 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, trace_io_uring_cqring_wait(ctx, min_events); do { - int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail); unsigned long check_cq; + int nr_wait; + + /* if min timeout has been hit, don't reset wait count */ + if (!iowq.hit_timeout) + nr_wait = (int) iowq.cq_tail - + READ_ONCE(ctx->rings->cq.tail); + else + nr_wait = 1; if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { atomic_set(&ctx->cq_wait_nr, nr_wait); @@ -2491,7 +2558,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, TASK_INTERRUPTIBLE); } - ret = io_cqring_wait_schedule(ctx, &iowq); + ret = io_cqring_wait_schedule(ctx, &iowq, start_time); __set_current_state(TASK_RUNNING); atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT); diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index f95c1b080f4b..65078e641390 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -39,8 +39,10 @@ struct io_wait_queue { struct wait_queue_entry wq; struct io_ring_ctx *ctx; unsigned cq_tail; + unsigned cq_min_tail; unsigned nr_timeouts; int hit_timeout; + ktime_t min_timeout; ktime_t timeout; struct hrtimer t; From 7ed9e09e2d13d5d43385153bba4734cb0eafd7fd Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 4 Jan 2024 10:46:30 -0700 Subject: [PATCH 16/35] io_uring: wire up min batch wake timeout Expose min_wait_usec in io_uring_getevents_arg, replacing the pad member that is currently in there. The value is in usecs, which is explained in the name as well. Note that if min_wait_usec and a normal timeout is used in conjunction, the normal timeout is still relative to the base time. For example, if min_wait_usec is set to 100 and the normal timeout is 1000, the max total time waited is still 1000. This also means that if the normal timeout is shorter than min_wait_usec, then only the min_wait_usec will take effect. See previous commit for an explanation of how this works. IORING_FEAT_MIN_TIMEOUT is added as a feature flag for this, as applications doing submit_and_wait_timeout() style operations will generally not see the -EINVAL from the wait side as they return the number of IOs submitted. Only if no IOs are submitted will the -EINVAL bubble back up to the application. Signed-off-by: Jens Axboe --- include/uapi/linux/io_uring.h | 3 ++- io_uring/io_uring.c | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 7af716136df9..042eab793e26 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -543,6 +543,7 @@ struct io_uring_params { #define IORING_FEAT_LINKED_FILE (1U << 12) #define IORING_FEAT_REG_REG_RING (1U << 13) #define IORING_FEAT_RECVSEND_BUNDLE (1U << 14) +#define IORING_FEAT_MIN_TIMEOUT (1U << 15) /* * io_uring_register(2) opcodes and arguments @@ -766,7 +767,7 @@ enum io_uring_register_restriction_op { struct io_uring_getevents_arg { __u64 sigmask; __u32 sigmask_sz; - __u32 pad; + __u32 min_wait_usec; __u64 ts; }; diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 968cd5fb3f79..80bb6e2374e9 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -2475,6 +2475,7 @@ struct ext_arg { size_t argsz; struct __kernel_timespec __user *ts; const sigset_t __user *sig; + ktime_t min_time; }; /* @@ -2508,7 +2509,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail); iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); iowq.hit_timeout = 0; - iowq.min_timeout = 0; + iowq.min_timeout = ext_arg->min_time; iowq.timeout = KTIME_MAX; start_time = io_get_time(ctx); @@ -3239,8 +3240,7 @@ static int io_get_ext_arg(unsigned flags, const void __user *argp, return -EINVAL; if (copy_from_user(&arg, argp, sizeof(arg))) return -EFAULT; - if (arg.pad) - return -EINVAL; + ext_arg->min_time = arg.min_wait_usec * NSEC_PER_USEC; ext_arg->sig = u64_to_user_ptr(arg.sigmask); ext_arg->argsz = arg.sigmask_sz; ext_arg->ts = u64_to_user_ptr(arg.ts); @@ -3641,7 +3641,7 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS | IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP | IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING | - IORING_FEAT_RECVSEND_BUNDLE; + IORING_FEAT_RECVSEND_BUNDLE | IORING_FEAT_MIN_TIMEOUT; if (copy_to_user(params, p, sizeof(*p))) { ret = -EFAULT; From 120443321dfaaab8eb9290af617abcc37734c1e2 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 8 Aug 2024 12:54:55 -0600 Subject: [PATCH 17/35] io_uring/kbuf: shrink nr_iovs/mode in struct buf_sel_arg nr_iovs is capped at 1024, and mode only has a few low values. We can safely make them u16, in preparation for adding a few more members. Signed-off-by: Jens Axboe --- io_uring/kbuf.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h index 2ed141d7662e..ab30aa13fb5e 100644 --- a/io_uring/kbuf.h +++ b/io_uring/kbuf.h @@ -56,8 +56,8 @@ struct buf_sel_arg { struct iovec *iovs; size_t out_len; size_t max_len; - int nr_iovs; - int mode; + unsigned short nr_iovs; + unsigned short mode; }; void __user *io_buffer_select(struct io_kiocb *req, size_t *len, From ecd5c9b29643f383d39320e30d21b8615bd893da Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Mon, 12 Aug 2024 09:18:35 -0600 Subject: [PATCH 18/35] io_uring/kbuf: add io_kbuf_commit() helper Committing the selected ring buffer is currently done in three different spots, combine it into a helper and just call that. Signed-off-by: Jens Axboe --- io_uring/kbuf.c | 7 +++---- io_uring/kbuf.h | 14 ++++++++++---- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index a4bde998f50d..c69f69807885 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -171,9 +171,8 @@ static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len, * the transfer completes (or if we get -EAGAIN and must poll of * retry). */ - req->flags &= ~REQ_F_BUFFERS_COMMIT; + io_kbuf_commit(req, bl, 1); req->buf_list = NULL; - bl->head++; } return u64_to_user_ptr(buf->addr); } @@ -297,8 +296,8 @@ int io_buffers_select(struct io_kiocb *req, struct buf_sel_arg *arg, * committed them, they cannot be put back in the queue. */ if (ret > 0) { - req->flags |= REQ_F_BL_NO_RECYCLE; - bl->head += ret; + req->flags |= REQ_F_BUFFERS_COMMIT | REQ_F_BL_NO_RECYCLE; + io_kbuf_commit(req, bl, ret); } } else { ret = io_provided_buffers_select(req, &arg->out_len, bl, arg->iovs); diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h index ab30aa13fb5e..43c7b18244b3 100644 --- a/io_uring/kbuf.h +++ b/io_uring/kbuf.h @@ -121,15 +121,21 @@ static inline bool io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags) return false; } +static inline void io_kbuf_commit(struct io_kiocb *req, + struct io_buffer_list *bl, int nr) +{ + if (unlikely(!(req->flags & REQ_F_BUFFERS_COMMIT))) + return; + bl->head += nr; + req->flags &= ~REQ_F_BUFFERS_COMMIT; +} + static inline void __io_put_kbuf_ring(struct io_kiocb *req, int nr) { struct io_buffer_list *bl = req->buf_list; if (bl) { - if (req->flags & REQ_F_BUFFERS_COMMIT) { - bl->head += nr; - req->flags &= ~REQ_F_BUFFERS_COMMIT; - } + io_kbuf_commit(req, bl, nr); req->buf_index = bl->bgid; } req->flags &= ~REQ_F_BUFFER_RING; From 2c8fa70bf3e981193ecda0eedf2100f933ef7085 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Mon, 12 Aug 2024 09:25:36 -0600 Subject: [PATCH 19/35] io_uring/kbuf: move io_ring_head_to_buf() to kbuf.h In preparation for using this helper in kbuf.h as well, move it there and turn it into a macro. Signed-off-by: Jens Axboe --- io_uring/kbuf.c | 6 ------ io_uring/kbuf.h | 3 +++ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index c69f69807885..297c1d2c3c27 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -132,12 +132,6 @@ static int io_provided_buffers_select(struct io_kiocb *req, size_t *len, return 0; } -static struct io_uring_buf *io_ring_head_to_buf(struct io_uring_buf_ring *br, - __u16 head, __u16 mask) -{ - return &br->bufs[head & mask]; -} - static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len, struct io_buffer_list *bl, unsigned int issue_flags) diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h index 43c7b18244b3..4c34ff3144b9 100644 --- a/io_uring/kbuf.h +++ b/io_uring/kbuf.h @@ -121,6 +121,9 @@ static inline bool io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags) return false; } +/* Mapped buffer ring, return io_uring_buf from head */ +#define io_ring_head_to_buf(br, head, mask) &(br)->bufs[(head) & (mask)] + static inline void io_kbuf_commit(struct io_kiocb *req, struct io_buffer_list *bl, int nr) { From 641a6816795b208aa7ccac751acaae580897db10 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 20 Aug 2024 07:22:53 -0600 Subject: [PATCH 20/35] Revert "io_uring: Require zeroed sqe->len on provided-buffers send" This reverts commit 79996b45f7b28c0e3e08a95bab80119e95317e28. Revert the change that restricts a send provided buffer to be zero, so it will always consume the whole buffer. This is strictly needed for partial consumption, as the send may very well be a subset of the current buffer. In fact, that's the intended use case. For non-incremental provided buffer rings, an application should set sqe->len carefully to avoid the potential issue described in the reverted commit. It is recommended that '0' still be set for len for that case, if the application is set on maintaining more than 1 send inflight for the same socket. This is somewhat of a nonsensical thing to do. Signed-off-by: Jens Axboe --- io_uring/net.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/io_uring/net.c b/io_uring/net.c index dc83a35b8af4..cc81bcacdc1b 100644 --- a/io_uring/net.c +++ b/io_uring/net.c @@ -434,8 +434,6 @@ int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) sr->buf_group = req->buf_index; req->buf_list = NULL; } - if (req->flags & REQ_F_BUFFER_SELECT && sr->len) - return -EINVAL; #ifdef CONFIG_COMPAT if (req->ctx->compat) @@ -599,7 +597,7 @@ retry_bundle: if (io_do_buffer_select(req)) { struct buf_sel_arg arg = { .iovs = &kmsg->fast_iov, - .max_len = INT_MAX, + .max_len = min_not_zero(sr->len, INT_MAX), .nr_iovs = 1, }; From 6733e678ba1226ad0df94f0bb095df121c54d701 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 27 Aug 2024 08:26:07 -0600 Subject: [PATCH 21/35] io_uring/kbuf: pass in 'len' argument for buffer commit In preparation for needing the consumed length, pass in the length being completed. Unused right now, but will be used when it is possible to partially consume a buffer. Signed-off-by: Jens Axboe --- io_uring/io_uring.c | 2 +- io_uring/kbuf.c | 10 +++++----- io_uring/kbuf.h | 33 +++++++++++++++++---------------- io_uring/net.c | 8 ++++---- io_uring/rw.c | 8 ++++---- 5 files changed, 31 insertions(+), 30 deletions(-) diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 80bb6e2374e9..1aca501efaf6 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -904,7 +904,7 @@ void io_req_defer_failed(struct io_kiocb *req, s32 res) lockdep_assert_held(&req->ctx->uring_lock); req_set_fail(req); - io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED)); + io_req_set_res(req, res, io_put_kbuf(req, res, IO_URING_F_UNLOCKED)); if (def->fail) def->fail(req); io_req_complete_defer(req); diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index 297c1d2c3c27..55d01861d8c5 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -70,7 +70,7 @@ bool io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags) return true; } -void __io_put_kbuf(struct io_kiocb *req, unsigned issue_flags) +void __io_put_kbuf(struct io_kiocb *req, int len, unsigned issue_flags) { /* * We can add this buffer back to two lists: @@ -88,12 +88,12 @@ void __io_put_kbuf(struct io_kiocb *req, unsigned issue_flags) struct io_ring_ctx *ctx = req->ctx; spin_lock(&ctx->completion_lock); - __io_put_kbuf_list(req, &ctx->io_buffers_comp); + __io_put_kbuf_list(req, len, &ctx->io_buffers_comp); spin_unlock(&ctx->completion_lock); } else { lockdep_assert_held(&req->ctx->uring_lock); - __io_put_kbuf_list(req, &req->ctx->io_buffers_cache); + __io_put_kbuf_list(req, len, &req->ctx->io_buffers_cache); } } @@ -165,7 +165,7 @@ static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len, * the transfer completes (or if we get -EAGAIN and must poll of * retry). */ - io_kbuf_commit(req, bl, 1); + io_kbuf_commit(req, bl, *len, 1); req->buf_list = NULL; } return u64_to_user_ptr(buf->addr); @@ -291,7 +291,7 @@ int io_buffers_select(struct io_kiocb *req, struct buf_sel_arg *arg, */ if (ret > 0) { req->flags |= REQ_F_BUFFERS_COMMIT | REQ_F_BL_NO_RECYCLE; - io_kbuf_commit(req, bl, ret); + io_kbuf_commit(req, bl, arg->out_len, ret); } } else { ret = io_provided_buffers_select(req, &arg->out_len, bl, arg->iovs); diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h index 4c34ff3144b9..b41e2a0a0505 100644 --- a/io_uring/kbuf.h +++ b/io_uring/kbuf.h @@ -77,7 +77,7 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg); int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg); int io_register_pbuf_status(struct io_ring_ctx *ctx, void __user *arg); -void __io_put_kbuf(struct io_kiocb *req, unsigned issue_flags); +void __io_put_kbuf(struct io_kiocb *req, int len, unsigned issue_flags); bool io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags); @@ -125,7 +125,7 @@ static inline bool io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags) #define io_ring_head_to_buf(br, head, mask) &(br)->bufs[(head) & (mask)] static inline void io_kbuf_commit(struct io_kiocb *req, - struct io_buffer_list *bl, int nr) + struct io_buffer_list *bl, int len, int nr) { if (unlikely(!(req->flags & REQ_F_BUFFERS_COMMIT))) return; @@ -133,22 +133,22 @@ static inline void io_kbuf_commit(struct io_kiocb *req, req->flags &= ~REQ_F_BUFFERS_COMMIT; } -static inline void __io_put_kbuf_ring(struct io_kiocb *req, int nr) +static inline void __io_put_kbuf_ring(struct io_kiocb *req, int len, int nr) { struct io_buffer_list *bl = req->buf_list; if (bl) { - io_kbuf_commit(req, bl, nr); + io_kbuf_commit(req, bl, len, nr); req->buf_index = bl->bgid; } req->flags &= ~REQ_F_BUFFER_RING; } -static inline void __io_put_kbuf_list(struct io_kiocb *req, +static inline void __io_put_kbuf_list(struct io_kiocb *req, int len, struct list_head *list) { if (req->flags & REQ_F_BUFFER_RING) { - __io_put_kbuf_ring(req, 1); + __io_put_kbuf_ring(req, len, 1); } else { req->buf_index = req->kbuf->bgid; list_add(&req->kbuf->list, list); @@ -163,11 +163,12 @@ static inline void io_kbuf_drop(struct io_kiocb *req) if (!(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING))) return; - __io_put_kbuf_list(req, &req->ctx->io_buffers_comp); + /* len == 0 is fine here, non-ring will always drop all of it */ + __io_put_kbuf_list(req, 0, &req->ctx->io_buffers_comp); } -static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int nbufs, - unsigned issue_flags) +static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int len, + int nbufs, unsigned issue_flags) { unsigned int ret; @@ -176,21 +177,21 @@ static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int nbufs, ret = IORING_CQE_F_BUFFER | (req->buf_index << IORING_CQE_BUFFER_SHIFT); if (req->flags & REQ_F_BUFFER_RING) - __io_put_kbuf_ring(req, nbufs); + __io_put_kbuf_ring(req, len, nbufs); else - __io_put_kbuf(req, issue_flags); + __io_put_kbuf(req, len, issue_flags); return ret; } -static inline unsigned int io_put_kbuf(struct io_kiocb *req, +static inline unsigned int io_put_kbuf(struct io_kiocb *req, int len, unsigned issue_flags) { - return __io_put_kbufs(req, 1, issue_flags); + return __io_put_kbufs(req, len, 1, issue_flags); } -static inline unsigned int io_put_kbufs(struct io_kiocb *req, int nbufs, - unsigned issue_flags) +static inline unsigned int io_put_kbufs(struct io_kiocb *req, int len, + int nbufs, unsigned issue_flags) { - return __io_put_kbufs(req, nbufs, issue_flags); + return __io_put_kbufs(req, len, nbufs, issue_flags); } #endif diff --git a/io_uring/net.c b/io_uring/net.c index cc81bcacdc1b..f10f5a22d66a 100644 --- a/io_uring/net.c +++ b/io_uring/net.c @@ -497,11 +497,11 @@ static inline bool io_send_finish(struct io_kiocb *req, int *ret, unsigned int cflags; if (!(sr->flags & IORING_RECVSEND_BUNDLE)) { - cflags = io_put_kbuf(req, issue_flags); + cflags = io_put_kbuf(req, *ret, issue_flags); goto finish; } - cflags = io_put_kbufs(req, io_bundle_nbufs(kmsg, *ret), issue_flags); + cflags = io_put_kbufs(req, *ret, io_bundle_nbufs(kmsg, *ret), issue_flags); if (bundle_finished || req->flags & REQ_F_BL_EMPTY) goto finish; @@ -842,13 +842,13 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret, cflags |= IORING_CQE_F_SOCK_NONEMPTY; if (sr->flags & IORING_RECVSEND_BUNDLE) { - cflags |= io_put_kbufs(req, io_bundle_nbufs(kmsg, *ret), + cflags |= io_put_kbufs(req, *ret, io_bundle_nbufs(kmsg, *ret), issue_flags); /* bundle with no more immediate buffers, we're done */ if (req->flags & REQ_F_BL_EMPTY) goto finish; } else { - cflags |= io_put_kbuf(req, issue_flags); + cflags |= io_put_kbuf(req, *ret, issue_flags); } /* diff --git a/io_uring/rw.c b/io_uring/rw.c index c004d21e2f12..f5e0694538b9 100644 --- a/io_uring/rw.c +++ b/io_uring/rw.c @@ -511,7 +511,7 @@ void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts) io_req_io_end(req); if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) - req->cqe.flags |= io_put_kbuf(req, 0); + req->cqe.flags |= io_put_kbuf(req, req->cqe.res, 0); io_req_rw_cleanup(req, 0); io_req_task_complete(req, ts); @@ -593,7 +593,7 @@ static int kiocb_done(struct io_kiocb *req, ssize_t ret, */ io_req_io_end(req); io_req_set_res(req, final_ret, - io_put_kbuf(req, issue_flags)); + io_put_kbuf(req, ret, issue_flags)); io_req_rw_cleanup(req, issue_flags); return IOU_OK; } @@ -975,7 +975,7 @@ int io_read_mshot(struct io_kiocb *req, unsigned int issue_flags) * Put our buffer and post a CQE. If we fail to post a CQE, then * jump to the termination path. This request is then done. */ - cflags = io_put_kbuf(req, issue_flags); + cflags = io_put_kbuf(req, ret, issue_flags); rw->len = 0; /* similarly to above, reset len to 0 */ if (io_req_post_cqe(req, ret, cflags | IORING_CQE_F_MORE)) { @@ -1167,7 +1167,7 @@ int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin) if (!smp_load_acquire(&req->iopoll_completed)) break; nr_events++; - req->cqe.flags = io_put_kbuf(req, 0); + req->cqe.flags = io_put_kbuf(req, req->cqe.res, 0); if (req->opcode != IORING_OP_URING_CMD) io_req_rw_cleanup(req, 0); } From ae98dbf43d755b4e111fcd086e53939bef3e9a1a Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Fri, 9 Aug 2024 11:20:45 -0600 Subject: [PATCH 22/35] io_uring/kbuf: add support for incremental buffer consumption By default, any recv/read operation that uses provided buffers will consume at least 1 buffer fully (and maybe more, in case of bundles). This adds support for incremental consumption, meaning that an application may add large buffers, and each read/recv will just consume the part of the buffer that it needs. For example, let's say an application registers 1MB buffers in a provided buffer ring, for streaming receives. If it gets a short recv, then the full 1MB buffer will be consumed and passed back to the application. With incremental consumption, only the part that was actually used is consumed, and the buffer remains the current one. This means that both the application and the kernel needs to keep track of what the current receive point is. Each recv will still pass back a buffer ID and the size consumed, the only difference is that before the next receive would always be the next buffer in the ring. Now the same buffer ID may return multiple receives, each at an offset into that buffer from where the previous receive left off. Example: Application registers a provided buffer ring, and adds two 32K buffers to the ring. Buffer1 address: 0x1000000 (buffer ID 0) Buffer2 address: 0x2000000 (buffer ID 1) A recv completion is received with the following values: cqe->res 0x1000 (4k bytes received) cqe->flags 0x11 (CQE_F_BUFFER|CQE_F_BUF_MORE set, buffer ID 0) and the application now knows that 4096b of data is available at 0x1000000, the start of that buffer, and that more data from this buffer will be coming. Now the next receive comes in: cqe->res 0x2010 (8k bytes received) cqe->flags 0x11 (CQE_F_BUFFER|CQE_F_BUF_MORE set, buffer ID 0) which tells the application that 8k is available where the last completion left off, at 0x1001000. Next completion is: cqe->res 0x5000 (20k bytes received) cqe->flags 0x1 (CQE_F_BUFFER set, buffer ID 0) and the application now knows that 20k of data is available at 0x1003000, which is where the previous receive ended. CQE_F_BUF_MORE isn't set, as no more data is available in this buffer ID. The next completion is then: cqe->res 0x1000 (4k bytes received) cqe->flags 0x10001 (CQE_F_BUFFER|CQE_F_BUF_MORE set, buffer ID 1) which tells the application that buffer ID 1 is now the current one, hence there's 4k of valid data at 0x2000000. 0x2001000 will be the next receive point for this buffer ID. When a buffer will be reused by future CQE completions, IORING_CQE_BUF_MORE will be set in cqe->flags. This tells the application that the kernel isn't done with the buffer yet, and that it should expect more completions for this buffer ID. Will only be set by provided buffer rings setup with IOU_PBUF_RING INC, as that's the only type of buffer that will see multiple consecutive completions for the same buffer ID. For any other provided buffer type, any completion that passes back a buffer to the application is final. Once a buffer has been fully consumed, the buffer ring head is incremented and the next receive will indicate the next buffer ID in the CQE cflags. On the send side, the application can manage how much data is sent from an existing buffer by setting sqe->len to the desired send length. An application can request incremental consumption by setting IOU_PBUF_RING_INC in the provided buffer ring registration. Outside of that, any provided buffer ring setup and buffer additions is done like before, no changes there. The only change is in how an application may see multiple completions for the same buffer ID, hence needing to know where the next receive will happen. Note that like existing provided buffer rings, this should not be used with IOSQE_ASYNC, as both really require the ring to remain locked over the duration of the buffer selection and the operation completion. It will consume a buffer otherwise regardless of the size of the IO done. Signed-off-by: Jens Axboe --- include/uapi/linux/io_uring.h | 18 +++++++++++++++ io_uring/kbuf.c | 42 +++++++++++++++++++++++++---------- io_uring/kbuf.h | 42 ++++++++++++++++++++++++++++------- 3 files changed, 82 insertions(+), 20 deletions(-) diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 042eab793e26..a275f91d2ac0 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -440,11 +440,21 @@ struct io_uring_cqe { * IORING_CQE_F_SOCK_NONEMPTY If set, more data to read after socket recv * IORING_CQE_F_NOTIF Set for notification CQEs. Can be used to distinct * them from sends. + * IORING_CQE_F_BUF_MORE If set, the buffer ID set in the completion will get + * more completions. In other words, the buffer is being + * partially consumed, and will be used by the kernel for + * more completions. This is only set for buffers used via + * the incremental buffer consumption, as provided by + * a ring buffer setup with IOU_PBUF_RING_INC. For any + * other provided buffer type, all completions with a + * buffer passed back is automatically returned to the + * application. */ #define IORING_CQE_F_BUFFER (1U << 0) #define IORING_CQE_F_MORE (1U << 1) #define IORING_CQE_F_SOCK_NONEMPTY (1U << 2) #define IORING_CQE_F_NOTIF (1U << 3) +#define IORING_CQE_F_BUF_MORE (1U << 4) #define IORING_CQE_BUFFER_SHIFT 16 @@ -716,9 +726,17 @@ struct io_uring_buf_ring { * mmap(2) with the offset set as: * IORING_OFF_PBUF_RING | (bgid << IORING_OFF_PBUF_SHIFT) * to get a virtual mapping for the ring. + * IOU_PBUF_RING_INC: If set, buffers consumed from this buffer ring can be + * consumed incrementally. Normally one (or more) buffers + * are fully consumed. With incremental consumptions, it's + * feasible to register big ranges of buffers, and each + * use of it will consume only as much as it needs. This + * requires that both the kernel and application keep + * track of where the current read/recv index is at. */ enum io_uring_register_pbuf_ring_flags { IOU_PBUF_RING_MMAP = 1, + IOU_PBUF_RING_INC = 2, }; /* argument for IORING_(UN)REGISTER_PBUF_RING */ diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index 55d01861d8c5..1f503bcc9c9f 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -212,14 +212,25 @@ static int io_ring_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg, buf = io_ring_head_to_buf(br, head, bl->mask); if (arg->max_len) { u32 len = READ_ONCE(buf->len); - size_t needed; if (unlikely(!len)) return -ENOBUFS; - needed = (arg->max_len + len - 1) / len; - needed = min_not_zero(needed, (size_t) PEEK_MAX_IMPORT); - if (nr_avail > needed) - nr_avail = needed; + /* + * Limit incremental buffers to 1 segment. No point trying + * to peek ahead and map more than we need, when the buffers + * themselves should be large when setup with + * IOU_PBUF_RING_INC. + */ + if (bl->flags & IOBL_INC) { + nr_avail = 1; + } else { + size_t needed; + + needed = (arg->max_len + len - 1) / len; + needed = min_not_zero(needed, (size_t) PEEK_MAX_IMPORT); + if (nr_avail > needed) + nr_avail = needed; + } } /* @@ -244,16 +255,21 @@ static int io_ring_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg, req->buf_index = buf->bid; do { - /* truncate end piece, if needed */ - if (buf->len > arg->max_len) - buf->len = arg->max_len; + u32 len = buf->len; + + /* truncate end piece, if needed, for non partial buffers */ + if (len > arg->max_len) { + len = arg->max_len; + if (!(bl->flags & IOBL_INC)) + buf->len = len; + } iov->iov_base = u64_to_user_ptr(buf->addr); - iov->iov_len = buf->len; + iov->iov_len = len; iov++; - arg->out_len += buf->len; - arg->max_len -= buf->len; + arg->out_len += len; + arg->max_len -= len; if (!arg->max_len) break; @@ -675,7 +691,7 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) if (reg.resv[0] || reg.resv[1] || reg.resv[2]) return -EINVAL; - if (reg.flags & ~IOU_PBUF_RING_MMAP) + if (reg.flags & ~(IOU_PBUF_RING_MMAP | IOU_PBUF_RING_INC)) return -EINVAL; if (!(reg.flags & IOU_PBUF_RING_MMAP)) { if (!reg.ring_addr) @@ -713,6 +729,8 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) if (!ret) { bl->nr_entries = reg.ring_entries; bl->mask = reg.ring_entries - 1; + if (reg.flags & IOU_PBUF_RING_INC) + bl->flags |= IOBL_INC; io_buffer_add_list(ctx, bl, reg.bgid); return 0; diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h index b41e2a0a0505..36aadfe5ac00 100644 --- a/io_uring/kbuf.h +++ b/io_uring/kbuf.h @@ -9,6 +9,9 @@ enum { IOBL_BUF_RING = 1, /* ring mapped provided buffers, but mmap'ed by application */ IOBL_MMAP = 2, + /* buffers are consumed incrementally rather than always fully */ + IOBL_INC = 4, + }; struct io_buffer_list { @@ -124,24 +127,45 @@ static inline bool io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags) /* Mapped buffer ring, return io_uring_buf from head */ #define io_ring_head_to_buf(br, head, mask) &(br)->bufs[(head) & (mask)] -static inline void io_kbuf_commit(struct io_kiocb *req, +static inline bool io_kbuf_commit(struct io_kiocb *req, struct io_buffer_list *bl, int len, int nr) { if (unlikely(!(req->flags & REQ_F_BUFFERS_COMMIT))) - return; - bl->head += nr; + return true; + req->flags &= ~REQ_F_BUFFERS_COMMIT; + + if (unlikely(len < 0)) + return true; + + if (bl->flags & IOBL_INC) { + struct io_uring_buf *buf; + + buf = io_ring_head_to_buf(bl->buf_ring, bl->head, bl->mask); + if (WARN_ON_ONCE(len > buf->len)) + len = buf->len; + buf->len -= len; + if (buf->len) { + buf->addr += len; + return false; + } + } + + bl->head += nr; + return true; } -static inline void __io_put_kbuf_ring(struct io_kiocb *req, int len, int nr) +static inline bool __io_put_kbuf_ring(struct io_kiocb *req, int len, int nr) { struct io_buffer_list *bl = req->buf_list; + bool ret = true; if (bl) { - io_kbuf_commit(req, bl, len, nr); + ret = io_kbuf_commit(req, bl, len, nr); req->buf_index = bl->bgid; } req->flags &= ~REQ_F_BUFFER_RING; + return ret; } static inline void __io_put_kbuf_list(struct io_kiocb *req, int len, @@ -176,10 +200,12 @@ static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int len, return 0; ret = IORING_CQE_F_BUFFER | (req->buf_index << IORING_CQE_BUFFER_SHIFT); - if (req->flags & REQ_F_BUFFER_RING) - __io_put_kbuf_ring(req, len, nbufs); - else + if (req->flags & REQ_F_BUFFER_RING) { + if (!__io_put_kbuf_ring(req, len, nbufs)) + ret |= IORING_CQE_F_BUF_MORE; + } else { __io_put_kbuf(req, len, issue_flags); + } return ret; } From 1802656ef8906cc949f58b64cb6d8d400326e163 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Fri, 30 Aug 2024 10:52:02 -0600 Subject: [PATCH 23/35] io_uring: add GCOV_PROFILE_URING Kconfig option If GCOV is enabled and this option is set, it enables code coverage profiling of the io_uring subsystem. Only use this for test purposes, as it will impact the runtime performance. Signed-off-by: Jens Axboe --- init/Kconfig | 13 +++++++++++++ io_uring/Makefile | 4 ++++ 2 files changed, 17 insertions(+) diff --git a/init/Kconfig b/init/Kconfig index 5783a0b87517..3b6ca7cce03b 100644 --- a/init/Kconfig +++ b/init/Kconfig @@ -1687,6 +1687,19 @@ config IO_URING applications to submit and complete IO through submission and completion rings that are shared between the kernel and application. +config GCOV_PROFILE_URING + bool "Enable GCOV profiling on the io_uring subsystem" + depends on GCOV_KERNEL + help + Enable GCOV profiling on the io_uring subsystem, to facilitate + code coverage testing. + + If unsure, say N. + + Note that this will have a negative impact on the performance of + the io_uring subsystem, hence this should only be enabled for + specific test purposes. + config ADVISE_SYSCALLS bool "Enable madvise/fadvise syscalls" if EXPERT default y diff --git a/io_uring/Makefile b/io_uring/Makefile index 61923e11c767..53167bef37d7 100644 --- a/io_uring/Makefile +++ b/io_uring/Makefile @@ -2,6 +2,10 @@ # # Makefile for io_uring +ifdef CONFIG_GCOV_PROFILE_URING +GCOV_PROFILE := y +endif + obj-$(CONFIG_IO_URING) += io_uring.o opdef.o kbuf.o rsrc.o notif.o \ tctx.o filetable.o rw.o net.o poll.o \ eventfd.o uring_cmd.o openclose.o \ From 6cf52b42c4efa4d064d19064fd2313ca4aaf9569 Mon Sep 17 00:00:00 2001 From: Anuj Gupta Date: Mon, 2 Sep 2024 11:51:33 +0530 Subject: [PATCH 24/35] io_uring: add new line after variable declaration Fixes checkpatch warning Signed-off-by: Anuj Gupta Link: https://lore.kernel.org/r/20240902062134.136387-2-anuj20.g@samsung.com Signed-off-by: Jens Axboe --- io_uring/eventfd.c | 1 + 1 file changed, 1 insertion(+) diff --git a/io_uring/eventfd.c b/io_uring/eventfd.c index b9384503a2b7..d9836d43725f 100644 --- a/io_uring/eventfd.c +++ b/io_uring/eventfd.c @@ -126,6 +126,7 @@ int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg, ev_fd->cq_ev_fd = eventfd_ctx_fdget(fd); if (IS_ERR(ev_fd->cq_ev_fd)) { int ret = PTR_ERR(ev_fd->cq_ev_fd); + kfree(ev_fd); return ret; } From c9f9ce65c2436879779d39c6e65b95c74a206e49 Mon Sep 17 00:00:00 2001 From: Anuj Gupta Date: Mon, 2 Sep 2024 11:51:34 +0530 Subject: [PATCH 25/35] io_uring: remove unused rsrc_put_fn rsrc_put_fn is declared but never used, remove it. Signed-off-by: Anuj Gupta Link: https://lore.kernel.org/r/20240902062134.136387-3-anuj20.g@samsung.com Signed-off-by: Jens Axboe --- io_uring/rsrc.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index 18242b2e9da4..3d0dda3556e6 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -22,8 +22,6 @@ struct io_rsrc_put { }; }; -typedef void (rsrc_put_fn)(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc); - struct io_rsrc_data { struct io_ring_ctx *ctx; From 0e0bcf07ec5b305ce7612385b06580dcbe5bc6a5 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Sun, 8 Sep 2024 16:34:55 -0600 Subject: [PATCH 26/35] io_uring/eventfd: move refs to refcount_t atomic_t for the struct io_ev_fd references and there are no issues with it. While the ref getting and putting for the eventfd code is somewhat performance critical for cases where eventfd signaling is used (news flash, you should not...), it probably doesn't warrant using an atomic_t for this. Let's just move to it to refcount_t to get the added protection of over/underflows. Link: https://lore.kernel.org/lkml/202409082039.hnsaIJ3X-lkp@intel.com/ Reported-by: kernel test robot Closes: https://lore.kernel.org/oe-kbuild-all/202409082039.hnsaIJ3X-lkp@intel.com/ Signed-off-by: Jens Axboe --- io_uring/eventfd.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/io_uring/eventfd.c b/io_uring/eventfd.c index d9836d43725f..e37fddd5d9ce 100644 --- a/io_uring/eventfd.c +++ b/io_uring/eventfd.c @@ -15,7 +15,7 @@ struct io_ev_fd { struct eventfd_ctx *cq_ev_fd; unsigned int eventfd_async: 1; struct rcu_head rcu; - atomic_t refs; + refcount_t refs; atomic_t ops; }; @@ -37,7 +37,7 @@ static void io_eventfd_do_signal(struct rcu_head *rcu) eventfd_signal_mask(ev_fd->cq_ev_fd, EPOLL_URING_WAKE); - if (atomic_dec_and_test(&ev_fd->refs)) + if (refcount_dec_and_test(&ev_fd->refs)) io_eventfd_free(rcu); } @@ -63,7 +63,7 @@ void io_eventfd_signal(struct io_ring_ctx *ctx) */ if (unlikely(!ev_fd)) return; - if (!atomic_inc_not_zero(&ev_fd->refs)) + if (!refcount_inc_not_zero(&ev_fd->refs)) return; if (ev_fd->eventfd_async && !io_wq_current_is_worker()) goto out; @@ -77,7 +77,7 @@ void io_eventfd_signal(struct io_ring_ctx *ctx) } } out: - if (atomic_dec_and_test(&ev_fd->refs)) + if (refcount_dec_and_test(&ev_fd->refs)) call_rcu(&ev_fd->rcu, io_eventfd_free); } @@ -137,7 +137,7 @@ int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg, ev_fd->eventfd_async = eventfd_async; ctx->has_evfd = true; - atomic_set(&ev_fd->refs, 1); + refcount_set(&ev_fd->refs, 1); atomic_set(&ev_fd->ops, 0); rcu_assign_pointer(ctx->io_ev_fd, ev_fd); return 0; @@ -152,7 +152,7 @@ int io_eventfd_unregister(struct io_ring_ctx *ctx) if (ev_fd) { ctx->has_evfd = false; rcu_assign_pointer(ctx->io_ev_fd, NULL); - if (atomic_dec_and_test(&ev_fd->refs)) + if (refcount_dec_and_test(&ev_fd->refs)) call_rcu(&ev_fd->rcu, io_eventfd_free); return 0; } From f011c9cf04c06f16b24f583d313d3c012e589e50 Mon Sep 17 00:00:00 2001 From: Felix Moessbauer Date: Mon, 9 Sep 2024 17:00:36 +0200 Subject: [PATCH 27/35] io_uring/sqpoll: do not allow pinning outside of cpuset The submit queue polling threads are userland threads that just never exit to the userland. When creating the thread with IORING_SETUP_SQ_AFF, the affinity of the poller thread is set to the cpu specified in sq_thread_cpu. However, this CPU can be outside of the cpuset defined by the cgroup cpuset controller. This violates the rules defined by the cpuset controller and is a potential issue for realtime applications. In b7ed6d8ffd6 we fixed the default affinity of the poller thread, in case no explicit pinning is required by inheriting the one of the creating task. In case of explicit pinning, the check is more complicated, as also a cpu outside of the parent cpumask is allowed. We implemented this by using cpuset_cpus_allowed (that has support for cgroup cpusets) and testing if the requested cpu is in the set. Fixes: 37d1e2e3642e ("io_uring: move SQPOLL thread io-wq forked worker") Cc: stable@vger.kernel.org # 6.1+ Signed-off-by: Felix Moessbauer Link: https://lore.kernel.org/r/20240909150036.55921-1-felix.moessbauer@siemens.com Signed-off-by: Jens Axboe --- io_uring/sqpoll.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c index e545bf240d35..272df9d00f45 100644 --- a/io_uring/sqpoll.c +++ b/io_uring/sqpoll.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -460,10 +461,12 @@ __cold int io_sq_offload_create(struct io_ring_ctx *ctx, return 0; if (p->flags & IORING_SETUP_SQ_AFF) { + struct cpumask allowed_mask; int cpu = p->sq_thread_cpu; ret = -EINVAL; - if (cpu >= nr_cpu_ids || !cpu_online(cpu)) + cpuset_cpus_allowed(current, &allowed_mask); + if (!cpumask_test_cpu(cpu, &allowed_mask)) goto err_sqpoll; sqd->sq_cpu = cpu; } else { From c0a9d496e0fece67db777bd48550376cf2960c47 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 10 Sep 2024 08:30:57 -0600 Subject: [PATCH 28/35] io_uring/rw: treat -EOPNOTSUPP for IOCB_NOWAIT like -EAGAIN Some file systems, ocfs2 in this case, will return -EOPNOTSUPP for an IOCB_NOWAIT read/write attempt. While this can be argued to be correct, the usual return value for something that requires blocking issue is -EAGAIN. A refactoring io_uring commit dropped calling kiocb_done() for negative return values, which is otherwise where we already do that transformation. To ensure we catch it in both spots, check it in __io_read() itself as well. Reported-by: Robert Sander Link: https://fosstodon.org/@gurubert@mastodon.gurubert.de/113112431889638440 Cc: stable@vger.kernel.org Fixes: a08d195b586a ("io_uring/rw: split io_read() into a helper") Signed-off-by: Jens Axboe --- io_uring/rw.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/io_uring/rw.c b/io_uring/rw.c index f5e0694538b9..f4d885b7c2d2 100644 --- a/io_uring/rw.c +++ b/io_uring/rw.c @@ -855,6 +855,14 @@ static int __io_read(struct io_kiocb *req, unsigned int issue_flags) ret = io_iter_do_read(rw, &io->iter); + /* + * Some file systems like to return -EOPNOTSUPP for an IOCB_NOWAIT + * issue, even though they should be returning -EAGAIN. To be safe, + * retry from blocking context for either. + */ + if (ret == -EOPNOTSUPP && force_nonblock) + ret = -EAGAIN; + if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) { req->flags &= ~REQ_F_REISSUE; /* If we can poll, just do that. */ From 90bfb28d5fa8127a113a140c9791ea0b40ab156a Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 10 Sep 2024 08:57:04 -0600 Subject: [PATCH 29/35] io_uring/rw: drop -EOPNOTSUPP check in __io_complete_rw_common() A recent change ensured that the necessary -EOPNOTSUPP -> -EAGAIN transformation happens inline on both the reader and writer side, and hence there's no need to check for both of these anymore on the completion handler side. Signed-off-by: Jens Axboe --- io_uring/rw.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/io_uring/rw.c b/io_uring/rw.c index f4d885b7c2d2..f023ff49c688 100644 --- a/io_uring/rw.c +++ b/io_uring/rw.c @@ -467,8 +467,7 @@ static void io_req_io_end(struct io_kiocb *req) static bool __io_complete_rw_common(struct io_kiocb *req, long res) { if (unlikely(res != req->cqe.res)) { - if ((res == -EAGAIN || res == -EOPNOTSUPP) && - io_rw_should_reissue(req)) { + if (res == -EAGAIN && io_rw_should_reissue(req)) { /* * Reissue will start accounting again, finish the * current cycle. From 0997aa5497c714edbb349ca366d28bd550ba3408 Mon Sep 17 00:00:00 2001 From: Felix Moessbauer Date: Tue, 10 Sep 2024 19:11:56 +0200 Subject: [PATCH 30/35] io_uring/io-wq: do not allow pinning outside of cpuset The io worker threads are userland threads that just never exit to the userland. By that, they are also assigned to a cgroup (the group of the creating task). When changing the affinity of the io_wq thread via syscall, we must only allow cpumasks within the limits defined by the cpuset controller of the cgroup (if enabled). Fixes: da64d6db3bd3 ("io_uring: One wqe per wq") Signed-off-by: Felix Moessbauer Link: https://lore.kernel.org/r/20240910171157.166423-2-felix.moessbauer@siemens.com Signed-off-by: Jens Axboe --- io_uring/io-wq.c | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index f1e7c670add8..c7055a8895d7 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -1322,17 +1323,29 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) { + cpumask_var_t allowed_mask; + int ret = 0; + if (!tctx || !tctx->io_wq) return -EINVAL; + if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) + return -ENOMEM; + rcu_read_lock(); - if (mask) - cpumask_copy(tctx->io_wq->cpu_mask, mask); - else - cpumask_copy(tctx->io_wq->cpu_mask, cpu_possible_mask); + cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); + if (mask) { + if (cpumask_subset(mask, allowed_mask)) + cpumask_copy(tctx->io_wq->cpu_mask, mask); + else + ret = -EINVAL; + } else { + cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask); + } rcu_read_unlock(); - return 0; + free_cpumask_var(allowed_mask); + return ret; } /* From 84eacf177faa605853c58e5b1c0d9544b88c16fd Mon Sep 17 00:00:00 2001 From: Felix Moessbauer Date: Tue, 10 Sep 2024 19:11:57 +0200 Subject: [PATCH 31/35] io_uring/io-wq: inherit cpuset of cgroup in io worker The io worker threads are userland threads that just never exit to the userland. By that, they are also assigned to a cgroup (the group of the creating task). When creating a new io worker, this worker should inherit the cpuset of the cgroup. Fixes: da64d6db3bd3 ("io_uring: One wqe per wq") Signed-off-by: Felix Moessbauer Link: https://lore.kernel.org/r/20240910171157.166423-3-felix.moessbauer@siemens.com Signed-off-by: Jens Axboe --- io_uring/io-wq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index c7055a8895d7..a38f36b68060 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -1168,7 +1168,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) goto err; - cpumask_copy(wq->cpu_mask, cpu_possible_mask); + cpuset_cpus_allowed(data->task, wq->cpu_mask); wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = task_rlimit(current, RLIMIT_NPROC); From 021b153f7d4115d99efa0d57ae2da6de1228295d Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 11 Sep 2024 13:52:17 -0600 Subject: [PATCH 32/35] io_uring/rsrc: clear 'slot' entry upfront No functional changes in this patch, but clearing the slot pointer earlier will be required by a later change. Signed-off-by: Jens Axboe --- io_uring/rsrc.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index 7d639a996f28..d42114845fac 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -114,6 +114,7 @@ static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_mapped_ubuf **slo struct io_mapped_ubuf *imu = *slot; unsigned int i; + *slot = NULL; if (imu != &dummy_ubuf) { for (i = 0; i < imu->nr_bvecs; i++) unpin_user_page(imu->bvec[i].bv_page); @@ -121,7 +122,6 @@ static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_mapped_ubuf **slo io_unaccount_mem(ctx, imu->acct_pages); kvfree(imu); } - *slot = NULL; } static void io_rsrc_put_work(struct io_rsrc_node *node) From bfc0aa7a512f9a4462a88ca7352b00b83f8d68fd Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 11 Sep 2024 13:54:32 -0600 Subject: [PATCH 33/35] io_uring/rsrc: add reference count to struct io_mapped_ubuf Currently there's a single ring owner of a mapped buffer, and hence the reference count will always be 1 when it's torn down and freed. However, in preparation for being able to link io_mapped_ubuf to different spots, add a reference count to manage the lifetime of it. Signed-off-by: Jens Axboe --- io_uring/rsrc.c | 3 +++ io_uring/rsrc.h | 1 + 2 files changed, 4 insertions(+) diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index d42114845fac..28f98de3c304 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -116,6 +116,8 @@ static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_mapped_ubuf **slo *slot = NULL; if (imu != &dummy_ubuf) { + if (!refcount_dec_and_test(&imu->refs)) + return; for (i = 0; i < imu->nr_bvecs; i++) unpin_user_page(imu->bvec[i].bv_page); if (imu->acct_pages) @@ -990,6 +992,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov, imu->folio_shift = data.folio_shift; imu->folio_mask = ~((1UL << data.folio_shift) - 1); } + refcount_set(&imu->refs, 1); off = (unsigned long) iov->iov_base & ~imu->folio_mask; *pimu = imu; ret = 0; diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index 3d0dda3556e6..98a253172c27 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -47,6 +47,7 @@ struct io_mapped_ubuf { unsigned int folio_shift; unsigned long acct_pages; unsigned long folio_mask; + refcount_t refs; struct bio_vec bvec[] __counted_by(nr_bvecs); }; From 0b6d253e084a97a05f4970dee06d9a75d29a7bda Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 12 Sep 2024 09:29:29 -0600 Subject: [PATCH 34/35] io_uring/register: provide helper to get io_ring_ctx from 'fd' Can be done in one of two ways: 1) Regular file descriptor, just fget() 2) Registered ring, index our own table for that In preparation for adding another register use of needing to get a ctx from a file descriptor, abstract out this helper and use it in the main register syscall as well. Signed-off-by: Jens Axboe --- io_uring/register.c | 58 +++++++++++++++++++++++++++------------------ io_uring/register.h | 1 + 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/io_uring/register.c b/io_uring/register.c index 57cb85c42526..d90159478045 100644 --- a/io_uring/register.c +++ b/io_uring/register.c @@ -550,6 +550,38 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, return ret; } +/* + * Given an 'fd' value, return the ctx associated with if. If 'registered' is + * true, then the registered index is used. Otherwise, the normal fd table. + * Caller must call fput() on the returned file, unless it's an ERR_PTR. + */ +struct file *io_uring_register_get_file(int fd, bool registered) +{ + struct file *file; + + if (registered) { + /* + * Ring fd has been registered via IORING_REGISTER_RING_FDS, we + * need only dereference our task private array to find it. + */ + struct io_uring_task *tctx = current->io_uring; + + if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX)) + return ERR_PTR(-EINVAL); + fd = array_index_nospec(fd, IO_RINGFD_REG_MAX); + file = tctx->registered_rings[fd]; + } else { + file = fget(fd); + } + + if (unlikely(!file)) + return ERR_PTR(-EBADF); + if (io_is_uring_fops(file)) + return file; + fput(file); + return ERR_PTR(-EOPNOTSUPP); +} + SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, void __user *, arg, unsigned int, nr_args) { @@ -564,35 +596,15 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, if (opcode >= IORING_REGISTER_LAST) return -EINVAL; - if (use_registered_ring) { - /* - * Ring fd has been registered via IORING_REGISTER_RING_FDS, we - * need only dereference our task private array to find it. - */ - struct io_uring_task *tctx = current->io_uring; - - if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX)) - return -EINVAL; - fd = array_index_nospec(fd, IO_RINGFD_REG_MAX); - file = tctx->registered_rings[fd]; - if (unlikely(!file)) - return -EBADF; - } else { - file = fget(fd); - if (unlikely(!file)) - return -EBADF; - ret = -EOPNOTSUPP; - if (!io_is_uring_fops(file)) - goto out_fput; - } - + file = io_uring_register_get_file(fd, use_registered_ring); + if (IS_ERR(file)) + return PTR_ERR(file); ctx = file->private_data; mutex_lock(&ctx->uring_lock); ret = __io_uring_register(ctx, opcode, arg, nr_args); mutex_unlock(&ctx->uring_lock); trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret); -out_fput: if (!use_registered_ring) fput(file); return ret; diff --git a/io_uring/register.h b/io_uring/register.h index c9da997d503c..cc69b88338fe 100644 --- a/io_uring/register.h +++ b/io_uring/register.h @@ -4,5 +4,6 @@ int io_eventfd_unregister(struct io_ring_ctx *ctx); int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id); +struct file *io_uring_register_get_file(int fd, bool registered); #endif From 7cc2a6eadcd7a5aa36ac63e6659f5c6138c7f4d2 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Wed, 11 Sep 2024 13:56:08 -0600 Subject: [PATCH 35/35] io_uring: add IORING_REGISTER_COPY_BUFFERS method Buffers can get registered with io_uring, which allows to skip the repeated pin_pages, unpin/unref pages for each O_DIRECT operation. This reduces the overhead of O_DIRECT IO. However, registrering buffers can take some time. Normally this isn't an issue as it's done at initialization time (and hence less critical), but for cases where rings can be created and destroyed as part of an IO thread pool, registering the same buffers for multiple rings become a more time sensitive proposition. As an example, let's say an application has an IO memory pool of 500G. Initial registration takes: Got 500 huge pages (each 1024MB) Registered 500 pages in 409 msec or about 0.4 seconds. If we go higher to 900 1GB huge pages being registered: Registered 900 pages in 738 msec which is, as expected, a fully linear scaling. Rather than have each ring pin/map/register the same buffer pool, provide an io_uring_register(2) opcode to simply duplicate the buffers that are registered with another ring. Adding the same 900GB of registered buffers to the target ring can then be accomplished in: Copied 900 pages in 17 usec While timing differs a bit, this provides around a 25,000-40,000x speedup for this use case. Signed-off-by: Jens Axboe --- include/uapi/linux/io_uring.h | 13 +++++ io_uring/register.c | 6 +++ io_uring/rsrc.c | 91 +++++++++++++++++++++++++++++++++++ io_uring/rsrc.h | 1 + 4 files changed, 111 insertions(+) diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index a275f91d2ac0..9dc5bb428c8a 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -609,6 +609,9 @@ enum io_uring_register_op { IORING_REGISTER_CLOCK = 29, + /* copy registered buffers from source ring to current ring */ + IORING_REGISTER_COPY_BUFFERS = 30, + /* this goes last */ IORING_REGISTER_LAST, @@ -694,6 +697,16 @@ struct io_uring_clock_register { __u32 __resv[3]; }; +enum { + IORING_REGISTER_SRC_REGISTERED = 1, +}; + +struct io_uring_copy_buffers { + __u32 src_fd; + __u32 flags; + __u32 pad[6]; +}; + struct io_uring_buf { __u64 addr; __u32 len; diff --git a/io_uring/register.c b/io_uring/register.c index d90159478045..dab0f8024ddf 100644 --- a/io_uring/register.c +++ b/io_uring/register.c @@ -542,6 +542,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; ret = io_register_clock(ctx, arg); break; + case IORING_REGISTER_COPY_BUFFERS: + ret = -EINVAL; + if (!arg || nr_args != 1) + break; + ret = io_register_copy_buffers(ctx, arg); + break; default: ret = -EINVAL; break; diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index 28f98de3c304..40696a395f0a 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -17,6 +17,7 @@ #include "openclose.h" #include "rsrc.h" #include "memmap.h" +#include "register.h" struct io_rsrc_update { struct file *file; @@ -1137,3 +1138,93 @@ int io_import_fixed(int ddir, struct iov_iter *iter, return 0; } + +static int io_copy_buffers(struct io_ring_ctx *ctx, struct io_ring_ctx *src_ctx) +{ + struct io_mapped_ubuf **user_bufs; + struct io_rsrc_data *data; + int i, ret, nbufs; + + /* + * Drop our own lock here. We'll setup the data we need and reference + * the source buffers, then re-grab, check, and assign at the end. + */ + mutex_unlock(&ctx->uring_lock); + + mutex_lock(&src_ctx->uring_lock); + ret = -ENXIO; + nbufs = src_ctx->nr_user_bufs; + if (!nbufs) + goto out_unlock; + ret = io_rsrc_data_alloc(ctx, IORING_RSRC_BUFFER, NULL, nbufs, &data); + if (ret) + goto out_unlock; + + ret = -ENOMEM; + user_bufs = kcalloc(nbufs, sizeof(*ctx->user_bufs), GFP_KERNEL); + if (!user_bufs) + goto out_free_data; + + for (i = 0; i < nbufs; i++) { + struct io_mapped_ubuf *src = src_ctx->user_bufs[i]; + + refcount_inc(&src->refs); + user_bufs[i] = src; + } + + /* Have a ref on the bufs now, drop src lock and re-grab our own lock */ + mutex_unlock(&src_ctx->uring_lock); + mutex_lock(&ctx->uring_lock); + if (!ctx->user_bufs) { + ctx->user_bufs = user_bufs; + ctx->buf_data = data; + ctx->nr_user_bufs = nbufs; + return 0; + } + + /* someone raced setting up buffers, dump ours */ + for (i = 0; i < nbufs; i++) + io_buffer_unmap(ctx, &user_bufs[i]); + io_rsrc_data_free(data); + kfree(user_bufs); + return -EBUSY; +out_free_data: + io_rsrc_data_free(data); +out_unlock: + mutex_unlock(&src_ctx->uring_lock); + mutex_lock(&ctx->uring_lock); + return ret; +} + +/* + * Copy the registered buffers from the source ring whose file descriptor + * is given in the src_fd to the current ring. This is identical to registering + * the buffers with ctx, except faster as mappings already exist. + * + * Since the memory is already accounted once, don't account it again. + */ +int io_register_copy_buffers(struct io_ring_ctx *ctx, void __user *arg) +{ + struct io_uring_copy_buffers buf; + bool registered_src; + struct file *file; + int ret; + + if (ctx->user_bufs || ctx->nr_user_bufs) + return -EBUSY; + if (copy_from_user(&buf, arg, sizeof(buf))) + return -EFAULT; + if (buf.flags & ~IORING_REGISTER_SRC_REGISTERED) + return -EINVAL; + if (memchr_inv(buf.pad, 0, sizeof(buf.pad))) + return -EINVAL; + + registered_src = (buf.flags & IORING_REGISTER_SRC_REGISTERED) != 0; + file = io_uring_register_get_file(buf.src_fd, registered_src); + if (IS_ERR(file)) + return PTR_ERR(file); + ret = io_copy_buffers(ctx, file->private_data); + if (!registered_src) + fput(file); + return ret; +} diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index 98a253172c27..93546ab337a6 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -68,6 +68,7 @@ int io_import_fixed(int ddir, struct iov_iter *iter, struct io_mapped_ubuf *imu, u64 buf_addr, size_t len); +int io_register_copy_buffers(struct io_ring_ctx *ctx, void __user *arg); void __io_sqe_buffers_unregister(struct io_ring_ctx *ctx); int io_sqe_buffers_unregister(struct io_ring_ctx *ctx); int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,