auto merge of #8008 : bblum/rust/select, r=brson

Main logic in ```Implement select() for new runtime pipes.```. The guts of the ```PortOne::try_recv()``` implementation are now split up across several functions, ```optimistic_check```, ```block_on```, and ```recv_ready```.

There is one weird FIXME I left open here, in the "implement select" commit -- an assertion I couldn't get to work in the receive path, on an invariant that for some reason doesn't hold with ```SharedPort```. Still investigating this.
This commit is contained in:
bors 2013-07-30 18:58:17 -07:00
commit 5633a5363b
9 changed files with 792 additions and 283 deletions

View file

@ -136,7 +136,7 @@ impl<T:Freeze+Send> Arc<T> {
*/
pub fn unwrap(self) -> T {
let Arc { x: x } = self;
unsafe { x.unwrap() }
x.unwrap()
}
}
@ -250,7 +250,7 @@ impl<T:Send> MutexArc<T> {
*/
pub fn unwrap(self) -> T {
let MutexArc { x: x } = self;
let inner = unsafe { x.unwrap() };
let inner = x.unwrap();
let MutexArcInner { failed: failed, data: data, _ } = inner;
if failed {
fail!(~"Can't unwrap poisoned MutexArc - another task failed inside!");
@ -469,7 +469,7 @@ impl<T:Freeze + Send> RWArc<T> {
*/
pub fn unwrap(self) -> T {
let RWArc { x: x, _ } = self;
let inner = unsafe { x.unwrap() };
let inner = x.unwrap();
let RWArcInner { failed: failed, data: data, _ } = inner;
if failed {
fail!(~"Can't unwrap poisoned RWArc - another task failed inside!")

View file

@ -130,11 +130,9 @@ impl<Q:Send> Sem<Q> {
impl Sem<()> {
pub fn access<U>(&self, blk: &fn() -> U) -> U {
let mut release = None;
unsafe {
do task::unkillable {
self.acquire();
release = Some(SemRelease(self));
}
do task::unkillable {
self.acquire();
release = Some(SemRelease(self));
}
blk()
}
@ -153,11 +151,9 @@ impl Sem<~[WaitQueue]> {
pub fn access_waitqueue<U>(&self, blk: &fn() -> U) -> U {
let mut release = None;
unsafe {
do task::unkillable {
self.acquire();
release = Some(SemAndSignalRelease(self));
}
do task::unkillable {
self.acquire();
release = Some(SemAndSignalRelease(self));
}
blk()
}
@ -294,17 +290,15 @@ impl<'self> Condvar<'self> {
#[unsafe_destructor]
impl<'self> Drop for CondvarReacquire<'self> {
fn drop(&self) {
unsafe {
// Needs to succeed, instead of itself dying.
do task::unkillable {
match self.order {
Just(lock) => do lock.access {
self.sem.acquire();
},
Nothing => {
self.sem.acquire();
},
}
// Needs to succeed, instead of itself dying.
do task::unkillable {
match self.order {
Just(lock) => do lock.access {
self.sem.acquire();
},
Nothing => {
self.sem.acquire();
},
}
}
}
@ -644,14 +638,12 @@ impl RWLock {
// Implementation slightly different from the slicker 'write's above.
// The exit path is conditional on whether the caller downgrades.
let mut _release = None;
unsafe {
do task::unkillable {
(&self.order_lock).acquire();
(&self.access_lock).acquire();
(&self.order_lock).release();
}
_release = Some(RWLockReleaseDowngrade(self));
do task::unkillable {
(&self.order_lock).acquire();
(&self.access_lock).acquire();
(&self.order_lock).release();
}
_release = Some(RWLockReleaseDowngrade(self));
blk(RWLockWriteMode { lock: self })
}

View file

@ -12,13 +12,13 @@
use option::*;
use cast;
use util;
use ops::Drop;
use rt::kill::BlockedTask;
use kinds::Send;
use rt::sched::Scheduler;
use rt::local::Local;
use unstable::atomics::{AtomicUint, AtomicOption, SeqCst};
use rt::select::{Select, SelectPort};
use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Release, SeqCst};
use unstable::sync::UnsafeAtomicRcBox;
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
@ -45,23 +45,12 @@ struct Packet<T> {
/// A one-shot channel.
pub struct ChanOne<T> {
// XXX: Hack extra allocation to make by-val self work
inner: ~ChanOneHack<T>
}
/// A one-shot port.
pub struct PortOne<T> {
// XXX: Hack extra allocation to make by-val self work
inner: ~PortOneHack<T>
}
pub struct ChanOneHack<T> {
void_packet: *mut Void,
suppress_finalize: bool
}
pub struct PortOneHack<T> {
/// A one-shot port.
pub struct PortOne<T> {
void_packet: *mut Void,
suppress_finalize: bool
}
@ -75,22 +64,26 @@ pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
unsafe {
let packet: *mut Void = cast::transmute(packet);
let port = PortOne {
inner: ~PortOneHack {
void_packet: packet,
suppress_finalize: false
}
void_packet: packet,
suppress_finalize: false
};
let chan = ChanOne {
inner: ~ChanOneHack {
void_packet: packet,
suppress_finalize: false
}
void_packet: packet,
suppress_finalize: false
};
return (port, chan);
}
}
impl<T> ChanOne<T> {
#[inline]
fn packet(&self) -> *mut Packet<T> {
unsafe {
let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
let p: *mut Packet<T> = &mut **p;
return p;
}
}
pub fn send(self, val: T) {
self.try_send(val);
@ -99,7 +92,7 @@ impl<T> ChanOne<T> {
pub fn try_send(self, val: T) -> bool {
let mut this = self;
let mut recvr_active = true;
let packet = this.inner.packet();
let packet = this.packet();
unsafe {
@ -127,7 +120,7 @@ impl<T> ChanOne<T> {
sched.metrics.rendezvous_sends += 1;
}
// Port has closed. Need to clean up.
let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
recvr_active = false;
}
task_as_state => {
@ -144,13 +137,20 @@ impl<T> ChanOne<T> {
}
// Suppress the synchronizing actions in the finalizer. We're done with the packet.
this.inner.suppress_finalize = true;
this.suppress_finalize = true;
return recvr_active;
}
}
impl<T> PortOne<T> {
fn packet(&self) -> *mut Packet<T> {
unsafe {
let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
let p: *mut Packet<T> = &mut **p;
return p;
}
}
pub fn recv(self) -> T {
match self.try_recv() {
Some(val) => val,
@ -162,43 +162,129 @@ impl<T> PortOne<T> {
pub fn try_recv(self) -> Option<T> {
let mut this = self;
let packet = this.inner.packet();
// XXX: Optimize this to not require the two context switches when data is available
// Switch to the scheduler to put the ~Task into the Packet state.
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |sched, task| {
unsafe {
// Atomically swap the task pointer into the Packet state, issuing
// an acquire barrier to prevent reordering of the subsequent read
// of the payload. Also issues a release barrier to prevent reordering
// of any previous writes to the task structure.
let task_as_state = task.cast_to_uint();
let oldstate = (*packet).state.swap(task_as_state, SeqCst);
match oldstate {
STATE_BOTH => {
// Data has not been sent. Now we're blocked.
rtdebug!("non-rendezvous recv");
sched.metrics.non_rendezvous_recvs += 1;
}
STATE_ONE => {
rtdebug!("rendezvous recv");
sched.metrics.rendezvous_recvs += 1;
// Channel is closed. Switch back and check the data.
// NB: We have to drop back into the scheduler event loop here
// instead of switching immediately back or we could end up
// triggering infinite recursion on the scheduler's stack.
let recvr = BlockedTask::cast_from_uint(task_as_state);
sched.enqueue_blocked_task(recvr);
}
_ => util::unreachable()
}
// Optimistic check. If data was sent already, we don't even need to block.
// No release barrier needed here; we're not handing off our task pointer yet.
if !this.optimistic_check() {
// No data available yet.
// Switch to the scheduler to put the ~Task into the Packet state.
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |sched, task| {
this.block_on(sched, task);
}
}
// Task resumes.
this.recv_ready()
}
}
impl<T> Select for PortOne<T> {
#[inline] #[cfg(not(test))]
fn optimistic_check(&mut self) -> bool {
unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
}
#[inline] #[cfg(test)]
fn optimistic_check(&mut self) -> bool {
// The optimistic check is never necessary for correctness. For testing
// purposes, making it randomly return false simulates a racing sender.
use rand::{Rand, rng};
let mut rng = rng();
let actually_check = Rand::rand(&mut rng);
if actually_check {
unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
} else {
false
}
}
fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
unsafe {
// Atomically swap the task pointer into the Packet state, issuing
// an acquire barrier to prevent reordering of the subsequent read
// of the payload. Also issues a release barrier to prevent
// reordering of any previous writes to the task structure.
let task_as_state = task.cast_to_uint();
let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
match oldstate {
STATE_BOTH => {
// Data has not been sent. Now we're blocked.
rtdebug!("non-rendezvous recv");
sched.metrics.non_rendezvous_recvs += 1;
false
}
STATE_ONE => {
// Re-record that we are the only owner of the packet.
// Release barrier needed in case the task gets reawoken
// on a different core (this is analogous to writing a
// payload; a barrier in enqueueing the task protects it).
// NB(#8132). This *must* occur before the enqueue below.
// FIXME(#6842, #8130) This is usually only needed for the
// assertion in recv_ready, except in the case of select().
// This won't actually ever have cacheline contention, but
// maybe should be optimized out with a cfg(test) anyway?
(*self.packet()).state.store(STATE_ONE, Release);
rtdebug!("rendezvous recv");
sched.metrics.rendezvous_recvs += 1;
// Channel is closed. Switch back and check the data.
// NB: We have to drop back into the scheduler event loop here
// instead of switching immediately back or we could end up
// triggering infinite recursion on the scheduler's stack.
let recvr = BlockedTask::cast_from_uint(task_as_state);
sched.enqueue_blocked_task(recvr);
true
}
_ => rtabort!("can't block_on; a task is already blocked")
}
}
}
// This is the only select trait function that's not also used in recv.
fn unblock_from(&mut self) -> bool {
let packet = self.packet();
unsafe {
// In case the data is available, the acquire barrier here matches
// the release barrier the sender used to release the payload.
match (*packet).state.load(Acquire) {
// Impossible. We removed STATE_BOTH when blocking on it, and
// no self-respecting sender would put it back.
STATE_BOTH => rtabort!("refcount already 2 in unblock_from"),
// Here, a sender already tried to wake us up. Perhaps they
// even succeeded! Data is available.
STATE_ONE => true,
// Still registered as blocked. Need to "unblock" the pointer.
task_as_state => {
// In the window between the load and the CAS, a sender
// might take the pointer and set the refcount to ONE. If
// that happens, we shouldn't clobber that with BOTH!
// Acquire barrier again for the same reason as above.
match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
Acquire) {
STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
STATE_ONE => true, // Lost the race. Data available.
same_ptr => {
// We successfully unblocked our task pointer.
assert!(task_as_state == same_ptr);
let handle = BlockedTask::cast_from_uint(task_as_state);
// Because we are already awake, the handle we
// gave to this port shall already be empty.
handle.assert_already_awake();
false
}
}
}
}
}
}
}
impl<T> SelectPort<T> for PortOne<T> {
fn recv_ready(self) -> Option<T> {
let mut this = self;
let packet = this.packet();
// No further memory barrier is needed here to access the
// payload. Some scenarios:
@ -210,14 +296,17 @@ impl<T> PortOne<T> {
// 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
// is pinned to some other scheduler, so the sending task had to give us to
// a different scheduler for resuming. That send synchronized memory.
unsafe {
let payload = util::replace(&mut (*packet).payload, None);
// See corresponding store() above in block_on for rationale.
// FIXME(#8130) This can happen only in test builds.
assert!((*packet).state.load(Acquire) == STATE_ONE);
let payload = (*packet).payload.take();
// The sender has closed up shop. Drop the packet.
let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
// Suppress the synchronizing actions in the finalizer. We're done with the packet.
this.inner.suppress_finalize = true;
this.suppress_finalize = true;
return payload;
}
}
@ -226,19 +315,19 @@ impl<T> PortOne<T> {
impl<T> Peekable<T> for PortOne<T> {
fn peek(&self) -> bool {
unsafe {
let packet: *mut Packet<T> = self.inner.packet();
let packet: *mut Packet<T> = self.packet();
let oldstate = (*packet).state.load(SeqCst);
match oldstate {
STATE_BOTH => false,
STATE_ONE => (*packet).payload.is_some(),
_ => util::unreachable()
_ => rtabort!("peeked on a blocked task")
}
}
}
}
#[unsafe_destructor]
impl<T> Drop for ChanOneHack<T> {
impl<T> Drop for ChanOne<T> {
fn drop(&self) {
if self.suppress_finalize { return }
@ -267,7 +356,7 @@ impl<T> Drop for ChanOneHack<T> {
}
#[unsafe_destructor]
impl<T> Drop for PortOneHack<T> {
impl<T> Drop for PortOne<T> {
fn drop(&self) {
if self.suppress_finalize { return }
@ -295,26 +384,6 @@ impl<T> Drop for PortOneHack<T> {
}
}
impl<T> ChanOneHack<T> {
fn packet(&self) -> *mut Packet<T> {
unsafe {
let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
let p: *mut Packet<T> = &mut **p;
return p;
}
}
}
impl<T> PortOneHack<T> {
fn packet(&self) -> *mut Packet<T> {
unsafe {
let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
let p: *mut Packet<T> = &mut **p;
return p;
}
}
}
struct StreamPayload<T> {
val: T,
next: PortOne<StreamPayload<T>>
@ -385,6 +454,36 @@ impl<T> Peekable<T> for Port<T> {
}
}
impl<T> Select for Port<T> {
#[inline]
fn optimistic_check(&mut self) -> bool {
do self.next.with_mut_ref |pone| { pone.optimistic_check() }
}
#[inline]
fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
let task = Cell::new(task);
do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
}
#[inline]
fn unblock_from(&mut self) -> bool {
do self.next.with_mut_ref |pone| { pone.unblock_from() }
}
}
impl<T> SelectPort<(T, Port<T>)> for Port<T> {
fn recv_ready(self) -> Option<(T, Port<T>)> {
match self.next.take().recv_ready() {
Some(StreamPayload { val, next }) => {
self.next.put_back(next);
Some((val, self))
}
None => None
}
}
}
pub struct SharedChan<T> {
// Just like Chan, but a shared AtomicOption instead of Cell
priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>

View file

@ -106,8 +106,14 @@ impl Drop for KillFlag {
// blocked task handle. So unblocking a task must restore that spare.
unsafe fn revive_task_ptr(task_ptr: uint, spare_flag: Option<KillFlagHandle>) -> ~Task {
let mut task: ~Task = cast::transmute(task_ptr);
rtassert!(task.death.spare_kill_flag.is_none());
task.death.spare_kill_flag = spare_flag;
if task.death.spare_kill_flag.is_none() {
task.death.spare_kill_flag = spare_flag;
} else {
// A task's spare kill flag is not used for blocking in one case:
// when an unkillable task blocks on select. In this case, a separate
// one was created, which we now discard.
rtassert!(task.death.unkillable > 0);
}
task
}
@ -119,7 +125,7 @@ impl BlockedTask {
Killable(flag_arc) => {
let flag = unsafe { &mut **flag_arc.get() };
match flag.swap(KILL_RUNNING, SeqCst) {
KILL_RUNNING => rtabort!("tried to wake an already-running task"),
KILL_RUNNING => None, // woken from select(), perhaps
KILL_KILLED => None, // a killer stole it already
task_ptr =>
Some(unsafe { revive_task_ptr(task_ptr, Some(flag_arc)) })
@ -162,6 +168,27 @@ impl BlockedTask {
}
}
/// Converts one blocked task handle to a list of many handles to the same.
pub fn make_selectable(self, num_handles: uint) -> ~[BlockedTask] {
let handles = match self {
Unkillable(task) => {
let flag = unsafe { KillFlag(AtomicUint::new(cast::transmute(task))) };
UnsafeAtomicRcBox::newN(flag, num_handles)
}
Killable(flag_arc) => flag_arc.cloneN(num_handles),
};
// Even if the task was unkillable before, we use 'Killable' because
// multiple pipes will have handles. It does not really mean killable.
handles.consume_iter().transform(|x| Killable(x)).collect()
}
// This assertion has two flavours because the wake involves an atomic op.
// In the faster version, destructors will fail dramatically instead.
#[inline] #[cfg(not(test))]
pub fn assert_already_awake(self) { }
#[inline] #[cfg(test)]
pub fn assert_already_awake(self) { assert!(self.wake().is_none()); }
/// Convert to an unsafe uint value. Useful for storing in a pipe's state flag.
#[inline]
pub unsafe fn cast_to_uint(self) -> uint {
@ -301,7 +328,7 @@ impl KillHandle {
}
// Try to see if all our children are gone already.
match unsafe { self.try_unwrap() } {
match self.try_unwrap() {
// Couldn't unwrap; children still alive. Reparent entire handle as
// our own tombstone, to be unwrapped later.
Left(this) => {
@ -313,7 +340,7 @@ impl KillHandle {
// Prefer to check tombstones that were there first,
// being "more fair" at the expense of tail-recursion.
others.take().map_consume_default(true, |f| f()) && {
let mut inner = unsafe { this.take().unwrap() };
let mut inner = this.take().unwrap();
(!inner.any_child_failed) &&
inner.child_tombstones.take_map_default(true, |f| f())
}
@ -402,7 +429,7 @@ impl Death {
do self.on_exit.take_map |on_exit| {
if success {
// We succeeded, but our children might not. Need to wait for them.
let mut inner = unsafe { self.kill_handle.take_unwrap().unwrap() };
let mut inner = self.kill_handle.take_unwrap().unwrap();
if inner.any_child_failed {
success = false;
} else {
@ -528,7 +555,7 @@ mod test {
// Without another handle to child, the try unwrap should succeed.
child.reparent_children_to(&mut parent);
let mut parent_inner = unsafe { parent.unwrap() };
let mut parent_inner = parent.unwrap();
assert!(parent_inner.child_tombstones.is_none());
assert!(parent_inner.any_child_failed == false);
}
@ -543,7 +570,7 @@ mod test {
child.notify_immediate_failure();
// Without another handle to child, the try unwrap should succeed.
child.reparent_children_to(&mut parent);
let mut parent_inner = unsafe { parent.unwrap() };
let mut parent_inner = parent.unwrap();
assert!(parent_inner.child_tombstones.is_none());
// Immediate failure should have been propagated.
assert!(parent_inner.any_child_failed);
@ -565,7 +592,7 @@ mod test {
// Otherwise, due to 'link', it would try to tombstone.
child2.reparent_children_to(&mut parent);
// Should successfully unwrap even though 'link' is still alive.
let mut parent_inner = unsafe { parent.unwrap() };
let mut parent_inner = parent.unwrap();
assert!(parent_inner.child_tombstones.is_none());
// Immediate failure should have been propagated by first child.
assert!(parent_inner.any_child_failed);
@ -584,7 +611,7 @@ mod test {
// Let parent collect tombstones.
util::ignore(link);
// Must have created a tombstone
let mut parent_inner = unsafe { parent.unwrap() };
let mut parent_inner = parent.unwrap();
assert!(parent_inner.child_tombstones.take_unwrap()());
assert!(parent_inner.any_child_failed == false);
}
@ -603,7 +630,7 @@ mod test {
// Let parent collect tombstones.
util::ignore(link);
// Must have created a tombstone
let mut parent_inner = unsafe { parent.unwrap() };
let mut parent_inner = parent.unwrap();
// Failure must be seen in the tombstone.
assert!(parent_inner.child_tombstones.take_unwrap()() == false);
assert!(parent_inner.any_child_failed == false);
@ -623,7 +650,7 @@ mod test {
// Let parent collect tombstones.
util::ignore(link);
// Must have created a tombstone
let mut parent_inner = unsafe { parent.unwrap() };
let mut parent_inner = parent.unwrap();
assert!(parent_inner.child_tombstones.take_unwrap()());
assert!(parent_inner.any_child_failed == false);
}
@ -644,7 +671,7 @@ mod test {
// Let parent collect tombstones.
util::ignore(link);
// Must have created a tombstone
let mut parent_inner = unsafe { parent.unwrap() };
let mut parent_inner = parent.unwrap();
// Failure must be seen in the tombstone.
assert!(parent_inner.child_tombstones.take_unwrap()() == false);
assert!(parent_inner.any_child_failed == false);

View file

@ -142,6 +142,9 @@ pub mod tube;
/// Simple reimplementation of core::comm
pub mod comm;
/// Routines for select()ing on pipes.
pub mod select;
// FIXME #5248 shouldn't be pub
/// The runtime needs to be able to put a pointer into thread-local storage.
pub mod local_ptr;

328
src/libstd/rt/select.rs Normal file
View file

@ -0,0 +1,328 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
// use either::{Either, Left, Right};
use rt::kill::BlockedTask;
use rt::sched::Scheduler;
use rt::local::Local;
/// Trait for message-passing primitives that can be select()ed on.
pub trait Select {
// Returns true if data was available.
fn optimistic_check(&mut self) -> bool;
// Returns true if data was available. If so, shall also wake() the task.
fn block_on(&mut self, &mut Scheduler, BlockedTask) -> bool;
// Returns true if data was available.
fn unblock_from(&mut self) -> bool;
}
/// Trait for message-passing primitives that can use the select2() convenience wrapper.
// (This is separate from the above trait to enable heterogeneous lists of ports
// that implement Select on different types to use select().)
pub trait SelectPort<T> : Select {
fn recv_ready(self) -> Option<T>;
}
/// Receive a message from any one of many ports at once.
pub fn select<A: Select>(ports: &mut [A]) -> uint {
if ports.is_empty() {
fail!("can't select on an empty list");
}
for ports.mut_iter().enumerate().advance |(index, port)| {
if port.optimistic_check() {
return index;
}
}
// If one of the ports already contains data when we go to block on it, we
// don't bother enqueueing on the rest of them, so we shouldn't bother
// unblocking from it either. This is just for efficiency, not correctness.
// (If not, we need to unblock from all of them. Length is a placeholder.)
let mut ready_index = ports.len();
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |sched, task| {
let task_handles = task.make_selectable(ports.len());
for ports.mut_iter().zip(task_handles.consume_iter()).enumerate().advance
|(index, (port, task_handle))| {
// If one of the ports has data by now, it will wake the handle.
if port.block_on(sched, task_handle) {
ready_index = index;
break;
}
}
}
// Task resumes. Now unblock ourselves from all the ports we blocked on.
// If the success index wasn't reset, 'take' will just take all of them.
// Iterate in reverse so the 'earliest' index that's ready gets returned.
for ports.mut_slice(0, ready_index).mut_rev_iter().enumerate().advance |(index, port)| {
if port.unblock_from() {
ready_index = index;
}
}
assert!(ready_index < ports.len());
return ready_index;
}
/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
impl <'self> Select for &'self mut Select {
fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
self.block_on(sched, task)
}
fn unblock_from(&mut self) -> bool { self.unblock_from() }
}
pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
-> Either<(Option<TA>, B), (A, Option<TB>)> {
let result = {
let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
select(ports)
};
match result {
0 => Left ((a.recv_ready(), b)),
1 => Right((a, b.recv_ready())),
x => fail!("impossible case in select2: %?", x)
}
}
*/
#[cfg(test)]
mod test {
use super::*;
use option::*;
use rt::comm::*;
use rt::test::*;
use vec::*;
use comm::GenericChan;
use task;
use cell::Cell;
#[test] #[ignore(cfg(windows))] #[should_fail]
fn select_doesnt_get_trolled() {
select::<PortOne<()>>([]);
}
/* non-blocking select tests */
#[cfg(test)]
fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
// Unfortunately this does not actually test the block_on early-break
// codepath in select -- racing between the sender and the receiver in
// separate tasks is necessary to get around the optimistic check.
let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>()));
let mut dead_chans = ~[];
let mut ports = ports;
for chans.consume_iter().enumerate().advance |(i, chan)| {
if send_on_chans.contains(&i) {
chan.send(());
} else {
dead_chans.push(chan);
}
}
let ready_index = select(ports);
assert!(send_on_chans.contains(&ready_index));
assert!(ports.swap_remove(ready_index).recv_ready().is_some());
let _ = dead_chans;
// Same thing with streams instead.
// FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>()));
let mut dead_chans = ~[];
let mut ports = ports;
for chans.consume_iter().enumerate().advance |(i, chan)| {
if send_on_chans.contains(&i) {
chan.send(());
} else {
dead_chans.push(chan);
}
}
let ready_index = select(ports);
assert!(send_on_chans.contains(&ready_index));
assert!(ports.swap_remove(ready_index).recv_ready().is_some());
let _ = dead_chans;
}
#[test]
fn select_one() {
do run_in_newsched_task { select_helper(1, [0]) }
}
#[test]
fn select_two() {
// NB. I would like to have a test that tests the first one that is
// ready is the one that's returned, but that can't be reliably tested
// with the randomized behaviour of optimistic_check.
do run_in_newsched_task { select_helper(2, [1]) }
do run_in_newsched_task { select_helper(2, [0]) }
do run_in_newsched_task { select_helper(2, [1,0]) }
}
#[test]
fn select_a_lot() {
do run_in_newsched_task { select_helper(12, [7,8,9]) }
}
#[test]
fn select_stream() {
use util;
use comm::GenericChan;
// Sends 10 buffered packets, and uses select to retrieve them all.
// Puts the port in a different spot in the vector each time.
do run_in_newsched_task {
let (ports, _) = unzip(from_fn(10, |_| stream()));
let (port, chan) = stream();
for 10.times { chan.send(31337); }
let mut ports = ports;
let mut port = Some(port);
let order = [5u,0,4,3,2,6,9,8,7,1];
for order.iter().advance |&index| {
// put the port in the vector at any index
util::swap(port.get_mut_ref(), &mut ports[index]);
assert!(select(ports) == index);
// get it back out
util::swap(port.get_mut_ref(), &mut ports[index]);
// NB. Not recv(), because optimistic_check randomly fails.
let (data, new_port) = port.take_unwrap().recv_ready().unwrap();
assert!(data == 31337);
port = Some(new_port);
}
}
}
#[test]
fn select_unkillable() {
do run_in_newsched_task {
do task::unkillable { select_helper(2, [1]) }
}
}
/* blocking select tests */
#[test]
fn select_blocking() {
select_blocking_helper(true);
select_blocking_helper(false);
fn select_blocking_helper(killable: bool) {
do run_in_newsched_task {
let (p1,_c) = oneshot();
let (p2,c2) = oneshot();
let mut ports = [p1,p2];
let (p3,c3) = oneshot();
let (p4,c4) = oneshot();
let x = Cell::new((c2, p3, c4));
do task::spawn {
let (c2, p3, c4) = x.take();
p3.recv(); // handshake parent
c4.send(()); // normal receive
task::yield();
c2.send(()); // select receive
}
// Try to block before child sends on c2.
c3.send(());
p4.recv();
if killable {
assert!(select(ports) == 1);
} else {
do task::unkillable { assert!(select(ports) == 1); }
}
}
}
}
#[test]
fn select_racing_senders() {
static NUM_CHANS: uint = 10;
select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]);
select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
select_racing_senders_helper(true, ~[0,1,2]);
select_racing_senders_helper(false, ~[0,1,2]);
select_racing_senders_helper(true, ~[3,4,5,6]);
select_racing_senders_helper(false, ~[3,4,5,6]);
select_racing_senders_helper(true, ~[7,8,9]);
select_racing_senders_helper(false, ~[7,8,9]);
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
use uint;
use rt::test::spawntask_random;
do run_in_newsched_task {
// A bit of stress, since ordinarily this is just smoke and mirrors.
for 4.times {
let send_on_chans = send_on_chans.clone();
do task::spawn {
let mut ports = ~[];
for uint::range(0, NUM_CHANS) |i| {
let (p,c) = oneshot();
ports.push(p);
if send_on_chans.contains(&i) {
let c = Cell::new(c);
do spawntask_random {
task::yield();
c.take().send(());
}
}
}
// nondeterministic result, but should succeed
if killable {
select(ports);
} else {
do task::unkillable { select(ports); }
}
}
}
}
}
}
#[test] #[ignore(cfg(windows))]
fn select_killed() {
do run_in_newsched_task {
let (success_p, success_c) = oneshot::<bool>();
let success_c = Cell::new(success_c);
do task::try {
let success_c = Cell::new(success_c.take());
do task::unkillable {
let (p,c) = oneshot();
let c = Cell::new(c);
do task::spawn {
let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>()));
let mut ports = dead_ps;
select(ports); // should get killed; nothing should leak
c.take().send(()); // must not happen
// Make sure dead_cs doesn't get closed until after select.
let _ = dead_cs;
}
do task::spawn {
fail!(); // should kill sibling awake
}
// wait for killed selector to close (NOT send on) its c.
// hope to send 'true'.
success_c.take().send(p.try_recv().is_none());
}
};
assert!(success_p.recv());
}
}
}

View file

@ -618,32 +618,34 @@ pub fn get_scheduler() -> Scheduler {
* }
* ~~~
*/
pub unsafe fn unkillable<U>(f: &fn() -> U) -> U {
pub fn unkillable<U>(f: &fn() -> U) -> U {
use rt::task::Task;
match context() {
OldTaskContext => {
let t = rt::rust_get_task();
do (|| {
rt::rust_task_inhibit_kill(t);
f()
}).finally {
rt::rust_task_allow_kill(t);
unsafe {
match context() {
OldTaskContext => {
let t = rt::rust_get_task();
do (|| {
rt::rust_task_inhibit_kill(t);
f()
}).finally {
rt::rust_task_allow_kill(t);
}
}
}
TaskContext => {
// The inhibits/allows might fail and need to borrow the task.
let t = Local::unsafe_borrow::<Task>();
do (|| {
(*t).death.inhibit_kill((*t).unwinder.unwinding);
f()
}).finally {
(*t).death.allow_kill((*t).unwinder.unwinding);
TaskContext => {
// The inhibits/allows might fail and need to borrow the task.
let t = Local::unsafe_borrow::<Task>();
do (|| {
(*t).death.inhibit_kill((*t).unwinder.unwinding);
f()
}).finally {
(*t).death.allow_kill((*t).unwinder.unwinding);
}
}
// FIXME(#3095): This should be an rtabort as soon as the scheduler
// no longer uses a workqueue implemented with an Exclusive.
_ => f()
}
// FIXME(#3095): This should be an rtabort as soon as the scheduler
// no longer uses a workqueue implemented with an Exclusive.
_ => f()
}
}

View file

@ -512,7 +512,9 @@ impl RuntimeGlue {
unsafe fn kill_all_tasks(task: &TaskHandle) {
match *task {
OldTask(ptr) => rt::rust_task_kill_all(ptr),
NewTask(ref _handle) => rtabort!("unimplemented"), // FIXME(#7544)
// FIXME(#7544): Remove the kill_all feature entirely once the
// oldsched goes away.
NewTask(ref _handle) => rtabort!("can't kill_all in newsched"),
}
}
@ -573,7 +575,10 @@ impl RuntimeGlue {
members: members,
descendants: TaskSet::new(),
}));
let group = Taskgroup(tasks, AncestorList(None), true, None);
// FIXME(#7544): Remove the is_main flag entirely once
// the newsched goes away. The main taskgroup has no special
// behaviour.
let group = Taskgroup(tasks, AncestorList(None), false, None);
(*me).taskgroup = Some(group);
(*me).taskgroup.get_ref()
}
@ -689,7 +694,7 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
// Should be run after the local-borrowed task is returned.
if enlist_success {
if indestructible {
unsafe { do unkillable { f() } }
do unkillable { f() }
} else {
f()
}

View file

@ -22,6 +22,7 @@ use unstable::finally::Finally;
use ops::Drop;
use clone::Clone;
use kinds::Send;
use vec;
/// An atomically reference counted pointer.
///
@ -41,138 +42,172 @@ struct AtomicRcBoxData<T> {
data: Option<T>,
}
unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut libc::c_void {
let data = ~AtomicRcBoxData { count: AtomicUint::new(refcount),
unwrapper: AtomicOption::empty(),
data: Some(data) };
cast::transmute(data)
}
impl<T: Send> UnsafeAtomicRcBox<T> {
pub fn new(data: T) -> UnsafeAtomicRcBox<T> {
unsafe {
let data = ~AtomicRcBoxData { count: AtomicUint::new(1),
unwrapper: AtomicOption::empty(),
data: Some(data) };
let ptr = cast::transmute(data);
return UnsafeAtomicRcBox { data: ptr };
}
unsafe { UnsafeAtomicRcBox { data: new_inner(data, 1) } }
}
/// As new(), but returns an extra pre-cloned handle.
pub fn new2(data: T) -> (UnsafeAtomicRcBox<T>, UnsafeAtomicRcBox<T>) {
unsafe {
let data = ~AtomicRcBoxData { count: AtomicUint::new(2),
unwrapper: AtomicOption::empty(),
data: Some(data) };
let ptr = cast::transmute(data);
return (UnsafeAtomicRcBox { data: ptr },
UnsafeAtomicRcBox { data: ptr });
let ptr = new_inner(data, 2);
(UnsafeAtomicRcBox { data: ptr }, UnsafeAtomicRcBox { data: ptr })
}
}
/// As new(), but returns a vector of as many pre-cloned handles as requested.
pub fn newN(data: T, num_handles: uint) -> ~[UnsafeAtomicRcBox<T>] {
unsafe {
if num_handles == 0 {
~[] // need to free data here
} else {
let ptr = new_inner(data, num_handles);
vec::from_fn(num_handles, |_| UnsafeAtomicRcBox { data: ptr })
}
}
}
/// As newN(), but from an already-existing handle. Uses one xadd.
pub fn cloneN(self, num_handles: uint) -> ~[UnsafeAtomicRcBox<T>] {
if num_handles == 0 {
~[] // The "num_handles - 1" trick (below) fails in the 0 case.
} else {
unsafe {
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
// Minus one because we are recycling the given handle's refcount.
let old_count = data.count.fetch_add(num_handles - 1, Acquire);
// let old_count = data.count.fetch_add(num_handles, Acquire);
assert!(old_count >= 1);
let ptr = cast::transmute(data);
cast::forget(self); // Don't run the destructor on this handle.
vec::from_fn(num_handles, |_| UnsafeAtomicRcBox { data: ptr })
}
}
}
#[inline]
pub unsafe fn get(&self) -> *mut T
{
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
assert!(data.count.load(Acquire) > 0); // no barrier is really needed
let r: *mut T = data.data.get_mut_ref();
cast::forget(data);
return r;
pub fn get(&self) -> *mut T {
unsafe {
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
// FIXME(#6598) Change Acquire to Relaxed.
assert!(data.count.load(Acquire) > 0);
let r: *mut T = data.data.get_mut_ref();
cast::forget(data);
return r;
}
}
#[inline]
pub unsafe fn get_immut(&self) -> *T
{
let data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
assert!(data.count.load(Acquire) > 0); // no barrier is really needed
let r: *T = data.data.get_ref();
cast::forget(data);
return r;
pub fn get_immut(&self) -> *T {
unsafe {
let data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
assert!(data.count.load(Acquire) > 0); // no barrier is really needed
let r: *T = data.data.get_ref();
cast::forget(data);
return r;
}
}
/// Wait until all other handles are dropped, then retrieve the enclosed
/// data. See extra::arc::Arc for specific semantics documentation.
/// If called when the task is already unkillable, unwrap will unkillably
/// block; otherwise, an unwrapping task can be killed by linked failure.
pub unsafe fn unwrap(self) -> T {
pub fn unwrap(self) -> T {
let this = Cell::new(self); // argh
do task::unkillable {
let mut this = this.take();
let mut data: ~AtomicRcBoxData<T> = cast::transmute(this.data);
// Set up the unwrap protocol.
let (p1,c1) = comm::oneshot(); // ()
let (p2,c2) = comm::oneshot(); // bool
// Try to put our server end in the unwrapper slot.
// This needs no barrier -- it's protected by the release barrier on
// the xadd, and the acquire+release barrier in the destructor's xadd.
// FIXME(#6598) Change Acquire to Relaxed.
if data.unwrapper.fill(~(c1,p2), Acquire).is_none() {
// Got in. Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
// Drop our own reference.
let old_count = data.count.fetch_sub(1, Release);
assert!(old_count >= 1);
if old_count == 1 {
// We were the last owner. Can unwrap immediately.
// AtomicOption's destructor will free the server endpoint.
// FIXME(#3224): it should be like this
// let ~AtomicRcBoxData { data: user_data, _ } = data;
// user_data
data.data.take_unwrap()
} else {
// The *next* person who sees the refcount hit 0 will wake us.
let p1 = Cell::new(p1); // argh
// Unlike the above one, this cell is necessary. It will get
// taken either in the do block or in the finally block.
let c2_and_data = Cell::new((c2,data));
do (|| {
do task::rekillable { p1.take().recv(); }
// Got here. Back in the 'unkillable' without getting killed.
let (c2, data) = c2_and_data.take();
c2.send(true);
unsafe {
let mut this = this.take();
let mut data: ~AtomicRcBoxData<T> = cast::transmute(this.data);
// Set up the unwrap protocol.
let (p1,c1) = comm::oneshot(); // ()
let (p2,c2) = comm::oneshot(); // bool
// Try to put our server end in the unwrapper slot.
// This needs no barrier -- it's protected by the release barrier on
// the xadd, and the acquire+release barrier in the destructor's xadd.
// FIXME(#6598) Change Acquire to Relaxed.
if data.unwrapper.fill(~(c1,p2), Acquire).is_none() {
// Got in. Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
// Drop our own reference.
let old_count = data.count.fetch_sub(1, Release);
assert!(old_count >= 1);
if old_count == 1 {
// We were the last owner. Can unwrap immediately.
// AtomicOption's destructor will free the server endpoint.
// FIXME(#3224): it should be like this
// let ~AtomicRcBoxData { data: user_data, _ } = data;
// user_data
let mut data = data;
data.data.take_unwrap()
}).finally {
if task::failing() {
// Killed during wait. Because this might happen while
// someone else still holds a reference, we can't free
// the data now; the "other" last refcount will free it.
} else {
// The *next* person who sees the refcount hit 0 will wake us.
let p1 = Cell::new(p1); // argh
// Unlike the above one, this cell is necessary. It will get
// taken either in the do block or in the finally block.
let c2_and_data = Cell::new((c2,data));
do (|| {
do task::rekillable { p1.take().recv(); }
// Got here. Back in the 'unkillable' without getting killed.
let (c2, data) = c2_and_data.take();
c2.send(false);
cast::forget(data);
} else {
assert!(c2_and_data.is_empty());
c2.send(true);
// FIXME(#3224): it should be like this
// let ~AtomicRcBoxData { data: user_data, _ } = data;
// user_data
let mut data = data;
data.data.take_unwrap()
}).finally {
if task::failing() {
// Killed during wait. Because this might happen while
// someone else still holds a reference, we can't free
// the data now; the "other" last refcount will free it.
let (c2, data) = c2_and_data.take();
c2.send(false);
cast::forget(data);
} else {
assert!(c2_and_data.is_empty());
}
}
}
} else {
// If 'put' returns the server end back to us, we were rejected;
// someone else was trying to unwrap. Avoid guaranteed deadlock.
cast::forget(data);
fail!("Another task is already unwrapping this Arc!");
}
} else {
// If 'put' returns the server end back to us, we were rejected;
// someone else was trying to unwrap. Avoid guaranteed deadlock.
cast::forget(data);
fail!("Another task is already unwrapping this Arc!");
}
}
}
/// As unwrap above, but without blocking. Returns 'Left(self)' if this is
/// not the last reference; 'Right(unwrapped_data)' if so.
pub unsafe fn try_unwrap(self) -> Either<UnsafeAtomicRcBox<T>, T> {
let mut this = self; // FIXME(#4330) mutable self
let mut data: ~AtomicRcBoxData<T> = cast::transmute(this.data);
// This can of course race with anybody else who has a handle, but in
// such a case, the returned count will always be at least 2. If we
// see 1, no race was possible. All that matters is 1 or not-1.
let count = data.count.load(Acquire);
assert!(count >= 1);
// The more interesting race is one with an unwrapper. They may have
// already dropped their count -- but if so, the unwrapper pointer
// will have been set first, which the barriers ensure we will see.
// (Note: using is_empty(), not take(), to not free the unwrapper.)
if count == 1 && data.unwrapper.is_empty(Acquire) {
// Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
// FIXME(#3224) as above
Right(data.data.take_unwrap())
} else {
cast::forget(data);
Left(this)
pub fn try_unwrap(self) -> Either<UnsafeAtomicRcBox<T>, T> {
unsafe {
let mut this = self; // FIXME(#4330) mutable self
let mut data: ~AtomicRcBoxData<T> = cast::transmute(this.data);
// This can of course race with anybody else who has a handle, but in
// such a case, the returned count will always be at least 2. If we
// see 1, no race was possible. All that matters is 1 or not-1.
let count = data.count.load(Acquire);
assert!(count >= 1);
// The more interesting race is one with an unwrapper. They may have
// already dropped their count -- but if so, the unwrapper pointer
// will have been set first, which the barriers ensure we will see.
// (Note: using is_empty(), not take(), to not free the unwrapper.)
if count == 1 && data.unwrapper.is_empty(Acquire) {
// Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
// FIXME(#3224) as above
Right(data.data.take_unwrap())
} else {
cast::forget(data);
Left(this)
}
}
}
}
@ -342,7 +377,7 @@ impl<T:Send> Exclusive<T> {
pub fn unwrap(self) -> T {
let Exclusive { x: x } = self;
// Someday we might need to unkillably unwrap an Exclusive, but not today.
let inner = unsafe { x.unwrap() };
let inner = x.unwrap();
let ExData { data: user_data, _ } = inner; // will destroy the LittleLock
user_data
}
@ -416,53 +451,71 @@ mod tests {
}
}
#[test]
fn arclike_newN() {
// Tests that the many-refcounts-at-once constructors don't leak.
let _ = UnsafeAtomicRcBox::new2(~~"hello");
let x = UnsafeAtomicRcBox::newN(~~"hello", 0);
assert_eq!(x.len(), 0)
let x = UnsafeAtomicRcBox::newN(~~"hello", 1);
assert_eq!(x.len(), 1)
let x = UnsafeAtomicRcBox::newN(~~"hello", 10);
assert_eq!(x.len(), 10)
}
#[test]
fn arclike_cloneN() {
// Tests that the many-refcounts-at-once special-clone doesn't leak.
let x = UnsafeAtomicRcBox::new(~~"hello");
let x = x.cloneN(0);
assert_eq!(x.len(), 0);
let x = UnsafeAtomicRcBox::new(~~"hello");
let x = x.cloneN(1);
assert_eq!(x.len(), 1);
let x = UnsafeAtomicRcBox::new(~~"hello");
let x = x.cloneN(10);
assert_eq!(x.len(), 10);
}
#[test]
fn arclike_unwrap_basic() {
unsafe {
let x = UnsafeAtomicRcBox::new(~~"hello");
assert!(x.unwrap() == ~~"hello");
}
let x = UnsafeAtomicRcBox::new(~~"hello");
assert!(x.unwrap() == ~~"hello");
}
#[test]
fn arclike_try_unwrap() {
unsafe {
let x = UnsafeAtomicRcBox::new(~~"hello");
assert!(x.try_unwrap().expect_right("try_unwrap failed") == ~~"hello");
}
let x = UnsafeAtomicRcBox::new(~~"hello");
assert!(x.try_unwrap().expect_right("try_unwrap failed") == ~~"hello");
}
#[test]
fn arclike_try_unwrap_fail() {
unsafe {
let x = UnsafeAtomicRcBox::new(~~"hello");
let x2 = x.clone();
let left_x = x.try_unwrap();
assert!(left_x.is_left());
util::ignore(left_x);
assert!(x2.try_unwrap().expect_right("try_unwrap none") == ~~"hello");
}
let x = UnsafeAtomicRcBox::new(~~"hello");
let x2 = x.clone();
let left_x = x.try_unwrap();
assert!(left_x.is_left());
util::ignore(left_x);
assert!(x2.try_unwrap().expect_right("try_unwrap none") == ~~"hello");
}
#[test]
fn arclike_try_unwrap_unwrap_race() {
// When an unwrap and a try_unwrap race, the unwrapper should always win.
unsafe {
let x = UnsafeAtomicRcBox::new(~~"hello");
let x2 = Cell::new(x.clone());
let (p,c) = comm::stream();
do task::spawn {
c.send(());
assert!(x2.take().unwrap() == ~~"hello");
c.send(());
}
p.recv();
task::yield(); // Try to make the unwrapper get blocked first.
let left_x = x.try_unwrap();
assert!(left_x.is_left());
util::ignore(left_x);
p.recv();
let x = UnsafeAtomicRcBox::new(~~"hello");
let x2 = Cell::new(x.clone());
let (p,c) = comm::stream();
do task::spawn {
c.send(());
assert!(x2.take().unwrap() == ~~"hello");
c.send(());
}
p.recv();
task::yield(); // Try to make the unwrapper get blocked first.
let left_x = x.try_unwrap();
assert!(left_x.is_left());
util::ignore(left_x);
p.recv();
}
#[test]