io_uring: move poll handling into its own file

Add a io_poll_issue() rather than export the general task_work locking
and io_issue_sqe(), and put the io_op_defs definition and structure into
a separate header file so that poll can use it.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
Jens Axboe 2022-05-25 20:31:09 -06:00
parent cfd22e6b33
commit 329061d3e2
6 changed files with 879 additions and 827 deletions

View File

@ -6,5 +6,5 @@ obj-$(CONFIG_IO_URING) += io_uring.o xattr.o nop.o fs.o splice.o \
sync.o advise.o filetable.o \ sync.o advise.o filetable.o \
openclose.o uring_cmd.o epoll.o \ openclose.o uring_cmd.o epoll.o \
statx.o net.o msg_ring.o timeout.o \ statx.o net.o msg_ring.o timeout.o \
sqpoll.o fdinfo.o tctx.o sqpoll.o fdinfo.o tctx.o poll.o
obj-$(CONFIG_IO_WQ) += io-wq.o obj-$(CONFIG_IO_WQ) += io-wq.o

View File

@ -88,6 +88,7 @@
#include "io_uring_types.h" #include "io_uring_types.h"
#include "io_uring.h" #include "io_uring.h"
#include "opdef.h"
#include "refs.h" #include "refs.h"
#include "tctx.h" #include "tctx.h"
#include "sqpoll.h" #include "sqpoll.h"
@ -106,6 +107,7 @@
#include "net.h" #include "net.h"
#include "msg_ring.h" #include "msg_ring.h"
#include "timeout.h" #include "timeout.h"
#include "poll.h"
#define IORING_MAX_ENTRIES 32768 #define IORING_MAX_ENTRIES 32768
#define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES) #define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES)
@ -208,22 +210,6 @@ struct io_buffer {
* First field must be the file pointer in all the * First field must be the file pointer in all the
* iocb unions! See also 'struct kiocb' in <linux/fs.h> * iocb unions! See also 'struct kiocb' in <linux/fs.h>
*/ */
struct io_poll {
struct file *file;
struct wait_queue_head *head;
__poll_t events;
struct wait_queue_entry wait;
};
struct io_poll_update {
struct file *file;
u64 old_user_data;
u64 new_user_data;
__poll_t events;
bool update_events;
bool update_user_data;
};
struct io_cancel { struct io_cancel {
struct file *file; struct file *file;
u64 addr; u64 addr;
@ -268,11 +254,6 @@ struct io_async_rw {
struct wait_page_queue wpq; struct wait_page_queue wpq;
}; };
struct async_poll {
struct io_poll poll;
struct io_poll *double_poll;
};
enum { enum {
IORING_RSRC_FILE = 0, IORING_RSRC_FILE = 0,
IORING_RSRC_BUFFER = 1, IORING_RSRC_BUFFER = 1,
@ -289,42 +270,6 @@ struct io_defer_entry {
u32 seq; u32 seq;
}; };
struct io_op_def {
/* needs req->file assigned */
unsigned needs_file : 1;
/* should block plug */
unsigned plug : 1;
/* hash wq insertion if file is a regular file */
unsigned hash_reg_file : 1;
/* unbound wq insertion if file is a non-regular file */
unsigned unbound_nonreg_file : 1;
/* set if opcode supports polled "wait" */
unsigned pollin : 1;
unsigned pollout : 1;
unsigned poll_exclusive : 1;
/* op supports buffer selection */
unsigned buffer_select : 1;
/* opcode is not supported by this kernel */
unsigned not_supported : 1;
/* skip auditing */
unsigned audit_skip : 1;
/* supports ioprio */
unsigned ioprio : 1;
/* supports iopoll */
unsigned iopoll : 1;
/* size of async data needed, if any */
unsigned short async_size;
const char *name;
int (*prep)(struct io_kiocb *, const struct io_uring_sqe *);
int (*issue)(struct io_kiocb *, unsigned int);
int (*prep_async)(struct io_kiocb *);
void (*cleanup)(struct io_kiocb *);
};
static const struct io_op_def io_op_defs[];
/* requests with any of those set should undergo io_disarm_next() */ /* requests with any of those set should undergo io_disarm_next() */
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL) #define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK) #define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
@ -529,32 +474,12 @@ static struct io_buffer_list *io_buffer_get_list(struct io_ring_ctx *ctx,
return xa_load(&ctx->io_bl_xa, bgid); return xa_load(&ctx->io_bl_xa, bgid);
} }
static void io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags) void __io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
struct io_buffer_list *bl; struct io_buffer_list *bl;
struct io_buffer *buf; struct io_buffer *buf;
if (!(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
return;
/*
* For legacy provided buffer mode, don't recycle if we already did
* IO to this buffer. For ring-mapped provided buffer mode, we should
* increment ring->head to explicitly monopolize the buffer to avoid
* multiple use.
*/
if ((req->flags & REQ_F_BUFFER_SELECTED) &&
(req->flags & REQ_F_PARTIAL_IO))
return;
/*
* READV uses fields in `struct io_rw` (len/addr) to stash the selected
* buffer data. However if that buffer is recycled the original request
* data stored in addr is lost. Therefore forbid recycling for now.
*/
if (req->opcode == IORING_OP_READV)
return;
/* /*
* We don't need to recycle for REQ_F_BUFFER_RING, we can just clear * We don't need to recycle for REQ_F_BUFFER_RING, we can just clear
* the flag and hence ensure that bl->head doesn't get incremented. * the flag and hence ensure that bl->head doesn't get incremented.
@ -599,8 +524,8 @@ static bool io_match_linked(struct io_kiocb *head)
* As io_match_task() but protected against racing with linked timeouts. * As io_match_task() but protected against racing with linked timeouts.
* User must not hold timeout_lock. * User must not hold timeout_lock.
*/ */
static bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task, bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
bool cancel_all) bool cancel_all)
{ {
bool matched; bool matched;
@ -1310,7 +1235,7 @@ inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags)
io_req_complete_post(req); io_req_complete_post(req);
} }
static void io_req_complete_failed(struct io_kiocb *req, s32 res) void io_req_complete_failed(struct io_kiocb *req, s32 res)
{ {
req_set_fail(req); req_set_fail(req);
io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED)); io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
@ -1656,7 +1581,7 @@ static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
io_req_complete_failed(req, req->cqe.res); io_req_complete_failed(req, req->cqe.res);
} }
static void io_req_task_submit(struct io_kiocb *req, bool *locked) void io_req_task_submit(struct io_kiocb *req, bool *locked)
{ {
io_tw_lock(req->ctx, locked); io_tw_lock(req->ctx, locked);
/* req->task == current here, checking PF_EXITING is safe */ /* req->task == current here, checking PF_EXITING is safe */
@ -3437,749 +3362,6 @@ static __maybe_unused int io_eopnotsupp_prep(struct io_kiocb *kiocb,
return -EOPNOTSUPP; return -EOPNOTSUPP;
} }
struct io_poll_table {
struct poll_table_struct pt;
struct io_kiocb *req;
int nr_entries;
int error;
};
#define IO_POLL_CANCEL_FLAG BIT(31)
#define IO_POLL_REF_MASK GENMASK(30, 0)
/*
* If refs part of ->poll_refs (see IO_POLL_REF_MASK) is 0, it's free. We can
* bump it and acquire ownership. It's disallowed to modify requests while not
* owning it, that prevents from races for enqueueing task_work's and b/w
* arming poll and wakeups.
*/
static inline bool io_poll_get_ownership(struct io_kiocb *req)
{
return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
}
static void io_poll_mark_cancelled(struct io_kiocb *req)
{
atomic_or(IO_POLL_CANCEL_FLAG, &req->poll_refs);
}
static struct io_poll *io_poll_get_double(struct io_kiocb *req)
{
/* pure poll stashes this in ->async_data, poll driven retry elsewhere */
if (req->opcode == IORING_OP_POLL_ADD)
return req->async_data;
return req->apoll->double_poll;
}
static struct io_poll *io_poll_get_single(struct io_kiocb *req)
{
if (req->opcode == IORING_OP_POLL_ADD)
return io_kiocb_to_cmd(req);
return &req->apoll->poll;
}
static void io_poll_req_insert(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
struct hlist_head *list;
list = &ctx->cancel_hash[hash_long(req->cqe.user_data, ctx->cancel_hash_bits)];
hlist_add_head(&req->hash_node, list);
}
static void io_init_poll_iocb(struct io_poll *poll, __poll_t events,
wait_queue_func_t wake_func)
{
poll->head = NULL;
#define IO_POLL_UNMASK (EPOLLERR|EPOLLHUP|EPOLLNVAL|EPOLLRDHUP)
/* mask in events that we always want/need */
poll->events = events | IO_POLL_UNMASK;
INIT_LIST_HEAD(&poll->wait.entry);
init_waitqueue_func_entry(&poll->wait, wake_func);
}
static inline void io_poll_remove_entry(struct io_poll *poll)
{
struct wait_queue_head *head = smp_load_acquire(&poll->head);
if (head) {
spin_lock_irq(&head->lock);
list_del_init(&poll->wait.entry);
poll->head = NULL;
spin_unlock_irq(&head->lock);
}
}
static void io_poll_remove_entries(struct io_kiocb *req)
{
/*
* Nothing to do if neither of those flags are set. Avoid dipping
* into the poll/apoll/double cachelines if we can.
*/
if (!(req->flags & (REQ_F_SINGLE_POLL | REQ_F_DOUBLE_POLL)))
return;
/*
* While we hold the waitqueue lock and the waitqueue is nonempty,
* wake_up_pollfree() will wait for us. However, taking the waitqueue
* lock in the first place can race with the waitqueue being freed.
*
* We solve this as eventpoll does: by taking advantage of the fact that
* all users of wake_up_pollfree() will RCU-delay the actual free. If
* we enter rcu_read_lock() and see that the pointer to the queue is
* non-NULL, we can then lock it without the memory being freed out from
* under us.
*
* Keep holding rcu_read_lock() as long as we hold the queue lock, in
* case the caller deletes the entry from the queue, leaving it empty.
* In that case, only RCU prevents the queue memory from being freed.
*/
rcu_read_lock();
if (req->flags & REQ_F_SINGLE_POLL)
io_poll_remove_entry(io_poll_get_single(req));
if (req->flags & REQ_F_DOUBLE_POLL)
io_poll_remove_entry(io_poll_get_double(req));
rcu_read_unlock();
}
static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags);
/*
* All poll tw should go through this. Checks for poll events, manages
* references, does rewait, etc.
*
* Returns a negative error on failure. >0 when no action require, which is
* either spurious wakeup or multishot CQE is served. 0 when it's done with
* the request, then the mask is stored in req->cqe.res.
*/
static int io_poll_check_events(struct io_kiocb *req, bool *locked)
{
struct io_ring_ctx *ctx = req->ctx;
int v, ret;
/* req->task == current here, checking PF_EXITING is safe */
if (unlikely(req->task->flags & PF_EXITING))
return -ECANCELED;
do {
v = atomic_read(&req->poll_refs);
/* tw handler should be the owner, and so have some references */
if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK)))
return 0;
if (v & IO_POLL_CANCEL_FLAG)
return -ECANCELED;
if (!req->cqe.res) {
struct poll_table_struct pt = { ._key = req->apoll_events };
req->cqe.res = vfs_poll(req->file, &pt) & req->apoll_events;
}
if ((unlikely(!req->cqe.res)))
continue;
if (req->apoll_events & EPOLLONESHOT)
return 0;
/* multishot, just fill a CQE and proceed */
if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
__poll_t mask = mangle_poll(req->cqe.res &
req->apoll_events);
bool filled;
spin_lock(&ctx->completion_lock);
filled = io_fill_cqe_aux(ctx, req->cqe.user_data,
mask, IORING_CQE_F_MORE);
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
if (filled) {
io_cqring_ev_posted(ctx);
continue;
}
return -ECANCELED;
}
io_tw_lock(req->ctx, locked);
if (unlikely(req->task->flags & PF_EXITING))
return -EFAULT;
ret = io_issue_sqe(req,
IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
if (ret)
return ret;
/*
* Release all references, retry if someone tried to restart
* task_work while we were executing it.
*/
} while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs));
return 1;
}
static void io_poll_task_func(struct io_kiocb *req, bool *locked)
{
struct io_ring_ctx *ctx = req->ctx;
int ret;
ret = io_poll_check_events(req, locked);
if (ret > 0)
return;
if (!ret) {
struct io_poll *poll = io_kiocb_to_cmd(req);
req->cqe.res = mangle_poll(req->cqe.res & poll->events);
} else {
req->cqe.res = ret;
req_set_fail(req);
}
io_poll_remove_entries(req);
spin_lock(&ctx->completion_lock);
hash_del(&req->hash_node);
req->cqe.flags = 0;
__io_req_complete_post(req);
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
}
static void io_apoll_task_func(struct io_kiocb *req, bool *locked)
{
struct io_ring_ctx *ctx = req->ctx;
int ret;
ret = io_poll_check_events(req, locked);
if (ret > 0)
return;
io_poll_remove_entries(req);
spin_lock(&ctx->completion_lock);
hash_del(&req->hash_node);
spin_unlock(&ctx->completion_lock);
if (!ret)
io_req_task_submit(req, locked);
else
io_req_complete_failed(req, ret);
}
static void __io_poll_execute(struct io_kiocb *req, int mask,
__poll_t __maybe_unused events)
{
io_req_set_res(req, mask, 0);
/*
* This is useful for poll that is armed on behalf of another
* request, and where the wakeup path could be on a different
* CPU. We want to avoid pulling in req->apoll->events for that
* case.
*/
if (req->opcode == IORING_OP_POLL_ADD)
req->io_task_work.func = io_poll_task_func;
else
req->io_task_work.func = io_apoll_task_func;
trace_io_uring_task_add(req->ctx, req, req->cqe.user_data, req->opcode, mask);
io_req_task_work_add(req);
}
static inline void io_poll_execute(struct io_kiocb *req, int res,
__poll_t events)
{
if (io_poll_get_ownership(req))
__io_poll_execute(req, res, events);
}
static void io_poll_cancel_req(struct io_kiocb *req)
{
io_poll_mark_cancelled(req);
/* kick tw, which should complete the request */
io_poll_execute(req, 0, 0);
}
#define wqe_to_req(wait) ((void *)((unsigned long) (wait)->private & ~1))
#define wqe_is_double(wait) ((unsigned long) (wait)->private & 1)
#define IO_ASYNC_POLL_COMMON (EPOLLONESHOT | EPOLLPRI)
static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
void *key)
{
struct io_kiocb *req = wqe_to_req(wait);
struct io_poll *poll = container_of(wait, struct io_poll, wait);
__poll_t mask = key_to_poll(key);
if (unlikely(mask & POLLFREE)) {
io_poll_mark_cancelled(req);
/* we have to kick tw in case it's not already */
io_poll_execute(req, 0, poll->events);
/*
* If the waitqueue is being freed early but someone is already
* holds ownership over it, we have to tear down the request as
* best we can. That means immediately removing the request from
* its waitqueue and preventing all further accesses to the
* waitqueue via the request.
*/
list_del_init(&poll->wait.entry);
/*
* Careful: this *must* be the last step, since as soon
* as req->head is NULL'ed out, the request can be
* completed and freed, since aio_poll_complete_work()
* will no longer need to take the waitqueue lock.
*/
smp_store_release(&poll->head, NULL);
return 1;
}
/* for instances that support it check for an event match first */
if (mask && !(mask & (poll->events & ~IO_ASYNC_POLL_COMMON)))
return 0;
if (io_poll_get_ownership(req)) {
/* optional, saves extra locking for removal in tw handler */
if (mask && poll->events & EPOLLONESHOT) {
list_del_init(&poll->wait.entry);
poll->head = NULL;
if (wqe_is_double(wait))
req->flags &= ~REQ_F_DOUBLE_POLL;
else
req->flags &= ~REQ_F_SINGLE_POLL;
}
__io_poll_execute(req, mask, poll->events);
}
return 1;
}
static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt,
struct wait_queue_head *head,
struct io_poll **poll_ptr)
{
struct io_kiocb *req = pt->req;
unsigned long wqe_private = (unsigned long) req;
/*
* The file being polled uses multiple waitqueues for poll handling
* (e.g. one for read, one for write). Setup a separate io_poll
* if this happens.
*/
if (unlikely(pt->nr_entries)) {
struct io_poll *first = poll;
/* double add on the same waitqueue head, ignore */
if (first->head == head)
return;
/* already have a 2nd entry, fail a third attempt */
if (*poll_ptr) {
if ((*poll_ptr)->head == head)
return;
pt->error = -EINVAL;
return;
}
poll = kmalloc(sizeof(*poll), GFP_ATOMIC);
if (!poll) {
pt->error = -ENOMEM;
return;
}
/* mark as double wq entry */
wqe_private |= 1;
req->flags |= REQ_F_DOUBLE_POLL;
io_init_poll_iocb(poll, first->events, first->wait.func);
*poll_ptr = poll;
if (req->opcode == IORING_OP_POLL_ADD)
req->flags |= REQ_F_ASYNC_DATA;
}
req->flags |= REQ_F_SINGLE_POLL;
pt->nr_entries++;
poll->head = head;
poll->wait.private = (void *) wqe_private;
if (poll->events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(head, &poll->wait);
else
add_wait_queue(head, &poll->wait);
}
static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
struct poll_table_struct *p)
{
struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
struct io_poll *poll = io_kiocb_to_cmd(pt->req);
__io_queue_proc(poll, pt, head,
(struct io_poll **) &pt->req->async_data);
}
static int __io_arm_poll_handler(struct io_kiocb *req,
struct io_poll *poll,
struct io_poll_table *ipt, __poll_t mask)
{
struct io_ring_ctx *ctx = req->ctx;
int v;
INIT_HLIST_NODE(&req->hash_node);
req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
io_init_poll_iocb(poll, mask, io_poll_wake);
poll->file = req->file;
req->apoll_events = poll->events;
ipt->pt._key = mask;
ipt->req = req;
ipt->error = 0;
ipt->nr_entries = 0;
/*
* Take the ownership to delay any tw execution up until we're done
* with poll arming. see io_poll_get_ownership().
*/
atomic_set(&req->poll_refs, 1);
mask = vfs_poll(req->file, &ipt->pt) & poll->events;
if (mask && (poll->events & EPOLLONESHOT)) {
io_poll_remove_entries(req);
/* no one else has access to the req, forget about the ref */
return mask;
}
if (!mask && unlikely(ipt->error || !ipt->nr_entries)) {
io_poll_remove_entries(req);
if (!ipt->error)
ipt->error = -EINVAL;
return 0;
}
spin_lock(&ctx->completion_lock);
io_poll_req_insert(req);
spin_unlock(&ctx->completion_lock);
if (mask) {
/* can't multishot if failed, just queue the event we've got */
if (unlikely(ipt->error || !ipt->nr_entries)) {
poll->events |= EPOLLONESHOT;
req->apoll_events |= EPOLLONESHOT;
ipt->error = 0;
}
__io_poll_execute(req, mask, poll->events);
return 0;
}
/*
* Release ownership. If someone tried to queue a tw while it was
* locked, kick it off for them.
*/
v = atomic_dec_return(&req->poll_refs);
if (unlikely(v & IO_POLL_REF_MASK))
__io_poll_execute(req, 0, poll->events);
return 0;
}
static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
struct poll_table_struct *p)
{
struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
struct async_poll *apoll = pt->req->apoll;
__io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
}
enum {
IO_APOLL_OK,
IO_APOLL_ABORTED,
IO_APOLL_READY
};
static int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags)
{
const struct io_op_def *def = &io_op_defs[req->opcode];
struct io_ring_ctx *ctx = req->ctx;
struct async_poll *apoll;
struct io_poll_table ipt;
__poll_t mask = POLLPRI | POLLERR;
int ret;
if (!def->pollin && !def->pollout)
return IO_APOLL_ABORTED;
if (!file_can_poll(req->file))
return IO_APOLL_ABORTED;
if ((req->flags & (REQ_F_POLLED|REQ_F_PARTIAL_IO)) == REQ_F_POLLED)
return IO_APOLL_ABORTED;
if (!(req->flags & REQ_F_APOLL_MULTISHOT))
mask |= EPOLLONESHOT;
if (def->pollin) {
mask |= EPOLLIN | EPOLLRDNORM;
/* If reading from MSG_ERRQUEUE using recvmsg, ignore POLLIN */
if (req->flags & REQ_F_CLEAR_POLLIN)
mask &= ~EPOLLIN;
} else {
mask |= EPOLLOUT | EPOLLWRNORM;
}
if (def->poll_exclusive)
mask |= EPOLLEXCLUSIVE;
if (req->flags & REQ_F_POLLED) {
apoll = req->apoll;
kfree(apoll->double_poll);
} else if (!(issue_flags & IO_URING_F_UNLOCKED) &&
!list_empty(&ctx->apoll_cache)) {
apoll = list_first_entry(&ctx->apoll_cache, struct async_poll,
poll.wait.entry);
list_del_init(&apoll->poll.wait.entry);
} else {
apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC);
if (unlikely(!apoll))
return IO_APOLL_ABORTED;
}
apoll->double_poll = NULL;
req->apoll = apoll;
req->flags |= REQ_F_POLLED;
ipt.pt._qproc = io_async_queue_proc;
io_kbuf_recycle(req, issue_flags);
ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask);
if (ret || ipt.error)
return ret ? IO_APOLL_READY : IO_APOLL_ABORTED;
trace_io_uring_poll_arm(ctx, req, req->cqe.user_data, req->opcode,
mask, apoll->poll.events);
return IO_APOLL_OK;
}
/*
* Returns true if we found and killed one or more poll requests
*/
static __cold bool io_poll_remove_all(struct io_ring_ctx *ctx,
struct task_struct *tsk, bool cancel_all)
{
struct hlist_node *tmp;
struct io_kiocb *req;
bool found = false;
int i;
spin_lock(&ctx->completion_lock);
for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
struct hlist_head *list;
list = &ctx->cancel_hash[i];
hlist_for_each_entry_safe(req, tmp, list, hash_node) {
if (io_match_task_safe(req, tsk, cancel_all)) {
hlist_del_init(&req->hash_node);
io_poll_cancel_req(req);
found = true;
}
}
}
spin_unlock(&ctx->completion_lock);
return found;
}
static struct io_kiocb *io_poll_find(struct io_ring_ctx *ctx, bool poll_only,
struct io_cancel_data *cd)
__must_hold(&ctx->completion_lock)
{
struct hlist_head *list;
struct io_kiocb *req;
list = &ctx->cancel_hash[hash_long(cd->data, ctx->cancel_hash_bits)];
hlist_for_each_entry(req, list, hash_node) {
if (cd->data != req->cqe.user_data)
continue;
if (poll_only && req->opcode != IORING_OP_POLL_ADD)
continue;
if (cd->flags & IORING_ASYNC_CANCEL_ALL) {
if (cd->seq == req->work.cancel_seq)
continue;
req->work.cancel_seq = cd->seq;
}
return req;
}
return NULL;
}
static struct io_kiocb *io_poll_file_find(struct io_ring_ctx *ctx,
struct io_cancel_data *cd)
__must_hold(&ctx->completion_lock)
{
struct io_kiocb *req;
int i;
for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
struct hlist_head *list;
list = &ctx->cancel_hash[i];
hlist_for_each_entry(req, list, hash_node) {
if (!(cd->flags & IORING_ASYNC_CANCEL_ANY) &&
req->file != cd->file)
continue;
if (cd->seq == req->work.cancel_seq)
continue;
req->work.cancel_seq = cd->seq;
return req;
}
}
return NULL;
}
static bool io_poll_disarm(struct io_kiocb *req)
__must_hold(&ctx->completion_lock)
{
if (!io_poll_get_ownership(req))
return false;
io_poll_remove_entries(req);
hash_del(&req->hash_node);
return true;
}
static int io_poll_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd)
__must_hold(&ctx->completion_lock)
{
struct io_kiocb *req;
if (cd->flags & (IORING_ASYNC_CANCEL_FD|IORING_ASYNC_CANCEL_ANY))
req = io_poll_file_find(ctx, cd);
else
req = io_poll_find(ctx, false, cd);
if (!req)
return -ENOENT;
io_poll_cancel_req(req);
return 0;
}
static __poll_t io_poll_parse_events(const struct io_uring_sqe *sqe,
unsigned int flags)
{
u32 events;
events = READ_ONCE(sqe->poll32_events);
#ifdef __BIG_ENDIAN
events = swahw32(events);
#endif
if (!(flags & IORING_POLL_ADD_MULTI))
events |= EPOLLONESHOT;
return demangle_poll(events) | (events & (EPOLLEXCLUSIVE|EPOLLONESHOT));
}
static int io_poll_remove_prep(struct io_kiocb *req,
const struct io_uring_sqe *sqe)
{
struct io_poll_update *upd = io_kiocb_to_cmd(req);
u32 flags;
if (sqe->buf_index || sqe->splice_fd_in)
return -EINVAL;
flags = READ_ONCE(sqe->len);
if (flags & ~(IORING_POLL_UPDATE_EVENTS | IORING_POLL_UPDATE_USER_DATA |
IORING_POLL_ADD_MULTI))
return -EINVAL;
/* meaningless without update */
if (flags == IORING_POLL_ADD_MULTI)
return -EINVAL;
upd->old_user_data = READ_ONCE(sqe->addr);
upd->update_events = flags & IORING_POLL_UPDATE_EVENTS;
upd->update_user_data = flags & IORING_POLL_UPDATE_USER_DATA;
upd->new_user_data = READ_ONCE(sqe->off);
if (!upd->update_user_data && upd->new_user_data)
return -EINVAL;
if (upd->update_events)
upd->events = io_poll_parse_events(sqe, flags);
else if (sqe->poll32_events)
return -EINVAL;
return 0;
}
static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_poll *poll = io_kiocb_to_cmd(req);
u32 flags;
if (sqe->buf_index || sqe->off || sqe->addr)
return -EINVAL;
flags = READ_ONCE(sqe->len);
if (flags & ~IORING_POLL_ADD_MULTI)
return -EINVAL;
if ((flags & IORING_POLL_ADD_MULTI) && (req->flags & REQ_F_CQE_SKIP))
return -EINVAL;
io_req_set_refcount(req);
poll->events = io_poll_parse_events(sqe, flags);
return 0;
}
static int io_poll_add(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_poll *poll = io_kiocb_to_cmd(req);
struct io_poll_table ipt;
int ret;
ipt.pt._qproc = io_poll_queue_proc;
ret = __io_arm_poll_handler(req, poll, &ipt, poll->events);
if (ret) {
io_req_set_res(req, ret, 0);
return IOU_OK;
}
if (ipt.error) {
req_set_fail(req);
return ipt.error;
}
return IOU_ISSUE_SKIP_COMPLETE;
}
static int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_poll_update *poll_update = io_kiocb_to_cmd(req);
struct io_cancel_data cd = { .data = poll_update->old_user_data, };
struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *preq;
int ret2, ret = 0;
bool locked;
spin_lock(&ctx->completion_lock);
preq = io_poll_find(ctx, true, &cd);
if (!preq || !io_poll_disarm(preq)) {
spin_unlock(&ctx->completion_lock);
ret = preq ? -EALREADY : -ENOENT;
goto out;
}
spin_unlock(&ctx->completion_lock);
if (poll_update->update_events || poll_update->update_user_data) {
/* only mask one event flags, keep behavior flags */
if (poll_update->update_events) {
struct io_poll *poll = io_kiocb_to_cmd(preq);
poll->events &= ~0xffff;
poll->events |= poll_update->events & 0xffff;
poll->events |= IO_POLL_UNMASK;
}
if (poll_update->update_user_data)
preq->cqe.user_data = poll_update->new_user_data;
ret2 = io_poll_add(preq, issue_flags);
/* successfully updated, don't complete poll request */
if (!ret2 || ret2 == -EIOCBQUEUED)
goto out;
}
req_set_fail(preq);
io_req_set_res(preq, -ECANCELED, 0);
locked = !(issue_flags & IO_URING_F_UNLOCKED);
io_req_task_complete(preq, &locked);
out:
if (ret < 0) {
req_set_fail(req);
return ret;
}
/* complete update request, we're done with it */
io_req_set_res(req, ret, 0);
return IOU_OK;
}
static bool io_cancel_cb(struct io_wq_work *work, void *data) static bool io_cancel_cb(struct io_wq_work *work, void *data)
{ {
struct io_kiocb *req = container_of(work, struct io_kiocb, work); struct io_kiocb *req = container_of(work, struct io_kiocb, work);
@ -4589,6 +3771,14 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
return 0; return 0;
} }
int io_poll_issue(struct io_kiocb *req, bool *locked)
{
io_tw_lock(req->ctx, locked);
if (unlikely(req->task->flags & PF_EXITING))
return -EFAULT;
return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
}
struct io_wq_work *io_wq_free_work(struct io_wq_work *work) struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
{ {
struct io_kiocb *req = container_of(work, struct io_kiocb, work); struct io_kiocb *req = container_of(work, struct io_kiocb, work);
@ -8209,7 +7399,7 @@ static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
return -ECANCELED; return -ECANCELED;
} }
static const struct io_op_def io_op_defs[] = { const struct io_op_def io_op_defs[] = {
[IORING_OP_NOP] = { [IORING_OP_NOP] = {
.audit_skip = 1, .audit_skip = 1,
.iopoll = 1, .iopoll = 1,

View File

@ -92,6 +92,7 @@ static inline bool io_run_task_work(void)
return false; return false;
} }
void io_req_complete_failed(struct io_kiocb *req, s32 res);
void __io_req_complete(struct io_kiocb *req, unsigned issue_flags); void __io_req_complete(struct io_kiocb *req, unsigned issue_flags);
void io_req_complete_post(struct io_kiocb *req); void io_req_complete_post(struct io_kiocb *req);
void __io_req_complete_post(struct io_kiocb *req); void __io_req_complete_post(struct io_kiocb *req);
@ -109,6 +110,32 @@ static inline bool io_do_buffer_select(struct io_kiocb *req)
return !(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)); return !(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING));
} }
void __io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags);
static inline void io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags)
{
if (!(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
return;
/*
* For legacy provided buffer mode, don't recycle if we already did
* IO to this buffer. For ring-mapped provided buffer mode, we should
* increment ring->head to explicitly monopolize the buffer to avoid
* multiple use.
*/
if ((req->flags & REQ_F_BUFFER_SELECTED) &&
(req->flags & REQ_F_PARTIAL_IO))
return;
/*
* READV uses fields in `struct io_rw` (len/addr) to stash the selected
* buffer data. However if that buffer is recycled the original request
* data stored in addr is lost. Therefore forbid recycling for now.
*/
if (req->opcode == IORING_OP_READV)
return;
__io_kbuf_recycle(req, issue_flags);
}
struct file *io_file_get_normal(struct io_kiocb *req, int fd); struct file *io_file_get_normal(struct io_kiocb *req, int fd);
struct file *io_file_get_fixed(struct io_kiocb *req, int fd, struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
unsigned issue_flags); unsigned issue_flags);
@ -128,12 +155,14 @@ void io_req_task_work_add(struct io_kiocb *req);
void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags); void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags);
void io_req_task_complete(struct io_kiocb *req, bool *locked); void io_req_task_complete(struct io_kiocb *req, bool *locked);
void io_req_task_queue_fail(struct io_kiocb *req, int ret); void io_req_task_queue_fail(struct io_kiocb *req, int ret);
void io_req_task_submit(struct io_kiocb *req, bool *locked);
void tctx_task_work(struct callback_head *cb); void tctx_task_work(struct callback_head *cb);
int io_try_cancel(struct io_kiocb *req, struct io_cancel_data *cd); int io_try_cancel(struct io_kiocb *req, struct io_cancel_data *cd);
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd); __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
int io_uring_alloc_task_context(struct task_struct *task, int io_uring_alloc_task_context(struct task_struct *task,
struct io_ring_ctx *ctx); struct io_ring_ctx *ctx);
int io_poll_issue(struct io_kiocb *req, bool *locked);
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr); int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin); int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
@ -143,6 +172,9 @@ void io_wq_submit_work(struct io_wq_work *work);
void io_free_req(struct io_kiocb *req); void io_free_req(struct io_kiocb *req);
void io_queue_next(struct io_kiocb *req); void io_queue_next(struct io_kiocb *req);
bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
bool cancel_all);
#define io_for_each_link(pos, head) \ #define io_for_each_link(pos, head) \
for (pos = (head); pos; pos = pos->link) for (pos = (head); pos; pos = pos->link)

40
io_uring/opdef.h Normal file
View File

@ -0,0 +1,40 @@
// SPDX-License-Identifier: GPL-2.0
#ifndef IOU_OP_DEF_H
#define IOU_OP_DEF_H
struct io_op_def {
/* needs req->file assigned */
unsigned needs_file : 1;
/* should block plug */
unsigned plug : 1;
/* hash wq insertion if file is a regular file */
unsigned hash_reg_file : 1;
/* unbound wq insertion if file is a non-regular file */
unsigned unbound_nonreg_file : 1;
/* set if opcode supports polled "wait" */
unsigned pollin : 1;
unsigned pollout : 1;
unsigned poll_exclusive : 1;
/* op supports buffer selection */
unsigned buffer_select : 1;
/* opcode is not supported by this kernel */
unsigned not_supported : 1;
/* skip auditing */
unsigned audit_skip : 1;
/* supports ioprio */
unsigned ioprio : 1;
/* supports iopoll */
unsigned iopoll : 1;
/* size of async data needed, if any */
unsigned short async_size;
const char *name;
int (*prep)(struct io_kiocb *, const struct io_uring_sqe *);
int (*issue)(struct io_kiocb *, unsigned int);
int (*prep_async)(struct io_kiocb *);
void (*cleanup)(struct io_kiocb *);
};
extern const struct io_op_def io_op_defs[];
#endif

760
io_uring/poll.c Normal file
View File

@ -0,0 +1,760 @@
// SPDX-License-Identifier: GPL-2.0
#include <linux/kernel.h>
#include <linux/errno.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/mm.h>
#include <linux/slab.h>
#include <linux/poll.h>
#include <linux/hashtable.h>
#include <linux/io_uring.h>
#include <trace/events/io_uring.h>
#include <uapi/linux/io_uring.h>
#include "io_uring_types.h"
#include "io_uring.h"
#include "refs.h"
#include "opdef.h"
#include "poll.h"
struct io_poll_update {
struct file *file;
u64 old_user_data;
u64 new_user_data;
__poll_t events;
bool update_events;
bool update_user_data;
};
struct io_poll_table {
struct poll_table_struct pt;
struct io_kiocb *req;
int nr_entries;
int error;
};
#define IO_POLL_CANCEL_FLAG BIT(31)
#define IO_POLL_REF_MASK GENMASK(30, 0)
/*
* If refs part of ->poll_refs (see IO_POLL_REF_MASK) is 0, it's free. We can
* bump it and acquire ownership. It's disallowed to modify requests while not
* owning it, that prevents from races for enqueueing task_work's and b/w
* arming poll and wakeups.
*/
static inline bool io_poll_get_ownership(struct io_kiocb *req)
{
return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
}
static void io_poll_mark_cancelled(struct io_kiocb *req)
{
atomic_or(IO_POLL_CANCEL_FLAG, &req->poll_refs);
}
static struct io_poll *io_poll_get_double(struct io_kiocb *req)
{
/* pure poll stashes this in ->async_data, poll driven retry elsewhere */
if (req->opcode == IORING_OP_POLL_ADD)
return req->async_data;
return req->apoll->double_poll;
}
static struct io_poll *io_poll_get_single(struct io_kiocb *req)
{
if (req->opcode == IORING_OP_POLL_ADD)
return io_kiocb_to_cmd(req);
return &req->apoll->poll;
}
static void io_poll_req_insert(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
struct hlist_head *list;
list = &ctx->cancel_hash[hash_long(req->cqe.user_data, ctx->cancel_hash_bits)];
hlist_add_head(&req->hash_node, list);
}
static void io_init_poll_iocb(struct io_poll *poll, __poll_t events,
wait_queue_func_t wake_func)
{
poll->head = NULL;
#define IO_POLL_UNMASK (EPOLLERR|EPOLLHUP|EPOLLNVAL|EPOLLRDHUP)
/* mask in events that we always want/need */
poll->events = events | IO_POLL_UNMASK;
INIT_LIST_HEAD(&poll->wait.entry);
init_waitqueue_func_entry(&poll->wait, wake_func);
}
static inline void io_poll_remove_entry(struct io_poll *poll)
{
struct wait_queue_head *head = smp_load_acquire(&poll->head);
if (head) {
spin_lock_irq(&head->lock);
list_del_init(&poll->wait.entry);
poll->head = NULL;
spin_unlock_irq(&head->lock);
}
}
static void io_poll_remove_entries(struct io_kiocb *req)
{
/*
* Nothing to do if neither of those flags are set. Avoid dipping
* into the poll/apoll/double cachelines if we can.
*/
if (!(req->flags & (REQ_F_SINGLE_POLL | REQ_F_DOUBLE_POLL)))
return;
/*
* While we hold the waitqueue lock and the waitqueue is nonempty,
* wake_up_pollfree() will wait for us. However, taking the waitqueue
* lock in the first place can race with the waitqueue being freed.
*
* We solve this as eventpoll does: by taking advantage of the fact that
* all users of wake_up_pollfree() will RCU-delay the actual free. If
* we enter rcu_read_lock() and see that the pointer to the queue is
* non-NULL, we can then lock it without the memory being freed out from
* under us.
*
* Keep holding rcu_read_lock() as long as we hold the queue lock, in
* case the caller deletes the entry from the queue, leaving it empty.
* In that case, only RCU prevents the queue memory from being freed.
*/
rcu_read_lock();
if (req->flags & REQ_F_SINGLE_POLL)
io_poll_remove_entry(io_poll_get_single(req));
if (req->flags & REQ_F_DOUBLE_POLL)
io_poll_remove_entry(io_poll_get_double(req));
rcu_read_unlock();
}
/*
* All poll tw should go through this. Checks for poll events, manages
* references, does rewait, etc.
*
* Returns a negative error on failure. >0 when no action require, which is
* either spurious wakeup or multishot CQE is served. 0 when it's done with
* the request, then the mask is stored in req->cqe.res.
*/
static int io_poll_check_events(struct io_kiocb *req, bool *locked)
{
struct io_ring_ctx *ctx = req->ctx;
int v, ret;
/* req->task == current here, checking PF_EXITING is safe */
if (unlikely(req->task->flags & PF_EXITING))
return -ECANCELED;
do {
v = atomic_read(&req->poll_refs);
/* tw handler should be the owner, and so have some references */
if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK)))
return 0;
if (v & IO_POLL_CANCEL_FLAG)
return -ECANCELED;
if (!req->cqe.res) {
struct poll_table_struct pt = { ._key = req->apoll_events };
req->cqe.res = vfs_poll(req->file, &pt) & req->apoll_events;
}
if ((unlikely(!req->cqe.res)))
continue;
if (req->apoll_events & EPOLLONESHOT)
return 0;
/* multishot, just fill a CQE and proceed */
if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
__poll_t mask = mangle_poll(req->cqe.res &
req->apoll_events);
bool filled;
spin_lock(&ctx->completion_lock);
filled = io_fill_cqe_aux(ctx, req->cqe.user_data,
mask, IORING_CQE_F_MORE);
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
if (filled) {
io_cqring_ev_posted(ctx);
continue;
}
return -ECANCELED;
}
ret = io_poll_issue(req, locked);
if (ret)
return ret;
/*
* Release all references, retry if someone tried to restart
* task_work while we were executing it.
*/
} while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs));
return 1;
}
static void io_poll_task_func(struct io_kiocb *req, bool *locked)
{
struct io_ring_ctx *ctx = req->ctx;
int ret;
ret = io_poll_check_events(req, locked);
if (ret > 0)
return;
if (!ret) {
struct io_poll *poll = io_kiocb_to_cmd(req);
req->cqe.res = mangle_poll(req->cqe.res & poll->events);
} else {
req->cqe.res = ret;
req_set_fail(req);
}
io_poll_remove_entries(req);
spin_lock(&ctx->completion_lock);
hash_del(&req->hash_node);
req->cqe.flags = 0;
__io_req_complete_post(req);
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
}
static void io_apoll_task_func(struct io_kiocb *req, bool *locked)
{
struct io_ring_ctx *ctx = req->ctx;
int ret;
ret = io_poll_check_events(req, locked);
if (ret > 0)
return;
io_poll_remove_entries(req);
spin_lock(&ctx->completion_lock);
hash_del(&req->hash_node);
spin_unlock(&ctx->completion_lock);
if (!ret)
io_req_task_submit(req, locked);
else
io_req_complete_failed(req, ret);
}
static void __io_poll_execute(struct io_kiocb *req, int mask,
__poll_t __maybe_unused events)
{
io_req_set_res(req, mask, 0);
/*
* This is useful for poll that is armed on behalf of another
* request, and where the wakeup path could be on a different
* CPU. We want to avoid pulling in req->apoll->events for that
* case.
*/
if (req->opcode == IORING_OP_POLL_ADD)
req->io_task_work.func = io_poll_task_func;
else
req->io_task_work.func = io_apoll_task_func;
trace_io_uring_task_add(req->ctx, req, req->cqe.user_data, req->opcode, mask);
io_req_task_work_add(req);
}
static inline void io_poll_execute(struct io_kiocb *req, int res,
__poll_t events)
{
if (io_poll_get_ownership(req))
__io_poll_execute(req, res, events);
}
static void io_poll_cancel_req(struct io_kiocb *req)
{
io_poll_mark_cancelled(req);
/* kick tw, which should complete the request */
io_poll_execute(req, 0, 0);
}
#define wqe_to_req(wait) ((void *)((unsigned long) (wait)->private & ~1))
#define wqe_is_double(wait) ((unsigned long) (wait)->private & 1)
#define IO_ASYNC_POLL_COMMON (EPOLLONESHOT | EPOLLPRI)
static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
void *key)
{
struct io_kiocb *req = wqe_to_req(wait);
struct io_poll *poll = container_of(wait, struct io_poll, wait);
__poll_t mask = key_to_poll(key);
if (unlikely(mask & POLLFREE)) {
io_poll_mark_cancelled(req);
/* we have to kick tw in case it's not already */
io_poll_execute(req, 0, poll->events);
/*
* If the waitqueue is being freed early but someone is already
* holds ownership over it, we have to tear down the request as
* best we can. That means immediately removing the request from
* its waitqueue and preventing all further accesses to the
* waitqueue via the request.
*/
list_del_init(&poll->wait.entry);
/*
* Careful: this *must* be the last step, since as soon
* as req->head is NULL'ed out, the request can be
* completed and freed, since aio_poll_complete_work()
* will no longer need to take the waitqueue lock.
*/
smp_store_release(&poll->head, NULL);
return 1;
}
/* for instances that support it check for an event match first */
if (mask && !(mask & (poll->events & ~IO_ASYNC_POLL_COMMON)))
return 0;
if (io_poll_get_ownership(req)) {
/* optional, saves extra locking for removal in tw handler */
if (mask && poll->events & EPOLLONESHOT) {
list_del_init(&poll->wait.entry);
poll->head = NULL;
if (wqe_is_double(wait))
req->flags &= ~REQ_F_DOUBLE_POLL;
else
req->flags &= ~REQ_F_SINGLE_POLL;
}
__io_poll_execute(req, mask, poll->events);
}
return 1;
}
static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt,
struct wait_queue_head *head,
struct io_poll **poll_ptr)
{
struct io_kiocb *req = pt->req;
unsigned long wqe_private = (unsigned long) req;
/*
* The file being polled uses multiple waitqueues for poll handling
* (e.g. one for read, one for write). Setup a separate io_poll
* if this happens.
*/
if (unlikely(pt->nr_entries)) {
struct io_poll *first = poll;
/* double add on the same waitqueue head, ignore */
if (first->head == head)
return;
/* already have a 2nd entry, fail a third attempt */
if (*poll_ptr) {
if ((*poll_ptr)->head == head)
return;
pt->error = -EINVAL;
return;
}
poll = kmalloc(sizeof(*poll), GFP_ATOMIC);
if (!poll) {
pt->error = -ENOMEM;
return;
}
/* mark as double wq entry */
wqe_private |= 1;
req->flags |= REQ_F_DOUBLE_POLL;
io_init_poll_iocb(poll, first->events, first->wait.func);
*poll_ptr = poll;
if (req->opcode == IORING_OP_POLL_ADD)
req->flags |= REQ_F_ASYNC_DATA;
}
req->flags |= REQ_F_SINGLE_POLL;
pt->nr_entries++;
poll->head = head;
poll->wait.private = (void *) wqe_private;
if (poll->events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(head, &poll->wait);
else
add_wait_queue(head, &poll->wait);
}
static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
struct poll_table_struct *p)
{
struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
struct io_poll *poll = io_kiocb_to_cmd(pt->req);
__io_queue_proc(poll, pt, head,
(struct io_poll **) &pt->req->async_data);
}
static int __io_arm_poll_handler(struct io_kiocb *req,
struct io_poll *poll,
struct io_poll_table *ipt, __poll_t mask)
{
struct io_ring_ctx *ctx = req->ctx;
int v;
INIT_HLIST_NODE(&req->hash_node);
req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
io_init_poll_iocb(poll, mask, io_poll_wake);
poll->file = req->file;
req->apoll_events = poll->events;
ipt->pt._key = mask;
ipt->req = req;
ipt->error = 0;
ipt->nr_entries = 0;
/*
* Take the ownership to delay any tw execution up until we're done
* with poll arming. see io_poll_get_ownership().
*/
atomic_set(&req->poll_refs, 1);
mask = vfs_poll(req->file, &ipt->pt) & poll->events;
if (mask && (poll->events & EPOLLONESHOT)) {
io_poll_remove_entries(req);
/* no one else has access to the req, forget about the ref */
return mask;
}
if (!mask && unlikely(ipt->error || !ipt->nr_entries)) {
io_poll_remove_entries(req);
if (!ipt->error)
ipt->error = -EINVAL;
return 0;
}
spin_lock(&ctx->completion_lock);
io_poll_req_insert(req);
spin_unlock(&ctx->completion_lock);
if (mask) {
/* can't multishot if failed, just queue the event we've got */
if (unlikely(ipt->error || !ipt->nr_entries)) {
poll->events |= EPOLLONESHOT;
req->apoll_events |= EPOLLONESHOT;
ipt->error = 0;
}
__io_poll_execute(req, mask, poll->events);
return 0;
}
/*
* Release ownership. If someone tried to queue a tw while it was
* locked, kick it off for them.
*/
v = atomic_dec_return(&req->poll_refs);
if (unlikely(v & IO_POLL_REF_MASK))
__io_poll_execute(req, 0, poll->events);
return 0;
}
static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
struct poll_table_struct *p)
{
struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
struct async_poll *apoll = pt->req->apoll;
__io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
}
int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags)
{
const struct io_op_def *def = &io_op_defs[req->opcode];
struct io_ring_ctx *ctx = req->ctx;
struct async_poll *apoll;
struct io_poll_table ipt;
__poll_t mask = POLLPRI | POLLERR;
int ret;
if (!def->pollin && !def->pollout)
return IO_APOLL_ABORTED;
if (!file_can_poll(req->file))
return IO_APOLL_ABORTED;
if ((req->flags & (REQ_F_POLLED|REQ_F_PARTIAL_IO)) == REQ_F_POLLED)
return IO_APOLL_ABORTED;
if (!(req->flags & REQ_F_APOLL_MULTISHOT))
mask |= EPOLLONESHOT;
if (def->pollin) {
mask |= EPOLLIN | EPOLLRDNORM;
/* If reading from MSG_ERRQUEUE using recvmsg, ignore POLLIN */
if (req->flags & REQ_F_CLEAR_POLLIN)
mask &= ~EPOLLIN;
} else {
mask |= EPOLLOUT | EPOLLWRNORM;
}
if (def->poll_exclusive)
mask |= EPOLLEXCLUSIVE;
if (req->flags & REQ_F_POLLED) {
apoll = req->apoll;
kfree(apoll->double_poll);
} else if (!(issue_flags & IO_URING_F_UNLOCKED) &&
!list_empty(&ctx->apoll_cache)) {
apoll = list_first_entry(&ctx->apoll_cache, struct async_poll,
poll.wait.entry);
list_del_init(&apoll->poll.wait.entry);
} else {
apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC);
if (unlikely(!apoll))
return IO_APOLL_ABORTED;
}
apoll->double_poll = NULL;
req->apoll = apoll;
req->flags |= REQ_F_POLLED;
ipt.pt._qproc = io_async_queue_proc;
io_kbuf_recycle(req, issue_flags);
ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask);
if (ret || ipt.error)
return ret ? IO_APOLL_READY : IO_APOLL_ABORTED;
trace_io_uring_poll_arm(ctx, req, req->cqe.user_data, req->opcode,
mask, apoll->poll.events);
return IO_APOLL_OK;
}
/*
* Returns true if we found and killed one or more poll requests
*/
__cold bool io_poll_remove_all(struct io_ring_ctx *ctx, struct task_struct *tsk,
bool cancel_all)
{
struct hlist_node *tmp;
struct io_kiocb *req;
bool found = false;
int i;
spin_lock(&ctx->completion_lock);
for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
struct hlist_head *list;
list = &ctx->cancel_hash[i];
hlist_for_each_entry_safe(req, tmp, list, hash_node) {
if (io_match_task_safe(req, tsk, cancel_all)) {
hlist_del_init(&req->hash_node);
io_poll_cancel_req(req);
found = true;
}
}
}
spin_unlock(&ctx->completion_lock);
return found;
}
static struct io_kiocb *io_poll_find(struct io_ring_ctx *ctx, bool poll_only,
struct io_cancel_data *cd)
__must_hold(&ctx->completion_lock)
{
struct hlist_head *list;
struct io_kiocb *req;
list = &ctx->cancel_hash[hash_long(cd->data, ctx->cancel_hash_bits)];
hlist_for_each_entry(req, list, hash_node) {
if (cd->data != req->cqe.user_data)
continue;
if (poll_only && req->opcode != IORING_OP_POLL_ADD)
continue;
if (cd->flags & IORING_ASYNC_CANCEL_ALL) {
if (cd->seq == req->work.cancel_seq)
continue;
req->work.cancel_seq = cd->seq;
}
return req;
}
return NULL;
}
static struct io_kiocb *io_poll_file_find(struct io_ring_ctx *ctx,
struct io_cancel_data *cd)
__must_hold(&ctx->completion_lock)
{
struct io_kiocb *req;
int i;
for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
struct hlist_head *list;
list = &ctx->cancel_hash[i];
hlist_for_each_entry(req, list, hash_node) {
if (!(cd->flags & IORING_ASYNC_CANCEL_ANY) &&
req->file != cd->file)
continue;
if (cd->seq == req->work.cancel_seq)
continue;
req->work.cancel_seq = cd->seq;
return req;
}
}
return NULL;
}
static bool io_poll_disarm(struct io_kiocb *req)
__must_hold(&ctx->completion_lock)
{
if (!io_poll_get_ownership(req))
return false;
io_poll_remove_entries(req);
hash_del(&req->hash_node);
return true;
}
int io_poll_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd)
__must_hold(&ctx->completion_lock)
{
struct io_kiocb *req;
if (cd->flags & (IORING_ASYNC_CANCEL_FD|IORING_ASYNC_CANCEL_ANY))
req = io_poll_file_find(ctx, cd);
else
req = io_poll_find(ctx, false, cd);
if (!req)
return -ENOENT;
io_poll_cancel_req(req);
return 0;
}
static __poll_t io_poll_parse_events(const struct io_uring_sqe *sqe,
unsigned int flags)
{
u32 events;
events = READ_ONCE(sqe->poll32_events);
#ifdef __BIG_ENDIAN
events = swahw32(events);
#endif
if (!(flags & IORING_POLL_ADD_MULTI))
events |= EPOLLONESHOT;
return demangle_poll(events) | (events & (EPOLLEXCLUSIVE|EPOLLONESHOT));
}
int io_poll_remove_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_poll_update *upd = io_kiocb_to_cmd(req);
u32 flags;
if (sqe->buf_index || sqe->splice_fd_in)
return -EINVAL;
flags = READ_ONCE(sqe->len);
if (flags & ~(IORING_POLL_UPDATE_EVENTS | IORING_POLL_UPDATE_USER_DATA |
IORING_POLL_ADD_MULTI))
return -EINVAL;
/* meaningless without update */
if (flags == IORING_POLL_ADD_MULTI)
return -EINVAL;
upd->old_user_data = READ_ONCE(sqe->addr);
upd->update_events = flags & IORING_POLL_UPDATE_EVENTS;
upd->update_user_data = flags & IORING_POLL_UPDATE_USER_DATA;
upd->new_user_data = READ_ONCE(sqe->off);
if (!upd->update_user_data && upd->new_user_data)
return -EINVAL;
if (upd->update_events)
upd->events = io_poll_parse_events(sqe, flags);
else if (sqe->poll32_events)
return -EINVAL;
return 0;
}
int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_poll *poll = io_kiocb_to_cmd(req);
u32 flags;
if (sqe->buf_index || sqe->off || sqe->addr)
return -EINVAL;
flags = READ_ONCE(sqe->len);
if (flags & ~IORING_POLL_ADD_MULTI)
return -EINVAL;
if ((flags & IORING_POLL_ADD_MULTI) && (req->flags & REQ_F_CQE_SKIP))
return -EINVAL;
io_req_set_refcount(req);
poll->events = io_poll_parse_events(sqe, flags);
return 0;
}
int io_poll_add(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_poll *poll = io_kiocb_to_cmd(req);
struct io_poll_table ipt;
int ret;
ipt.pt._qproc = io_poll_queue_proc;
ret = __io_arm_poll_handler(req, poll, &ipt, poll->events);
if (ret) {
io_req_set_res(req, ret, 0);
return IOU_OK;
}
if (ipt.error) {
req_set_fail(req);
return ipt.error;
}
return IOU_ISSUE_SKIP_COMPLETE;
}
int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_poll_update *poll_update = io_kiocb_to_cmd(req);
struct io_cancel_data cd = { .data = poll_update->old_user_data, };
struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *preq;
int ret2, ret = 0;
bool locked;
spin_lock(&ctx->completion_lock);
preq = io_poll_find(ctx, true, &cd);
if (!preq || !io_poll_disarm(preq)) {
spin_unlock(&ctx->completion_lock);
ret = preq ? -EALREADY : -ENOENT;
goto out;
}
spin_unlock(&ctx->completion_lock);
if (poll_update->update_events || poll_update->update_user_data) {
/* only mask one event flags, keep behavior flags */
if (poll_update->update_events) {
struct io_poll *poll = io_kiocb_to_cmd(preq);
poll->events &= ~0xffff;
poll->events |= poll_update->events & 0xffff;
poll->events |= IO_POLL_UNMASK;
}
if (poll_update->update_user_data)
preq->cqe.user_data = poll_update->new_user_data;
ret2 = io_poll_add(preq, issue_flags);
/* successfully updated, don't complete poll request */
if (!ret2 || ret2 == -EIOCBQUEUED)
goto out;
}
req_set_fail(preq);
io_req_set_res(preq, -ECANCELED, 0);
locked = !(issue_flags & IO_URING_F_UNLOCKED);
io_req_task_complete(preq, &locked);
out:
if (ret < 0) {
req_set_fail(req);
return ret;
}
/* complete update request, we're done with it */
io_req_set_res(req, ret, 0);
return IOU_OK;
}

30
io_uring/poll.h Normal file
View File

@ -0,0 +1,30 @@
// SPDX-License-Identifier: GPL-2.0
enum {
IO_APOLL_OK,
IO_APOLL_ABORTED,
IO_APOLL_READY
};
struct io_poll {
struct file *file;
struct wait_queue_head *head;
__poll_t events;
struct wait_queue_entry wait;
};
struct async_poll {
struct io_poll poll;
struct io_poll *double_poll;
};
int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
int io_poll_add(struct io_kiocb *req, unsigned int issue_flags);
int io_poll_remove_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags);
int io_poll_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd);
int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags);
bool io_poll_remove_all(struct io_ring_ctx *ctx, struct task_struct *tsk,
bool cancel_all);