mirror of
https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git
synced 2024-12-29 09:16:33 +00:00
io_uring: One wqe per wq
Right now io_wq allocates one io_wqe per NUMA node. As io_wq is now bound to a task, the task basically uses only the NUMA local io_wqe, and almost never changes NUMA nodes, thus, the other wqes are mostly unused. Allocate just one io_wqe embedded into io_wq, and uses all possible cpus (cpu_possible_mask) in the io_wqe->cpumask. Signed-off-by: Breno Leitao <leitao@debian.org> Link: https://lore.kernel.org/r/20230310201107.4020580-1-leitao@debian.org Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
parent
c56e022c0a
commit
da64d6db3b
176
io_uring/io-wq.c
176
io_uring/io-wq.c
@ -15,6 +15,7 @@
|
|||||||
#include <linux/cpu.h>
|
#include <linux/cpu.h>
|
||||||
#include <linux/task_work.h>
|
#include <linux/task_work.h>
|
||||||
#include <linux/audit.h>
|
#include <linux/audit.h>
|
||||||
|
#include <linux/mmu_context.h>
|
||||||
#include <uapi/linux/io_uring.h>
|
#include <uapi/linux/io_uring.h>
|
||||||
|
|
||||||
#include "io-wq.h"
|
#include "io-wq.h"
|
||||||
@ -96,8 +97,6 @@ struct io_wqe {
|
|||||||
raw_spinlock_t lock;
|
raw_spinlock_t lock;
|
||||||
struct io_wqe_acct acct[IO_WQ_ACCT_NR];
|
struct io_wqe_acct acct[IO_WQ_ACCT_NR];
|
||||||
|
|
||||||
int node;
|
|
||||||
|
|
||||||
struct hlist_nulls_head free_list;
|
struct hlist_nulls_head free_list;
|
||||||
struct list_head all_list;
|
struct list_head all_list;
|
||||||
|
|
||||||
@ -127,7 +126,7 @@ struct io_wq {
|
|||||||
|
|
||||||
struct task_struct *task;
|
struct task_struct *task;
|
||||||
|
|
||||||
struct io_wqe *wqes[];
|
struct io_wqe wqe;
|
||||||
};
|
};
|
||||||
|
|
||||||
static enum cpuhp_state io_wq_online;
|
static enum cpuhp_state io_wq_online;
|
||||||
@ -754,7 +753,7 @@ static void create_worker_cont(struct callback_head *cb)
|
|||||||
worker = container_of(cb, struct io_worker, create_work);
|
worker = container_of(cb, struct io_worker, create_work);
|
||||||
clear_bit_unlock(0, &worker->create_state);
|
clear_bit_unlock(0, &worker->create_state);
|
||||||
wqe = worker->wqe;
|
wqe = worker->wqe;
|
||||||
tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
|
tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE);
|
||||||
if (!IS_ERR(tsk)) {
|
if (!IS_ERR(tsk)) {
|
||||||
io_init_new_worker(wqe, worker, tsk);
|
io_init_new_worker(wqe, worker, tsk);
|
||||||
io_worker_release(worker);
|
io_worker_release(worker);
|
||||||
@ -804,7 +803,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
|
|||||||
|
|
||||||
__set_current_state(TASK_RUNNING);
|
__set_current_state(TASK_RUNNING);
|
||||||
|
|
||||||
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
|
worker = kzalloc(sizeof(*worker), GFP_KERNEL);
|
||||||
if (!worker) {
|
if (!worker) {
|
||||||
fail:
|
fail:
|
||||||
atomic_dec(&acct->nr_running);
|
atomic_dec(&acct->nr_running);
|
||||||
@ -823,7 +822,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
|
|||||||
if (index == IO_WQ_ACCT_BOUND)
|
if (index == IO_WQ_ACCT_BOUND)
|
||||||
worker->flags |= IO_WORKER_F_BOUND;
|
worker->flags |= IO_WORKER_F_BOUND;
|
||||||
|
|
||||||
tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
|
tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE);
|
||||||
if (!IS_ERR(tsk)) {
|
if (!IS_ERR(tsk)) {
|
||||||
io_init_new_worker(wqe, worker, tsk);
|
io_init_new_worker(wqe, worker, tsk);
|
||||||
} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
|
} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
|
||||||
@ -961,7 +960,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
|
|||||||
|
|
||||||
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
|
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
|
||||||
{
|
{
|
||||||
struct io_wqe *wqe = wq->wqes[numa_node_id()];
|
struct io_wqe *wqe = &wq->wqe;
|
||||||
|
|
||||||
io_wqe_enqueue(wqe, work);
|
io_wqe_enqueue(wqe, work);
|
||||||
}
|
}
|
||||||
@ -1083,7 +1082,7 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
|
|||||||
.data = data,
|
.data = data,
|
||||||
.cancel_all = cancel_all,
|
.cancel_all = cancel_all,
|
||||||
};
|
};
|
||||||
int node;
|
struct io_wqe *wqe = &wq->wqe;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* First check pending list, if we're lucky we can just remove it
|
* First check pending list, if we're lucky we can just remove it
|
||||||
@ -1098,19 +1097,15 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
|
|||||||
* Do both of these while holding the wqe->lock, to ensure that
|
* Do both of these while holding the wqe->lock, to ensure that
|
||||||
* we'll find a work item regardless of state.
|
* we'll find a work item regardless of state.
|
||||||
*/
|
*/
|
||||||
for_each_node(node) {
|
io_wqe_cancel_pending_work(wqe, &match);
|
||||||
struct io_wqe *wqe = wq->wqes[node];
|
if (match.nr_pending && !match.cancel_all)
|
||||||
|
return IO_WQ_CANCEL_OK;
|
||||||
|
|
||||||
io_wqe_cancel_pending_work(wqe, &match);
|
raw_spin_lock(&wqe->lock);
|
||||||
if (match.nr_pending && !match.cancel_all)
|
io_wqe_cancel_running_work(wqe, &match);
|
||||||
return IO_WQ_CANCEL_OK;
|
raw_spin_unlock(&wqe->lock);
|
||||||
|
if (match.nr_running && !match.cancel_all)
|
||||||
raw_spin_lock(&wqe->lock);
|
return IO_WQ_CANCEL_RUNNING;
|
||||||
io_wqe_cancel_running_work(wqe, &match);
|
|
||||||
raw_spin_unlock(&wqe->lock);
|
|
||||||
if (match.nr_running && !match.cancel_all)
|
|
||||||
return IO_WQ_CANCEL_RUNNING;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (match.nr_running)
|
if (match.nr_running)
|
||||||
return IO_WQ_CANCEL_RUNNING;
|
return IO_WQ_CANCEL_RUNNING;
|
||||||
@ -1140,15 +1135,16 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
|
|||||||
|
|
||||||
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
|
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
|
||||||
{
|
{
|
||||||
int ret, node, i;
|
int ret, i;
|
||||||
struct io_wq *wq;
|
struct io_wq *wq;
|
||||||
|
struct io_wqe *wqe;
|
||||||
|
|
||||||
if (WARN_ON_ONCE(!data->free_work || !data->do_work))
|
if (WARN_ON_ONCE(!data->free_work || !data->do_work))
|
||||||
return ERR_PTR(-EINVAL);
|
return ERR_PTR(-EINVAL);
|
||||||
if (WARN_ON_ONCE(!bounded))
|
if (WARN_ON_ONCE(!bounded))
|
||||||
return ERR_PTR(-EINVAL);
|
return ERR_PTR(-EINVAL);
|
||||||
|
|
||||||
wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
|
wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
|
||||||
if (!wq)
|
if (!wq)
|
||||||
return ERR_PTR(-ENOMEM);
|
return ERR_PTR(-ENOMEM);
|
||||||
ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
|
ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
|
||||||
@ -1159,40 +1155,30 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
|
|||||||
wq->hash = data->hash;
|
wq->hash = data->hash;
|
||||||
wq->free_work = data->free_work;
|
wq->free_work = data->free_work;
|
||||||
wq->do_work = data->do_work;
|
wq->do_work = data->do_work;
|
||||||
|
wqe = &wq->wqe;
|
||||||
|
|
||||||
ret = -ENOMEM;
|
ret = -ENOMEM;
|
||||||
for_each_node(node) {
|
|
||||||
struct io_wqe *wqe;
|
|
||||||
int alloc_node = node;
|
|
||||||
|
|
||||||
if (!node_online(alloc_node))
|
if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
|
||||||
alloc_node = NUMA_NO_NODE;
|
goto err;
|
||||||
wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
|
cpumask_copy(wqe->cpu_mask, cpu_possible_mask);
|
||||||
if (!wqe)
|
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
|
||||||
goto err;
|
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
|
||||||
wq->wqes[node] = wqe;
|
task_rlimit(current, RLIMIT_NPROC);
|
||||||
if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
|
INIT_LIST_HEAD(&wqe->wait.entry);
|
||||||
goto err;
|
wqe->wait.func = io_wqe_hash_wake;
|
||||||
cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
|
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
|
||||||
wqe->node = alloc_node;
|
struct io_wqe_acct *acct = &wqe->acct[i];
|
||||||
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
|
|
||||||
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
|
|
||||||
task_rlimit(current, RLIMIT_NPROC);
|
|
||||||
INIT_LIST_HEAD(&wqe->wait.entry);
|
|
||||||
wqe->wait.func = io_wqe_hash_wake;
|
|
||||||
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
|
|
||||||
struct io_wqe_acct *acct = &wqe->acct[i];
|
|
||||||
|
|
||||||
acct->index = i;
|
acct->index = i;
|
||||||
atomic_set(&acct->nr_running, 0);
|
atomic_set(&acct->nr_running, 0);
|
||||||
INIT_WQ_LIST(&acct->work_list);
|
INIT_WQ_LIST(&acct->work_list);
|
||||||
raw_spin_lock_init(&acct->lock);
|
raw_spin_lock_init(&acct->lock);
|
||||||
}
|
|
||||||
wqe->wq = wq;
|
|
||||||
raw_spin_lock_init(&wqe->lock);
|
|
||||||
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
|
|
||||||
INIT_LIST_HEAD(&wqe->all_list);
|
|
||||||
}
|
}
|
||||||
|
wqe->wq = wq;
|
||||||
|
raw_spin_lock_init(&wqe->lock);
|
||||||
|
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
|
||||||
|
INIT_LIST_HEAD(&wqe->all_list);
|
||||||
|
|
||||||
wq->task = get_task_struct(data->task);
|
wq->task = get_task_struct(data->task);
|
||||||
atomic_set(&wq->worker_refs, 1);
|
atomic_set(&wq->worker_refs, 1);
|
||||||
@ -1201,12 +1187,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
|
|||||||
err:
|
err:
|
||||||
io_wq_put_hash(data->hash);
|
io_wq_put_hash(data->hash);
|
||||||
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
|
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
|
||||||
for_each_node(node) {
|
|
||||||
if (!wq->wqes[node])
|
free_cpumask_var(wq->wqe.cpu_mask);
|
||||||
continue;
|
|
||||||
free_cpumask_var(wq->wqes[node]->cpu_mask);
|
|
||||||
kfree(wq->wqes[node]);
|
|
||||||
}
|
|
||||||
err_wq:
|
err_wq:
|
||||||
kfree(wq);
|
kfree(wq);
|
||||||
return ERR_PTR(ret);
|
return ERR_PTR(ret);
|
||||||
@ -1247,48 +1229,36 @@ static void io_wq_cancel_tw_create(struct io_wq *wq)
|
|||||||
|
|
||||||
static void io_wq_exit_workers(struct io_wq *wq)
|
static void io_wq_exit_workers(struct io_wq *wq)
|
||||||
{
|
{
|
||||||
int node;
|
|
||||||
|
|
||||||
if (!wq->task)
|
if (!wq->task)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
io_wq_cancel_tw_create(wq);
|
io_wq_cancel_tw_create(wq);
|
||||||
|
|
||||||
rcu_read_lock();
|
rcu_read_lock();
|
||||||
for_each_node(node) {
|
io_wq_for_each_worker(&wq->wqe, io_wq_worker_wake, NULL);
|
||||||
struct io_wqe *wqe = wq->wqes[node];
|
|
||||||
|
|
||||||
io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
|
|
||||||
}
|
|
||||||
rcu_read_unlock();
|
rcu_read_unlock();
|
||||||
io_worker_ref_put(wq);
|
io_worker_ref_put(wq);
|
||||||
wait_for_completion(&wq->worker_done);
|
wait_for_completion(&wq->worker_done);
|
||||||
|
|
||||||
for_each_node(node) {
|
spin_lock_irq(&wq->hash->wait.lock);
|
||||||
spin_lock_irq(&wq->hash->wait.lock);
|
list_del_init(&wq->wqe.wait.entry);
|
||||||
list_del_init(&wq->wqes[node]->wait.entry);
|
spin_unlock_irq(&wq->hash->wait.lock);
|
||||||
spin_unlock_irq(&wq->hash->wait.lock);
|
|
||||||
}
|
|
||||||
put_task_struct(wq->task);
|
put_task_struct(wq->task);
|
||||||
wq->task = NULL;
|
wq->task = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_wq_destroy(struct io_wq *wq)
|
static void io_wq_destroy(struct io_wq *wq)
|
||||||
{
|
{
|
||||||
int node;
|
struct io_cb_cancel_data match = {
|
||||||
|
.fn = io_wq_work_match_all,
|
||||||
|
.cancel_all = true,
|
||||||
|
};
|
||||||
|
struct io_wqe *wqe = &wq->wqe;
|
||||||
|
|
||||||
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
|
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
|
||||||
|
io_wqe_cancel_pending_work(wqe, &match);
|
||||||
for_each_node(node) {
|
free_cpumask_var(wqe->cpu_mask);
|
||||||
struct io_wqe *wqe = wq->wqes[node];
|
|
||||||
struct io_cb_cancel_data match = {
|
|
||||||
.fn = io_wq_work_match_all,
|
|
||||||
.cancel_all = true,
|
|
||||||
};
|
|
||||||
io_wqe_cancel_pending_work(wqe, &match);
|
|
||||||
free_cpumask_var(wqe->cpu_mask);
|
|
||||||
kfree(wqe);
|
|
||||||
}
|
|
||||||
io_wq_put_hash(wq->hash);
|
io_wq_put_hash(wq->hash);
|
||||||
kfree(wq);
|
kfree(wq);
|
||||||
}
|
}
|
||||||
@ -1323,11 +1293,9 @@ static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
|
|||||||
.cpu = cpu,
|
.cpu = cpu,
|
||||||
.online = online
|
.online = online
|
||||||
};
|
};
|
||||||
int i;
|
|
||||||
|
|
||||||
rcu_read_lock();
|
rcu_read_lock();
|
||||||
for_each_node(i)
|
io_wq_for_each_worker(&wq->wqe, io_wq_worker_affinity, &od);
|
||||||
io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
|
|
||||||
rcu_read_unlock();
|
rcu_read_unlock();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -1348,18 +1316,15 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
|
|||||||
|
|
||||||
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
|
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
|
||||||
{
|
{
|
||||||
int i;
|
struct io_wqe *wqe = &wq->wqe;
|
||||||
|
|
||||||
rcu_read_lock();
|
rcu_read_lock();
|
||||||
for_each_node(i) {
|
if (mask)
|
||||||
struct io_wqe *wqe = wq->wqes[i];
|
cpumask_copy(wqe->cpu_mask, mask);
|
||||||
|
else
|
||||||
if (mask)
|
cpumask_copy(wqe->cpu_mask, cpu_possible_mask);
|
||||||
cpumask_copy(wqe->cpu_mask, mask);
|
|
||||||
else
|
|
||||||
cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
|
|
||||||
}
|
|
||||||
rcu_read_unlock();
|
rcu_read_unlock();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1369,9 +1334,10 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
|
|||||||
*/
|
*/
|
||||||
int io_wq_max_workers(struct io_wq *wq, int *new_count)
|
int io_wq_max_workers(struct io_wq *wq, int *new_count)
|
||||||
{
|
{
|
||||||
|
struct io_wqe *wqe = &wq->wqe;
|
||||||
|
struct io_wqe_acct *acct;
|
||||||
int prev[IO_WQ_ACCT_NR];
|
int prev[IO_WQ_ACCT_NR];
|
||||||
bool first_node = true;
|
int i;
|
||||||
int i, node;
|
|
||||||
|
|
||||||
BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
|
BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
|
||||||
BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
|
BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
|
||||||
@ -1386,21 +1352,15 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
|
|||||||
prev[i] = 0;
|
prev[i] = 0;
|
||||||
|
|
||||||
rcu_read_lock();
|
rcu_read_lock();
|
||||||
for_each_node(node) {
|
|
||||||
struct io_wqe *wqe = wq->wqes[node];
|
|
||||||
struct io_wqe_acct *acct;
|
|
||||||
|
|
||||||
raw_spin_lock(&wqe->lock);
|
raw_spin_lock(&wqe->lock);
|
||||||
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
|
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
|
||||||
acct = &wqe->acct[i];
|
acct = &wqe->acct[i];
|
||||||
if (first_node)
|
prev[i] = max_t(int, acct->max_workers, prev[i]);
|
||||||
prev[i] = max_t(int, acct->max_workers, prev[i]);
|
if (new_count[i])
|
||||||
if (new_count[i])
|
acct->max_workers = new_count[i];
|
||||||
acct->max_workers = new_count[i];
|
|
||||||
}
|
|
||||||
raw_spin_unlock(&wqe->lock);
|
|
||||||
first_node = false;
|
|
||||||
}
|
}
|
||||||
|
raw_spin_unlock(&wqe->lock);
|
||||||
rcu_read_unlock();
|
rcu_read_unlock();
|
||||||
|
|
||||||
for (i = 0; i < IO_WQ_ACCT_NR; i++)
|
for (i = 0; i < IO_WQ_ACCT_NR; i++)
|
||||||
|
Loading…
Reference in New Issue
Block a user