diff --git a/src/libextra/arc.rs b/src/libextra/arc.rs index 9479e47ed8c..d4bf1d480ed 100644 --- a/src/libextra/arc.rs +++ b/src/libextra/arc.rs @@ -136,7 +136,7 @@ impl Arc { */ pub fn unwrap(self) -> T { let Arc { x: x } = self; - unsafe { x.unwrap() } + x.unwrap() } } @@ -250,7 +250,7 @@ impl MutexArc { */ pub fn unwrap(self) -> T { let MutexArc { x: x } = self; - let inner = unsafe { x.unwrap() }; + let inner = x.unwrap(); let MutexArcInner { failed: failed, data: data, _ } = inner; if failed { fail!(~"Can't unwrap poisoned MutexArc - another task failed inside!"); @@ -469,7 +469,7 @@ impl RWArc { */ pub fn unwrap(self) -> T { let RWArc { x: x, _ } = self; - let inner = unsafe { x.unwrap() }; + let inner = x.unwrap(); let RWArcInner { failed: failed, data: data, _ } = inner; if failed { fail!(~"Can't unwrap poisoned RWArc - another task failed inside!") diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index 743c4347a4b..47fd4cccb9f 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -130,11 +130,9 @@ impl Sem { impl Sem<()> { pub fn access(&self, blk: &fn() -> U) -> U { let mut release = None; - unsafe { - do task::unkillable { - self.acquire(); - release = Some(SemRelease(self)); - } + do task::unkillable { + self.acquire(); + release = Some(SemRelease(self)); } blk() } @@ -153,11 +151,9 @@ impl Sem<~[WaitQueue]> { pub fn access_waitqueue(&self, blk: &fn() -> U) -> U { let mut release = None; - unsafe { - do task::unkillable { - self.acquire(); - release = Some(SemAndSignalRelease(self)); - } + do task::unkillable { + self.acquire(); + release = Some(SemAndSignalRelease(self)); } blk() } @@ -294,17 +290,15 @@ impl<'self> Condvar<'self> { #[unsafe_destructor] impl<'self> Drop for CondvarReacquire<'self> { fn drop(&self) { - unsafe { - // Needs to succeed, instead of itself dying. - do task::unkillable { - match self.order { - Just(lock) => do lock.access { - self.sem.acquire(); - }, - Nothing => { - self.sem.acquire(); - }, - } + // Needs to succeed, instead of itself dying. + do task::unkillable { + match self.order { + Just(lock) => do lock.access { + self.sem.acquire(); + }, + Nothing => { + self.sem.acquire(); + }, } } } @@ -644,14 +638,12 @@ impl RWLock { // Implementation slightly different from the slicker 'write's above. // The exit path is conditional on whether the caller downgrades. let mut _release = None; - unsafe { - do task::unkillable { - (&self.order_lock).acquire(); - (&self.access_lock).acquire(); - (&self.order_lock).release(); - } - _release = Some(RWLockReleaseDowngrade(self)); + do task::unkillable { + (&self.order_lock).acquire(); + (&self.access_lock).acquire(); + (&self.order_lock).release(); } + _release = Some(RWLockReleaseDowngrade(self)); blk(RWLockWriteMode { lock: self }) } diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index c0effdaa94c..6528835c52c 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -12,13 +12,13 @@ use option::*; use cast; -use util; use ops::Drop; use rt::kill::BlockedTask; use kinds::Send; use rt::sched::Scheduler; use rt::local::Local; -use unstable::atomics::{AtomicUint, AtomicOption, SeqCst}; +use rt::select::{Select, SelectPort}; +use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Release, SeqCst}; use unstable::sync::UnsafeAtomicRcBox; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; @@ -45,23 +45,12 @@ struct Packet { /// A one-shot channel. pub struct ChanOne { - // XXX: Hack extra allocation to make by-val self work - inner: ~ChanOneHack -} - - -/// A one-shot port. -pub struct PortOne { - // XXX: Hack extra allocation to make by-val self work - inner: ~PortOneHack -} - -pub struct ChanOneHack { void_packet: *mut Void, suppress_finalize: bool } -pub struct PortOneHack { +/// A one-shot port. +pub struct PortOne { void_packet: *mut Void, suppress_finalize: bool } @@ -75,22 +64,26 @@ pub fn oneshot() -> (PortOne, ChanOne) { unsafe { let packet: *mut Void = cast::transmute(packet); let port = PortOne { - inner: ~PortOneHack { - void_packet: packet, - suppress_finalize: false - } + void_packet: packet, + suppress_finalize: false }; let chan = ChanOne { - inner: ~ChanOneHack { - void_packet: packet, - suppress_finalize: false - } + void_packet: packet, + suppress_finalize: false }; return (port, chan); } } impl ChanOne { + #[inline] + fn packet(&self) -> *mut Packet { + unsafe { + let p: *mut ~Packet = cast::transmute(&self.void_packet); + let p: *mut Packet = &mut **p; + return p; + } + } pub fn send(self, val: T) { self.try_send(val); @@ -99,7 +92,7 @@ impl ChanOne { pub fn try_send(self, val: T) -> bool { let mut this = self; let mut recvr_active = true; - let packet = this.inner.packet(); + let packet = this.packet(); unsafe { @@ -127,7 +120,7 @@ impl ChanOne { sched.metrics.rendezvous_sends += 1; } // Port has closed. Need to clean up. - let _packet: ~Packet = cast::transmute(this.inner.void_packet); + let _packet: ~Packet = cast::transmute(this.void_packet); recvr_active = false; } task_as_state => { @@ -144,13 +137,20 @@ impl ChanOne { } // Suppress the synchronizing actions in the finalizer. We're done with the packet. - this.inner.suppress_finalize = true; + this.suppress_finalize = true; return recvr_active; } } - impl PortOne { + fn packet(&self) -> *mut Packet { + unsafe { + let p: *mut ~Packet = cast::transmute(&self.void_packet); + let p: *mut Packet = &mut **p; + return p; + } + } + pub fn recv(self) -> T { match self.try_recv() { Some(val) => val, @@ -162,43 +162,129 @@ impl PortOne { pub fn try_recv(self) -> Option { let mut this = self; - let packet = this.inner.packet(); - // XXX: Optimize this to not require the two context switches when data is available - - // Switch to the scheduler to put the ~Task into the Packet state. - let sched = Local::take::(); - do sched.deschedule_running_task_and_then |sched, task| { - unsafe { - // Atomically swap the task pointer into the Packet state, issuing - // an acquire barrier to prevent reordering of the subsequent read - // of the payload. Also issues a release barrier to prevent reordering - // of any previous writes to the task structure. - let task_as_state = task.cast_to_uint(); - let oldstate = (*packet).state.swap(task_as_state, SeqCst); - match oldstate { - STATE_BOTH => { - // Data has not been sent. Now we're blocked. - rtdebug!("non-rendezvous recv"); - sched.metrics.non_rendezvous_recvs += 1; - } - STATE_ONE => { - rtdebug!("rendezvous recv"); - sched.metrics.rendezvous_recvs += 1; - - // Channel is closed. Switch back and check the data. - // NB: We have to drop back into the scheduler event loop here - // instead of switching immediately back or we could end up - // triggering infinite recursion on the scheduler's stack. - let recvr = BlockedTask::cast_from_uint(task_as_state); - sched.enqueue_blocked_task(recvr); - } - _ => util::unreachable() - } + // Optimistic check. If data was sent already, we don't even need to block. + // No release barrier needed here; we're not handing off our task pointer yet. + if !this.optimistic_check() { + // No data available yet. + // Switch to the scheduler to put the ~Task into the Packet state. + let sched = Local::take::(); + do sched.deschedule_running_task_and_then |sched, task| { + this.block_on(sched, task); } } // Task resumes. + this.recv_ready() + } +} + +impl Select for PortOne { + #[inline] #[cfg(not(test))] + fn optimistic_check(&mut self) -> bool { + unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } + } + + #[inline] #[cfg(test)] + fn optimistic_check(&mut self) -> bool { + // The optimistic check is never necessary for correctness. For testing + // purposes, making it randomly return false simulates a racing sender. + use rand::{Rand, rng}; + let mut rng = rng(); + let actually_check = Rand::rand(&mut rng); + if actually_check { + unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } + } else { + false + } + } + + fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { + unsafe { + // Atomically swap the task pointer into the Packet state, issuing + // an acquire barrier to prevent reordering of the subsequent read + // of the payload. Also issues a release barrier to prevent + // reordering of any previous writes to the task structure. + let task_as_state = task.cast_to_uint(); + let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst); + match oldstate { + STATE_BOTH => { + // Data has not been sent. Now we're blocked. + rtdebug!("non-rendezvous recv"); + sched.metrics.non_rendezvous_recvs += 1; + false + } + STATE_ONE => { + // Re-record that we are the only owner of the packet. + // Release barrier needed in case the task gets reawoken + // on a different core (this is analogous to writing a + // payload; a barrier in enqueueing the task protects it). + // NB(#8132). This *must* occur before the enqueue below. + // FIXME(#6842, #8130) This is usually only needed for the + // assertion in recv_ready, except in the case of select(). + // This won't actually ever have cacheline contention, but + // maybe should be optimized out with a cfg(test) anyway? + (*self.packet()).state.store(STATE_ONE, Release); + + rtdebug!("rendezvous recv"); + sched.metrics.rendezvous_recvs += 1; + + // Channel is closed. Switch back and check the data. + // NB: We have to drop back into the scheduler event loop here + // instead of switching immediately back or we could end up + // triggering infinite recursion on the scheduler's stack. + let recvr = BlockedTask::cast_from_uint(task_as_state); + sched.enqueue_blocked_task(recvr); + true + } + _ => rtabort!("can't block_on; a task is already blocked") + } + } + } + + // This is the only select trait function that's not also used in recv. + fn unblock_from(&mut self) -> bool { + let packet = self.packet(); + unsafe { + // In case the data is available, the acquire barrier here matches + // the release barrier the sender used to release the payload. + match (*packet).state.load(Acquire) { + // Impossible. We removed STATE_BOTH when blocking on it, and + // no self-respecting sender would put it back. + STATE_BOTH => rtabort!("refcount already 2 in unblock_from"), + // Here, a sender already tried to wake us up. Perhaps they + // even succeeded! Data is available. + STATE_ONE => true, + // Still registered as blocked. Need to "unblock" the pointer. + task_as_state => { + // In the window between the load and the CAS, a sender + // might take the pointer and set the refcount to ONE. If + // that happens, we shouldn't clobber that with BOTH! + // Acquire barrier again for the same reason as above. + match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH, + Acquire) { + STATE_BOTH => rtabort!("refcount became 2 in unblock_from"), + STATE_ONE => true, // Lost the race. Data available. + same_ptr => { + // We successfully unblocked our task pointer. + assert!(task_as_state == same_ptr); + let handle = BlockedTask::cast_from_uint(task_as_state); + // Because we are already awake, the handle we + // gave to this port shall already be empty. + handle.assert_already_awake(); + false + } + } + } + } + } + } +} + +impl SelectPort for PortOne { + fn recv_ready(self) -> Option { + let mut this = self; + let packet = this.packet(); // No further memory barrier is needed here to access the // payload. Some scenarios: @@ -210,14 +296,17 @@ impl PortOne { // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task) // is pinned to some other scheduler, so the sending task had to give us to // a different scheduler for resuming. That send synchronized memory. - unsafe { - let payload = util::replace(&mut (*packet).payload, None); + // See corresponding store() above in block_on for rationale. + // FIXME(#8130) This can happen only in test builds. + assert!((*packet).state.load(Acquire) == STATE_ONE); + + let payload = (*packet).payload.take(); // The sender has closed up shop. Drop the packet. - let _packet: ~Packet = cast::transmute(this.inner.void_packet); + let _packet: ~Packet = cast::transmute(this.void_packet); // Suppress the synchronizing actions in the finalizer. We're done with the packet. - this.inner.suppress_finalize = true; + this.suppress_finalize = true; return payload; } } @@ -226,19 +315,19 @@ impl PortOne { impl Peekable for PortOne { fn peek(&self) -> bool { unsafe { - let packet: *mut Packet = self.inner.packet(); + let packet: *mut Packet = self.packet(); let oldstate = (*packet).state.load(SeqCst); match oldstate { STATE_BOTH => false, STATE_ONE => (*packet).payload.is_some(), - _ => util::unreachable() + _ => rtabort!("peeked on a blocked task") } } } } #[unsafe_destructor] -impl Drop for ChanOneHack { +impl Drop for ChanOne { fn drop(&self) { if self.suppress_finalize { return } @@ -267,7 +356,7 @@ impl Drop for ChanOneHack { } #[unsafe_destructor] -impl Drop for PortOneHack { +impl Drop for PortOne { fn drop(&self) { if self.suppress_finalize { return } @@ -295,26 +384,6 @@ impl Drop for PortOneHack { } } -impl ChanOneHack { - fn packet(&self) -> *mut Packet { - unsafe { - let p: *mut ~Packet = cast::transmute(&self.void_packet); - let p: *mut Packet = &mut **p; - return p; - } - } -} - -impl PortOneHack { - fn packet(&self) -> *mut Packet { - unsafe { - let p: *mut ~Packet = cast::transmute(&self.void_packet); - let p: *mut Packet = &mut **p; - return p; - } - } -} - struct StreamPayload { val: T, next: PortOne> @@ -385,6 +454,36 @@ impl Peekable for Port { } } +impl Select for Port { + #[inline] + fn optimistic_check(&mut self) -> bool { + do self.next.with_mut_ref |pone| { pone.optimistic_check() } + } + + #[inline] + fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { + let task = Cell::new(task); + do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) } + } + + #[inline] + fn unblock_from(&mut self) -> bool { + do self.next.with_mut_ref |pone| { pone.unblock_from() } + } +} + +impl SelectPort<(T, Port)> for Port { + fn recv_ready(self) -> Option<(T, Port)> { + match self.next.take().recv_ready() { + Some(StreamPayload { val, next }) => { + self.next.put_back(next); + Some((val, self)) + } + None => None + } + } +} + pub struct SharedChan { // Just like Chan, but a shared AtomicOption instead of Cell priv next: UnsafeAtomicRcBox>> diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 2bf4543df50..e691bf51ea5 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -106,8 +106,14 @@ impl Drop for KillFlag { // blocked task handle. So unblocking a task must restore that spare. unsafe fn revive_task_ptr(task_ptr: uint, spare_flag: Option) -> ~Task { let mut task: ~Task = cast::transmute(task_ptr); - rtassert!(task.death.spare_kill_flag.is_none()); - task.death.spare_kill_flag = spare_flag; + if task.death.spare_kill_flag.is_none() { + task.death.spare_kill_flag = spare_flag; + } else { + // A task's spare kill flag is not used for blocking in one case: + // when an unkillable task blocks on select. In this case, a separate + // one was created, which we now discard. + rtassert!(task.death.unkillable > 0); + } task } @@ -119,7 +125,7 @@ impl BlockedTask { Killable(flag_arc) => { let flag = unsafe { &mut **flag_arc.get() }; match flag.swap(KILL_RUNNING, SeqCst) { - KILL_RUNNING => rtabort!("tried to wake an already-running task"), + KILL_RUNNING => None, // woken from select(), perhaps KILL_KILLED => None, // a killer stole it already task_ptr => Some(unsafe { revive_task_ptr(task_ptr, Some(flag_arc)) }) @@ -162,6 +168,27 @@ impl BlockedTask { } } + /// Converts one blocked task handle to a list of many handles to the same. + pub fn make_selectable(self, num_handles: uint) -> ~[BlockedTask] { + let handles = match self { + Unkillable(task) => { + let flag = unsafe { KillFlag(AtomicUint::new(cast::transmute(task))) }; + UnsafeAtomicRcBox::newN(flag, num_handles) + } + Killable(flag_arc) => flag_arc.cloneN(num_handles), + }; + // Even if the task was unkillable before, we use 'Killable' because + // multiple pipes will have handles. It does not really mean killable. + handles.consume_iter().transform(|x| Killable(x)).collect() + } + + // This assertion has two flavours because the wake involves an atomic op. + // In the faster version, destructors will fail dramatically instead. + #[inline] #[cfg(not(test))] + pub fn assert_already_awake(self) { } + #[inline] #[cfg(test)] + pub fn assert_already_awake(self) { assert!(self.wake().is_none()); } + /// Convert to an unsafe uint value. Useful for storing in a pipe's state flag. #[inline] pub unsafe fn cast_to_uint(self) -> uint { @@ -301,7 +328,7 @@ impl KillHandle { } // Try to see if all our children are gone already. - match unsafe { self.try_unwrap() } { + match self.try_unwrap() { // Couldn't unwrap; children still alive. Reparent entire handle as // our own tombstone, to be unwrapped later. Left(this) => { @@ -313,7 +340,7 @@ impl KillHandle { // Prefer to check tombstones that were there first, // being "more fair" at the expense of tail-recursion. others.take().map_consume_default(true, |f| f()) && { - let mut inner = unsafe { this.take().unwrap() }; + let mut inner = this.take().unwrap(); (!inner.any_child_failed) && inner.child_tombstones.take_map_default(true, |f| f()) } @@ -402,7 +429,7 @@ impl Death { do self.on_exit.take_map |on_exit| { if success { // We succeeded, but our children might not. Need to wait for them. - let mut inner = unsafe { self.kill_handle.take_unwrap().unwrap() }; + let mut inner = self.kill_handle.take_unwrap().unwrap(); if inner.any_child_failed { success = false; } else { @@ -528,7 +555,7 @@ mod test { // Without another handle to child, the try unwrap should succeed. child.reparent_children_to(&mut parent); - let mut parent_inner = unsafe { parent.unwrap() }; + let mut parent_inner = parent.unwrap(); assert!(parent_inner.child_tombstones.is_none()); assert!(parent_inner.any_child_failed == false); } @@ -543,7 +570,7 @@ mod test { child.notify_immediate_failure(); // Without another handle to child, the try unwrap should succeed. child.reparent_children_to(&mut parent); - let mut parent_inner = unsafe { parent.unwrap() }; + let mut parent_inner = parent.unwrap(); assert!(parent_inner.child_tombstones.is_none()); // Immediate failure should have been propagated. assert!(parent_inner.any_child_failed); @@ -565,7 +592,7 @@ mod test { // Otherwise, due to 'link', it would try to tombstone. child2.reparent_children_to(&mut parent); // Should successfully unwrap even though 'link' is still alive. - let mut parent_inner = unsafe { parent.unwrap() }; + let mut parent_inner = parent.unwrap(); assert!(parent_inner.child_tombstones.is_none()); // Immediate failure should have been propagated by first child. assert!(parent_inner.any_child_failed); @@ -584,7 +611,7 @@ mod test { // Let parent collect tombstones. util::ignore(link); // Must have created a tombstone - let mut parent_inner = unsafe { parent.unwrap() }; + let mut parent_inner = parent.unwrap(); assert!(parent_inner.child_tombstones.take_unwrap()()); assert!(parent_inner.any_child_failed == false); } @@ -603,7 +630,7 @@ mod test { // Let parent collect tombstones. util::ignore(link); // Must have created a tombstone - let mut parent_inner = unsafe { parent.unwrap() }; + let mut parent_inner = parent.unwrap(); // Failure must be seen in the tombstone. assert!(parent_inner.child_tombstones.take_unwrap()() == false); assert!(parent_inner.any_child_failed == false); @@ -623,7 +650,7 @@ mod test { // Let parent collect tombstones. util::ignore(link); // Must have created a tombstone - let mut parent_inner = unsafe { parent.unwrap() }; + let mut parent_inner = parent.unwrap(); assert!(parent_inner.child_tombstones.take_unwrap()()); assert!(parent_inner.any_child_failed == false); } @@ -644,7 +671,7 @@ mod test { // Let parent collect tombstones. util::ignore(link); // Must have created a tombstone - let mut parent_inner = unsafe { parent.unwrap() }; + let mut parent_inner = parent.unwrap(); // Failure must be seen in the tombstone. assert!(parent_inner.child_tombstones.take_unwrap()() == false); assert!(parent_inner.any_child_failed == false); diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 808d07ce77d..2ca7d01da49 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -142,6 +142,9 @@ pub mod tube; /// Simple reimplementation of core::comm pub mod comm; +/// Routines for select()ing on pipes. +pub mod select; + // FIXME #5248 shouldn't be pub /// The runtime needs to be able to put a pointer into thread-local storage. pub mod local_ptr; diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs new file mode 100644 index 00000000000..bc9e265c8d9 --- /dev/null +++ b/src/libstd/rt/select.rs @@ -0,0 +1,328 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use option::*; +// use either::{Either, Left, Right}; +use rt::kill::BlockedTask; +use rt::sched::Scheduler; +use rt::local::Local; + +/// Trait for message-passing primitives that can be select()ed on. +pub trait Select { + // Returns true if data was available. + fn optimistic_check(&mut self) -> bool; + // Returns true if data was available. If so, shall also wake() the task. + fn block_on(&mut self, &mut Scheduler, BlockedTask) -> bool; + // Returns true if data was available. + fn unblock_from(&mut self) -> bool; +} + +/// Trait for message-passing primitives that can use the select2() convenience wrapper. +// (This is separate from the above trait to enable heterogeneous lists of ports +// that implement Select on different types to use select().) +pub trait SelectPort : Select { + fn recv_ready(self) -> Option; +} + +/// Receive a message from any one of many ports at once. +pub fn select(ports: &mut [A]) -> uint { + if ports.is_empty() { + fail!("can't select on an empty list"); + } + + for ports.mut_iter().enumerate().advance |(index, port)| { + if port.optimistic_check() { + return index; + } + } + + // If one of the ports already contains data when we go to block on it, we + // don't bother enqueueing on the rest of them, so we shouldn't bother + // unblocking from it either. This is just for efficiency, not correctness. + // (If not, we need to unblock from all of them. Length is a placeholder.) + let mut ready_index = ports.len(); + + let sched = Local::take::(); + do sched.deschedule_running_task_and_then |sched, task| { + let task_handles = task.make_selectable(ports.len()); + + for ports.mut_iter().zip(task_handles.consume_iter()).enumerate().advance + |(index, (port, task_handle))| { + // If one of the ports has data by now, it will wake the handle. + if port.block_on(sched, task_handle) { + ready_index = index; + break; + } + } + } + + // Task resumes. Now unblock ourselves from all the ports we blocked on. + // If the success index wasn't reset, 'take' will just take all of them. + // Iterate in reverse so the 'earliest' index that's ready gets returned. + for ports.mut_slice(0, ready_index).mut_rev_iter().enumerate().advance |(index, port)| { + if port.unblock_from() { + ready_index = index; + } + } + + assert!(ready_index < ports.len()); + return ready_index; +} + +/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet. + +impl <'self> Select for &'self mut Select { + fn optimistic_check(&mut self) -> bool { self.optimistic_check() } + fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { + self.block_on(sched, task) + } + fn unblock_from(&mut self) -> bool { self.unblock_from() } +} + +pub fn select2, TB, B: SelectPort>(mut a: A, mut b: B) + -> Either<(Option, B), (A, Option)> { + let result = { + let mut ports = [&mut a as &mut Select, &mut b as &mut Select]; + select(ports) + }; + match result { + 0 => Left ((a.recv_ready(), b)), + 1 => Right((a, b.recv_ready())), + x => fail!("impossible case in select2: %?", x) + } +} + +*/ + +#[cfg(test)] +mod test { + use super::*; + use option::*; + use rt::comm::*; + use rt::test::*; + use vec::*; + use comm::GenericChan; + use task; + use cell::Cell; + + #[test] #[ignore(cfg(windows))] #[should_fail] + fn select_doesnt_get_trolled() { + select::>([]); + } + + /* non-blocking select tests */ + + #[cfg(test)] + fn select_helper(num_ports: uint, send_on_chans: &[uint]) { + // Unfortunately this does not actually test the block_on early-break + // codepath in select -- racing between the sender and the receiver in + // separate tasks is necessary to get around the optimistic check. + let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>())); + let mut dead_chans = ~[]; + let mut ports = ports; + for chans.consume_iter().enumerate().advance |(i, chan)| { + if send_on_chans.contains(&i) { + chan.send(()); + } else { + dead_chans.push(chan); + } + } + let ready_index = select(ports); + assert!(send_on_chans.contains(&ready_index)); + assert!(ports.swap_remove(ready_index).recv_ready().is_some()); + let _ = dead_chans; + + // Same thing with streams instead. + // FIXME(#7971): This should be in a macro but borrowck isn't smart enough. + let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>())); + let mut dead_chans = ~[]; + let mut ports = ports; + for chans.consume_iter().enumerate().advance |(i, chan)| { + if send_on_chans.contains(&i) { + chan.send(()); + } else { + dead_chans.push(chan); + } + } + let ready_index = select(ports); + assert!(send_on_chans.contains(&ready_index)); + assert!(ports.swap_remove(ready_index).recv_ready().is_some()); + let _ = dead_chans; + } + + #[test] + fn select_one() { + do run_in_newsched_task { select_helper(1, [0]) } + } + + #[test] + fn select_two() { + // NB. I would like to have a test that tests the first one that is + // ready is the one that's returned, but that can't be reliably tested + // with the randomized behaviour of optimistic_check. + do run_in_newsched_task { select_helper(2, [1]) } + do run_in_newsched_task { select_helper(2, [0]) } + do run_in_newsched_task { select_helper(2, [1,0]) } + } + + #[test] + fn select_a_lot() { + do run_in_newsched_task { select_helper(12, [7,8,9]) } + } + + #[test] + fn select_stream() { + use util; + use comm::GenericChan; + + // Sends 10 buffered packets, and uses select to retrieve them all. + // Puts the port in a different spot in the vector each time. + do run_in_newsched_task { + let (ports, _) = unzip(from_fn(10, |_| stream())); + let (port, chan) = stream(); + for 10.times { chan.send(31337); } + let mut ports = ports; + let mut port = Some(port); + let order = [5u,0,4,3,2,6,9,8,7,1]; + for order.iter().advance |&index| { + // put the port in the vector at any index + util::swap(port.get_mut_ref(), &mut ports[index]); + assert!(select(ports) == index); + // get it back out + util::swap(port.get_mut_ref(), &mut ports[index]); + // NB. Not recv(), because optimistic_check randomly fails. + let (data, new_port) = port.take_unwrap().recv_ready().unwrap(); + assert!(data == 31337); + port = Some(new_port); + } + } + } + + #[test] + fn select_unkillable() { + do run_in_newsched_task { + do task::unkillable { select_helper(2, [1]) } + } + } + + /* blocking select tests */ + + #[test] + fn select_blocking() { + select_blocking_helper(true); + select_blocking_helper(false); + + fn select_blocking_helper(killable: bool) { + do run_in_newsched_task { + let (p1,_c) = oneshot(); + let (p2,c2) = oneshot(); + let mut ports = [p1,p2]; + + let (p3,c3) = oneshot(); + let (p4,c4) = oneshot(); + + let x = Cell::new((c2, p3, c4)); + do task::spawn { + let (c2, p3, c4) = x.take(); + p3.recv(); // handshake parent + c4.send(()); // normal receive + task::yield(); + c2.send(()); // select receive + } + + // Try to block before child sends on c2. + c3.send(()); + p4.recv(); + if killable { + assert!(select(ports) == 1); + } else { + do task::unkillable { assert!(select(ports) == 1); } + } + } + } + } + + #[test] + fn select_racing_senders() { + static NUM_CHANS: uint = 10; + + select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]); + select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]); + select_racing_senders_helper(true, ~[0,1,2]); + select_racing_senders_helper(false, ~[0,1,2]); + select_racing_senders_helper(true, ~[3,4,5,6]); + select_racing_senders_helper(false, ~[3,4,5,6]); + select_racing_senders_helper(true, ~[7,8,9]); + select_racing_senders_helper(false, ~[7,8,9]); + + fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) { + use uint; + use rt::test::spawntask_random; + + do run_in_newsched_task { + // A bit of stress, since ordinarily this is just smoke and mirrors. + for 4.times { + let send_on_chans = send_on_chans.clone(); + do task::spawn { + let mut ports = ~[]; + for uint::range(0, NUM_CHANS) |i| { + let (p,c) = oneshot(); + ports.push(p); + if send_on_chans.contains(&i) { + let c = Cell::new(c); + do spawntask_random { + task::yield(); + c.take().send(()); + } + } + } + // nondeterministic result, but should succeed + if killable { + select(ports); + } else { + do task::unkillable { select(ports); } + } + } + } + } + } + } + + #[test] #[ignore(cfg(windows))] + fn select_killed() { + do run_in_newsched_task { + let (success_p, success_c) = oneshot::(); + let success_c = Cell::new(success_c); + do task::try { + let success_c = Cell::new(success_c.take()); + do task::unkillable { + let (p,c) = oneshot(); + let c = Cell::new(c); + do task::spawn { + let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>())); + let mut ports = dead_ps; + select(ports); // should get killed; nothing should leak + c.take().send(()); // must not happen + // Make sure dead_cs doesn't get closed until after select. + let _ = dead_cs; + } + do task::spawn { + fail!(); // should kill sibling awake + } + + // wait for killed selector to close (NOT send on) its c. + // hope to send 'true'. + success_c.take().send(p.try_recv().is_none()); + } + }; + assert!(success_p.recv()); + } + } +} diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index df927cb6a7a..c26349b220d 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -618,32 +618,34 @@ pub fn get_scheduler() -> Scheduler { * } * ~~~ */ -pub unsafe fn unkillable(f: &fn() -> U) -> U { +pub fn unkillable(f: &fn() -> U) -> U { use rt::task::Task; - match context() { - OldTaskContext => { - let t = rt::rust_get_task(); - do (|| { - rt::rust_task_inhibit_kill(t); - f() - }).finally { - rt::rust_task_allow_kill(t); + unsafe { + match context() { + OldTaskContext => { + let t = rt::rust_get_task(); + do (|| { + rt::rust_task_inhibit_kill(t); + f() + }).finally { + rt::rust_task_allow_kill(t); + } } - } - TaskContext => { - // The inhibits/allows might fail and need to borrow the task. - let t = Local::unsafe_borrow::(); - do (|| { - (*t).death.inhibit_kill((*t).unwinder.unwinding); - f() - }).finally { - (*t).death.allow_kill((*t).unwinder.unwinding); + TaskContext => { + // The inhibits/allows might fail and need to borrow the task. + let t = Local::unsafe_borrow::(); + do (|| { + (*t).death.inhibit_kill((*t).unwinder.unwinding); + f() + }).finally { + (*t).death.allow_kill((*t).unwinder.unwinding); + } } + // FIXME(#3095): This should be an rtabort as soon as the scheduler + // no longer uses a workqueue implemented with an Exclusive. + _ => f() } - // FIXME(#3095): This should be an rtabort as soon as the scheduler - // no longer uses a workqueue implemented with an Exclusive. - _ => f() } } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 61dcc33c629..749db307012 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -512,7 +512,9 @@ impl RuntimeGlue { unsafe fn kill_all_tasks(task: &TaskHandle) { match *task { OldTask(ptr) => rt::rust_task_kill_all(ptr), - NewTask(ref _handle) => rtabort!("unimplemented"), // FIXME(#7544) + // FIXME(#7544): Remove the kill_all feature entirely once the + // oldsched goes away. + NewTask(ref _handle) => rtabort!("can't kill_all in newsched"), } } @@ -573,7 +575,10 @@ impl RuntimeGlue { members: members, descendants: TaskSet::new(), })); - let group = Taskgroup(tasks, AncestorList(None), true, None); + // FIXME(#7544): Remove the is_main flag entirely once + // the newsched goes away. The main taskgroup has no special + // behaviour. + let group = Taskgroup(tasks, AncestorList(None), false, None); (*me).taskgroup = Some(group); (*me).taskgroup.get_ref() } @@ -689,7 +694,7 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { // Should be run after the local-borrowed task is returned. if enlist_success { if indestructible { - unsafe { do unkillable { f() } } + do unkillable { f() } } else { f() } diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index b6fc5b1f662..4c52d897a72 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -22,6 +22,7 @@ use unstable::finally::Finally; use ops::Drop; use clone::Clone; use kinds::Send; +use vec; /// An atomically reference counted pointer. /// @@ -41,138 +42,172 @@ struct AtomicRcBoxData { data: Option, } +unsafe fn new_inner(data: T, refcount: uint) -> *mut libc::c_void { + let data = ~AtomicRcBoxData { count: AtomicUint::new(refcount), + unwrapper: AtomicOption::empty(), + data: Some(data) }; + cast::transmute(data) +} + impl UnsafeAtomicRcBox { pub fn new(data: T) -> UnsafeAtomicRcBox { - unsafe { - let data = ~AtomicRcBoxData { count: AtomicUint::new(1), - unwrapper: AtomicOption::empty(), - data: Some(data) }; - let ptr = cast::transmute(data); - return UnsafeAtomicRcBox { data: ptr }; - } + unsafe { UnsafeAtomicRcBox { data: new_inner(data, 1) } } } /// As new(), but returns an extra pre-cloned handle. pub fn new2(data: T) -> (UnsafeAtomicRcBox, UnsafeAtomicRcBox) { unsafe { - let data = ~AtomicRcBoxData { count: AtomicUint::new(2), - unwrapper: AtomicOption::empty(), - data: Some(data) }; - let ptr = cast::transmute(data); - return (UnsafeAtomicRcBox { data: ptr }, - UnsafeAtomicRcBox { data: ptr }); + let ptr = new_inner(data, 2); + (UnsafeAtomicRcBox { data: ptr }, UnsafeAtomicRcBox { data: ptr }) + } + } + + /// As new(), but returns a vector of as many pre-cloned handles as requested. + pub fn newN(data: T, num_handles: uint) -> ~[UnsafeAtomicRcBox] { + unsafe { + if num_handles == 0 { + ~[] // need to free data here + } else { + let ptr = new_inner(data, num_handles); + vec::from_fn(num_handles, |_| UnsafeAtomicRcBox { data: ptr }) + } + } + } + + /// As newN(), but from an already-existing handle. Uses one xadd. + pub fn cloneN(self, num_handles: uint) -> ~[UnsafeAtomicRcBox] { + if num_handles == 0 { + ~[] // The "num_handles - 1" trick (below) fails in the 0 case. + } else { + unsafe { + let mut data: ~AtomicRcBoxData = cast::transmute(self.data); + // Minus one because we are recycling the given handle's refcount. + let old_count = data.count.fetch_add(num_handles - 1, Acquire); + // let old_count = data.count.fetch_add(num_handles, Acquire); + assert!(old_count >= 1); + let ptr = cast::transmute(data); + cast::forget(self); // Don't run the destructor on this handle. + vec::from_fn(num_handles, |_| UnsafeAtomicRcBox { data: ptr }) + } } } #[inline] - pub unsafe fn get(&self) -> *mut T - { - let mut data: ~AtomicRcBoxData = cast::transmute(self.data); - assert!(data.count.load(Acquire) > 0); // no barrier is really needed - let r: *mut T = data.data.get_mut_ref(); - cast::forget(data); - return r; + pub fn get(&self) -> *mut T { + unsafe { + let mut data: ~AtomicRcBoxData = cast::transmute(self.data); + // FIXME(#6598) Change Acquire to Relaxed. + assert!(data.count.load(Acquire) > 0); + let r: *mut T = data.data.get_mut_ref(); + cast::forget(data); + return r; + } } #[inline] - pub unsafe fn get_immut(&self) -> *T - { - let data: ~AtomicRcBoxData = cast::transmute(self.data); - assert!(data.count.load(Acquire) > 0); // no barrier is really needed - let r: *T = data.data.get_ref(); - cast::forget(data); - return r; + pub fn get_immut(&self) -> *T { + unsafe { + let data: ~AtomicRcBoxData = cast::transmute(self.data); + assert!(data.count.load(Acquire) > 0); // no barrier is really needed + let r: *T = data.data.get_ref(); + cast::forget(data); + return r; + } } /// Wait until all other handles are dropped, then retrieve the enclosed /// data. See extra::arc::Arc for specific semantics documentation. /// If called when the task is already unkillable, unwrap will unkillably /// block; otherwise, an unwrapping task can be killed by linked failure. - pub unsafe fn unwrap(self) -> T { + pub fn unwrap(self) -> T { let this = Cell::new(self); // argh do task::unkillable { - let mut this = this.take(); - let mut data: ~AtomicRcBoxData = cast::transmute(this.data); - // Set up the unwrap protocol. - let (p1,c1) = comm::oneshot(); // () - let (p2,c2) = comm::oneshot(); // bool - // Try to put our server end in the unwrapper slot. - // This needs no barrier -- it's protected by the release barrier on - // the xadd, and the acquire+release barrier in the destructor's xadd. - // FIXME(#6598) Change Acquire to Relaxed. - if data.unwrapper.fill(~(c1,p2), Acquire).is_none() { - // Got in. Tell this handle's destructor not to run (we are now it). - this.data = ptr::mut_null(); - // Drop our own reference. - let old_count = data.count.fetch_sub(1, Release); - assert!(old_count >= 1); - if old_count == 1 { - // We were the last owner. Can unwrap immediately. - // AtomicOption's destructor will free the server endpoint. - // FIXME(#3224): it should be like this - // let ~AtomicRcBoxData { data: user_data, _ } = data; - // user_data - data.data.take_unwrap() - } else { - // The *next* person who sees the refcount hit 0 will wake us. - let p1 = Cell::new(p1); // argh - // Unlike the above one, this cell is necessary. It will get - // taken either in the do block or in the finally block. - let c2_and_data = Cell::new((c2,data)); - do (|| { - do task::rekillable { p1.take().recv(); } - // Got here. Back in the 'unkillable' without getting killed. - let (c2, data) = c2_and_data.take(); - c2.send(true); + unsafe { + let mut this = this.take(); + let mut data: ~AtomicRcBoxData = cast::transmute(this.data); + // Set up the unwrap protocol. + let (p1,c1) = comm::oneshot(); // () + let (p2,c2) = comm::oneshot(); // bool + // Try to put our server end in the unwrapper slot. + // This needs no barrier -- it's protected by the release barrier on + // the xadd, and the acquire+release barrier in the destructor's xadd. + // FIXME(#6598) Change Acquire to Relaxed. + if data.unwrapper.fill(~(c1,p2), Acquire).is_none() { + // Got in. Tell this handle's destructor not to run (we are now it). + this.data = ptr::mut_null(); + // Drop our own reference. + let old_count = data.count.fetch_sub(1, Release); + assert!(old_count >= 1); + if old_count == 1 { + // We were the last owner. Can unwrap immediately. + // AtomicOption's destructor will free the server endpoint. // FIXME(#3224): it should be like this // let ~AtomicRcBoxData { data: user_data, _ } = data; // user_data - let mut data = data; data.data.take_unwrap() - }).finally { - if task::failing() { - // Killed during wait. Because this might happen while - // someone else still holds a reference, we can't free - // the data now; the "other" last refcount will free it. + } else { + // The *next* person who sees the refcount hit 0 will wake us. + let p1 = Cell::new(p1); // argh + // Unlike the above one, this cell is necessary. It will get + // taken either in the do block or in the finally block. + let c2_and_data = Cell::new((c2,data)); + do (|| { + do task::rekillable { p1.take().recv(); } + // Got here. Back in the 'unkillable' without getting killed. let (c2, data) = c2_and_data.take(); - c2.send(false); - cast::forget(data); - } else { - assert!(c2_and_data.is_empty()); + c2.send(true); + // FIXME(#3224): it should be like this + // let ~AtomicRcBoxData { data: user_data, _ } = data; + // user_data + let mut data = data; + data.data.take_unwrap() + }).finally { + if task::failing() { + // Killed during wait. Because this might happen while + // someone else still holds a reference, we can't free + // the data now; the "other" last refcount will free it. + let (c2, data) = c2_and_data.take(); + c2.send(false); + cast::forget(data); + } else { + assert!(c2_and_data.is_empty()); + } } } + } else { + // If 'put' returns the server end back to us, we were rejected; + // someone else was trying to unwrap. Avoid guaranteed deadlock. + cast::forget(data); + fail!("Another task is already unwrapping this Arc!"); } - } else { - // If 'put' returns the server end back to us, we were rejected; - // someone else was trying to unwrap. Avoid guaranteed deadlock. - cast::forget(data); - fail!("Another task is already unwrapping this Arc!"); } } } /// As unwrap above, but without blocking. Returns 'Left(self)' if this is /// not the last reference; 'Right(unwrapped_data)' if so. - pub unsafe fn try_unwrap(self) -> Either, T> { - let mut this = self; // FIXME(#4330) mutable self - let mut data: ~AtomicRcBoxData = cast::transmute(this.data); - // This can of course race with anybody else who has a handle, but in - // such a case, the returned count will always be at least 2. If we - // see 1, no race was possible. All that matters is 1 or not-1. - let count = data.count.load(Acquire); - assert!(count >= 1); - // The more interesting race is one with an unwrapper. They may have - // already dropped their count -- but if so, the unwrapper pointer - // will have been set first, which the barriers ensure we will see. - // (Note: using is_empty(), not take(), to not free the unwrapper.) - if count == 1 && data.unwrapper.is_empty(Acquire) { - // Tell this handle's destructor not to run (we are now it). - this.data = ptr::mut_null(); - // FIXME(#3224) as above - Right(data.data.take_unwrap()) - } else { - cast::forget(data); - Left(this) + pub fn try_unwrap(self) -> Either, T> { + unsafe { + let mut this = self; // FIXME(#4330) mutable self + let mut data: ~AtomicRcBoxData = cast::transmute(this.data); + // This can of course race with anybody else who has a handle, but in + // such a case, the returned count will always be at least 2. If we + // see 1, no race was possible. All that matters is 1 or not-1. + let count = data.count.load(Acquire); + assert!(count >= 1); + // The more interesting race is one with an unwrapper. They may have + // already dropped their count -- but if so, the unwrapper pointer + // will have been set first, which the barriers ensure we will see. + // (Note: using is_empty(), not take(), to not free the unwrapper.) + if count == 1 && data.unwrapper.is_empty(Acquire) { + // Tell this handle's destructor not to run (we are now it). + this.data = ptr::mut_null(); + // FIXME(#3224) as above + Right(data.data.take_unwrap()) + } else { + cast::forget(data); + Left(this) + } } } } @@ -342,7 +377,7 @@ impl Exclusive { pub fn unwrap(self) -> T { let Exclusive { x: x } = self; // Someday we might need to unkillably unwrap an Exclusive, but not today. - let inner = unsafe { x.unwrap() }; + let inner = x.unwrap(); let ExData { data: user_data, _ } = inner; // will destroy the LittleLock user_data } @@ -416,53 +451,71 @@ mod tests { } } + #[test] + fn arclike_newN() { + // Tests that the many-refcounts-at-once constructors don't leak. + let _ = UnsafeAtomicRcBox::new2(~~"hello"); + let x = UnsafeAtomicRcBox::newN(~~"hello", 0); + assert_eq!(x.len(), 0) + let x = UnsafeAtomicRcBox::newN(~~"hello", 1); + assert_eq!(x.len(), 1) + let x = UnsafeAtomicRcBox::newN(~~"hello", 10); + assert_eq!(x.len(), 10) + } + + #[test] + fn arclike_cloneN() { + // Tests that the many-refcounts-at-once special-clone doesn't leak. + let x = UnsafeAtomicRcBox::new(~~"hello"); + let x = x.cloneN(0); + assert_eq!(x.len(), 0); + let x = UnsafeAtomicRcBox::new(~~"hello"); + let x = x.cloneN(1); + assert_eq!(x.len(), 1); + let x = UnsafeAtomicRcBox::new(~~"hello"); + let x = x.cloneN(10); + assert_eq!(x.len(), 10); + } + #[test] fn arclike_unwrap_basic() { - unsafe { - let x = UnsafeAtomicRcBox::new(~~"hello"); - assert!(x.unwrap() == ~~"hello"); - } + let x = UnsafeAtomicRcBox::new(~~"hello"); + assert!(x.unwrap() == ~~"hello"); } #[test] fn arclike_try_unwrap() { - unsafe { - let x = UnsafeAtomicRcBox::new(~~"hello"); - assert!(x.try_unwrap().expect_right("try_unwrap failed") == ~~"hello"); - } + let x = UnsafeAtomicRcBox::new(~~"hello"); + assert!(x.try_unwrap().expect_right("try_unwrap failed") == ~~"hello"); } #[test] fn arclike_try_unwrap_fail() { - unsafe { - let x = UnsafeAtomicRcBox::new(~~"hello"); - let x2 = x.clone(); - let left_x = x.try_unwrap(); - assert!(left_x.is_left()); - util::ignore(left_x); - assert!(x2.try_unwrap().expect_right("try_unwrap none") == ~~"hello"); - } + let x = UnsafeAtomicRcBox::new(~~"hello"); + let x2 = x.clone(); + let left_x = x.try_unwrap(); + assert!(left_x.is_left()); + util::ignore(left_x); + assert!(x2.try_unwrap().expect_right("try_unwrap none") == ~~"hello"); } #[test] fn arclike_try_unwrap_unwrap_race() { // When an unwrap and a try_unwrap race, the unwrapper should always win. - unsafe { - let x = UnsafeAtomicRcBox::new(~~"hello"); - let x2 = Cell::new(x.clone()); - let (p,c) = comm::stream(); - do task::spawn { - c.send(()); - assert!(x2.take().unwrap() == ~~"hello"); - c.send(()); - } - p.recv(); - task::yield(); // Try to make the unwrapper get blocked first. - let left_x = x.try_unwrap(); - assert!(left_x.is_left()); - util::ignore(left_x); - p.recv(); + let x = UnsafeAtomicRcBox::new(~~"hello"); + let x2 = Cell::new(x.clone()); + let (p,c) = comm::stream(); + do task::spawn { + c.send(()); + assert!(x2.take().unwrap() == ~~"hello"); + c.send(()); } + p.recv(); + task::yield(); // Try to make the unwrapper get blocked first. + let left_x = x.try_unwrap(); + assert!(left_x.is_left()); + util::ignore(left_x); + p.recv(); } #[test]