From c3fa411459cdcc70c5893e44209320762cdc26d1 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 30 Jul 2013 19:02:21 -0700 Subject: [PATCH] std: Implement SingleThreaded spawn mode for newsched --- src/libstd/rt/sched.rs | 12 ++++-- src/libstd/task/mod.rs | 23 ++++++++--- src/libstd/task/spawn.rs | 88 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 107 insertions(+), 16 deletions(-) diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 4abe69a7d13..945a784aaf9 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -39,14 +39,14 @@ use cell::Cell; pub struct Scheduler { /// A queue of available work. Under a work-stealing policy there /// is one per Scheduler. - priv work_queue: WorkQueue<~Task>, + work_queue: WorkQueue<~Task>, /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. priv message_queue: MessageQueue, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. - priv sleeper_list: SleeperList, + sleeper_list: SleeperList, /// Indicates that we have previously pushed a handle onto the /// SleeperList but have not yet received the Wake message. /// Being `true` does not necessarily mean that the scheduler is @@ -158,6 +158,9 @@ impl Scheduler { // scheduler. Grab it out of TLS - performing the scheduler // action will have given it away. let sched = Local::take::(); + + rtdebug!("starting scheduler %u", sched.sched_id()); + sched.run(); // Now that we are done with the scheduler, clean up the @@ -166,6 +169,9 @@ impl Scheduler { // task.run() on the scheduler task we never get through all // the cleanup code it runs. let mut stask = Local::take::(); + + rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id()); + stask.destroyed = true; } @@ -484,7 +490,7 @@ impl Scheduler { return None; } else if !homed && !this.run_anything { // the task isn't homed, but it can't be run here - this.enqueue_task(task); + this.send_to_friend(task); return Some(this); } else { // task isn't home, so don't run it here, send it home diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 7a864ecb867..19acedb56dd 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -971,16 +971,29 @@ fn test_try_fail() { } } +#[cfg(test)] +fn get_sched_id() -> int { + if context() == OldTaskContext { + unsafe { + rt::rust_get_sched_id() as int + } + } else { + do Local::borrow::<::rt::sched::Scheduler, int> |sched| { + sched.sched_id() as int + } + } +} + #[test] fn test_spawn_sched() { let (po, ch) = stream::<()>(); let ch = SharedChan::new(ch); fn f(i: int, ch: SharedChan<()>) { - let parent_sched_id = unsafe { rt::rust_get_sched_id() }; + let parent_sched_id = get_sched_id(); do spawn_sched(SingleThreaded) { - let child_sched_id = unsafe { rt::rust_get_sched_id() }; + let child_sched_id = get_sched_id(); assert!(parent_sched_id != child_sched_id); if (i == 0) { @@ -1000,15 +1013,15 @@ fn test_spawn_sched_childs_on_default_sched() { let (po, ch) = stream(); // Assuming tests run on the default scheduler - let default_id = unsafe { rt::rust_get_sched_id() }; + let default_id = get_sched_id(); let ch = Cell::new(ch); do spawn_sched(SingleThreaded) { - let parent_sched_id = unsafe { rt::rust_get_sched_id() }; + let parent_sched_id = get_sched_id(); let ch = Cell::new(ch.take()); do spawn { let ch = ch.take(); - let child_sched_id = unsafe { rt::rust_get_sched_id() }; + let child_sched_id = get_sched_id(); assert!(parent_sched_id != child_sched_id); assert_eq!(child_sched_id, default_id); ch.send(()); diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 88f214ef4c0..e6f115958fd 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -78,13 +78,13 @@ use cast::transmute; use cast; use cell::Cell; use container::MutableMap; -use comm::{Chan, GenericChan}; +use comm::{Chan, GenericChan, oneshot}; use hashmap::{HashSet, HashSetConsumeIterator}; use local_data; use task::local_data_priv::{local_get, local_set, OldHandle}; use task::rt::rust_task; use task::rt; -use task::{Failure}; +use task::{Failure, SingleThreaded}; use task::{Success, TaskOpts, TaskResult}; use task::unkillable; use to_bytes::IterBytes; @@ -93,9 +93,11 @@ use util; use unstable::sync::Exclusive; use rt::{OldTaskContext, TaskContext, SchedulerContext, GlobalContext, context}; use rt::local::Local; -use rt::task::Task; +use rt::task::{Task, Sched}; use rt::kill::KillHandle; use rt::sched::Scheduler; +use rt::uv::uvio::UvEventLoop; +use rt::thread::Thread; #[cfg(test)] use task::default_task_opts; #[cfg(test)] use comm; @@ -694,11 +696,81 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { } }; - let mut task = if opts.watched { - Task::build_child(child_wrapper) - } else { - // An unwatched task is a new root in the exit-code propagation tree - Task::build_root(child_wrapper) + let mut task = unsafe { + if opts.sched.mode != SingleThreaded { + if opts.watched { + Task::build_child(child_wrapper) + } else { + Task::build_root(child_wrapper) + } + } else { + // Creating a 1:1 task:thread ... + let sched = Local::unsafe_borrow::(); + let sched_handle = (*sched).make_handle(); + + // Create a new scheduler to hold the new task + let new_loop = ~UvEventLoop::new(); + let mut new_sched = ~Scheduler::new_special(new_loop, + (*sched).work_queue.clone(), + (*sched).sleeper_list.clone(), + false, + Some(sched_handle)); + let mut new_sched_handle = new_sched.make_handle(); + + // Allow the scheduler to exit when the pinned task exits + new_sched_handle.send(Shutdown); + + // Pin the new task to the new scheduler + let new_task = if opts.watched { + Task::build_homed_child(child_wrapper, Sched(new_sched_handle)) + } else { + Task::build_homed_root(child_wrapper, Sched(new_sched_handle)) + }; + + // Create a task that will later be used to join with the new scheduler + // thread when it is ready to terminate + let (thread_port, thread_chan) = oneshot(); + let thread_port_cell = Cell::new(thread_port); + let join_task = do Task::build_child() { + rtdebug!("running join task"); + let thread_port = thread_port_cell.take(); + let thread: Thread = thread_port.recv(); + thread.join(); + }; + + // Put the scheduler into another thread + let new_sched_cell = Cell::new(new_sched); + let orig_sched_handle_cell = Cell::new((*sched).make_handle()); + let join_task_cell = Cell::new(join_task); + + let thread = do Thread::start { + let mut new_sched = new_sched_cell.take(); + let mut orig_sched_handle = orig_sched_handle_cell.take(); + let join_task = join_task_cell.take(); + + let bootstrap_task = ~do Task::new_root(&mut new_sched.stack_pool) || { + rtdebug!("boostraping a 1:1 scheduler"); + }; + new_sched.bootstrap(bootstrap_task); + + rtdebug!("enqueing join_task"); + // Now tell the original scheduler to join with this thread + // by scheduling a thread-joining task on the original scheduler + orig_sched_handle.send(TaskFromFriend(join_task)); + + // NB: We can't simply send a message from here to another task + // because this code isn't running in a task and message passing doesn't + // work outside of tasks. Hence we're sending a scheduler message + // to execute a new task directly to a scheduler. + }; + + // Give the thread handle to the join task + thread_chan.send(thread); + + // When this task is enqueued on the current scheduler it will then get + // forwarded to the scheduler to which it is pinned + new_task + } }; if opts.notify_chan.is_some() {