auto merge of #8221 : brson/rust/single-threaded, r=graydon

This is the last major runtime feature needed for the transition to the new scheduler.
This commit is contained in:
bors 2013-08-02 10:52:50 -07:00
commit f1c1f92d0c
3 changed files with 107 additions and 16 deletions

View file

@ -39,14 +39,14 @@ use cell::Cell;
pub struct Scheduler {
/// A queue of available work. Under a work-stealing policy there
/// is one per Scheduler.
priv work_queue: WorkQueue<~Task>,
work_queue: WorkQueue<~Task>,
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
priv message_queue: MessageQueue<SchedMessage>,
/// A shared list of sleeping schedulers. We'll use this to wake
/// up schedulers when pushing work onto the work queue.
priv sleeper_list: SleeperList,
sleeper_list: SleeperList,
/// Indicates that we have previously pushed a handle onto the
/// SleeperList but have not yet received the Wake message.
/// Being `true` does not necessarily mean that the scheduler is
@ -158,6 +158,9 @@ impl Scheduler {
// scheduler. Grab it out of TLS - performing the scheduler
// action will have given it away.
let sched = Local::take::<Scheduler>();
rtdebug!("starting scheduler %u", sched.sched_id());
sched.run();
// Now that we are done with the scheduler, clean up the
@ -166,6 +169,9 @@ impl Scheduler {
// task.run() on the scheduler task we never get through all
// the cleanup code it runs.
let mut stask = Local::take::<Task>();
rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id());
stask.destroyed = true;
}
@ -484,7 +490,7 @@ impl Scheduler {
return None;
} else if !homed && !this.run_anything {
// the task isn't homed, but it can't be run here
this.enqueue_task(task);
this.send_to_friend(task);
return Some(this);
} else {
// task isn't home, so don't run it here, send it home

View file

@ -971,16 +971,29 @@ fn test_try_fail() {
}
}
#[cfg(test)]
fn get_sched_id() -> int {
if context() == OldTaskContext {
unsafe {
rt::rust_get_sched_id() as int
}
} else {
do Local::borrow::<::rt::sched::Scheduler, int> |sched| {
sched.sched_id() as int
}
}
}
#[test]
fn test_spawn_sched() {
let (po, ch) = stream::<()>();
let ch = SharedChan::new(ch);
fn f(i: int, ch: SharedChan<()>) {
let parent_sched_id = unsafe { rt::rust_get_sched_id() };
let parent_sched_id = get_sched_id();
do spawn_sched(SingleThreaded) {
let child_sched_id = unsafe { rt::rust_get_sched_id() };
let child_sched_id = get_sched_id();
assert!(parent_sched_id != child_sched_id);
if (i == 0) {
@ -1000,15 +1013,15 @@ fn test_spawn_sched_childs_on_default_sched() {
let (po, ch) = stream();
// Assuming tests run on the default scheduler
let default_id = unsafe { rt::rust_get_sched_id() };
let default_id = get_sched_id();
let ch = Cell::new(ch);
do spawn_sched(SingleThreaded) {
let parent_sched_id = unsafe { rt::rust_get_sched_id() };
let parent_sched_id = get_sched_id();
let ch = Cell::new(ch.take());
do spawn {
let ch = ch.take();
let child_sched_id = unsafe { rt::rust_get_sched_id() };
let child_sched_id = get_sched_id();
assert!(parent_sched_id != child_sched_id);
assert_eq!(child_sched_id, default_id);
ch.send(());

View file

@ -78,13 +78,13 @@ use cast::transmute;
use cast;
use cell::Cell;
use container::MutableMap;
use comm::{Chan, GenericChan};
use comm::{Chan, GenericChan, oneshot};
use hashmap::{HashSet, HashSetConsumeIterator};
use local_data;
use task::local_data_priv::{local_get, local_set, OldHandle};
use task::rt::rust_task;
use task::rt;
use task::{Failure};
use task::{Failure, SingleThreaded};
use task::{Success, TaskOpts, TaskResult};
use task::unkillable;
use to_bytes::IterBytes;
@ -93,9 +93,11 @@ use util;
use unstable::sync::Exclusive;
use rt::{OldTaskContext, TaskContext, SchedulerContext, GlobalContext, context};
use rt::local::Local;
use rt::task::Task;
use rt::task::{Task, Sched};
use rt::kill::KillHandle;
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use rt::thread::Thread;
#[cfg(test)] use task::default_task_opts;
#[cfg(test)] use comm;
@ -694,11 +696,81 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
}
};
let mut task = if opts.watched {
Task::build_child(child_wrapper)
} else {
// An unwatched task is a new root in the exit-code propagation tree
Task::build_root(child_wrapper)
let mut task = unsafe {
if opts.sched.mode != SingleThreaded {
if opts.watched {
Task::build_child(child_wrapper)
} else {
Task::build_root(child_wrapper)
}
} else {
// Creating a 1:1 task:thread ...
let sched = Local::unsafe_borrow::<Scheduler>();
let sched_handle = (*sched).make_handle();
// Create a new scheduler to hold the new task
let new_loop = ~UvEventLoop::new();
let mut new_sched = ~Scheduler::new_special(new_loop,
(*sched).work_queue.clone(),
(*sched).sleeper_list.clone(),
false,
Some(sched_handle));
let mut new_sched_handle = new_sched.make_handle();
// Allow the scheduler to exit when the pinned task exits
new_sched_handle.send(Shutdown);
// Pin the new task to the new scheduler
let new_task = if opts.watched {
Task::build_homed_child(child_wrapper, Sched(new_sched_handle))
} else {
Task::build_homed_root(child_wrapper, Sched(new_sched_handle))
};
// Create a task that will later be used to join with the new scheduler
// thread when it is ready to terminate
let (thread_port, thread_chan) = oneshot();
let thread_port_cell = Cell::new(thread_port);
let join_task = do Task::build_child() {
rtdebug!("running join task");
let thread_port = thread_port_cell.take();
let thread: Thread = thread_port.recv();
thread.join();
};
// Put the scheduler into another thread
let new_sched_cell = Cell::new(new_sched);
let orig_sched_handle_cell = Cell::new((*sched).make_handle());
let join_task_cell = Cell::new(join_task);
let thread = do Thread::start {
let mut new_sched = new_sched_cell.take();
let mut orig_sched_handle = orig_sched_handle_cell.take();
let join_task = join_task_cell.take();
let bootstrap_task = ~do Task::new_root(&mut new_sched.stack_pool) || {
rtdebug!("boostraping a 1:1 scheduler");
};
new_sched.bootstrap(bootstrap_task);
rtdebug!("enqueing join_task");
// Now tell the original scheduler to join with this thread
// by scheduling a thread-joining task on the original scheduler
orig_sched_handle.send(TaskFromFriend(join_task));
// NB: We can't simply send a message from here to another task
// because this code isn't running in a task and message passing doesn't
// work outside of tasks. Hence we're sending a scheduler message
// to execute a new task directly to a scheduler.
};
// Give the thread handle to the join task
thread_chan.send(thread);
// When this task is enqueued on the current scheduler it will then get
// forwarded to the scheduler to which it is pinned
new_task
}
};
if opts.notify_chan.is_some() {