linux-stable/kernel/workqueue.c
Stuart Hayes dad11ff11f usb: Fix deadlock in hid_reset when Dell iDRAC is reset
This was fixed upstream by commit e22bee782b
('workqueue: implement concurrency managed dynamic worker pool'), but
that is far too large a change for stable.

When Dell iDRAC is reset, the iDRAC's USB keyboard/mouse device stops
responding but is not actually disconnected.  This causes usbhid to
hid hid_io_error(), and you get a chain of calls like...

hid_reset()
 usb_reset_device()
  usb_reset_and_verify_device()
   usb_ep0_reinit()
    usb_disble_endpoint()
     usb_hcd_disable_endpoint()
      ehci_endpoint_disable()

Along the way, as a result of an error/timeout with a USB transaction,
ehci_clear_tt_buffer() calls usb_hub_clear_tt_buffer() (to clear a failed
transaction out of the transaction translator in the hub), which schedules
hub_tt_work() to be run (using keventd), and then sets qh->clearing_tt=1 so
that nobody will mess with that qh until the TT is cleared.

But run_workqueue() never happens for the keventd workqueue on that CPU, so
hub_tt_work() never gets run.  And qh->clearing_tt never gets changed back to
0.

This causes ehci_endpoint_disable() to get stuck in a loop waiting for
qh->clearing_tt to go to 0.

Part of the problem is hid_reset() is itself running on keventd.  So
when that thread gets a timeout trying to talk to the HID device, it
schedules clear_work (to run hub_tt_work) to run, and then gets stuck
in ehci_endpoint_disable waiting for it to run.

However, clear_work never gets run because the workqueue for that CPU
is still waiting for hid_reset to finish.

A much less invasive patch for earlier kernels is to just schedule
clear_work on khubd if the usb code needs to clear the TT and it sees
that it is already running on keventd.  Khubd isn't used by default
because it can get blocked by device enumeration sometimes, but I
think it should be ok for a backup for unusual cases like this just to
prevent deadlock.

Signed-off-by: Stuart Hayes <stuart_hayes@dell.com>
Signed-off-by: Shyam Iyer <shyam_iyer@dell.com>
[bwh: Use current_is_keventd() rather than checking current->{flags,comm}]
Signed-off-by: Ben Hutchings <ben@decadent.org.uk>
Signed-off-by: Willy Tarreau <w@1wt.eu>
2012-10-07 23:37:12 +02:00

1056 lines
26 KiB
C

/*
* linux/kernel/workqueue.c
*
* Generic mechanism for defining kernel helper threads for running
* arbitrary tasks in process context.
*
* Started by Ingo Molnar, Copyright (C) 2002
*
* Derived from the taskqueue/keventd code by:
*
* David Woodhouse <dwmw2@infradead.org>
* Andrew Morton
* Kai Petzke <wpp@marie.physik.tu-berlin.de>
* Theodore Ts'o <tytso@mit.edu>
*
* Made to use alloc_percpu by Christoph Lameter.
*/
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/sched.h>
#include <linux/init.h>
#include <linux/signal.h>
#include <linux/completion.h>
#include <linux/workqueue.h>
#include <linux/slab.h>
#include <linux/cpu.h>
#include <linux/notifier.h>
#include <linux/kthread.h>
#include <linux/hardirq.h>
#include <linux/mempolicy.h>
#include <linux/freezer.h>
#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
#define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h>
/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu).
*/
struct cpu_workqueue_struct {
spinlock_t lock;
struct list_head worklist;
wait_queue_head_t more_work;
struct work_struct *current_work;
struct workqueue_struct *wq;
struct task_struct *thread;
} ____cacheline_aligned;
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq;
struct list_head list;
const char *name;
int singlethread;
int freezeable; /* Freeze threads during suspend */
int rt;
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
/* Serializes the accesses to the list of workqueues. */
static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
static int singlethread_cpu __read_mostly;
static const struct cpumask *cpu_singlethread_map __read_mostly;
/*
* _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
* flushes cwq->worklist. This means that flush_workqueue/wait_on_work
* which comes in between can't use for_each_online_cpu(). We could
* use cpu_possible_map, the cpumask below is more a documentation
* than optimization.
*/
static cpumask_var_t cpu_populated_map __read_mostly;
/* If it's single threaded, it isn't in the list of workqueues. */
static inline int is_wq_single_threaded(struct workqueue_struct *wq)
{
return wq->singlethread;
}
static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
{
return is_wq_single_threaded(wq)
? cpu_singlethread_map : cpu_populated_map;
}
static
struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
{
if (unlikely(is_wq_single_threaded(wq)))
cpu = singlethread_cpu;
return per_cpu_ptr(wq->cpu_wq, cpu);
}
/*
* Set the workqueue on which a work item is to be run
* - Must *only* be called if the pending flag is set
*/
static inline void set_wq_data(struct work_struct *work,
struct cpu_workqueue_struct *cwq)
{
unsigned long new;
BUG_ON(!work_pending(work));
new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
atomic_long_set(&work->data, new);
}
static inline
struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
{
return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
}
static void insert_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work, struct list_head *head)
{
trace_workqueue_insertion(cwq->thread, work);
set_wq_data(work, cwq);
/*
* Ensure that we get the right work->data if we see the
* result of list_add() below, see try_to_grab_pending().
*/
smp_wmb();
list_add_tail(&work->entry, head);
wake_up(&cwq->more_work);
}
static void __queue_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
unsigned long flags;
spin_lock_irqsave(&cwq->lock, flags);
insert_work(cwq, work, &cwq->worklist);
spin_unlock_irqrestore(&cwq->lock, flags);
}
/**
* queue_work - queue work on a workqueue
* @wq: workqueue to use
* @work: work to queue
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*
* We queue the work to the CPU on which it was submitted, but if the CPU dies
* it can be processed by another CPU.
*/
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
int ret;
ret = queue_work_on(get_cpu(), wq, work);
put_cpu();
return ret;
}
EXPORT_SYMBOL_GPL(queue_work);
/**
* queue_work_on - queue work on specific cpu
* @cpu: CPU number to execute work on
* @wq: workqueue to use
* @work: work to queue
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*
* We queue the work to a specific CPU, the caller must ensure it
* can't go away.
*/
int
queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
{
int ret = 0;
if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
BUG_ON(!list_empty(&work->entry));
__queue_work(wq_per_cpu(wq, cpu), work);
ret = 1;
}
return ret;
}
EXPORT_SYMBOL_GPL(queue_work_on);
static void delayed_work_timer_fn(unsigned long __data)
{
struct delayed_work *dwork = (struct delayed_work *)__data;
struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
struct workqueue_struct *wq = cwq->wq;
__queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
}
/**
* queue_delayed_work - queue work on a workqueue after delay
* @wq: workqueue to use
* @dwork: delayable work to queue
* @delay: number of jiffies to wait before queueing
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*/
int queue_delayed_work(struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay)
{
if (delay == 0)
return queue_work(wq, &dwork->work);
return queue_delayed_work_on(-1, wq, dwork, delay);
}
EXPORT_SYMBOL_GPL(queue_delayed_work);
/**
* queue_delayed_work_on - queue work on specific CPU after delay
* @cpu: CPU number to execute work on
* @wq: workqueue to use
* @dwork: work to queue
* @delay: number of jiffies to wait before queueing
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*/
int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay)
{
int ret = 0;
struct timer_list *timer = &dwork->timer;
struct work_struct *work = &dwork->work;
if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));
timer_stats_timer_set_start_info(&dwork->timer);
/* This stores cwq for the moment, for the timer_fn */
set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn;
if (unlikely(cpu >= 0))
add_timer_on(timer, cpu);
else
add_timer(timer);
ret = 1;
}
return ret;
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
spin_lock_irq(&cwq->lock);
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
work_func_t f = work->func;
#ifdef CONFIG_LOCKDEP
/*
* It is permissible to free the struct work_struct
* from inside the function that is called from it,
* this we need to take into account for lockdep too.
* To avoid bogus "held lock freed" warnings as well
* as problems when looking into work->lockdep_map,
* make a copy and use that here.
*/
struct lockdep_map lockdep_map = work->lockdep_map;
#endif
trace_workqueue_execution(cwq->thread, work);
cwq->current_work = work;
list_del_init(cwq->worklist.next);
spin_unlock_irq(&cwq->lock);
BUG_ON(get_wq_data(work) != cwq);
work_clear_pending(work);
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
f(work);
lock_map_release(&lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
"%s/0x%08x/%d\n",
current->comm, preempt_count(),
task_pid_nr(current));
printk(KERN_ERR " last function: ");
print_symbol("%s\n", (unsigned long)f);
debug_show_held_locks(current);
dump_stack();
}
spin_lock_irq(&cwq->lock);
cwq->current_work = NULL;
}
spin_unlock_irq(&cwq->lock);
}
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);
if (cwq->wq->freezeable)
set_freezable();
for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
if (!freezing(current) &&
!kthread_should_stop() &&
list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);
try_to_freeze();
if (kthread_should_stop())
break;
run_workqueue(cwq);
}
return 0;
}
struct wq_barrier {
struct work_struct work;
struct completion done;
};
static void wq_barrier_func(struct work_struct *work)
{
struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
complete(&barr->done);
}
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
struct wq_barrier *barr, struct list_head *head)
{
INIT_WORK(&barr->work, wq_barrier_func);
__set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
init_completion(&barr->done);
insert_work(cwq, &barr->work, head);
}
static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{
int active = 0;
struct wq_barrier barr;
WARN_ON(cwq->thread == current);
spin_lock_irq(&cwq->lock);
if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
insert_wq_barrier(cwq, &barr, &cwq->worklist);
active = 1;
}
spin_unlock_irq(&cwq->lock);
if (active)
wait_for_completion(&barr.done);
return active;
}
/**
* flush_workqueue - ensure that any scheduled work has run to completion.
* @wq: workqueue to flush
*
* Forces execution of the workqueue and blocks until its completion.
* This is typically used in driver shutdown handlers.
*
* We sleep until all works which were queued on entry have been handled,
* but we are not livelocked by new incoming ones.
*
* This function used to run the workqueues itself. Now we just wait for the
* helper threads to do it.
*/
void flush_workqueue(struct workqueue_struct *wq)
{
const struct cpumask *cpu_map = wq_cpu_map(wq);
int cpu;
might_sleep();
lock_map_acquire(&wq->lockdep_map);
lock_map_release(&wq->lockdep_map);
for_each_cpu(cpu, cpu_map)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
}
EXPORT_SYMBOL_GPL(flush_workqueue);
/**
* flush_work - block until a work_struct's callback has terminated
* @work: the work which is to be flushed
*
* Returns false if @work has already terminated.
*
* It is expected that, prior to calling flush_work(), the caller has
* arranged for the work to not be requeued, otherwise it doesn't make
* sense to use this function.
*/
int flush_work(struct work_struct *work)
{
struct cpu_workqueue_struct *cwq;
struct list_head *prev;
struct wq_barrier barr;
might_sleep();
cwq = get_wq_data(work);
if (!cwq)
return 0;
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);
prev = NULL;
spin_lock_irq(&cwq->lock);
if (!list_empty(&work->entry)) {
/*
* See the comment near try_to_grab_pending()->smp_rmb().
* If it was re-queued under us we are not going to wait.
*/
smp_rmb();
if (unlikely(cwq != get_wq_data(work)))
goto out;
prev = &work->entry;
} else {
if (cwq->current_work != work)
goto out;
prev = &cwq->worklist;
}
insert_wq_barrier(cwq, &barr, prev->next);
out:
spin_unlock_irq(&cwq->lock);
if (!prev)
return 0;
wait_for_completion(&barr.done);
return 1;
}
EXPORT_SYMBOL_GPL(flush_work);
/*
* Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
* so this work can't be re-armed in any way.
*/
static int try_to_grab_pending(struct work_struct *work)
{
struct cpu_workqueue_struct *cwq;
int ret = -1;
if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
return 0;
/*
* The queueing is in progress, or it is already queued. Try to
* steal it from ->worklist without clearing WORK_STRUCT_PENDING.
*/
cwq = get_wq_data(work);
if (!cwq)
return ret;
spin_lock_irq(&cwq->lock);
if (!list_empty(&work->entry)) {
/*
* This work is queued, but perhaps we locked the wrong cwq.
* In that case we must see the new value after rmb(), see
* insert_work()->wmb().
*/
smp_rmb();
if (cwq == get_wq_data(work)) {
list_del_init(&work->entry);
ret = 1;
}
}
spin_unlock_irq(&cwq->lock);
return ret;
}
static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
struct wq_barrier barr;
int running = 0;
spin_lock_irq(&cwq->lock);
if (unlikely(cwq->current_work == work)) {
insert_wq_barrier(cwq, &barr, cwq->worklist.next);
running = 1;
}
spin_unlock_irq(&cwq->lock);
if (unlikely(running))
wait_for_completion(&barr.done);
}
static void wait_on_work(struct work_struct *work)
{
struct cpu_workqueue_struct *cwq;
struct workqueue_struct *wq;
const struct cpumask *cpu_map;
int cpu;
might_sleep();
lock_map_acquire(&work->lockdep_map);
lock_map_release(&work->lockdep_map);
cwq = get_wq_data(work);
if (!cwq)
return;
wq = cwq->wq;
cpu_map = wq_cpu_map(wq);
for_each_cpu(cpu, cpu_map)
wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
}
static int __cancel_work_timer(struct work_struct *work,
struct timer_list* timer)
{
int ret;
do {
ret = (timer && likely(del_timer(timer)));
if (!ret)
ret = try_to_grab_pending(work);
wait_on_work(work);
} while (unlikely(ret < 0));
work_clear_pending(work);
return ret;
}
/**
* cancel_work_sync - block until a work_struct's callback has terminated
* @work: the work which is to be flushed
*
* Returns true if @work was pending.
*
* cancel_work_sync() will cancel the work if it is queued. If the work's
* callback appears to be running, cancel_work_sync() will block until it
* has completed.
*
* It is possible to use this function if the work re-queues itself. It can
* cancel the work even if it migrates to another workqueue, however in that
* case it only guarantees that work->func() has completed on the last queued
* workqueue.
*
* cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
* pending, otherwise it goes into a busy-wait loop until the timer expires.
*
* The caller must ensure that workqueue_struct on which this work was last
* queued can't be destroyed before this function returns.
*/
int cancel_work_sync(struct work_struct *work)
{
return __cancel_work_timer(work, NULL);
}
EXPORT_SYMBOL_GPL(cancel_work_sync);
/**
* cancel_delayed_work_sync - reliably kill off a delayed work.
* @dwork: the delayed work struct
*
* Returns true if @dwork was pending.
*
* It is possible to use this function if @dwork rearms itself via queue_work()
* or queue_delayed_work(). See also the comment for cancel_work_sync().
*/
int cancel_delayed_work_sync(struct delayed_work *dwork)
{
return __cancel_work_timer(&dwork->work, &dwork->timer);
}
EXPORT_SYMBOL(cancel_delayed_work_sync);
static struct workqueue_struct *keventd_wq __read_mostly;
/**
* schedule_work - put work task in global workqueue
* @work: job to be done
*
* Returns zero if @work was already on the kernel-global workqueue and
* non-zero otherwise.
*
* This puts a job in the kernel-global workqueue if it was not already
* queued and leaves it in the same position on the kernel-global
* workqueue otherwise.
*/
int schedule_work(struct work_struct *work)
{
return queue_work(keventd_wq, work);
}
EXPORT_SYMBOL(schedule_work);
/*
* schedule_work_on - put work task on a specific cpu
* @cpu: cpu to put the work task on
* @work: job to be done
*
* This puts a job on a specific cpu
*/
int schedule_work_on(int cpu, struct work_struct *work)
{
return queue_work_on(cpu, keventd_wq, work);
}
EXPORT_SYMBOL(schedule_work_on);
/**
* schedule_delayed_work - put work task in global workqueue after delay
* @dwork: job to be done
* @delay: number of jiffies to wait or 0 for immediate execution
*
* After waiting for a given time this puts a job in the kernel-global
* workqueue.
*/
int schedule_delayed_work(struct delayed_work *dwork,
unsigned long delay)
{
return queue_delayed_work(keventd_wq, dwork, delay);
}
EXPORT_SYMBOL(schedule_delayed_work);
/**
* flush_delayed_work - block until a dwork_struct's callback has terminated
* @dwork: the delayed work which is to be flushed
*
* Any timeout is cancelled, and any pending work is run immediately.
*/
void flush_delayed_work(struct delayed_work *dwork)
{
if (del_timer_sync(&dwork->timer)) {
struct cpu_workqueue_struct *cwq;
cwq = wq_per_cpu(keventd_wq, get_cpu());
__queue_work(cwq, &dwork->work);
put_cpu();
}
flush_work(&dwork->work);
}
EXPORT_SYMBOL(flush_delayed_work);
/**
* schedule_delayed_work_on - queue work in global workqueue on CPU after delay
* @cpu: cpu to use
* @dwork: job to be done
* @delay: number of jiffies to wait
*
* After waiting for a given time this puts a job in the kernel-global
* workqueue on the specified CPU.
*/
int schedule_delayed_work_on(int cpu,
struct delayed_work *dwork, unsigned long delay)
{
return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
}
EXPORT_SYMBOL(schedule_delayed_work_on);
/**
* schedule_on_each_cpu - call a function on each online CPU from keventd
* @func: the function to call
*
* Returns zero on success.
* Returns -ve errno on failure.
*
* schedule_on_each_cpu() is very slow.
*/
int schedule_on_each_cpu(work_func_t func)
{
int cpu;
int orig = -1;
struct work_struct *works;
works = alloc_percpu(struct work_struct);
if (!works)
return -ENOMEM;
get_online_cpus();
/*
* When running in keventd don't schedule a work item on
* itself. Can just call directly because the work queue is
* already bound. This also is faster.
*/
if (current_is_keventd())
orig = raw_smp_processor_id();
for_each_online_cpu(cpu) {
struct work_struct *work = per_cpu_ptr(works, cpu);
INIT_WORK(work, func);
if (cpu != orig)
schedule_work_on(cpu, work);
}
if (orig >= 0)
func(per_cpu_ptr(works, orig));
for_each_online_cpu(cpu)
flush_work(per_cpu_ptr(works, cpu));
put_online_cpus();
free_percpu(works);
return 0;
}
void flush_scheduled_work(void)
{
flush_workqueue(keventd_wq);
}
EXPORT_SYMBOL(flush_scheduled_work);
/**
* execute_in_process_context - reliably execute the routine with user context
* @fn: the function to execute
* @ew: guaranteed storage for the execute work structure (must
* be available when the work executes)
*
* Executes the function immediately if process context is available,
* otherwise schedules the function for delayed execution.
*
* Returns: 0 - function was executed
* 1 - function was scheduled for execution
*/
int execute_in_process_context(work_func_t fn, struct execute_work *ew)
{
if (!in_interrupt()) {
fn(&ew->work);
return 0;
}
INIT_WORK(&ew->work, fn);
schedule_work(&ew->work);
return 1;
}
EXPORT_SYMBOL_GPL(execute_in_process_context);
int keventd_up(void)
{
return keventd_wq != NULL;
}
int current_is_keventd(void)
{
struct cpu_workqueue_struct *cwq;
int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
int ret = 0;
BUG_ON(!keventd_wq);
cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
if (current == cwq->thread)
ret = 1;
return ret;
}
EXPORT_SYMBOL_GPL(current_is_keventd);
static struct cpu_workqueue_struct *
init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
{
struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
cwq->wq = wq;
spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);
return cwq;
}
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
struct workqueue_struct *wq = cwq->wq;
const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
struct task_struct *p;
p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
/*
* Nobody can add the work_struct to this cwq,
* if (caller is __create_workqueue)
* nobody should see this wq
* else // caller is CPU_UP_PREPARE
* cpu is not on cpu_online_map
* so we can abort safely.
*/
if (IS_ERR(p))
return PTR_ERR(p);
if (cwq->wq->rt)
sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
cwq->thread = p;
trace_workqueue_creation(cwq->thread, cpu);
return 0;
}
static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct task_struct *p = cwq->thread;
if (p != NULL) {
if (cpu >= 0)
kthread_bind(p, cpu);
wake_up_process(p);
}
}
struct workqueue_struct *__create_workqueue_key(const char *name,
int singlethread,
int freezeable,
int rt,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
struct cpu_workqueue_struct *cwq;
int err = 0, cpu;
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
return NULL;
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
if (!wq->cpu_wq) {
kfree(wq);
return NULL;
}
wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
wq->singlethread = singlethread;
wq->freezeable = freezeable;
wq->rt = rt;
INIT_LIST_HEAD(&wq->list);
if (singlethread) {
cwq = init_cpu_workqueue(wq, singlethread_cpu);
err = create_workqueue_thread(cwq, singlethread_cpu);
start_workqueue_thread(cwq, -1);
} else {
cpu_maps_update_begin();
/*
* We must place this wq on list even if the code below fails.
* cpu_down(cpu) can remove cpu from cpu_populated_map before
* destroy_workqueue() takes the lock, in that case we leak
* cwq[cpu]->thread.
*/
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
spin_unlock(&workqueue_lock);
/*
* We must initialize cwqs for each possible cpu even if we
* are going to call destroy_workqueue() finally. Otherwise
* cpu_up() can hit the uninitialized cwq once we drop the
* lock.
*/
for_each_possible_cpu(cpu) {
cwq = init_cpu_workqueue(wq, cpu);
if (err || !cpu_online(cpu))
continue;
err = create_workqueue_thread(cwq, cpu);
start_workqueue_thread(cwq, cpu);
}
cpu_maps_update_done();
}
if (err) {
destroy_workqueue(wq);
wq = NULL;
}
return wq;
}
EXPORT_SYMBOL_GPL(__create_workqueue_key);
static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
{
/*
* Our caller is either destroy_workqueue() or CPU_POST_DEAD,
* cpu_add_remove_lock protects cwq->thread.
*/
if (cwq->thread == NULL)
return;
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);
flush_cpu_workqueue(cwq);
/*
* If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
* a concurrent flush_workqueue() can insert a barrier after us.
* However, in that case run_workqueue() won't return and check
* kthread_should_stop() until it flushes all work_struct's.
* When ->worklist becomes empty it is safe to exit because no
* more work_structs can be queued on this cwq: flush_workqueue
* checks list_empty(), and a "normal" queue_work() can't use
* a dead CPU.
*/
trace_workqueue_destruction(cwq->thread);
kthread_stop(cwq->thread);
cwq->thread = NULL;
}
/**
* destroy_workqueue - safely terminate a workqueue
* @wq: target workqueue
*
* Safely destroy a workqueue. All work currently pending will be done first.
*/
void destroy_workqueue(struct workqueue_struct *wq)
{
const struct cpumask *cpu_map = wq_cpu_map(wq);
int cpu;
cpu_maps_update_begin();
spin_lock(&workqueue_lock);
list_del(&wq->list);
spin_unlock(&workqueue_lock);
for_each_cpu(cpu, cpu_map)
cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
cpu_maps_update_done();
free_percpu(wq->cpu_wq);
kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);
static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
unsigned int cpu = (unsigned long)hcpu;
struct cpu_workqueue_struct *cwq;
struct workqueue_struct *wq;
int ret = NOTIFY_OK;
action &= ~CPU_TASKS_FROZEN;
switch (action) {
case CPU_UP_PREPARE:
cpumask_set_cpu(cpu, cpu_populated_map);
}
undo:
list_for_each_entry(wq, &workqueues, list) {
cwq = per_cpu_ptr(wq->cpu_wq, cpu);
switch (action) {
case CPU_UP_PREPARE:
if (!create_workqueue_thread(cwq, cpu))
break;
printk(KERN_ERR "workqueue [%s] for %i failed\n",
wq->name, cpu);
action = CPU_UP_CANCELED;
ret = NOTIFY_BAD;
goto undo;
case CPU_ONLINE:
start_workqueue_thread(cwq, cpu);
break;
case CPU_UP_CANCELED:
start_workqueue_thread(cwq, -1);
case CPU_POST_DEAD:
cleanup_workqueue_thread(cwq);
break;
}
}
switch (action) {
case CPU_UP_CANCELED:
case CPU_POST_DEAD:
cpumask_clear_cpu(cpu, cpu_populated_map);
}
return ret;
}
#ifdef CONFIG_SMP
struct work_for_cpu {
struct completion completion;
long (*fn)(void *);
void *arg;
long ret;
};
static int do_work_for_cpu(void *_wfc)
{
struct work_for_cpu *wfc = _wfc;
wfc->ret = wfc->fn(wfc->arg);
complete(&wfc->completion);
return 0;
}
/**
* work_on_cpu - run a function in user context on a particular cpu
* @cpu: the cpu to run on
* @fn: the function to run
* @arg: the function arg
*
* This will return the value @fn returns.
* It is up to the caller to ensure that the cpu doesn't go offline.
* The caller must not hold any locks which would prevent @fn from completing.
*/
long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
{
struct task_struct *sub_thread;
struct work_for_cpu wfc = {
.completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
.fn = fn,
.arg = arg,
};
sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
if (IS_ERR(sub_thread))
return PTR_ERR(sub_thread);
kthread_bind(sub_thread, cpu);
wake_up_process(sub_thread);
wait_for_completion(&wfc.completion);
return wfc.ret;
}
EXPORT_SYMBOL_GPL(work_on_cpu);
#endif /* CONFIG_SMP */
void __init init_workqueues(void)
{
alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
cpumask_copy(cpu_populated_map, cpu_online_mask);
singlethread_cpu = cpumask_first(cpu_possible_mask);
cpu_singlethread_map = cpumask_of(singlethread_cpu);
hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq);
}