(cleanup) Use more do...finally in extra::sync.

This commit is contained in:
Ben Blum 2013-08-02 17:09:32 -04:00
parent d30cca46e6
commit bd35798773
2 changed files with 108 additions and 207 deletions

View file

@ -22,7 +22,9 @@ use std::comm::SendDeferred;
use std::task;
use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox};
use std::unstable::atomics;
use std::unstable::finally::Finally;
use std::util;
use std::util::NonCopyable;
/****************************************************************************
* Internals
@ -84,7 +86,6 @@ struct SemInner<Q> {
#[doc(hidden)]
struct Sem<Q>(Exclusive<SemInner<Q>>);
#[doc(hidden)]
impl<Q:Send> Sem<Q> {
fn new(count: int, q: Q) -> Sem<Q> {
@ -125,17 +126,18 @@ impl<Q:Send> Sem<Q> {
}
}
}
}
// FIXME(#3154) move both copies of this into Sem<Q>, and unify the 2 structs
#[doc(hidden)]
impl Sem<()> {
pub fn access<U>(&self, blk: &fn() -> U) -> U {
let mut release = None;
do task::unkillable {
self.acquire();
release = Some(SemRelease(self));
do (|| {
self.acquire();
unsafe {
do task::rekillable { blk() }
}
}).finally {
self.release();
}
}
blk()
}
}
@ -149,46 +151,6 @@ impl Sem<~[WaitQueue]> {
}
Sem::new(count, queues)
}
pub fn access_waitqueue<U>(&self, blk: &fn() -> U) -> U {
let mut release = None;
do task::unkillable {
self.acquire();
release = Some(SemAndSignalRelease(self));
}
blk()
}
}
// FIXME(#3588) should go inside of access()
#[doc(hidden)]
type SemRelease<'self> = SemReleaseGeneric<'self, ()>;
#[doc(hidden)]
type SemAndSignalRelease<'self> = SemReleaseGeneric<'self, ~[WaitQueue]>;
#[doc(hidden)]
struct SemReleaseGeneric<'self, Q> { sem: &'self Sem<Q> }
#[doc(hidden)]
#[unsafe_destructor]
impl<'self, Q:Send> Drop for SemReleaseGeneric<'self, Q> {
fn drop(&self) {
self.sem.release();
}
}
#[doc(hidden)]
fn SemRelease<'r>(sem: &'r Sem<()>) -> SemRelease<'r> {
SemReleaseGeneric {
sem: sem
}
}
#[doc(hidden)]
fn SemAndSignalRelease<'r>(sem: &'r Sem<~[WaitQueue]>)
-> SemAndSignalRelease<'r> {
SemReleaseGeneric {
sem: sem
}
}
// FIXME(#3598): Want to use an Option down below, but we need a custom enum
@ -211,11 +173,10 @@ pub struct Condvar<'self> {
// writer waking up from a cvar wait can't race with a reader to steal it,
// See the comment in write_cond for more detail.
priv order: ReacquireOrderLock<'self>,
// Make sure condvars are non-copyable.
priv token: util::NonCopyable,
}
#[unsafe_destructor]
impl<'self> Drop for Condvar<'self> { fn drop(&self) {} }
impl<'self> Condvar<'self> {
/**
* Atomically drop the associated lock, and block until a signal is sent.
@ -243,11 +204,10 @@ impl<'self> Condvar<'self> {
let (WaitEnd, SignalEnd) = comm::oneshot();
let mut WaitEnd = Some(WaitEnd);
let mut SignalEnd = Some(SignalEnd);
let mut reacquire = None;
let mut out_of_bounds = None;
unsafe {
do task::unkillable {
// Release lock, 'atomically' enqueuing ourselves in so doing.
do task::unkillable {
// Release lock, 'atomically' enqueuing ourselves in so doing.
unsafe {
do (**self.sem).with |state| {
if condvar_id < state.blocked.len() {
// Drop the lock.
@ -262,37 +222,25 @@ impl<'self> Condvar<'self> {
out_of_bounds = Some(state.blocked.len());
}
}
// If yield checks start getting inserted anywhere, we can be
// killed before or after enqueueing. Deciding whether to
// unkillably reacquire the lock needs to happen atomically
// wrt enqueuing.
if out_of_bounds.is_none() {
reacquire = Some(CondvarReacquire { sem: self.sem,
order: self.order });
}
}
}
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
// Unconditionally "block". (Might not actually block if a
// signaller already sent -- I mean 'unconditionally' in contrast
// with acquire().)
let _ = comm::recv_one(WaitEnd.take_unwrap());
}
// This is needed for a failing condition variable to reacquire the
// mutex during unwinding. As long as the wrapper (mutex, etc) is
// bounded in when it gets released, this shouldn't hang forever.
struct CondvarReacquire<'self> {
sem: &'self Sem<~[WaitQueue]>,
order: ReacquireOrderLock<'self>,
}
#[unsafe_destructor]
impl<'self> Drop for CondvarReacquire<'self> {
fn drop(&self) {
// Needs to succeed, instead of itself dying.
do task::unkillable {
// If yield checks start getting inserted anywhere, we can be
// killed before or after enqueueing. Deciding whether to
// unkillably reacquire the lock needs to happen atomically
// wrt enqueuing.
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
// Unconditionally "block". (Might not actually block if a
// signaller already sent -- I mean 'unconditionally' in contrast
// with acquire().)
do (|| {
unsafe {
do task::rekillable {
let _ = comm::recv_one(WaitEnd.take_unwrap());
}
}
}).finally {
// Reacquire the condvar. Note this is back in the unkillable
// section; it needs to succeed, instead of itself dying.
match self.order {
Just(lock) => do lock.access {
self.sem.acquire();
@ -374,8 +322,8 @@ impl Sem<~[WaitQueue]> {
// The only other places that condvars get built are rwlock.write_cond()
// and rwlock_write_mode.
pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
do self.access_waitqueue {
blk(&Condvar { sem: self, order: Nothing })
do self.access {
blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() })
}
}
}
@ -453,7 +401,7 @@ impl Mutex {
/// Run a function with ownership of the mutex.
pub fn lock<U>(&self, blk: &fn() -> U) -> U {
(&self.sem).access_waitqueue(blk)
(&self.sem).access(blk)
}
/// Run a function with ownership of the mutex and a handle to a condvar.
@ -532,7 +480,6 @@ impl RWLock {
* tasks may run concurrently with this one.
*/
pub fn read<U>(&self, blk: &fn() -> U) -> U {
let mut release = None;
unsafe {
do task::unkillable {
do (&self.order_lock).access {
@ -543,10 +490,24 @@ impl RWLock {
state.read_mode = true;
}
}
release = Some(RWLockReleaseRead(self));
do (|| {
do task::rekillable { blk() }
}).finally {
let state = &mut *self.state.get();
assert!(state.read_mode);
let old_count = state.read_count.fetch_sub(1, atomics::Release);
assert!(old_count > 0);
if old_count == 1 {
state.read_mode = false;
// Note: this release used to be outside of a locked access
// to exclusive-protected state. If this code is ever
// converted back to such (instead of using atomic ops),
// this access MUST NOT go inside the exclusive access.
(&self.access_lock).release();
}
}
}
}
blk()
}
/**
@ -557,7 +518,7 @@ impl RWLock {
unsafe {
do task::unkillable {
(&self.order_lock).acquire();
do (&self.access_lock).access_waitqueue {
do (&self.access_lock).access {
(&self.order_lock).release();
do task::rekillable {
blk()
@ -607,7 +568,8 @@ impl RWLock {
(&self.order_lock).release();
do task::rekillable {
let opt_lock = Just(&self.order_lock);
blk(&Condvar { order: opt_lock, ..*cond })
blk(&Condvar { sem: cond.sem, order: opt_lock,
token: NonCopyable::new() })
}
}
}
@ -638,14 +600,43 @@ impl RWLock {
pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U {
// Implementation slightly different from the slicker 'write's above.
// The exit path is conditional on whether the caller downgrades.
let mut _release = None;
do task::unkillable {
(&self.order_lock).acquire();
(&self.access_lock).acquire();
(&self.order_lock).release();
do (|| {
unsafe {
do task::rekillable {
blk(RWLockWriteMode { lock: self, token: NonCopyable::new() })
}
}
}).finally {
let writer_or_last_reader;
// Check if we're releasing from read mode or from write mode.
let state = unsafe { &mut *self.state.get() };
if state.read_mode {
// Releasing from read mode.
let old_count = state.read_count.fetch_sub(1, atomics::Release);
assert!(old_count > 0);
// Check if other readers remain.
if old_count == 1 {
// Case 1: Writer downgraded & was the last reader
writer_or_last_reader = true;
state.read_mode = false;
} else {
// Case 2: Writer downgraded & was not the last reader
writer_or_last_reader = false;
}
} else {
// Case 3: Writer did not downgrade
writer_or_last_reader = true;
}
if writer_or_last_reader {
// Nobody left inside; release the "reader cloud" lock.
(&self.access_lock).release();
}
}
}
_release = Some(RWLockReleaseDowngrade(self));
blk(RWLockWriteMode { lock: self })
}
/// To be called inside of the write_downgrade block.
@ -674,105 +665,16 @@ impl RWLock {
}
}
}
RWLockReadMode { lock: token.lock }
}
}
// FIXME(#3588) should go inside of read()
#[doc(hidden)]
struct RWLockReleaseRead<'self> {
lock: &'self RWLock,
}
#[doc(hidden)]
#[unsafe_destructor]
impl<'self> Drop for RWLockReleaseRead<'self> {
fn drop(&self) {
unsafe {
do task::unkillable {
let state = &mut *self.lock.state.get();
assert!(state.read_mode);
let old_count = state.read_count.fetch_sub(1, atomics::Release);
assert!(old_count > 0);
if old_count == 1 {
state.read_mode = false;
// Note: this release used to be outside of a locked access
// to exclusive-protected state. If this code is ever
// converted back to such (instead of using atomic ops),
// this access MUST NOT go inside the exclusive access.
(&self.lock.access_lock).release();
}
}
}
}
}
#[doc(hidden)]
fn RWLockReleaseRead<'r>(lock: &'r RWLock) -> RWLockReleaseRead<'r> {
RWLockReleaseRead {
lock: lock
}
}
// FIXME(#3588) should go inside of downgrade()
#[doc(hidden)]
#[unsafe_destructor]
struct RWLockReleaseDowngrade<'self> {
lock: &'self RWLock,
}
#[doc(hidden)]
#[unsafe_destructor]
impl<'self> Drop for RWLockReleaseDowngrade<'self> {
fn drop(&self) {
unsafe {
do task::unkillable {
let writer_or_last_reader;
// Check if we're releasing from read mode or from write mode.
let state = &mut *self.lock.state.get();
if state.read_mode {
// Releasing from read mode.
let old_count = state.read_count.fetch_sub(1, atomics::Release);
assert!(old_count > 0);
// Check if other readers remain.
if old_count == 1 {
// Case 1: Writer downgraded & was the last reader
writer_or_last_reader = true;
state.read_mode = false;
} else {
// Case 2: Writer downgraded & was not the last reader
writer_or_last_reader = false;
}
} else {
// Case 3: Writer did not downgrade
writer_or_last_reader = true;
}
if writer_or_last_reader {
// Nobody left inside; release the "reader cloud" lock.
(&self.lock.access_lock).release();
}
}
}
}
}
#[doc(hidden)]
fn RWLockReleaseDowngrade<'r>(lock: &'r RWLock)
-> RWLockReleaseDowngrade<'r> {
RWLockReleaseDowngrade {
lock: lock
RWLockReadMode { lock: token.lock, token: NonCopyable::new() }
}
}
/// The "write permission" token used for rwlock.write_downgrade().
pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock }
#[unsafe_destructor]
impl<'self> Drop for RWLockWriteMode<'self> { fn drop(&self) {} }
pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
/// The "read permission" token used for rwlock.write_downgrade().
pub struct RWLockReadMode<'self> { priv lock: &'self RWLock }
#[unsafe_destructor]
impl<'self> Drop for RWLockReadMode<'self> { fn drop(&self) {} }
pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
priv token: NonCopyable }
impl<'self> RWLockWriteMode<'self> {
/// Access the pre-downgrade rwlock in write mode.
@ -782,7 +684,8 @@ impl<'self> RWLockWriteMode<'self> {
// Need to make the condvar use the order lock when reacquiring the
// access lock. See comment in RWLock::write_cond for why.
blk(&Condvar { sem: &self.lock.access_lock,
order: Just(&self.lock.order_lock), })
order: Just(&self.lock.order_lock),
token: NonCopyable::new() })
}
}
@ -1060,6 +963,8 @@ mod tests {
}
#[test] #[ignore(cfg(windows))]
fn test_mutex_killed_broadcast() {
use std::unstable::finally::Finally;
let m = ~Mutex::new();
let m2 = ~m.clone();
let (p,c) = comm::stream();
@ -1076,8 +981,13 @@ mod tests {
do mi.lock_cond |cond| {
let c = c.take();
c.send(()); // tell sibling to go ahead
let _z = SendOnFailure(c);
cond.wait(); // block forever
do (|| {
cond.wait(); // block forever
}).finally {
error!("task unwinding and sending");
c.send(());
error!("task unwinding and done sending");
}
}
}
}
@ -1096,21 +1006,6 @@ mod tests {
let woken = cond.broadcast();
assert_eq!(woken, 0);
}
struct SendOnFailure {
c: comm::Chan<()>,
}
impl Drop for SendOnFailure {
fn drop(&self) {
self.c.send(());
}
}
fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure {
SendOnFailure {
c: c
}
}
}
#[test]
fn test_mutex_cond_signal_on_0() {

View file

@ -79,6 +79,12 @@ pub fn replace<T>(dest: &mut T, mut src: T) -> T {
#[unsafe_no_drop_flag]
pub struct NonCopyable;
impl NonCopyable {
// FIXME(#8233) should not be necessary
/// Create a new noncopyable token.
pub fn new() -> NonCopyable { NonCopyable }
}
impl Drop for NonCopyable {
fn drop(&self) { }
}