Fallout from new thread API

This commit is contained in:
Aaron Turon 2014-12-06 18:34:37 -08:00
parent 14c1a103bc
commit 43ae4b3301
51 changed files with 323 additions and 439 deletions

View file

@ -32,7 +32,7 @@ use std::io;
use std::os;
use std::str;
use std::string::String;
use std::task;
use std::thread::Thread;
use std::time::Duration;
use test::MetricMap;
@ -445,9 +445,9 @@ fn run_debuginfo_gdb_test(config: &Config, props: &TestProps, testfile: &Path) {
loop {
//waiting 1 second for gdbserver start
timer::sleep(Duration::milliseconds(1000));
let result = task::try(move || {
let result = Thread::with_join(move || {
tcp::TcpStream::connect("127.0.0.1:5039").unwrap();
});
}).join();
if result.is_err() {
continue;
}

View file

@ -347,16 +347,16 @@ result with an `int` field (representing a successful result) or an `Err` result
(representing termination with an error).
```{rust}
# use std::task;
# use std::thread::Thread;
# fn some_condition() -> bool { false }
# fn calculate_result() -> int { 0 }
let result: Result<int, Box<std::any::Any + Send>> = task::try(move || {
let result: Result<int, Box<std::any::Any + Send>> = Thread::with_join(move || {
if some_condition() {
calculate_result()
} else {
panic!("oops!");
}
});
}).join();
assert!(result.is_err());
```

View file

@ -92,7 +92,7 @@ impl<'a, T, Sized? B> BorrowFrom<Cow<'a, T, B>> for B where B: ToOwned<T> {
/// Trait for moving into a `Cow`
pub trait IntoCow<'a, T, Sized? B> {
/// Moves `self` into `Cow`
/// Moves `serlf` into `Cow`
fn into_cow(self) -> Cow<'a, T, B>;
}

View file

@ -55,7 +55,7 @@ use rustc::DIAGNOSTICS;
use std::any::AnyRefExt;
use std::io;
use std::os;
use std::task::TaskBuilder;
use std::thread;
use rustc::session::early_error;
@ -475,18 +475,22 @@ pub fn monitor<F:FnOnce()+Send>(f: F) {
static STACK_SIZE: uint = 32000000; // 32MB
let (tx, rx) = channel();
let w = io::ChanWriter::new(tx);
let mut w = Some(io::ChanWriter::new(tx)); // option dance
let mut r = io::ChanReader::new(rx);
let mut task = TaskBuilder::new().named("rustc").stderr(box w);
let mut cfg = thread::cfg().name("rustc".to_string());
// FIXME: Hacks on hacks. If the env is trying to override the stack size
// then *don't* set it explicitly.
if os::getenv("RUST_MIN_STACK").is_none() {
task = task.stack_size(STACK_SIZE);
cfg = cfg.stack_size(STACK_SIZE);
}
match task.try(f) {
let f = proc() {
std::io::stdio::set_stderr(box w.take().unwrap());
f()
};
match cfg.with_join(f).join() {
Ok(()) => { /* fallthrough */ }
Err(value) => {
// Task panicked without emitting a fatal diagnostic
@ -540,4 +544,3 @@ pub fn main() {
let result = run(args);
std::os::set_exit_status(result);
}

View file

@ -30,7 +30,7 @@ use std::ptr;
use std::str;
use std::mem;
use std::sync::{Arc, Mutex};
use std::task::TaskBuilder;
use std::thread;
use libc::{c_uint, c_int, c_void};
#[deriving(Clone, PartialEq, PartialOrd, Ord, Eq)]
@ -896,7 +896,11 @@ fn run_work_multithreaded(sess: &Session,
let diag_emitter = diag_emitter.clone();
let remark = sess.opts.cg.remark.clone();
let future = TaskBuilder::new().named(format!("codegen-{}", i)).try_future(move |:| {
let (tx, rx) = channel();
let mut tx = Some(tx);
futures.push(rx);
thread::cfg().name(format!("codegen-{}", i)).spawn(move |:| {
let diag_handler = mk_handler(box diag_emitter);
// Must construct cgcx inside the proc because it has non-Send
@ -921,13 +925,14 @@ fn run_work_multithreaded(sess: &Session,
None => break,
}
}
tx.take().unwrap().send(());
});
futures.push(future);
}
let mut panicked = false;
for future in futures.into_iter() {
match future.into_inner() {
for rx in futures.into_iter() {
match rx.recv_opt() {
Ok(()) => {},
Err(_) => {
panicked = true;

View file

@ -342,10 +342,11 @@ fn rust_input(cratefile: &str, externs: core::Externs, matches: &getopts::Matche
let cr = Path::new(cratefile);
info!("starting to run rustc");
let (mut krate, analysis) = std::task::try(move |:| {
let (mut krate, analysis) = std::thread::Thread::with_join(move |:| {
let cr = cr;
core::run_core(libs, cfgs, externs, &cr, triple)
}).map_err(|_| "rustc failed").unwrap();
}).join().map_err(|_| "rustc failed").unwrap();
info!("finished with rustc");
let mut analysis = Some(analysis);
ANALYSISKEY.with(|s| {

View file

@ -16,6 +16,7 @@ use std::os;
use std::str;
use std::string::String;
use std::thunk::Thunk;
use std::thread::Thread;
use std::collections::{HashSet, HashMap};
use testing;
@ -143,7 +144,7 @@ fn runtest(test: &str, cratename: &str, libs: Vec<Path>, externs: core::Externs,
let w1 = io::ChanWriter::new(tx);
let w2 = w1.clone();
let old = io::stdio::set_stderr(box w1);
spawn(move |:| {
Thread::spawn(move |:| {
let mut p = io::ChanReader::new(rx);
let mut err = match old {
Some(old) => {

View file

@ -536,7 +536,7 @@ pub unsafe fn from_c_multistring<F>(buf: *const libc::c_char,
mod tests {
use prelude::*;
use ptr;
use task;
use thread::Thread;
use libc;
use super::*;
@ -637,7 +637,7 @@ mod tests {
#[test]
fn test_to_c_str_fail() {
assert!(task::try(move|| { "he\x00llo".to_c_str() }).is_err());
assert!(Thread::with_join(move|| { "he\x00llo".to_c_str() }).join().is_err());
}
#[test]

View file

@ -32,7 +32,7 @@ pub struct WaitToken {
no_send: NoSend,
}
fn token() -> (WaitToken, SignalToken) {
pub fn tokens() -> (WaitToken, SignalToken) {
let inner = Arc::new(Inner {
thread: Thread::current(),
woken: INIT_ATOMIC_BOOL,
@ -48,7 +48,7 @@ fn token() -> (WaitToken, SignalToken) {
}
impl SignalToken {
fn signal(&self) -> bool {
pub fn signal(&self) -> bool {
let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst);
if wake {
self.inner.thread.unpark();
@ -73,7 +73,7 @@ impl SignalToken {
}
impl WaitToken {
fn wait(self) {
pub fn wait(self) {
while !self.inner.woken.load(Ordering::SeqCst) {
Thread::park()
}

View file

@ -317,8 +317,10 @@ use core::kinds::marker;
use core::mem;
use core::cell::UnsafeCell;
pub use comm::select::{Select, Handle};
use comm::select::StartResult::*;
pub use self::select::{Select, Handle};
use self::select::StartResult;
use self::select::StartResult::*;
use self::blocking::SignalToken;
macro_rules! test {
{ fn $name:ident() $b:block $(#[$a:meta])*} => (
@ -330,7 +332,7 @@ macro_rules! test {
use comm::*;
use super::*;
use task;
use thread::Thread;
$(#[$a])* #[test] fn f() { $b }
}
@ -593,12 +595,12 @@ impl<T: Send> Sender<T> {
(a, ret)
}
oneshot::UpDisconnected => (a, Err(t)),
oneshot::UpWoke(task) => {
// This send cannot panic because the task is
oneshot::UpWoke(token) => {
// This send cannot panic because the thread is
// asleep (we're looking at it), so the receiver
// can't go away.
(*a.get()).send(t).ok().unwrap();
task.wake().map(|t| t.reawaken());
token.signal();
(a, Ok(()))
}
}
@ -937,7 +939,7 @@ impl<T: Send> select::Packet for Receiver<T> {
}
}
fn start_selection(&self, mut token: SignalToken) -> bool {
fn start_selection(&self, mut token: SignalToken) -> StartResult {
loop {
let (t, new_port) = match *unsafe { self.inner() } {
Oneshot(ref p) => {
@ -1240,11 +1242,11 @@ mod test {
test! { fn oneshot_single_thread_recv_chan_close() {
// Receiving on a closed chan will panic
let res = task::try(move|| {
let res = Thread::with_join(move|| {
let (tx, rx) = channel::<int>();
drop(tx);
rx.recv();
});
}).join();
// What is our res?
assert!(res.is_err());
} }
@ -1312,9 +1314,9 @@ mod test {
spawn(move|| {
drop(tx);
});
let res = task::try(move|| {
let res = Thread::with_join(move|| {
assert!(rx.recv() == box 10);
});
}).join();
assert!(res.is_err());
} }
@ -1334,19 +1336,19 @@ mod test {
spawn(move|| {
drop(rx);
});
let _ = task::try(move|| {
let _ = Thread::with_join(move|| {
tx.send(1);
});
}).join();
}
} }
test! { fn oneshot_multi_thread_recv_close_stress() {
for _ in range(0, stress_factor()) {
let (tx, rx) = channel::<int>();
spawn(move|| {
let res = task::try(move|| {
spawn(proc() {
let res = Thread::with_join(move|| {
rx.recv();
});
}).join();
assert!(res.is_err());
});
spawn(move|| {
@ -1495,7 +1497,7 @@ mod test {
tx2.send(());
});
// make sure the other task has gone to sleep
for _ in range(0u, 5000) { task::deschedule(); }
for _ in range(0u, 5000) { Thread::yield_now(); }
// upgrade to a shared chan and send a message
let t = tx.clone();
@ -1504,45 +1506,7 @@ mod test {
// wait for the child task to exit before we exit
rx2.recv();
} }
test! { fn sends_off_the_runtime() {
use rt::thread::Thread;
let (tx, rx) = channel();
let t = Thread::start(move|| {
for _ in range(0u, 1000) {
tx.send(());
}
});
for _ in range(0u, 1000) {
rx.recv();
}
t.join();
} }
test! { fn try_recvs_off_the_runtime() {
use rt::thread::Thread;
let (tx, rx) = channel();
let (cdone, pdone) = channel();
let t = Thread::start(move|| {
let mut hits = 0u;
while hits < 10 {
match rx.try_recv() {
Ok(()) => { hits += 1; }
Err(Empty) => { Thread::yield_now(); }
Err(Disconnected) => return,
}
}
cdone.send(());
});
for _ in range(0u, 10) {
tx.send(());
}
t.join();
pdone.recv();
} }
})
}
#[cfg(test)]
@ -1700,11 +1664,11 @@ mod sync_tests {
test! { fn oneshot_single_thread_recv_chan_close() {
// Receiving on a closed chan will panic
let res = task::try(move|| {
let res = Thread::with_join(move|| {
let (tx, rx) = sync_channel::<int>(0);
drop(tx);
rx.recv();
});
}).join();
// What is our res?
assert!(res.is_err());
} }
@ -1777,9 +1741,9 @@ mod sync_tests {
spawn(move|| {
drop(tx);
});
let res = task::try(move|| {
let res = Thread::with_join(move|| {
assert!(rx.recv() == box 10);
});
}).join();
assert!(res.is_err());
} }
@ -1799,19 +1763,19 @@ mod sync_tests {
spawn(move|| {
drop(rx);
});
let _ = task::try(move|| {
let _ = Thread::with_join(move || {
tx.send(1);
});
}).join();
}
} }
test! { fn oneshot_multi_thread_recv_close_stress() {
for _ in range(0, stress_factor()) {
let (tx, rx) = sync_channel::<int>(0);
spawn(move|| {
let res = task::try(move|| {
spawn(proc() {
let res = Thread::with_join(move|| {
rx.recv();
});
}).join();
assert!(res.is_err());
});
spawn(move|| {
@ -1960,7 +1924,7 @@ mod sync_tests {
tx2.send(());
});
// make sure the other task has gone to sleep
for _ in range(0u, 5000) { task::deschedule(); }
for _ in range(0u, 5000) { Thread::yield_now(); }
// upgrade to a shared chan and send a message
let t = tx.clone();
@ -1971,29 +1935,6 @@ mod sync_tests {
rx2.recv();
} }
test! { fn try_recvs_off_the_runtime() {
use rt::thread::Thread;
let (tx, rx) = sync_channel::<()>(0);
let (cdone, pdone) = channel();
let t = Thread::start(move|| {
let mut hits = 0u;
while hits < 10 {
match rx.try_recv() {
Ok(()) => { hits += 1; }
Err(Empty) => { Thread::yield_now(); }
Err(Disconnected) => return,
}
}
cdone.send(());
});
for _ in range(0u, 10) {
tx.send(());
}
t.join();
pdone.recv();
} }
test! { fn send_opt1() {
let (tx, rx) = sync_channel::<int>(0);
spawn(move|| { rx.recv(); });
@ -2052,7 +1993,7 @@ mod sync_tests {
test! { fn try_send4() {
let (tx, rx) = sync_channel::<int>(0);
spawn(move|| {
for _ in range(0u, 1000) { task::deschedule(); }
for _ in range(0u, 1000) { Thread::yield_now(); }
assert_eq!(tx.try_send(1), Ok(()));
});
assert_eq!(rx.recv(), 1);

View file

@ -39,9 +39,8 @@ use self::MyUpgrade::*;
use core::prelude::*;
use alloc::boxed::Box;
use comm::Receiver;
use comm::blocking::{mod, WaitToken, SignalToken};
use comm::blocking::{mod, SignalToken};
use core::mem;
use sync::atomic;
@ -143,7 +142,7 @@ impl<T: Send> Packet<T> {
// Attempt to not block the task (it's a little expensive). If it looks
// like we're not empty, then immediately go through to `try_recv`.
if self.state.load(atomic::SeqCst) == EMPTY {
let (wait_token, signal_token) = blocking::token();
let (wait_token, signal_token) = blocking::tokens();
let ptr = unsafe { signal_token.cast_to_uint() };
// race with senders to enter the blocking state
@ -332,7 +331,7 @@ impl<T: Send> Packet<T> {
// If we've got a blocked task, then use an atomic to gain ownership
// of it (may fail)
BLOCKED => self.state.compare_and_swap(BLOCKED, EMPTY, atomic::SeqCst)
ptr => self.state.compare_and_swap(ptr, EMPTY, atomic::SeqCst)
};
// Now that we've got ownership of our state, figure out what to do

View file

@ -54,7 +54,6 @@
use core::prelude::*;
use alloc::boxed::Box;
use core::cell::Cell;
use core::kinds::marker;
use core::mem;
@ -63,8 +62,6 @@ use core::uint;
use comm::Receiver;
use comm::blocking::{mod, SignalToken};
use self::StartResult::*;
/// The "receiver set" of the select interface. This structure is used to manage
/// a set of receivers which are being selected over.
pub struct Select {
@ -190,8 +187,8 @@ impl Select {
let (wait_token, signal_token) = blocking::tokens();
for (i, handle) in self.iter().enumerate() {
match (*handle).packet.start_selection(signal_token.clone()) {
Installed => {}
Abort => {
StartResult::Installed => {}
StartResult::Abort => {
// Go back and abort the already-begun selections
for handle in self.iter().take(i) {
(*handle).packet.abort_selection();
@ -417,10 +414,10 @@ mod test {
let (tx3, rx3) = channel::<int>();
spawn(move|| {
for _ in range(0u, 20) { task::deschedule(); }
for _ in range(0u, 20) { Thread::yield_now(); }
tx1.send(1);
rx3.recv();
for _ in range(0u, 20) { task::deschedule(); }
for _ in range(0u, 20) { Thread::yield_now(); }
});
select! {
@ -440,7 +437,7 @@ mod test {
let (tx3, rx3) = channel::<()>();
spawn(move|| {
for _ in range(0u, 20) { task::deschedule(); }
for _ in range(0u, 20) { Thread::yield_now(); }
tx1.send(1);
tx2.send(2);
rx3.recv();
@ -541,7 +538,7 @@ mod test {
tx3.send(());
});
for _ in range(0u, 1000) { task::deschedule(); }
for _ in range(0u, 1000) { Thread::yield_now(); }
drop(tx1.clone());
tx2.send(());
rx3.recv();
@ -644,7 +641,7 @@ mod test {
tx2.send(());
});
for _ in range(0u, 100) { task::deschedule() }
for _ in range(0u, 100) { Thread::yield_now() }
tx1.send(());
rx2.recv();
} }
@ -663,7 +660,7 @@ mod test {
tx2.send(());
});
for _ in range(0u, 100) { task::deschedule() }
for _ in range(0u, 100) { Thread::yield_now() }
tx1.send(());
rx2.recv();
} }
@ -681,7 +678,7 @@ mod test {
tx2.send(());
});
for _ in range(0u, 100) { task::deschedule() }
for _ in range(0u, 100) { Thread::yield_now() }
tx1.send(());
rx2.recv();
} }
@ -697,7 +694,7 @@ mod test {
test! { fn sync2() {
let (tx, rx) = sync_channel::<int>(0);
spawn(move|| {
for _ in range(0u, 100) { task::deschedule() }
for _ in range(0u, 100) { Thread::yield_now() }
tx.send(1);
});
select! {

View file

@ -22,7 +22,6 @@ pub use self::Failure::*;
use core::prelude::*;
use alloc::boxed::Box;
use core::cmp;
use core::int;
@ -31,6 +30,7 @@ use comm::mpsc_queue as mpsc;
use comm::blocking::{mod, SignalToken};
use comm::select::StartResult;
use comm::select::StartResult::*;
use thread::Thread;
const DISCONNECTED: int = int::MIN;
const FUDGE: int = 1024;

View file

@ -24,7 +24,6 @@ use self::Message::*;
use core::prelude::*;
use alloc::boxed::Box;
use core::cmp;
use core::int;
use thread::Thread;
@ -32,7 +31,7 @@ use thread::Thread;
use sync::atomic;
use comm::spsc_queue as spsc;
use comm::Receiver;
use comm::blocking::{mod, WaitToken, SignalToken};
use comm::blocking::{mod, SignalToken};
const DISCONNECTED: int = int::MIN;
#[cfg(test)]
@ -147,7 +146,7 @@ impl<T: Send> Packet<T> {
let ptr = self.to_wake.load(atomic::SeqCst);
self.to_wake.store(0, atomic::SeqCst);
assert!(ptr != 0);
unsafe { SignaToken::cast_from_uint(ptr) }
unsafe { SignalToken::cast_from_uint(ptr) }
}
// Decrements the count on the channel for a sleeper, returning the sleeper

View file

@ -38,10 +38,8 @@ use core::prelude::*;
pub use self::Failure::*;
use self::Blocker::*;
use alloc::boxed::Box;
use vec::Vec;
use core::mem;
use core::cell::UnsafeCell;
use sync::{atomic, Mutex, MutexGuard};
use comm::blocking::{mod, WaitToken, SignalToken};
@ -105,10 +103,10 @@ pub enum Failure {
/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
/// in the meantime. This re-locks the mutex upon returning.
fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>,
guard: MutexGuard<'b, State<T>>,
f: fn(BlockedTask) -> Blocker)
-> MutexGuard<'a, State<T>>
fn wait<'a, 'b, T: Send>(lock: &'a Mutex<State<T>>,
mut guard: MutexGuard<'b, State<T>>,
f: fn(SignalToken) -> Blocker)
-> MutexGuard<'a, State<T>>
{
let me: Box<Task> = Local::take();
me.deschedule(1, |task| {
@ -170,7 +168,7 @@ impl<T: Send> Packet<T> {
}
pub fn send(&self, t: T) -> Result<(), T> {
let guard = self.acquire_send_slot();
let mut guard = self.acquire_send_slot();
if guard.disconnected { return Err(t) }
guard.buf.enqueue(t);
@ -183,7 +181,7 @@ impl<T: Send> Packet<T> {
let mut canceled = false;
assert!(guard.canceled.is_none());
guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
let guard = wait(&self.lock, guard, BlockedSender);
let mut guard = wait(&self.lock, guard, BlockedSender);
if canceled {Err(guard.buf.dequeue())} else {Ok(())}
}
@ -198,7 +196,7 @@ impl<T: Send> Packet<T> {
}
pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
let guard = self.lock.lock();
let mut guard = self.lock.lock();
if guard.disconnected {
Err(super::RecvDisconnected(t))
} else if guard.buf.size() == guard.buf.cap() {
@ -235,13 +233,13 @@ impl<T: Send> Packet<T> {
// When reading this, remember that there can only ever be one receiver at
// time.
pub fn recv(&self) -> Result<T, ()> {
let guard = self.lock.lock();
let mut guard = self.lock.lock();
// Wait for the buffer to have something in it. No need for a while loop
// because we're the only receiver.
let mut waited = false;
if !guard.disconnected && guard.buf.size() == 0 {
wait(&mut guard.blocker, BlockedReceiver, &self.lock);
guard = wait(&self.lock, guard, BlockedReceiver);
waited = true;
}
if guard.disconnected && guard.buf.size() == 0 { return Err(()) }
@ -249,12 +247,12 @@ impl<T: Send> Packet<T> {
// Pick up the data, wake up our neighbors, and carry on
assert!(guard.buf.size() > 0);
let ret = guard.buf.dequeue();
self.wakeup_senders(waited, guard, state);
self.wakeup_senders(waited, guard);
return Ok(ret);
}
pub fn try_recv(&self) -> Result<T, Failure> {
let guard = self.lock();
let mut guard = self.lock.lock();
// Easy cases first
if guard.disconnected { return Err(Disconnected) }
@ -262,7 +260,7 @@ impl<T: Send> Packet<T> {
// Be sure to wake up neighbors
let ret = Ok(guard.buf.dequeue());
self.wakeup_senders(false, guard, state);
self.wakeup_senders(false, guard);
return ret;
}
@ -272,7 +270,7 @@ impl<T: Send> Packet<T> {
// * `waited` - flag if the receiver blocked to receive some data, or if it
// just picked up some data on the way out
// * `guard` - the lock guard that is held over this channel's lock
fn wakeup_senders(&self, waited: bool, guard: MutexGuard<State<T>>) {
fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<State<T>>) {
let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
// If this is a no-buffer channel (cap == 0), then if we didn't wait we
@ -311,7 +309,7 @@ impl<T: Send> Packet<T> {
}
// Not much to do other than wake up a receiver if one's there
let guard = self.lock();
let mut guard = self.lock.lock();
if guard.disconnected { return }
guard.disconnected = true;
match mem::replace(&mut guard.blocker, NoneBlocked) {
@ -322,7 +320,7 @@ impl<T: Send> Packet<T> {
}
pub fn drop_port(&self) {
let guard = self.lock();
let mut guard = self.lock.lock();
if guard.disconnected { return }
guard.disconnected = true;
@ -368,14 +366,14 @@ impl<T: Send> Packet<T> {
// If Ok, the value is whether this port has data, if Err, then the upgraded
// port needs to be checked instead of this one.
pub fn can_recv(&self) -> bool {
let guard = self.lock();
let guard = self.lock.lock();
guard.disconnected || guard.buf.size() > 0
}
// Attempts to start selection on this port. This can either succeed or fail
// because there is data waiting.
pub fn start_selection(&self, token: SignalToken) -> StartResult {
let guard = self.lock();
let mut guard = self.lock.lock();
if guard.disconnected || guard.buf.size() > 0 {
Abort
} else {
@ -393,7 +391,7 @@ impl<T: Send> Packet<T> {
//
// The return value indicates whether there's data on this port.
pub fn abort_selection(&self) -> bool {
let guard = self.lock();
let mut guard = self.lock.lock();
match mem::replace(&mut guard.blocker, NoneBlocked) {
NoneBlocked => true,
BlockedSender(token) => {
@ -409,7 +407,7 @@ impl<T: Send> Packet<T> {
impl<T: Send> Drop for Packet<T> {
fn drop(&mut self) {
assert_eq!(self.channels.load(atomic::SeqCst), 0);
let guard = self.lock();
let mut guard = self.lock.lock();
assert!(guard.queue.dequeue().is_none());
assert!(guard.canceled.is_none());
}

View file

@ -21,10 +21,10 @@ use option::Option::{Some, None};
use result::Result::Ok;
use rt::backtrace;
use rt::util::{Stderr, Stdio};
use rt::local::Local;
use rt::task::Task;
use str::Str;
use string::String;
use thread::Thread;
use sys_common::thread_info;
// Defined in this module instead of io::stdio so that the unwinding
thread_local! {
@ -52,63 +52,35 @@ pub fn on_fail(obj: &(Any+Send), file: &'static str, line: uint) {
}
};
let mut err = Stderr;
// It is assumed that all reasonable rust code will have a local task at
// all times. This means that this `exists` will return true almost all of
// the time. There are border cases, however, when the runtime has
// *almost* set up the local task, but hasn't quite gotten there yet. In
// order to get some better diagnostics, we print on panic and
// immediately abort the whole process if there is no local task
// available.
if !Local::exists(None::<Task>) {
let _ = writeln!(&mut err, "panicked at '{}', {}:{}", msg, file, line);
if backtrace::log_enabled() {
let _ = backtrace::write(&mut err);
} else {
let _ = writeln!(&mut err, "run with `RUST_BACKTRACE=1` to \
see a backtrace");
}
return
}
// Peel the name out of local task so we can print it. We've got to be sure
// that the local task is in TLS while we're printing as I/O may occur.
let (name, unwinding) = {
let mut t = Local::borrow(None::<Task>);
(t.name.take(), t.unwinder.unwinding())
};
{
let n = name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>");
let prev = LOCAL_STDERR.with(|s| s.borrow_mut().take());
match prev {
Some(mut stderr) => {
// FIXME: what to do when the task printing panics?
let _ = writeln!(stderr,
"task '{}' panicked at '{}', {}:{}\n",
n, msg, file, line);
if backtrace::log_enabled() {
let _ = backtrace::write(&mut *stderr);
}
let mut s = Some(stderr);
LOCAL_STDERR.with(|slot| {
*slot.borrow_mut() = s.take();
});
let thread = Thread::current();
let name = thread.name().unwrap_or("<unnamed>");
let prev = LOCAL_STDERR.with(|s| s.borrow_mut().take());
match prev {
Some(mut stderr) => {
// FIXME: what to do when the thread printing panics?
let _ = writeln!(stderr,
"thread '{}' panicked at '{}', {}:{}\n",
name, msg, file, line);
if backtrace::log_enabled() {
let _ = backtrace::write(&mut *stderr);
}
None => {
let _ = writeln!(&mut err, "task '{}' panicked at '{}', {}:{}",
n, msg, file, line);
if backtrace::log_enabled() {
let _ = backtrace::write(&mut err);
}
let mut s = Some(stderr);
LOCAL_STDERR.with(|slot| {
*slot.borrow_mut() = s.take();
});
}
None => {
let _ = writeln!(&mut err, "thread '{}' panicked at '{}', {}:{}",
name, msg, file, line);
if backtrace::log_enabled() {
let _ = backtrace::write(&mut err);
}
}
// If this is a double panic, make sure that we printed a backtrace
// for this panic.
if unwinding && !backtrace::log_enabled() {
let _ = backtrace::write(&mut err);
}
}
Local::borrow(None::<Task>).name = name;
// If this is a double panic, make sure that we printed a backtrace
// for this panic.
if thread_info::panicking() && !backtrace::log_enabled() {
let _ = backtrace::write(&mut err);
}
}

View file

@ -156,12 +156,12 @@ mod test {
use prelude::*;
use super::*;
use io;
use task;
use thread::Thread;
#[test]
fn test_rx_reader() {
let (tx, rx) = channel();
task::spawn(move|| {
Thread::spawn(move|| {
tx.send(vec![1u8, 2u8]);
tx.send(vec![]);
tx.send(vec![3u8, 4u8]);
@ -203,7 +203,7 @@ mod test {
#[test]
fn test_rx_buffer() {
let (tx, rx) = channel();
task::spawn(move|| {
Thread::spawn(move|| {
tx.send(b"he".to_vec());
tx.send(b"llo wo".to_vec());
tx.send(b"".to_vec());
@ -229,7 +229,11 @@ mod test {
writer.write_be_u32(42).unwrap();
let wanted = vec![0u8, 0u8, 0u8, 42u8];
<<<<<<< HEAD
let got = match task::try(move|| { rx.recv() }) {
=======
let got = match Thread::with_join(proc() { rx.recv() }).join() {
>>>>>>> Fallout from new thread API
Ok(got) => got,
Err(_) => panic!(),
};

View file

@ -549,7 +549,7 @@ mod tests {
Err(ref e) if e.kind == TimedOut => {}
Err(e) => panic!("error: {}", e),
}
::task::deschedule();
::thread::Thread::yield_now();
if i == 1000 { panic!("should have a pending connection") }
}
drop(l);

View file

@ -1155,7 +1155,7 @@ mod test {
Err(ref e) if e.kind == TimedOut => {}
Err(e) => panic!("error: {}", e),
}
::task::deschedule();
::thread::Thread::yield_now();
if i == 1000 { panic!("should have a pending connection") }
}
}
@ -1378,7 +1378,7 @@ mod test {
// Try to ensure that the reading clone is indeed reading
for _ in range(0i, 50) {
::task::deschedule();
::thread::Thread::yield_now();
}
// clone the handle again while it's reading, then let it finish the

View file

@ -30,6 +30,7 @@ use hash::Hash;
use std::hash::sip::SipState;
use io::pipe::{PipeStream, PipePair};
use path::BytesContainer;
use thread::Thread;
use sys;
use sys::fs::FileDesc;
@ -693,10 +694,12 @@ impl Process {
fn read(stream: Option<io::PipeStream>) -> Receiver<IoResult<Vec<u8>>> {
let (tx, rx) = channel();
match stream {
Some(stream) => spawn(move |:| {
let mut stream = stream;
tx.send(stream.read_to_end())
}),
Some(stream) => {
Thread::spawn(move |:| {
let mut stream = stream;
tx.send(stream.read_to_end())
});
}
None => tx.send(Ok(Vec::new()))
}
rx

View file

@ -41,9 +41,6 @@ use option::Option;
use option::Option::{Some, None};
use ops::{Deref, DerefMut, FnOnce};
use result::Result::{Ok, Err};
use rt;
use rt::local::Local;
use rt::task::Task;
use slice::SliceExt;
use str::StrPrelude;
use string::String;
@ -328,25 +325,17 @@ pub fn set_stderr(stderr: Box<Writer + Send>) -> Option<Box<Writer + Send>> {
// // io1 aliases io2
// })
// })
fn with_task_stdout<F>(f: F) where
F: FnOnce(&mut Writer) -> IoResult<()>,
{
let result = if Local::exists(None::<Task>) {
let mut my_stdout = LOCAL_STDOUT.with(|slot| {
slot.borrow_mut().take()
}).unwrap_or_else(|| {
box stdout() as Box<Writer + Send>
});
let result = f(&mut *my_stdout);
let mut var = Some(my_stdout);
LOCAL_STDOUT.with(|slot| {
*slot.borrow_mut() = var.take();
});
result
} else {
let mut io = rt::util::Stdout;
f(&mut io as &mut Writer)
};
fn with_task_stdout(f: |&mut Writer| -> IoResult<()>) {
let mut my_stdout = LOCAL_STDOUT.with(|slot| {
slot.borrow_mut().take()
}).unwrap_or_else(|| {
box stdout() as Box<Writer + Send>
});
let result = f(&mut *my_stdout);
let mut var = Some(my_stdout);
LOCAL_STDOUT.with(|slot| {
*slot.borrow_mut() = var.take();
});
match result {
Ok(()) => {}
Err(e) => panic!("failed printing to stdout: {}", e),

View file

@ -229,6 +229,8 @@ pub mod hash;
/* Threads and communication */
pub mod task;
#[allow(missing_docs)]
pub mod thread;
pub mod sync;
pub mod comm;

View file

@ -49,7 +49,7 @@ use ptr::RawPtr;
use ptr;
use result::Result;
use result::Result::{Err, Ok};
use slice::{AsSlice, SliceExt, PartialEqSliceExt};
use slice::{AsSlice, SliceExt};
use slice::CloneSliceExt;
use str::{Str, StrPrelude, StrAllocating};
use string::{String, ToString};

View file

@ -514,20 +514,20 @@ mod tests {
#[test]
fn test_null_byte() {
use task;
let result = task::try(move|| {
use thread::Thread;
let result = Thread::with_join(move|| {
Path::new(b"foo/bar\0")
});
}).join();
assert!(result.is_err());
let result = task::try(move|| {
let result = Thread::with_join(move|| {
Path::new("test").set_filename(b"f\0o")
});
}).join();
assert!(result.is_err());
let result = task::try(move|| {
let result = Thread::with_join(move|| {
Path::new("test").push(b"f\0o");
});
}).join();
assert!(result.is_err());
}

View file

@ -1298,20 +1298,20 @@ mod tests {
#[test]
fn test_null_byte() {
use task;
let result = task::try(move|| {
use thread::Thread;
let result = Thread::with_join(move|| {
Path::new(b"foo/bar\0")
});
}).join();
assert!(result.is_err());
let result = task::try(move|| {
let result = Thread::with_join(move|| {
Path::new("test").set_filename(b"f\0o")
});
}).join();
assert!(result.is_err());
let result = task::try(move|| {
let result = Thread::with_join(move|| {
Path::new("test").push(b"f\0o");
});
}).join();
assert!(result.is_err());
}

View file

@ -340,7 +340,7 @@ mod test {
use super::OsRng;
use rand::Rng;
use task;
use thread::Thread;
#[test]
fn test_os_rng() {
@ -360,25 +360,26 @@ mod test {
for _ in range(0u, 20) {
let (tx, rx) = channel();
txs.push(tx);
task::spawn(move|| {
Thread::spawn(move|| {
// wait until all the tasks are ready to go.
rx.recv();
// deschedule to attempt to interleave things as much
// as possible (XXX: is this a good test?)
let mut r = OsRng::new().unwrap();
task::deschedule();
Thread::yield_now();
let mut v = [0u8, .. 1000];
for _ in range(0u, 100) {
r.next_u32();
task::deschedule();
Thread::yield_now();
r.next_u64();
task::deschedule();
Thread::yield_now();
r.fill_bytes(&mut v);
task::deschedule();
Thread::yield_now();
}
})
});
}
// start all the tasks

View file

@ -14,7 +14,6 @@
use core::prelude::*;
use libc;
use boxed::Box;
use vec::Vec;
use sync::{Mutex, atomic, Once, ONCE_INIT};
@ -25,31 +24,30 @@ type Queue = Mutex<Vec<Thunk>>;
static INIT: Once = ONCE_INIT;
static QUEUE: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT;
static RUNNING: atomic::AtomicBool = atomic::INIT_ATOMIC_BOOL;
fn init() {
let state: Box<Queue> = box Mutex::new(Vec::new());
unsafe {
QUEUE.store(mem::transmute(state), atomic::SeqCst);
libc::atexit(run);
// FIXME: switch this to use atexit as below. Currently this
// segfaults (the queue's memory is mysteriously gone), so
// instead the cleanup is tied to the `std::rt` entry point.
//
// ::libc::atexit(cleanup);
}
}
// Note: this is private and so can only be called via atexit above,
// which guarantees initialization.
extern fn run() {
let cur = unsafe {
rtassert!(!RUNNING.load(atomic::SeqCst));
pub fn cleanup() {
unsafe {
let queue = QUEUE.swap(0, atomic::SeqCst);
rtassert!(queue != 0);
let queue: Box<Queue> = mem::transmute(queue);
let v = mem::replace(&mut *queue.lock(), Vec::new());
v
};
for to_run in cur.into_iter() {
to_run.invoke(());
if queue != 0 {
let queue: Box<Queue> = mem::transmute(queue);
let v = mem::replace(&mut *queue.lock(), Vec::new());
for to_run in v.into_iter() {
to_run.invoke();
}
}
}
}
@ -60,7 +58,6 @@ pub fn push(f: Thunk) {
// all with respect to `run`, meaning that this could theoretically be a
// use-after-free. There's not much we can do to protect against that,
// however. Let's just assume a well-behaved runtime and go from there!
rtassert!(!RUNNING.load(atomic::SeqCst));
let queue = QUEUE.load(atomic::SeqCst);
rtassert!(queue != 0);
(*(queue as *const Queue)).lock().push(f);

View file

@ -12,12 +12,8 @@
#![allow(non_camel_case_types)]
use io::{IoResult, Writer};
use iter::{Iterator, IteratorExt};
use option::Option::{Some, None};
use os;
use result::Result::{Ok, Err};
use str::{StrPrelude, from_str};
use sync::atomic;
pub use sys::backtrace::write;

View file

@ -48,14 +48,14 @@
#![allow(dead_code)]
use borrow::IntoCow;
use failure;
use os;
use thunk::Thunk;
use kinds::Send;
use thread::Thread;
use sys;
use sys_common;
use sys_common::thread::{mod, NewThread};
use sys_common::thread_info::{mod, NewThread};
// Reexport some of our utilities which are expected by other crates.
pub use self::util::{default_sched_threads, min_stack, running_on_valgrind};
@ -87,10 +87,9 @@ pub const DEFAULT_ERROR_CODE: int = 101;
/// Initializes global state, including frobbing
/// the crate's logging flags, registering GC
/// metadata, and storing the process arguments.
// FIXME: this should be unsafe
#[allow(experimental)]
pub fn init(argc: int, argv: *const *const u8) {
// FIXME: Derefing these pointers is not safe.
// Need to propagate the unsafety to `start`.
unsafe {
args::init(argc, argv);
thread::init();
@ -122,8 +121,6 @@ fn lang_start(main: *const u8, argc: int, argv: *const *const u8) -> int {
pub fn start(argc: int, argv: *const *const u8, main: Thunk) -> int {
use prelude::*;
use rt;
use rt::task::Task;
use str;
let something_around_the_top_of_the_stack = 1;
let addr = &something_around_the_top_of_the_stack as *const int;
@ -153,18 +150,19 @@ pub fn start(argc: int, argv: *const *const u8, main: Thunk) -> int {
init(argc, argv);
let mut exit_code = None;
let thread: std::Thread = NewThread::new(Some("<main>".into_string()));
let thread: Thread = NewThread::new(Some("<main>".into_string()));
thread_info::set((my_stack_bottom, my_stack_top),
unsafe { sys::thread::guard::main() },
thread);
unwind::try(|| {
unsafe {
let mut main_opt = Some(main); // option dance
unsafe {
let _ = unwind::try(|| {
sys_common::stack::record_os_managed_stack_bounds(my_stack_bottom, my_stack_top);
}
(main.take().unwrap()).invoke(());
exit_code = Some(os::get_exit_status());
});
unsafe { cleanup(); }
(main_opt.take().unwrap()).invoke();
exit_code = Some(os::get_exit_status());
});
cleanup();
}
// If the exit code wasn't set, then the task block must have panicked.
return exit_code.unwrap_or(rt::DEFAULT_ERROR_CODE);
}
@ -197,14 +195,6 @@ pub fn at_exit(f: proc():Send) {
/// undefined behavior.
pub unsafe fn cleanup() {
args::cleanup();
thread::cleanup();
}
// FIXME: these probably shouldn't be public...
#[doc(hidden)]
pub mod shouldnt_be_public {
#[cfg(not(test))]
pub use super::local_ptr::native::maybe_tls_key;
#[cfg(all(not(windows), not(target_os = "android"), not(target_os = "ios")))]
pub use super::local_ptr::compiled::RT_TLS_PTR;
sys::stack_overflow::cleanup();
at_exit_imp::cleanup();
}

View file

@ -565,7 +565,7 @@ fn begin_unwind_inner(msg: Box<Any + Send>, file_line: &(&'static str, uint)) ->
// Now that we've run all the necessary unwind callbacks, we actually
// perform the unwinding.
if thread_info::unwinding() {
if thread_info::panicking() {
// If a thread panics while it's already unwinding then we
// have limited options. Currently our preference is to
// just abort. In the future we may consider resuming

View file

@ -196,8 +196,7 @@ memory and partly incapable of presentation to others.",
}
pub unsafe fn report_overflow() {
use rt::task::Task;
use rt::local::Local;
use thread::Thread;
// See the message below for why this is not emitted to the
// ^ Where did the message below go?
@ -206,11 +205,6 @@ pub unsafe fn report_overflow() {
// call would happen to initialized it (calling out to libuv),
// and the FFI call needs 2MB of stack when we just ran out.
let task: Option<*mut Task> = Local::try_unsafe_borrow();
let name = task.and_then(|task| {
(*task).name.as_ref().map(|n| n.as_slice())
});
rterrln!("\ntask '{}' has overflowed its stack", name.unwrap_or("<unknown>"));
rterrln!("\nthread '{}' has overflowed its stack",
Thread::current().name().unwrap_or("<unknown>"));
}

View file

@ -29,8 +29,8 @@ use core::mem::replace;
use self::FutureState::*;
use comm::{Receiver, channel};
use task::spawn;
use thunk::{Thunk};
use thread::Thread;
/// A type encapsulating the result of a computation which may not be complete
pub struct Future<A> {
@ -139,7 +139,7 @@ impl<A:Send> Future<A> {
let (tx, rx) = channel();
spawn(move |:| {
Thread::spawn(move |:| {
// Don't panic if the other end has hung up
let _ = tx.send_opt(blk());
});

View file

@ -274,7 +274,7 @@ impl Drop for StaticMutexGuard {
mod test {
use prelude::*;
use task;
use thread::Thread;
use sync::{Arc, Mutex, StaticMutex, MUTEX_INIT, Condvar};
#[test]
@ -386,10 +386,10 @@ mod test {
fn test_mutex_arc_poison() {
let arc = Arc::new(Mutex::new(1i));
let arc2 = arc.clone();
let _ = task::try(move|| {
let _ = Thread::with_join(move|| {
let lock = arc2.lock();
assert_eq!(*lock, 2);
});
}).join();
let lock = arc.lock();
assert_eq!(*lock, 1);
}
@ -414,7 +414,7 @@ mod test {
fn test_mutex_arc_access_in_unwind() {
let arc = Arc::new(Mutex::new(1i));
let arc2 = arc.clone();
let _ = task::try(move|| -> () {
let _ = Thread::with_join::<()>(move|| -> () {
struct Unwinder {
i: Arc<Mutex<int>>,
}
@ -425,7 +425,7 @@ mod test {
}
let _u = Unwinder { i: arc2 };
panic!();
});
}).join();
let lock = arc.lock();
assert_eq!(*lock, 2);
}

View file

@ -121,7 +121,7 @@ impl Once {
mod test {
use prelude::*;
use task;
use thread::Thread;
use super::{ONCE_INIT, Once};
#[test]
@ -143,7 +143,7 @@ mod test {
for _ in range(0u, 10) {
let tx = tx.clone();
spawn(move|| {
for _ in range(0u, 4) { task::deschedule() }
for _ in range(0u, 4) { Thread::yield_now() }
unsafe {
O.doit(|| {
assert!(!run);

View file

@ -8,21 +8,19 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::Option::None;
use rustrt::task::Task;
use rustrt::local::Local;
use thread::Thread;
pub struct Flag { pub failed: bool }
impl Flag {
pub fn borrow(&mut self) -> Guard {
Guard { flag: &mut self.failed, failing: failing() }
Guard { flag: &mut self.failed, panicking: Thread::panicking() }
}
}
pub struct Guard<'a> {
flag: &'a mut bool,
failing: bool,
panicking: bool,
}
impl<'a> Guard<'a> {
@ -33,16 +31,8 @@ impl<'a> Guard<'a> {
}
pub fn done(&mut self) {
if !self.failing && failing() {
if !self.panicking && Thread::panicking() {
*self.flag = true;
}
}
}
fn failing() -> bool {
if Local::exists(None::<Task>) {
Local::borrow(None::<Task>).unwinder.unwinding()
} else {
false
}
}

View file

@ -356,7 +356,7 @@ mod tests {
use prelude::*;
use rand::{mod, Rng};
use task;
use thread::Thread;
use sync::{Arc, RWLock, StaticRWLock, RWLOCK_INIT};
#[test]
@ -409,10 +409,10 @@ mod tests {
fn test_rw_arc_poison_wr() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(move|| {
let _ = Thread::with_join(move|| {
let lock = arc2.write();
assert_eq!(*lock, 2);
});
}).join();
let lock = arc.read();
assert_eq!(*lock, 1);
}
@ -422,10 +422,10 @@ mod tests {
fn test_rw_arc_poison_ww() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(move|| {
let _ = Thread::with_join(move|| {
let lock = arc2.write();
assert_eq!(*lock, 2);
});
}).join();
let lock = arc.write();
assert_eq!(*lock, 1);
}
@ -434,10 +434,10 @@ mod tests {
fn test_rw_arc_no_poison_rr() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(move|| {
let _ = Thread::with_join(move|| {
let lock = arc2.read();
assert_eq!(*lock, 2);
});
}).join();
let lock = arc.read();
assert_eq!(*lock, 1);
}
@ -445,10 +445,10 @@ mod tests {
fn test_rw_arc_no_poison_rw() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(move|| {
let _ = Thread::with_join(move|| {
let lock = arc2.read();
assert_eq!(*lock, 2);
});
}).join();
let lock = arc.write();
assert_eq!(*lock, 1);
}
@ -459,12 +459,12 @@ mod tests {
let arc2 = arc.clone();
let (tx, rx) = channel();
task::spawn(move|| {
Thread::spawn(move|| {
let mut lock = arc2.write();
for _ in range(0u, 10) {
let tmp = *lock;
*lock = -1;
task::deschedule();
Thread::yield_now();
*lock = tmp + 1;
}
tx.send(());
@ -474,15 +474,15 @@ mod tests {
let mut children = Vec::new();
for _ in range(0u, 5) {
let arc3 = arc.clone();
children.push(task::try_future(move|| {
children.push(Thread::with_join(move|| {
let lock = arc3.read();
assert!(*lock >= 0);
}));
}
// Wait for children to pass their asserts
for r in children.iter_mut() {
assert!(r.get_ref().is_ok());
for r in children.into_iter() {
assert!(r.join().is_ok());
}
// Wait for writer to finish
@ -495,7 +495,11 @@ mod tests {
fn test_rw_arc_access_in_unwind() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
<<<<<<< HEAD
let _ = task::try(move|| -> () {
=======
let _ = Thread::with_join::<()>(proc() {
>>>>>>> Fallout from new thread API
struct Unwinder {
i: Arc<RWLock<int>>,
}
@ -507,7 +511,7 @@ mod tests {
}
let _u = Unwinder { i: arc2 };
panic!();
});
}).join();
let lock = arc.read();
assert_eq!(*lock, 2);
}

View file

@ -12,7 +12,7 @@
use core::prelude::*;
use task::{spawn};
use thread::Thread;
use comm::{channel, Sender, Receiver};
use sync::{Arc, Mutex};
use thunk::Thunk;
@ -105,7 +105,7 @@ impl TaskPool {
}
fn spawn_in_pool(jobs: Arc<Mutex<Receiver<Thunk>>>) {
spawn(move |:| {
Thread::spawn(move |:| {
// Will spawn a new task on panic unless it is cancelled.
let sentinel = Sentinel::new(&jobs);
@ -126,7 +126,7 @@ fn spawn_in_pool(jobs: Arc<Mutex<Receiver<Thunk>>>) {
}
sentinel.cancel();
})
});
}
#[cfg(test)]
@ -206,4 +206,3 @@ mod test {
waiter.wait();
}
}

View file

@ -9,7 +9,7 @@
// except according to those terms.
use io::{IoResult, Writer};
use iter::Iterator;
use iter::{Iterator, IteratorExt};
use option::{Some, None};
use result::{Ok, Err};
use str::{StrPrelude, from_str};

View file

@ -28,7 +28,7 @@ use sync::{StaticMutex, StaticCondvar};
use rt;
use sys::helper_signal;
use task;
use thread::Thread;
/// A structure for management of a helper thread.
///
@ -82,7 +82,11 @@ impl<M: Send> Helper<M> {
*self.signal.get() = send as uint;
let t = f();
<<<<<<< HEAD
task::spawn(move |:| {
=======
Thread::spawn(proc() {
>>>>>>> Fallout from new thread API
helper(receive, rx, t);
let _g = self.lock.lock();
*self.shutdown.get() = true;

View file

@ -27,6 +27,7 @@ pub mod net;
pub mod rwlock;
pub mod stack;
pub mod thread;
pub mod thread_info;
pub mod thread_local;
// common error constructors

View file

@ -33,11 +33,11 @@ impl ThreadInfo {
*c.borrow_mut() = Some(ThreadInfo {
stack_bounds: (0, 0),
stack_guard: 0,
unwinder: false,
thread: Thread::new(None),
unwinding: false,
thread: NewThread::new(None),
})
}
f(c.borrow_mut().as_ref().unwrap())
f(c.borrow_mut().as_mut().unwrap())
})
}
}
@ -47,28 +47,25 @@ pub fn current_thread() -> Thread {
}
pub fn panicking() -> bool {
ThreadInfo::with(|info| info.unwinder.unwinding())
ThreadInfo::with(|info| info.unwinding)
}
pub fn stack_guard() -> uint {
ThreadInfo::with(|info| info.stack_guard)
}
pub fn unwinding() -> bool {
ThreadInfo::with(|info| info.unwinder.unwinding)
}
pub fn set_unwinding(unwinding: bool) {
ThreadInfo::with(|info| info.unwinding = unwinding)
}
pub fn set(stack_bounds: (uint, uint), stack_guard: uint, thread: Thread) {
THREAD_INFO.with(|c| assert!(c.borrow().is_none()));
let mut thread_opt = Some(thread); // option dance
THREAD_INFO.with(|c| *c.borrow_mut() = Some(ThreadInfo{
stack_bounds: stack_bounds,
stack_guard: stack_guard,
unwinding: false,
thread: thread,
thread: thread_opt.take().unwrap(),
}));
}

View file

@ -58,7 +58,6 @@
use prelude::*;
use rt;
use sync::atomic::{mod, AtomicUint};
use sync::{Mutex, Once, ONCE_INIT};

View file

@ -141,7 +141,7 @@ pub fn write(w: &mut Writer) -> IoResult<()> {
struct Context<'a> {
idx: int,
writer: &'a mut Writer+'a,
writer: &'a mut (Writer+'a),
last_error: Option<IoError>,
}

View file

@ -45,8 +45,6 @@ mod imp {
use self::signal::{siginfo, sigaction, SIGBUS, SIG_DFL,
SA_SIGINFO, SA_ONSTACK, sigaltstack,
SIGSTKSZ};
use rt::local::Local;
use rt::task::Task;
use libc;
use libc::funcs::posix88::mman::{mmap, munmap};
use libc::consts::os::posix88::{SIGSEGV,
@ -56,20 +54,12 @@ mod imp {
MAP_ANON,
MAP_FAILED};
use sys_common::thread_info;
// This is initialized in init() and only read from after
static mut PAGE_SIZE: uint = 0;
// get_task_info is called from an exception / signal handler.
// It returns the guard page of the current task or 0 if that
// guard page doesn't exist. None is returned if there's currently
// no local task.
unsafe fn get_task_guard_page() -> Option<uint> {
let task: Option<*mut Task> = Local::try_unsafe_borrow();
task.map(|task| (&*task).stack_guard().unwrap_or(0))
}
#[no_stack_check]
unsafe extern fn signal_handler(signum: libc::c_int,
info: *mut siginfo,
@ -89,20 +79,16 @@ mod imp {
// We're calling into functions with stack checks
stack::record_sp_limit(0);
match get_task_guard_page() {
Some(guard) => {
let addr = (*info).si_addr as uint;
let guard = thread_info::stack_guard();
let addr = (*info).si_addr as uint;
if guard == 0 || addr < guard - PAGE_SIZE || addr >= guard {
term(signum);
}
report_overflow();
intrinsics::abort()
}
None => term(signum)
if guard == 0 || addr < guard - PAGE_SIZE || addr >= guard {
term(signum);
}
report_overflow();
intrinsics::abort()
}
static mut MAIN_ALTSTACK: *mut libc::c_void = 0 as *mut libc::c_void;

View file

@ -8,15 +8,13 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use rt::local::Local;
use rt::task::Task;
use rt::util::report_overflow;
use core::prelude::*;
use ptr;
use mem;
use libc;
use libc::types::os::arch::extra::{LPVOID, DWORD, LONG, BOOL};
use sys_common::stack;
use sys_common::{stack, thread_info};
pub struct Handler {
_data: *mut libc::c_void
@ -37,8 +35,7 @@ impl Drop for Handler {
// guard page doesn't exist. None is returned if there's currently
// no local task.
unsafe fn get_task_guard_page() -> Option<uint> {
let task: Option<*mut Task> = Local::try_unsafe_borrow();
task.map(|task| (&*task).stack_guard().unwrap_or(0))
thread_info::stack_guard()
}
// This is initialized in init() and only read from after

View file

@ -12,8 +12,11 @@
#![deprecated = "use std::thread instead"]
use any::Any;
use boxed::Box;
use thread;
use kinds::Send;
use result::Result;
/// Deprecate: use `std::thread::Cfg` instead.
#[deprecated = "use std::thread::Cfg instead"]
@ -24,3 +27,15 @@ pub type TaskBuilder = thread::Cfg;
pub fn spawn(f: proc(): Send) {
thread::Thread::spawn(f);
}
/// Deprecated: use `std::thread::Thread::with_join instead`.
#[deprecated = "use std::thread::Thread::with_join instead"]
pub fn try<T: Send>(f: proc(): Send -> T) -> Result<T, Box<Any + Send>> {
thread::Thread::with_join(f).join()
}
/// Deprecated: use `std::thread::Thread::yield_now instead`.
#[deprecated = "use std::thread::Thread::yield_now instead"]
pub fn deschedule() {
thread::Thread::yield_now()
}

View file

@ -231,7 +231,7 @@ impl Cfg {
}
thread_info::set(
(my_stack_bottom, my_stack_top),
thread::current_guard_page(),
unsafe { imp::guard::current() },
their_thread
);
@ -261,7 +261,7 @@ impl Cfg {
}
}
};
(unsafe { imp::create(stack, box main) }, my_thread)
(unsafe { imp::create(stack_size, box main) }, my_thread)
}
/// Spawn a detached thread, and return a handle to it.
@ -278,19 +278,20 @@ impl Cfg {
// We need the address of the packet to fill in to be stable so when
// `main` fills it in it's still valid, so allocate an extra box to do
// so.
let my_packet = box Err(box 0); // sentinel value
let any: Box<Any+Send> = box 0u8; // sentinel value
let my_packet = box Err(any);
let their_packet: *mut Result<T> = unsafe {
*mem::transmute::<&Box<Result<T>>, *const *mut Result<T>>(&my_packet)
};
let (native, thread) = self.core_spawn(f, proc(result) {
*their_packet = result;
unsafe { *their_packet = result; }
});
JoinGuard {
native: native,
joined: false,
packet: my_packet,
packet: Some(my_packet),
thread: thread,
}
}
@ -336,7 +337,7 @@ impl Thread {
/// Gets a handle to the thread that invokes it.
pub fn current() -> Thread {
ThreadInfo::current_thread()
thread_info::current_thread()
}
/// Cooperatively give up a timeslice to the OS scheduler.
@ -346,7 +347,7 @@ impl Thread {
/// Determines whether the current thread is panicking.
pub fn panicking() -> bool {
ThreadInfo::panicking()
thread_info::panicking()
}
// http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp
@ -355,9 +356,9 @@ impl Thread {
/// See the module doc for more detail.
pub fn park() {
let thread = Thread::current();
let guard = thread.inner.lock.lock();
let mut guard = thread.inner.lock.lock();
while !*guard {
thread.inner.cvar.wait(guard);
thread.inner.cvar.wait(&guard);
}
*guard = false;
}
@ -366,7 +367,7 @@ impl Thread {
///
/// See the module doc for more detail.
pub fn unpark(&self) {
let guard = self.inner.lock();
let mut guard = self.inner.lock.lock();
if !*guard {
*guard = true;
self.inner.cvar.notify_one();
@ -375,7 +376,7 @@ impl Thread {
/// Get the thread's name.
pub fn name(&self) -> Option<&str> {
self.inner.name.as_ref()
self.inner.name.as_ref().map(|s| s.as_slice())
}
}
@ -387,7 +388,7 @@ impl thread_info::NewThread for Thread {
/// Indicates the manner in which a thread exited.
///
/// A thread that completes without panicking is considered to exit successfully.
pub type Result<T> = result::Result<T, Box<Any + Send>>;
pub type Result<T> = ::result::Result<T, Box<Any + Send>>;
#[must_use]
/// An RAII guard that will block until thread termination when dropped.
@ -395,7 +396,7 @@ pub struct JoinGuard<T> {
native: imp::rust_thread,
thread: Thread,
joined: bool,
packet: Box<Result<T>>,
packet: Option<Box<Result<T>>>,
}
impl<T: Send> JoinGuard<T> {

View file

@ -446,7 +446,7 @@ mod tests {
use prelude::*;
use cell::UnsafeCell;
use rt::thread::Thread;
use thread::Thread;
struct Foo(Sender<()>);
@ -534,7 +534,7 @@ mod tests {
}
}
Thread::start(move|| {
Thread::with_join(move|| {
drop(S1);
}).join();
}
@ -552,7 +552,7 @@ mod tests {
}
}
Thread::start(move|| unsafe {
Thread::with_join(move|| unsafe {
K1.with(|s| *s.get() = Some(S1));
}).join();
}

View file

@ -69,7 +69,7 @@ use std::num::{Float, FloatMath, Int};
use std::os;
use std::str::FromStr;
use std::string::String;
use std::task::TaskBuilder;
use std::thread::{mod, Thread};
use std::time::Duration;
use std::thunk::{Thunk, Invoke};
@ -1121,28 +1121,27 @@ pub fn run_test(opts: &TestOpts,
monitor_ch: Sender<MonitorMsg>,
nocapture: bool,
testfn: Thunk) {
spawn(move || {
Thread::spawn(move || {
let (tx, rx) = channel();
let mut reader = ChanReader::new(rx);
let stdout = ChanWriter::new(tx.clone());
let stderr = ChanWriter::new(tx);
let mut task = TaskBuilder::new().named(match desc.name {
let mut cfg = thread::cfg().name(match desc.name {
DynTestName(ref name) => name.clone().to_string(),
StaticTestName(name) => name.to_string(),
});
if nocapture {
drop((stdout, stderr));
} else {
task = task.stdout(box stdout as Box<Writer + Send>);
task = task.stderr(box stderr as Box<Writer + Send>);
cfg = cfg.stdout(box stdout as Box<Writer + Send>);
cfg = cfg.stderr(box stderr as Box<Writer + Send>);
}
let result_future = task.try_future(move || testfn.invoke(()));
let result_guard = cfg.with_join(testfn);
let stdout = reader.read_to_end().unwrap().into_iter().collect();
let task_result = result_future.into_inner();
let test_result = calc_result(&desc, task_result);
let test_result = calc_result(&desc, result_guard.join());
monitor_ch.send((desc.clone(), test_result, stdout));
})
});
}
match testfn {

View file

@ -24,7 +24,7 @@
// It's unclear how likely such a bug is to recur, but it seems like a
// scenario worth testing.
use std::task;
use std::thread::Thread;
enum Conzabble {
Bickwick(Foo)
@ -45,5 +45,5 @@ pub fn fails() {
}
pub fn main() {
task::try(fails);
Thread::with_join(fails).join();
}

View file

@ -9,16 +9,16 @@
// except according to those terms.
use std::io::{ChanReader, ChanWriter};
use std::task::TaskBuilder;
use std::thread;
fn main() {
let (tx, rx) = channel();
let mut reader = ChanReader::new(rx);
let stderr = ChanWriter::new(tx);
let res = TaskBuilder::new().stderr(box stderr as Box<Writer + Send>).try(move|| -> () {
let res = thread::cfg().stderr(box stderr as Box<Writer + Send>).with_join(move|| -> () {
panic!("Hello, world!")
});
}).join();
assert!(res.is_err());
let output = reader.read_to_string().unwrap();