David Howells cb65537ee1 Add wait_on_atomic_t() and wake_up_atomic_t()
Add wait_on_atomic_t() and wake_up_atomic_t() to indicate became-zero events on
atomic_t types.  This uses the bit-wake waitqueue table.  The key is set to a
value outside of the number of bits in a long so that wait_on_bit() won't be
woken up accidentally.

What I'm using this for is: in a following patch I add a counter to struct
fscache_cookie to count the number of outstanding operations that need access
to netfs data.  The way this works is:

 (1) When a cookie is allocated, the counter is initialised to 1.

 (2) When an operation wants to access netfs data, it calls atomic_inc_unless()
     to increment the counter before it does so.  If it was 0, then the counter
     isn't incremented, the operation isn't permitted to access the netfs data
     (which might by this point no longer exist) and the operation aborts in
     some appropriate manner.

 (3) When an operation finishes with the netfs data, it decrements the counter
     and if it reaches 0, calls wake_up_atomic_t() on it - the assumption being
     that it was the last blocker.

 (4) When a cookie is released, the counter is decremented and the releaser
     uses wait_on_atomic_t() to wait for the counter to become 0 - which should
     indicate no one is using the netfs data any longer.  The netfs data can
     then be destroyed.

There are some alternatives that I have thought of and that have been suggested
by Tejun Heo:

 (A) Using wait_on_bit() to wait on a bit in the counter.  This doesn't work
     because if that bit happens to be 0 then the wait won't happen - even if
     the counter is non-zero.

 (B) Using wait_on_bit() to wait on a flag elsewhere which is cleared when the
     counter reaches 0.  Such a flag would be redundant and would add
     complexity.

 (C) Adding a waitqueue to fscache_cookie - this would expand that struct by
     several words for an event that happens just once in each cookie's
     lifetime.  Further, cookies are generally per-file so there are likely to
     be a lot of them.

 (D) Similar to (C), but add a pointer to a waitqueue in the cookie instead of
     a waitqueue.  This would add single word per cookie and so would be less
     of an expansion - but still an expansion.

 (E) Adding a static waitqueue to the fscache module.  Generally this would be
     fine, but under certain circumstances many cookies will all get added at
     the same time (eg. NFS umount, cache withdrawal) thereby presenting
     scaling issues.  Note that the wait may be significant as disk I/O may be
     in progress.

So, I think reusing the wait_on_bit() waitqueue set is reasonable.  I don't
make much use of the waitqueue I need on a per-cookie basis, but sometimes I
have a huge flood of the cookies to deal with.

I also don't want to add a whole new set of global waitqueue tables
specifically for the dec-to-0 event if I can reuse the bit tables.

Signed-off-by: David Howells <dhowells@redhat.com>
Tested-By: Milosz Tanski <milosz@adfin.com>
Acked-by: Jeff Layton <jlayton@redhat.com>
2013-05-15 13:50:38 +01:00

378 lines
11 KiB
C

/*
* Generic waiting primitives.
*
* (C) 2004 Nadia Yvette Chambers, Oracle
*/
#include <linux/init.h>
#include <linux/export.h>
#include <linux/sched.h>
#include <linux/mm.h>
#include <linux/wait.h>
#include <linux/hash.h>
void __init_waitqueue_head(wait_queue_head_t *q, const char *name, struct lock_class_key *key)
{
spin_lock_init(&q->lock);
lockdep_set_class_and_name(&q->lock, key, name);
INIT_LIST_HEAD(&q->task_list);
}
EXPORT_SYMBOL(__init_waitqueue_head);
void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(add_wait_queue);
void add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;
wait->flags |= WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue_tail(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(add_wait_queue_exclusive);
void remove_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;
spin_lock_irqsave(&q->lock, flags);
__remove_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(remove_wait_queue);
/*
* Note: we use "set_current_state()" _after_ the wait-queue add,
* because we need a memory barrier there on SMP, so that any
* wake-function that tests for the wait-queue being active
* will be guaranteed to see waitqueue addition _or_ subsequent
* tests in this thread will see the wakeup having taken place.
*
* The spin_unlock() itself is semi-permeable and only protects
* one way (it only protects stuff inside the critical region and
* stops them from bleeding out - it would still allow subsequent
* loads to move into the critical region).
*/
void
prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list))
__add_wait_queue(q, wait);
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(prepare_to_wait);
void
prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
unsigned long flags;
wait->flags |= WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list))
__add_wait_queue_tail(q, wait);
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(prepare_to_wait_exclusive);
/**
* finish_wait - clean up after waiting in a queue
* @q: waitqueue waited on
* @wait: wait descriptor
*
* Sets current thread back to running state and removes
* the wait descriptor from the given waitqueue if still
* queued.
*/
void finish_wait(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;
__set_current_state(TASK_RUNNING);
/*
* We can check for list emptiness outside the lock
* IFF:
* - we use the "careful" check that verifies both
* the next and prev pointers, so that there cannot
* be any half-pending updates in progress on other
* CPU's that we haven't seen yet (and that might
* still change the stack area.
* and
* - all other users take the lock (ie we can only
* have _one_ other CPU that looks at or modifies
* the list).
*/
if (!list_empty_careful(&wait->task_list)) {
spin_lock_irqsave(&q->lock, flags);
list_del_init(&wait->task_list);
spin_unlock_irqrestore(&q->lock, flags);
}
}
EXPORT_SYMBOL(finish_wait);
/**
* abort_exclusive_wait - abort exclusive waiting in a queue
* @q: waitqueue waited on
* @wait: wait descriptor
* @mode: runstate of the waiter to be woken
* @key: key to identify a wait bit queue or %NULL
*
* Sets current thread back to running state and removes
* the wait descriptor from the given waitqueue if still
* queued.
*
* Wakes up the next waiter if the caller is concurrently
* woken up through the queue.
*
* This prevents waiter starvation where an exclusive waiter
* aborts and is woken up concurrently and no one wakes up
* the next waiter.
*/
void abort_exclusive_wait(wait_queue_head_t *q, wait_queue_t *wait,
unsigned int mode, void *key)
{
unsigned long flags;
__set_current_state(TASK_RUNNING);
spin_lock_irqsave(&q->lock, flags);
if (!list_empty(&wait->task_list))
list_del_init(&wait->task_list);
else if (waitqueue_active(q))
__wake_up_locked_key(q, mode, key);
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(abort_exclusive_wait);
int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int ret = default_wake_function(wait, mode, sync, key);
if (ret)
list_del_init(&wait->task_list);
return ret;
}
EXPORT_SYMBOL(autoremove_wake_function);
int wake_bit_function(wait_queue_t *wait, unsigned mode, int sync, void *arg)
{
struct wait_bit_key *key = arg;
struct wait_bit_queue *wait_bit
= container_of(wait, struct wait_bit_queue, wait);
if (wait_bit->key.flags != key->flags ||
wait_bit->key.bit_nr != key->bit_nr ||
test_bit(key->bit_nr, key->flags))
return 0;
else
return autoremove_wake_function(wait, mode, sync, key);
}
EXPORT_SYMBOL(wake_bit_function);
/*
* To allow interruptible waiting and asynchronous (i.e. nonblocking)
* waiting, the actions of __wait_on_bit() and __wait_on_bit_lock() are
* permitted return codes. Nonzero return codes halt waiting and return.
*/
int __sched
__wait_on_bit(wait_queue_head_t *wq, struct wait_bit_queue *q,
int (*action)(void *), unsigned mode)
{
int ret = 0;
do {
prepare_to_wait(wq, &q->wait, mode);
if (test_bit(q->key.bit_nr, q->key.flags))
ret = (*action)(q->key.flags);
} while (test_bit(q->key.bit_nr, q->key.flags) && !ret);
finish_wait(wq, &q->wait);
return ret;
}
EXPORT_SYMBOL(__wait_on_bit);
int __sched out_of_line_wait_on_bit(void *word, int bit,
int (*action)(void *), unsigned mode)
{
wait_queue_head_t *wq = bit_waitqueue(word, bit);
DEFINE_WAIT_BIT(wait, word, bit);
return __wait_on_bit(wq, &wait, action, mode);
}
EXPORT_SYMBOL(out_of_line_wait_on_bit);
int __sched
__wait_on_bit_lock(wait_queue_head_t *wq, struct wait_bit_queue *q,
int (*action)(void *), unsigned mode)
{
do {
int ret;
prepare_to_wait_exclusive(wq, &q->wait, mode);
if (!test_bit(q->key.bit_nr, q->key.flags))
continue;
ret = action(q->key.flags);
if (!ret)
continue;
abort_exclusive_wait(wq, &q->wait, mode, &q->key);
return ret;
} while (test_and_set_bit(q->key.bit_nr, q->key.flags));
finish_wait(wq, &q->wait);
return 0;
}
EXPORT_SYMBOL(__wait_on_bit_lock);
int __sched out_of_line_wait_on_bit_lock(void *word, int bit,
int (*action)(void *), unsigned mode)
{
wait_queue_head_t *wq = bit_waitqueue(word, bit);
DEFINE_WAIT_BIT(wait, word, bit);
return __wait_on_bit_lock(wq, &wait, action, mode);
}
EXPORT_SYMBOL(out_of_line_wait_on_bit_lock);
void __wake_up_bit(wait_queue_head_t *wq, void *word, int bit)
{
struct wait_bit_key key = __WAIT_BIT_KEY_INITIALIZER(word, bit);
if (waitqueue_active(wq))
__wake_up(wq, TASK_NORMAL, 1, &key);
}
EXPORT_SYMBOL(__wake_up_bit);
/**
* wake_up_bit - wake up a waiter on a bit
* @word: the word being waited on, a kernel virtual address
* @bit: the bit of the word being waited on
*
* There is a standard hashed waitqueue table for generic use. This
* is the part of the hashtable's accessor API that wakes up waiters
* on a bit. For instance, if one were to have waiters on a bitflag,
* one would call wake_up_bit() after clearing the bit.
*
* In order for this to function properly, as it uses waitqueue_active()
* internally, some kind of memory barrier must be done prior to calling
* this. Typically, this will be smp_mb__after_clear_bit(), but in some
* cases where bitflags are manipulated non-atomically under a lock, one
* may need to use a less regular barrier, such fs/inode.c's smp_mb(),
* because spin_unlock() does not guarantee a memory barrier.
*/
void wake_up_bit(void *word, int bit)
{
__wake_up_bit(bit_waitqueue(word, bit), word, bit);
}
EXPORT_SYMBOL(wake_up_bit);
wait_queue_head_t *bit_waitqueue(void *word, int bit)
{
const int shift = BITS_PER_LONG == 32 ? 5 : 6;
const struct zone *zone = page_zone(virt_to_page(word));
unsigned long val = (unsigned long)word << shift | bit;
return &zone->wait_table[hash_long(val, zone->wait_table_bits)];
}
EXPORT_SYMBOL(bit_waitqueue);
/*
* Manipulate the atomic_t address to produce a better bit waitqueue table hash
* index (we're keying off bit -1, but that would produce a horrible hash
* value).
*/
static inline wait_queue_head_t *atomic_t_waitqueue(atomic_t *p)
{
if (BITS_PER_LONG == 64) {
unsigned long q = (unsigned long)p;
return bit_waitqueue((void *)(q & ~1), q & 1);
}
return bit_waitqueue(p, 0);
}
static int wake_atomic_t_function(wait_queue_t *wait, unsigned mode, int sync,
void *arg)
{
struct wait_bit_key *key = arg;
struct wait_bit_queue *wait_bit
= container_of(wait, struct wait_bit_queue, wait);
atomic_t *val = key->flags;
if (wait_bit->key.flags != key->flags ||
wait_bit->key.bit_nr != key->bit_nr ||
atomic_read(val) != 0)
return 0;
return autoremove_wake_function(wait, mode, sync, key);
}
/*
* To allow interruptible waiting and asynchronous (i.e. nonblocking) waiting,
* the actions of __wait_on_atomic_t() are permitted return codes. Nonzero
* return codes halt waiting and return.
*/
static __sched
int __wait_on_atomic_t(wait_queue_head_t *wq, struct wait_bit_queue *q,
int (*action)(atomic_t *), unsigned mode)
{
atomic_t *val;
int ret = 0;
do {
prepare_to_wait(wq, &q->wait, mode);
val = q->key.flags;
if (atomic_read(val) == 0)
ret = (*action)(val);
} while (!ret && atomic_read(val) != 0);
finish_wait(wq, &q->wait);
return ret;
}
#define DEFINE_WAIT_ATOMIC_T(name, p) \
struct wait_bit_queue name = { \
.key = __WAIT_ATOMIC_T_KEY_INITIALIZER(p), \
.wait = { \
.private = current, \
.func = wake_atomic_t_function, \
.task_list = \
LIST_HEAD_INIT((name).wait.task_list), \
}, \
}
__sched int out_of_line_wait_on_atomic_t(atomic_t *p, int (*action)(atomic_t *),
unsigned mode)
{
wait_queue_head_t *wq = atomic_t_waitqueue(p);
DEFINE_WAIT_ATOMIC_T(wait, p);
return __wait_on_atomic_t(wq, &wait, action, mode);
}
EXPORT_SYMBOL(out_of_line_wait_on_atomic_t);
/**
* wake_up_atomic_t - Wake up a waiter on a atomic_t
* @word: The word being waited on, a kernel virtual address
* @bit: The bit of the word being waited on
*
* Wake up anyone waiting for the atomic_t to go to zero.
*
* Abuse the bit-waker function and its waitqueue hash table set (the atomic_t
* check is done by the waiter's wake function, not the by the waker itself).
*/
void wake_up_atomic_t(atomic_t *p)
{
__wake_up_bit(atomic_t_waitqueue(p), p, WAIT_ATOMIC_T_BIT_NR);
}
EXPORT_SYMBOL(wake_up_atomic_t);