locking/rwbase: Take care of ordering guarantee for fastpath reader

Readers of rwbase can lock and unlock without taking any inner lock, if
that happens, we need the ordering provided by atomic operations to
satisfy the ordering semantics of lock/unlock. Without that, considering
the follow case:

	{ X = 0 initially }

	CPU 0			CPU 1
	=====			=====
				rt_write_lock();
				X = 1
				rt_write_unlock():
				  atomic_add(READER_BIAS - WRITER_BIAS, ->readers);
				  // ->readers is READER_BIAS.
	rt_read_lock():
	  if ((r = atomic_read(->readers)) < 0) // True
	    atomic_try_cmpxchg(->readers, r, r + 1); // succeed.
	  <acquire the read lock via fast path>

	r1 = X;	// r1 may be 0, because nothing prevent the reordering
	        // of "X=1" and atomic_add() on CPU 1.

Therefore audit every usage of atomic operations that may happen in a
fast path, and add necessary barriers.

Signed-off-by: Boqun Feng <boqun.feng@gmail.com>
Signed-off-by: Peter Zijlstra (Intel) <peterz@infradead.org>
Reviewed-by: Thomas Gleixner <tglx@linutronix.de>
Link: https://lkml.kernel.org/r/20210909110203.953991276@infradead.org
This commit is contained in:
Boqun Feng 2021-09-09 12:59:19 +02:00 committed by Peter Zijlstra
parent 616be87eac
commit 81121524f1

View File

@ -41,6 +41,12 @@
* The risk of writer starvation is there, but the pathological use cases
* which trigger it are not necessarily the typical RT workloads.
*
* Fast-path orderings:
* The lock/unlock of readers can run in fast paths: lock and unlock are only
* atomic ops, and there is no inner lock to provide ACQUIRE and RELEASE
* semantics of rwbase_rt. Atomic ops should thus provide _acquire()
* and _release() (or stronger).
*
* Common code shared between RT rw_semaphore and rwlock
*/
@ -53,6 +59,7 @@ static __always_inline int rwbase_read_trylock(struct rwbase_rt *rwb)
* set.
*/
for (r = atomic_read(&rwb->readers); r < 0;) {
/* Fully-ordered if cmpxchg() succeeds, provides ACQUIRE */
if (likely(atomic_try_cmpxchg(&rwb->readers, &r, r + 1)))
return 1;
}
@ -162,6 +169,8 @@ static __always_inline void rwbase_read_unlock(struct rwbase_rt *rwb,
/*
* rwb->readers can only hit 0 when a writer is waiting for the
* active readers to leave the critical section.
*
* dec_and_test() is fully ordered, provides RELEASE.
*/
if (unlikely(atomic_dec_and_test(&rwb->readers)))
__rwbase_read_unlock(rwb, state);
@ -172,7 +181,11 @@ static inline void __rwbase_write_unlock(struct rwbase_rt *rwb, int bias,
{
struct rt_mutex_base *rtm = &rwb->rtmutex;
atomic_add(READER_BIAS - bias, &rwb->readers);
/*
* _release() is needed in case that reader is in fast path, pairing
* with atomic_try_cmpxchg() in rwbase_read_trylock(), provides RELEASE
*/
(void)atomic_add_return_release(READER_BIAS - bias, &rwb->readers);
raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
rwbase_rtmutex_unlock(rtm);
}
@ -201,7 +214,11 @@ static inline bool __rwbase_write_trylock(struct rwbase_rt *rwb)
/* Can do without CAS because we're serialized by wait_lock. */
lockdep_assert_held(&rwb->rtmutex.wait_lock);
if (!atomic_read(&rwb->readers)) {
/*
* _acquire is needed in case the reader is in the fast path, pairing
* with rwbase_read_unlock(), provides ACQUIRE.
*/
if (!atomic_read_acquire(&rwb->readers)) {
atomic_set(&rwb->readers, WRITER_BIAS);
return 1;
}