io_uring: calculate CQEs from the user visible value

io_cqring_wait (and it's wake function io_has_work) used cached_cq_tail in
order to calculate the number of CQEs. cached_cq_tail is set strictly
before the user visible rings->cq.tail

However as far as userspace is concerned,  if io_uring_enter(2) is called
with a minimum number of events, they will verify by checking
rings->cq.tail.

It is therefore possible for io_uring_enter(2) to return early with fewer
events visible to the user.

Instead make the wait functions read from the user visible value, so there
will be no discrepency.

This is triggered eventually by the following reproducer:

struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
unsigned int cqe_ready;
struct io_uring ring;
int ret, i;

ret = io_uring_queue_init(N, &ring, 0);
assert(!ret);
while(true) {
	for (i = 0; i < N; i++) {
		sqe = io_uring_get_sqe(&ring);
		io_uring_prep_nop(sqe);
		sqe->flags |= IOSQE_ASYNC;
	}
	ret = io_uring_submit(&ring);
	assert(ret == N);

	do {
		ret = io_uring_wait_cqes(&ring, &cqe, N, NULL, NULL);
	} while(ret == -EINTR);
	cqe_ready = io_uring_cq_ready(&ring);
	assert(!ret);
	assert(cqe_ready == N);
	io_uring_cq_advance(&ring, N);
}

Fixes: ad3eb2c89fb2 ("io_uring: split overflow state into SQ and CQ side")
Signed-off-by: Dylan Yudaken <dylany@meta.com>
Link: https://lore.kernel.org/r/20221108153016.1854297-1-dylany@meta.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
Dylan Yudaken 2022-11-08 07:30:16 -08:00 committed by Jens Axboe
parent 6dcabcd398
commit 0fc8c2acbf

View File

@ -176,6 +176,11 @@ static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
}
static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
{
return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
}
static bool io_match_linked(struct io_kiocb *head)
{
struct io_kiocb *req;
@ -2315,7 +2320,7 @@ static inline bool io_has_work(struct io_ring_ctx *ctx)
static inline bool io_should_wake(struct io_wait_queue *iowq)
{
struct io_ring_ctx *ctx = iowq->ctx;
int dist = ctx->cached_cq_tail - (int) iowq->cq_tail;
int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail;
/*
* Wake up if we have enough events, or if a timeout occurred since we
@ -2399,7 +2404,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
return ret;
io_cqring_overflow_flush(ctx);
if (io_cqring_events(ctx) >= min_events)
/* if user messes with these they will just get an early return */
if (__io_cqring_events_user(ctx) >= min_events)
return 0;
} while (ret > 0);