auto merge of #5303 : brson/rust/newsched4, r=brson

r?

Followup to #5022. This is the same, but everything is in `core::rt` now. `std::uv_ll` is moved to `core::unstable::uvll`, with the intent that it eventually move into its own crate (blocked on #5192 at least). I've had to disable the uv tests because of #2064. All of `core::rt` is disabled on platforms that aren't mac or linux until I complete the windows thread local storage bindings and ARM context switching.

My immediate next priorities will be to fix #2064 and clean up the uv bindings, get everything building on all platforms.
This commit is contained in:
bors 2013-03-11 20:21:45 -07:00
commit 48cb9a8ac0
26 changed files with 2784 additions and 248 deletions

View file

@ -243,7 +243,8 @@ pub mod unicode;
#[path = "num/cmath.rs"]
pub mod cmath;
pub mod stackwalk;
#[path = "rt/mod.rs"]
pub mod rt;
// A curious inner-module that's not exported that contains the binding
// 'core' so that macro-expanded references to core::error and such

View file

@ -130,6 +130,27 @@ pub pure fn get_ref<T>(opt: &r/Option<T>) -> &r/T {
}
}
pub pure fn get_mut_ref<T>(opt: &r/mut Option<T>) -> &r/mut T {
/*!
Gets a mutable reference to the value inside an option.
# Failure
Fails if the value equals `None`
# Safety note
In general, because this function may fail, its use is discouraged
(calling `get` on `None` is akin to dereferencing a null pointer).
Instead, prefer to use pattern matching and handle the `None`
case explicitly.
*/
match *opt {
Some(ref mut x) => x,
None => fail!(~"option::get_mut_ref none")
}
}
#[inline(always)]
pub pure fn map<T, U>(opt: &r/Option<T>, f: &fn(x: &r/T) -> U) -> Option<U> {
//! Maps a `some` value by reference from one type to another
@ -364,6 +385,23 @@ pub impl<T> Option<T> {
#[inline(always)]
pure fn get_ref(&self) -> &self/T { get_ref(self) }
/**
Gets a mutable reference to the value inside an option.
# Failure
Fails if the value equals `None`
# Safety note
In general, because this function may fail, its use is discouraged
(calling `get` on `None` is akin to dereferencing a null pointer).
Instead, prefer to use pattern matching and handle the `None`
case explicitly.
*/
#[inline(always)]
pure fn get_mut_ref(&mut self) -> &self/mut T { get_mut_ref(self) }
/**
* Gets the value out of an option without copying.
*

156
src/libcore/rt/context.rs Normal file
View file

@ -0,0 +1,156 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use super::stack::StackSegment;
use libc::c_void;
use cast::{transmute, transmute_mut_unsafe,
transmute_region, transmute_mut_region};
// XXX: Registers is boxed so that it is 16-byte aligned, for storing
// SSE regs. It would be marginally better not to do this. In C++ we
// use an attribute on a struct.
pub struct Context(~Registers);
pub impl Context {
static fn empty() -> Context {
Context(new_regs())
}
/// Create a new context that will resume execution by running ~fn()
/// # Safety Note
/// The `start` closure must remain valid for the life of the Task
static fn new(start: &~fn(), stack: &mut StackSegment) -> Context {
// The C-ABI function that is the task entry point
extern fn task_start_wrapper(f: &~fn()) { (*f)() }
let fp: *c_void = task_start_wrapper as *c_void;
let argp: *c_void = unsafe { transmute::<&~fn(), *c_void>(&*start) };
let sp: *uint = stack.end();
let sp: *mut uint = unsafe { transmute_mut_unsafe(sp) };
// Save and then immediately load the current context,
// which we will then modify to call the given function when restored
let mut regs = new_regs();
unsafe {
swap_registers(transmute_mut_region(&mut *regs),
transmute_region(&*regs))
};
initialize_call_frame(&mut *regs, fp, argp, sp);
return Context(regs);
}
static fn swap(out_context: &mut Context, in_context: &Context) {
let out_regs: &mut Registers = match out_context {
&Context(~ref mut r) => r
};
let in_regs: &Registers = match in_context {
&Context(~ref r) => r
};
unsafe { swap_registers(out_regs, in_regs) };
}
}
extern {
fn swap_registers(out_regs: *mut Registers, in_regs: *Registers);
}
// Definitions of these registers are in rt/arch/x86_64/regs.h
#[cfg(target_arch = "x86_64")]
type Registers = [uint * 22];
#[cfg(target_arch = "x86_64")]
fn new_regs() -> ~Registers { ~[0, .. 22] }
#[cfg(target_arch = "x86_64")]
fn initialize_call_frame(regs: &mut Registers,
fptr: *c_void, arg: *c_void, sp: *mut uint) {
// Redefinitions from regs.h
const RUSTRT_ARG0: uint = 3;
const RUSTRT_RSP: uint = 1;
const RUSTRT_IP: uint = 8;
const RUSTRT_RBP: uint = 2;
let sp = align_down(sp);
let sp = mut_offset(sp, -1);
// The final return address. 0 indicates the bottom of the stack
unsafe { *sp = 0; }
rtdebug!("creating call frame");
rtdebug!("fptr %x", fptr as uint);
rtdebug!("arg %x", arg as uint);
rtdebug!("sp %x", sp as uint);
regs[RUSTRT_ARG0] = arg as uint;
regs[RUSTRT_RSP] = sp as uint;
regs[RUSTRT_IP] = fptr as uint;
// Last base pointer on the stack should be 0
regs[RUSTRT_RBP] = 0;
}
#[cfg(target_arch = "x86")]
struct Registers {
eax: u32, ebx: u32, ecx: u32, edx: u32,
ebp: u32, esi: u32, edi: u32, esp: u32,
cs: u16, ds: u16, ss: u16, es: u16, fs: u16, gs: u16,
eflags: u32, eip: u32
}
#[cfg(target_arch = "x86")]
fn new_regs() -> ~Registers {
~Registers {
eax: 0, ebx: 0, ecx: 0, edx: 0,
ebp: 0, esi: 0, edi: 0, esp: 0,
cs: 0, ds: 0, ss: 0, es: 0, fs: 0, gs: 0,
eflags: 0, eip: 0
}
}
#[cfg(target_arch = "x86")]
fn initialize_call_frame(regs: &mut Registers,
fptr: *c_void, arg: *c_void, sp: *mut uint) {
let sp = align_down(sp);
let sp = mut_offset(sp, -4); // XXX: -4 words? Needs this be done at all?
unsafe { *sp = arg as uint; }
let sp = mut_offset(sp, -1);
unsafe { *sp = 0; } // The final return address
regs.esp = sp as u32;
regs.eip = fptr as u32;
// Last base pointer on the stack is 0
regs.ebp = 0;
}
fn align_down(sp: *mut uint) -> *mut uint {
unsafe {
let sp = transmute::<*mut uint, uint>(sp);
let sp = sp & !(16 - 1);
transmute::<uint, *mut uint>(sp)
}
}
// XXX: ptr::offset is positive ints only
#[inline(always)]
pub pure fn mut_offset<T>(ptr: *mut T, count: int) -> *mut T {
use core::sys::size_of;
unsafe {
(ptr as int + count * (size_of::<T>() as int)) as *mut T
}
}

45
src/libcore/rt/io.rs Normal file
View file

@ -0,0 +1,45 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
use result::*;
// XXX: ~object doesn't work currently so these are some placeholder
// types to use instead
pub type EventLoopObject = super::uvio::UvEventLoop;
pub type IoFactoryObject = super::uvio::UvIoFactory;
pub type StreamObject = super::uvio::UvStream;
pub type TcpListenerObject = super::uvio::UvTcpListener;
pub trait EventLoop {
fn run(&mut self);
fn callback(&mut self, ~fn());
/// The asynchronous I/O services. Not all event loops may provide one
fn io(&mut self) -> Option<&self/mut IoFactoryObject>;
}
pub trait IoFactory {
fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject>;
fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject>;
}
pub trait TcpListener {
fn listen(&mut self) -> Option<~StreamObject>;
}
pub trait Stream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()>;
fn write(&mut self, buf: &[u8]) -> Result<(), ()>;
}
pub enum IpAddr {
Ipv4(u8, u8, u8, u8, u16),
Ipv6
}

51
src/libcore/rt/mod.rs Normal file
View file

@ -0,0 +1,51 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// XXX: Missing some implementation for other architectures
#[cfg(target_os = "linux")];
#[cfg(target_os = "mac")];
#[cfg(target_os = "win32")];
// Some basic logging
macro_rules! rtdebug (
($( $arg:expr),+) => ( {
dumb_println(fmt!( $($arg),+ ));
fn dumb_println(s: &str) {
use str::as_c_str;
use libc::c_char;
extern {
fn printf(s: *c_char);
}
do as_c_str(s.to_str() + "\n") |s| {
unsafe { printf(s); }
}
}
} )
)
// An alternate version with no output, for turning off logging
macro_rules! rtdebug_ (
($( $arg:expr),+) => ( $(let _ = $arg)*; )
)
mod sched;
mod io;
mod uvio;
mod uv;
// FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
pub mod thread_local_storage;
mod work_queue;
mod stack;
mod context;
mod thread;

564
src/libcore/rt/sched.rs Normal file
View file

@ -0,0 +1,564 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
use sys;
use cast::transmute;
use libc::c_void;
use ptr::mut_null;
use super::work_queue::WorkQueue;
use super::stack::{StackPool, StackSegment};
use super::io::{EventLoop, EventLoopObject};
use super::context::Context;
use tls = super::thread_local_storage;
#[cfg(test)] use super::uvio::UvEventLoop;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use int;
/// The Scheduler is responsible for coordinating execution of Tasks
/// on a single thread. When the scheduler is running it is owned by
/// thread local storage and the running task is owned by the
/// scheduler.
pub struct Scheduler {
task_queue: WorkQueue<~Task>,
stack_pool: StackPool,
/// The event loop used to drive the scheduler and perform I/O
event_loop: ~EventLoopObject,
/// The scheduler's saved context.
/// Always valid when a task is executing, otherwise not
priv saved_context: Context,
/// The currently executing task
priv current_task: Option<~Task>,
/// A queue of jobs to perform immediately upon return from task
/// context to scheduler context.
/// XXX: This probably should be a single cleanup action and it
/// should run after a context switch, not on return from the
/// scheduler
priv cleanup_jobs: ~[CleanupJob]
}
// XXX: Some hacks to put a &fn in Scheduler without borrowck
// complaining
type UnsafeTaskReceiver = sys::Closure;
trait HackAroundBorrowCk {
static fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
}
impl HackAroundBorrowCk for UnsafeTaskReceiver {
static fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
unsafe { transmute(f) }
}
fn to_fn(self) -> &fn(&mut Scheduler, ~Task) {
unsafe { transmute(self) }
}
}
enum CleanupJob {
RescheduleTask(~Task),
RecycleTask(~Task),
GiveTask(~Task, UnsafeTaskReceiver)
}
pub impl Scheduler {
static fn new(event_loop: ~EventLoopObject) -> Scheduler {
Scheduler {
event_loop: event_loop,
task_queue: WorkQueue::new(),
stack_pool: StackPool::new(),
saved_context: Context::empty(),
current_task: None,
cleanup_jobs: ~[]
}
}
// XXX: This may eventually need to be refactored so that
// the scheduler itself doesn't have to call event_loop.run.
// That will be important for embedding the runtime into external
// event loops.
fn run(~self) -> ~Scheduler {
fail_unless!(!self.in_task_context());
// Give ownership of the scheduler (self) to the thread
do self.install |scheduler| {
fn run_scheduler_once() {
do Scheduler::local |scheduler| {
if scheduler.resume_task_from_queue() {
// Ok, a task ran. Nice! We'll do it again later
scheduler.event_loop.callback(run_scheduler_once);
}
}
}
scheduler.event_loop.callback(run_scheduler_once);
scheduler.event_loop.run();
}
}
fn install(~self, f: &fn(&mut Scheduler)) -> ~Scheduler {
let mut tlsched = ThreadLocalScheduler::new();
tlsched.put_scheduler(self);
{
let sched = tlsched.get_scheduler();
f(sched);
}
return tlsched.take_scheduler();
}
static fn local(f: &fn(&mut Scheduler)) {
let mut tlsched = ThreadLocalScheduler::new();
f(tlsched.get_scheduler());
}
// * Scheduler-context operations
fn resume_task_from_queue(&mut self) -> bool {
fail_unless!(!self.in_task_context());
let mut self = self;
match self.task_queue.pop_front() {
Some(task) => {
self.resume_task_immediately(task);
return true;
}
None => {
rtdebug!("no tasks in queue");
return false;
}
}
}
fn resume_task_immediately(&mut self, task: ~Task) {
fail_unless!(!self.in_task_context());
rtdebug!("scheduling a task");
// Store the task in the scheduler so it can be grabbed later
self.current_task = Some(task);
self.swap_in_task();
// The running task should have passed ownership elsewhere
fail_unless!(self.current_task.is_none());
// Running tasks may have asked us to do some cleanup
self.run_cleanup_jobs();
}
// * Task-context operations
/// Called by a running task to end execution, after which it will
/// be recycled by the scheduler for reuse in a new task.
fn terminate_current_task(&mut self) {
fail_unless!(self.in_task_context());
rtdebug!("ending running task");
let dead_task = self.current_task.swap_unwrap();
self.enqueue_cleanup_job(RecycleTask(dead_task));
let dead_task = self.task_from_last_cleanup_job();
self.swap_out_task(dead_task);
}
/// Block a running task, context switch to the scheduler, then pass the
/// blocked task to a closure.
///
/// # Safety note
///
/// The closure here is a *stack* closure that lives in the
/// running task. It gets transmuted to the scheduler's lifetime
/// and called while the task is blocked.
fn block_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) {
fail_unless!(self.in_task_context());
rtdebug!("blocking task");
let blocked_task = self.current_task.swap_unwrap();
let f_fake_region = unsafe {
transmute::<&fn(&mut Scheduler, ~Task),
&fn(&mut Scheduler, ~Task)>(f)
};
let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region);
self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
let blocked_task = self.task_from_last_cleanup_job();
self.swap_out_task(blocked_task);
}
/// Switch directly to another task, without going through the scheduler.
/// You would want to think hard about doing this, e.g. if there are
/// pending I/O events it would be a bad idea.
fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) {
fail_unless!(self.in_task_context());
rtdebug!("switching tasks");
let old_running_task = self.current_task.swap_unwrap();
self.enqueue_cleanup_job(RescheduleTask(old_running_task));
let old_running_task = self.task_from_last_cleanup_job();
self.current_task = Some(next_task);
self.swap_in_task_from_running_task(old_running_task);
}
// * Context switching
// NB: When switching to a task callers are expected to first set
// self.running_task. When switching away from a task likewise move
// out of the self.running_task
priv fn swap_in_task(&mut self) {
// Take pointers to both the task and scheduler's saved registers.
let running_task: &~Task = self.current_task.get_ref();
let task_context = &running_task.saved_context;
let scheduler_context = &mut self.saved_context;
// Context switch to the task, restoring it's registers
// and saving the scheduler's
Context::swap(scheduler_context, task_context);
}
priv fn swap_out_task(&mut self, running_task: &mut Task) {
let task_context = &mut running_task.saved_context;
let scheduler_context = &self.saved_context;
Context::swap(task_context, scheduler_context);
}
priv fn swap_in_task_from_running_task(&mut self,
running_task: &mut Task) {
let running_task_context = &mut running_task.saved_context;
let next_context = &self.current_task.get_ref().saved_context;
Context::swap(running_task_context, next_context);
}
// * Other stuff
fn in_task_context(&self) -> bool { self.current_task.is_some() }
fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
self.cleanup_jobs.unshift(job);
}
fn run_cleanup_jobs(&mut self) {
fail_unless!(!self.in_task_context());
rtdebug!("running cleanup jobs");
while !self.cleanup_jobs.is_empty() {
match self.cleanup_jobs.pop() {
RescheduleTask(task) => {
// NB: Pushing to the *front* of the queue
self.task_queue.push_front(task);
}
RecycleTask(task) => task.recycle(&mut self.stack_pool),
GiveTask(task, f) => (f.to_fn())(self, task)
}
}
}
// XXX: Hack. This should return &self/mut but I don't know how to
// make the borrowcheck happy
fn task_from_last_cleanup_job(&mut self) -> &mut Task {
fail_unless!(!self.cleanup_jobs.is_empty());
let last_job: &self/mut CleanupJob = &mut self.cleanup_jobs[0];
let last_task: &self/Task = match last_job {
&RescheduleTask(~ref task) => task,
&RecycleTask(~ref task) => task,
&GiveTask(~ref task, _) => task,
};
// XXX: Pattern matching mutable pointers above doesn't work
// because borrowck thinks the three patterns are conflicting
// borrows
return unsafe { transmute::<&Task, &mut Task>(last_task) };
}
}
const TASK_MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
pub struct Task {
/// The task entry point, saved here for later destruction
priv start: ~~fn(),
/// The segment of stack on which the task is currently running or,
/// if the task is blocked, on which the task will resume execution
priv current_stack_segment: StackSegment,
/// These are always valid when the task is not running, unless
/// the task is dead
priv saved_context: Context,
}
impl Task {
static fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task {
// XXX: Putting main into a ~ so it's a thin pointer and can
// be passed to the spawn function. Another unfortunate
// allocation
let start = ~Task::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(TASK_MIN_STACK_SIZE);
// NB: Context holds a pointer to that ~fn
let initial_context = Context::new(&*start, &mut stack);
return Task {
start: start,
current_stack_segment: stack,
saved_context: initial_context,
};
}
static priv fn build_start_wrapper(start: ~fn()) -> ~fn() {
// XXX: The old code didn't have this extra allocation
let wrapper: ~fn() = || {
start();
let mut sched = ThreadLocalScheduler::new();
let sched = sched.get_scheduler();
sched.terminate_current_task();
};
return wrapper;
}
/// Destroy the task and try to reuse its components
fn recycle(~self, stack_pool: &mut StackPool) {
match self {
~Task {current_stack_segment, _} => {
stack_pool.give_segment(current_stack_segment);
}
}
}
}
// NB: This is a type so we can use make use of the &self region.
struct ThreadLocalScheduler(tls::Key);
impl ThreadLocalScheduler {
static fn new() -> ThreadLocalScheduler {
unsafe {
// NB: This assumes that the TLS key has been created prior.
// Currently done in rust_start.
let key: *mut c_void = rust_get_sched_tls_key();
let key: &mut tls::Key = transmute(key);
ThreadLocalScheduler(*key)
}
}
fn put_scheduler(&mut self, scheduler: ~Scheduler) {
unsafe {
let key = match self { &ThreadLocalScheduler(key) => key };
let value: *mut c_void =
transmute::<~Scheduler, *mut c_void>(scheduler);
tls::set(key, value);
}
}
fn get_scheduler(&mut self) -> &self/mut Scheduler {
unsafe {
let key = match self { &ThreadLocalScheduler(key) => key };
let mut value: *mut c_void = tls::get(key);
fail_unless!(value.is_not_null());
{
let value_ptr = &mut value;
let sched: &mut ~Scheduler =
transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr);
let sched: &mut Scheduler = &mut **sched;
return sched;
}
}
}
fn take_scheduler(&mut self) -> ~Scheduler {
unsafe {
let key = match self { &ThreadLocalScheduler(key) => key };
let value: *mut c_void = tls::get(key);
fail_unless!(value.is_not_null());
let sched = transmute(value);
tls::set(key, mut_null());
return sched;
}
}
}
extern {
fn rust_get_sched_tls_key() -> *mut c_void;
}
#[test]
fn thread_local_scheduler_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
let mut tls_scheduler = ThreadLocalScheduler::new();
tls_scheduler.put_scheduler(scheduler);
{
let _scheduler = tls_scheduler.get_scheduler();
}
let _scheduler = tls_scheduler.take_scheduler();
}
#[test]
fn thread_local_scheduler_two_instances() {
let scheduler = ~UvEventLoop::new_scheduler();
let mut tls_scheduler = ThreadLocalScheduler::new();
tls_scheduler.put_scheduler(scheduler);
{
let _scheduler = tls_scheduler.get_scheduler();
}
{
let scheduler = tls_scheduler.take_scheduler();
tls_scheduler.put_scheduler(scheduler);
}
let mut tls_scheduler = ThreadLocalScheduler::new();
{
let _scheduler = tls_scheduler.get_scheduler();
}
let _scheduler = tls_scheduler.take_scheduler();
}
#[test]
fn test_simple_scheduling() {
do run_in_bare_thread {
let mut task_ran = false;
let task_ran_ptr: *mut bool = &mut task_ran;
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe { *task_ran_ptr = true; }
};
sched.task_queue.push_back(task);
sched.run();
fail_unless!(task_ran);
}
}
#[test]
fn test_several_tasks() {
do run_in_bare_thread {
let total = 10;
let mut task_count = 0;
let task_count_ptr: *mut int = &mut task_count;
let mut sched = ~UvEventLoop::new_scheduler();
for int::range(0, total) |_| {
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe { *task_count_ptr = *task_count_ptr + 1; }
};
sched.task_queue.push_back(task);
}
sched.run();
fail_unless!(task_count == total);
}
}
#[test]
fn test_swap_tasks() {
do run_in_bare_thread {
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut sched = ~UvEventLoop::new_scheduler();
let task1 = ~do Task::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
do Scheduler::local |sched| {
let task2 = ~do Task::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
};
// Context switch directly to the new task
sched.resume_task_from_running_task_direct(task2);
}
unsafe { *count_ptr = *count_ptr + 1; }
};
sched.task_queue.push_back(task1);
sched.run();
fail_unless!(count == 3);
}
}
#[bench] #[test] #[ignore(reason = "long test")]
fn test_run_a_lot_of_tasks_queued() {
do run_in_bare_thread {
const MAX: int = 1000000;
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut sched = ~UvEventLoop::new_scheduler();
let start_task = ~do Task::new(&mut sched.stack_pool) {
run_task(count_ptr);
};
sched.task_queue.push_back(start_task);
sched.run();
fail_unless!(count == MAX);
fn run_task(count_ptr: *mut int) {
do Scheduler::local |sched| {
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
if *count_ptr != MAX {
run_task(count_ptr);
}
}
};
sched.task_queue.push_back(task);
}
};
}
}
#[bench] #[test] #[ignore(reason = "too much stack allocation")]
fn test_run_a_lot_of_tasks_direct() {
do run_in_bare_thread {
const MAX: int = 100000;
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut sched = ~UvEventLoop::new_scheduler();
let start_task = ~do Task::new(&mut sched.stack_pool) {
run_task(count_ptr);
};
sched.task_queue.push_back(start_task);
sched.run();
fail_unless!(count == MAX);
fn run_task(count_ptr: *mut int) {
do Scheduler::local |sched| {
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
if *count_ptr != MAX {
run_task(count_ptr);
}
}
};
// Context switch directly to the new task
sched.resume_task_from_running_task_direct(task);
}
};
}
}
#[test]
fn test_block_task() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::local |sched| {
fail_unless!(sched.in_task_context());
do sched.block_running_task_and_then() |sched, task| {
fail_unless!(!sched.in_task_context());
sched.task_queue.push_back(task);
}
}
};
sched.task_queue.push_back(task);
sched.run();
}
}

49
src/libcore/rt/stack.rs Normal file
View file

@ -0,0 +1,49 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use vec;
pub struct StackSegment {
buf: ~[u8]
}
pub impl StackSegment {
static fn new(size: uint) -> StackSegment {
// Crate a block of uninitialized values
let mut stack = vec::with_capacity(size);
unsafe {
vec::raw::set_len(&mut stack, size);
}
StackSegment {
buf: stack
}
}
fn end(&self) -> *uint {
unsafe {
vec::raw::to_ptr(self.buf).offset(self.buf.len()) as *uint
}
}
}
pub struct StackPool(());
impl StackPool {
static fn new() -> StackPool { StackPool(()) }
fn take_segment(&self, min_size: uint) -> StackSegment {
StackSegment::new(min_size)
}
fn give_segment(&self, _stack: StackSegment) {
}
}

44
src/libcore/rt/thread.rs Normal file
View file

@ -0,0 +1,44 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc;
use ops::Drop;
#[allow(non_camel_case_types)] // runtime type
type raw_thread = libc::c_void;
struct Thread {
main: ~fn(),
raw_thread: *raw_thread
}
impl Thread {
static fn start(main: ~fn()) -> Thread {
fn substart(main: &fn()) -> *raw_thread {
unsafe { rust_raw_thread_start(main) }
}
let raw = substart(main);
Thread {
main: main,
raw_thread: raw
}
}
}
impl Drop for Thread {
fn finalize(&self) {
unsafe { rust_raw_thread_join_delete(self.raw_thread) }
}
}
extern {
pub unsafe fn rust_raw_thread_start(f: &fn()) -> *raw_thread;
pub unsafe fn rust_raw_thread_join_delete(thread: *raw_thread);
}

View file

@ -0,0 +1,91 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::{c_void};
#[cfg(unix)]
use libc::{c_uint, c_int};
#[cfg(unix)]
use ptr::null;
#[cfg(windows)]
use libc::types::os::arch::extra::{DWORD, LPVOID, BOOL};
#[cfg(unix)]
pub type Key = pthread_key_t;
#[cfg(unix)]
pub unsafe fn create(key: &mut Key) {
unsafe { fail_unless!(0 == pthread_key_create(key, null())); }
}
#[cfg(unix)]
pub unsafe fn set(key: Key, value: *mut c_void) {
unsafe { fail_unless!(0 == pthread_setspecific(key, value)); }
}
#[cfg(unix)]
pub unsafe fn get(key: Key) -> *mut c_void {
unsafe { pthread_getspecific(key) }
}
#[cfg(unix)]
#[allow(non_camel_case_types)] // foreign type
type pthread_key_t = c_uint;
#[cfg(unix)]
extern {
fn pthread_key_create(key: *mut pthread_key_t, dtor: *u8) -> c_int;
fn pthread_setspecific(key: pthread_key_t, value: *mut c_void) -> c_int;
fn pthread_getspecific(key: pthread_key_t) -> *mut c_void;
}
#[cfg(windows)]
pub type Key = DWORD;
#[cfg(windows)]
pub unsafe fn create(key: &mut Key) {
const TLS_OUT_OF_INDEXES: DWORD = 0xFFFFFFFF;
*key = unsafe { TlsAlloc() };
fail_unless!(*key != TLS_OUT_OF_INDEXES);
}
#[cfg(windows)]
pub unsafe fn set(key: Key, value: *mut c_void) {
unsafe { fail_unless!(0 != TlsSetValue(key, value)) }
}
#[cfg(windows)]
pub unsafe fn get(key: Key) -> *mut c_void {
TlsGetValue(key)
}
#[cfg(windows)]
#[abi = "stdcall"]
extern {
fn TlsAlloc() -> DWORD;
fn TlsSetValue(dwTlsIndex: DWORD, lpTlsvalue: LPVOID) -> BOOL;
fn TlsGetValue(dwTlsIndex: DWORD) -> LPVOID;
}
#[test]
fn tls_smoke_test() {
use cast::transmute;
unsafe {
let mut key = 0;
let value = ~20;
create(&mut key);
set(key, transmute(value));
let value: ~int = transmute(get(key));
fail_unless!(value == ~20);
let value = ~30;
set(key, transmute(value));
let value: ~int = transmute(get(key));
fail_unless!(value == ~30);
}
}

919
src/libcore/rt/uv.rs Normal file
View file

@ -0,0 +1,919 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!
Bindings to libuv.
UV types consist of the event loop (Loop), Watchers, Requests and
Callbacks.
Watchers and Requests encapsulate pointers to uv *handles*, which have
subtyping relationships with each other. This subtyping is reflected
in the bindings with explicit or implicit coercions. For example, an
upcast from TcpWatcher to StreamWatcher is done with
`tcp_watcher.as_stream()`. In other cases a callback on a specific
type of watcher will be passed a watcher of a supertype.
Currently all use of Request types (connect/write requests) are
encapsulated in the bindings and don't need to be dealt with by the
caller.
# Safety note
Due to the complex lifecycle of uv handles, as well as compiler bugs,
this module is not memory safe and requires explicit memory management,
via `close` and `delete` methods.
*/
use option::*;
use str::raw::from_c_str;
use to_str::ToStr;
use vec;
use ptr;
use libc::{c_void, c_int, size_t, malloc, free, ssize_t};
use cast::{transmute, transmute_mut_region};
use ptr::null;
use sys::size_of;
use unstable::uvll;
use super::io::{IpAddr, Ipv4, Ipv6};
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::thread::Thread;
#[cfg(test)] use cell::Cell;
fn ip4_to_uv_ip4(addr: IpAddr) -> uvll::sockaddr_in {
match addr {
Ipv4(a, b, c, d, p) => {
unsafe {
uvll::ip4_addr(fmt!("%u.%u.%u.%u",
a as uint,
b as uint,
c as uint,
d as uint), p as int)
}
}
Ipv6 => fail!()
}
}
/// A trait for callbacks to implement. Provides a little extra type safety
/// for generic, unsafe interop functions like `set_watcher_callback`.
trait Callback { }
type NullCallback = ~fn();
impl Callback for NullCallback { }
/// A type that wraps a native handle
trait NativeHandle<T> {
static pub fn from_native_handle(T) -> Self;
pub fn native_handle(&self) -> T;
}
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
/// but the results are not correct.
pub struct Loop {
handle: *uvll::uv_loop_t
}
pub impl Loop {
static fn new() -> Loop {
let handle = unsafe { uvll::loop_new() };
fail_unless!(handle.is_not_null());
NativeHandle::from_native_handle(handle)
}
fn run(&mut self) {
unsafe { uvll::run(self.native_handle()) };
}
fn close(&mut self) {
unsafe { uvll::loop_delete(self.native_handle()) };
}
}
impl NativeHandle<*uvll::uv_loop_t> for Loop {
static fn from_native_handle(handle: *uvll::uv_loop_t) -> Loop {
Loop { handle: handle }
}
fn native_handle(&self) -> *uvll::uv_loop_t {
self.handle
}
}
/// The trait implemented by uv 'watchers' (handles). Watchers are
/// non-owning wrappers around the uv handles and are not completely
/// safe - there may be multiple instances for a single underlying
/// handle. Watchers are generally created, then `start`ed, `stop`ed
/// and `close`ed, but due to their complex life cycle may not be
/// entirely memory safe if used in unanticipated patterns.
trait Watcher {
fn event_loop(&self) -> Loop;
}
pub struct IdleWatcher(*uvll::uv_idle_t);
impl Watcher for IdleWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
impl Callback for IdleCallback { }
pub impl IdleWatcher {
static fn new(loop_: &mut Loop) -> IdleWatcher {
unsafe {
let handle = uvll::idle_new();
fail_unless!(handle.is_not_null());
fail_unless!(0 == uvll::idle_init(loop_.native_handle(), handle));
uvll::set_data_for_uv_handle(handle, null::<()>());
NativeHandle::from_native_handle(handle)
}
}
fn start(&mut self, cb: IdleCallback) {
set_watcher_callback(self, cb);
unsafe {
fail_unless!(0 == uvll::idle_start(self.native_handle(), idle_cb))
};
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
let idle_watcher: IdleWatcher =
NativeHandle::from_native_handle(handle);
let cb: &IdleCallback =
borrow_callback_from_watcher(&idle_watcher);
let status = status_to_maybe_uv_error(handle, status);
(*cb)(idle_watcher, status);
}
}
fn stop(&mut self) {
unsafe { fail_unless!(0 == uvll::idle_stop(self.native_handle())); }
}
fn close(self) {
unsafe { uvll::close(self.native_handle(), close_cb) };
extern fn close_cb(handle: *uvll::uv_idle_t) {
let mut idle_watcher = NativeHandle::from_native_handle(handle);
drop_watcher_callback::<uvll::uv_idle_t,
IdleWatcher,
IdleCallback>(&mut idle_watcher);
unsafe { uvll::idle_delete(handle) };
}
}
}
impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
static fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
IdleWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_idle_t {
match self { &IdleWatcher(ptr) => ptr }
}
}
// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
// and uv_file_t
pub struct StreamWatcher(*uvll::uv_stream_t);
impl Watcher for StreamWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
impl Callback for ReadCallback { }
// XXX: The uv alloc callback also has a *uv_handle_t arg
pub type AllocCallback = ~fn(uint) -> Buf;
impl Callback for AllocCallback { }
pub impl StreamWatcher {
fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
// XXX: Borrowchk problems
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
data.alloc_cb = Some(alloc);
data.read_cb = Some(cb);
let handle = self.native_handle();
unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
extern fn alloc_cb(stream: *uvll::uv_stream_t,
suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher =
NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let alloc_cb = data.alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
extern fn read_cb(stream: *uvll::uv_stream_t,
nread: ssize_t, ++buf: Buf) {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher =
NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let cb = data.read_cb.get_ref();
let status = status_to_maybe_uv_error(stream, nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
}
}
fn read_stop(&mut self) {
// It would be nice to drop the alloc and read callbacks here,
// but read_stop may be called from inside one of them and we
// would end up freeing the in-use environment
let handle = self.native_handle();
unsafe { uvll::read_stop(handle); }
}
// XXX: Needs to take &[u8], not ~[u8]
fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
fail_unless!(data.write_cb.is_none());
data.write_cb = Some(cb);
let req = WriteRequest::new();
let buf = vec_to_uv_buf(msg);
// XXX: Allocation
let bufs = ~[buf];
unsafe {
fail_unless!(0 == uvll::write(req.native_handle(),
self.native_handle(),
&bufs, write_cb));
}
// XXX: Freeing immediately after write. Is this ok?
let _v = vec_from_uv_buf(buf);
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let write_request: WriteRequest =
NativeHandle::from_native_handle(req);
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = get_watcher_data(&mut stream_watcher)
.write_cb.swap_unwrap();
let status = status_to_maybe_uv_error(
stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
}
fn accept(&mut self, stream: StreamWatcher) {
let self_handle = self.native_handle() as *c_void;
let stream_handle = stream.native_handle() as *c_void;
unsafe {
fail_unless!(0 == uvll::accept(self_handle, stream_handle));
}
}
fn close(self, cb: NullCallback) {
{
let mut self = self;
let data = get_watcher_data(&mut self);
fail_unless!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher =
NativeHandle::from_native_handle(handle);
{
let mut data = get_watcher_data(&mut stream_watcher);
data.close_cb.swap_unwrap()();
}
drop_watcher_data(&mut stream_watcher);
unsafe { free(handle as *c_void) }
}
}
}
impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
static fn from_native_handle(
handle: *uvll::uv_stream_t) -> StreamWatcher {
StreamWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_stream_t {
match self { &StreamWatcher(ptr) => ptr }
}
}
pub struct TcpWatcher(*uvll::uv_tcp_t);
impl Watcher for TcpWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
impl Callback for ConnectionCallback { }
pub impl TcpWatcher {
static fn new(loop_: &mut Loop) -> TcpWatcher {
unsafe {
let size = size_of::<uvll::uv_tcp_t>() as size_t;
let handle = malloc(size) as *uvll::uv_tcp_t;
fail_unless!(handle.is_not_null());
fail_unless!(0 == uvll::tcp_init(loop_.native_handle(), handle));
let mut watcher = NativeHandle::from_native_handle(handle);
install_watcher_data(&mut watcher);
return watcher;
}
}
fn bind(&mut self, address: IpAddr) {
match address {
Ipv4(*) => {
let addr = ip4_to_uv_ip4(address);
let result = unsafe {
uvll::tcp_bind(self.native_handle(), &addr)
};
// XXX: bind is likely to fail. need real error handling
fail_unless!(result == 0);
}
_ => fail!()
}
}
fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) {
unsafe {
fail_unless!(get_watcher_data(self).connect_cb.is_none());
get_watcher_data(self).connect_cb = Some(cb);
let mut connect_watcher = ConnectRequest::new();
let connect_handle = connect_watcher.native_handle();
match address {
Ipv4(*) => {
let addr = ip4_to_uv_ip4(address);
rtdebug!("connect_t: %x", connect_handle as uint);
fail_unless!(0 == uvll::tcp_connect(connect_handle,
self.native_handle(),
&addr, connect_cb));
}
_ => fail!()
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
rtdebug!("connect_t: %x", req as uint);
let connect_request: ConnectRequest =
NativeHandle::from_native_handle(req);
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb: ConnectionCallback = {
let data = get_watcher_data(&mut stream_watcher);
data.connect_cb.swap_unwrap()
};
let status = status_to_maybe_uv_error(
stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
}
}
fn listen(&mut self, cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
fail_unless!(data.connect_cb.is_none());
data.connect_cb = Some(cb);
unsafe {
const BACKLOG: c_int = 128; // XXX should be configurable
// XXX: This can probably fail
fail_unless!(0 == uvll::listen(self.native_handle(),
BACKLOG, connection_cb));
}
extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher =
NativeHandle::from_native_handle(handle);
let cb = get_watcher_data(&mut stream_watcher)
.connect_cb.swap_unwrap();
let status = status_to_maybe_uv_error(
stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
}
fn as_stream(&self) -> StreamWatcher {
NativeHandle::from_native_handle(
self.native_handle() as *uvll::uv_stream_t)
}
}
impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
static fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
TcpWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_tcp_t {
match self { &TcpWatcher(ptr) => ptr }
}
}
trait Request { }
type ConnectCallback = ~fn(ConnectRequest, Option<UvError>);
impl Callback for ConnectCallback { }
// uv_connect_t is a subclass of uv_req_t
struct ConnectRequest(*uvll::uv_connect_t);
impl Request for ConnectRequest { }
impl ConnectRequest {
static fn new() -> ConnectRequest {
let connect_handle = unsafe {
malloc(size_of::<uvll::uv_connect_t>() as size_t)
};
fail_unless!(connect_handle.is_not_null());
let connect_handle = connect_handle as *uvll::uv_connect_t;
ConnectRequest(connect_handle)
}
fn stream(&self) -> StreamWatcher {
unsafe {
let stream_handle =
uvll::get_stream_handle_from_connect_req(
self.native_handle());
NativeHandle::from_native_handle(stream_handle)
}
}
fn delete(self) {
unsafe { free(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
static fn from_native_handle(
handle: *uvll:: uv_connect_t) -> ConnectRequest {
ConnectRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_connect_t {
match self { &ConnectRequest(ptr) => ptr }
}
}
pub struct WriteRequest(*uvll::uv_write_t);
impl Request for WriteRequest { }
impl WriteRequest {
static fn new() -> WriteRequest {
let write_handle = unsafe {
malloc(size_of::<uvll::uv_write_t>() as size_t)
};
fail_unless!(write_handle.is_not_null());
let write_handle = write_handle as *uvll::uv_write_t;
WriteRequest(write_handle)
}
fn stream(&self) -> StreamWatcher {
unsafe {
let stream_handle =
uvll::get_stream_handle_from_write_req(self.native_handle());
NativeHandle::from_native_handle(stream_handle)
}
}
fn delete(self) {
unsafe { free(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
static fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
WriteRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_write_t {
match self { &WriteRequest(ptr) => ptr }
}
}
// XXX: Need to define the error constants like EOF so they can be
// compared to the UvError type
struct UvError(uvll::uv_err_t);
impl UvError {
pure fn name(&self) -> ~str {
unsafe {
let inner = match self { &UvError(ref a) => a };
let name_str = uvll::err_name(inner);
fail_unless!(name_str.is_not_null());
from_c_str(name_str)
}
}
pure fn desc(&self) -> ~str {
unsafe {
let inner = match self { &UvError(ref a) => a };
let desc_str = uvll::strerror(inner);
fail_unless!(desc_str.is_not_null());
from_c_str(desc_str)
}
}
}
impl ToStr for UvError {
pure fn to_str(&self) -> ~str {
fmt!("%s: %s", self.name(), self.desc())
}
}
#[test]
fn error_smoke_test() {
let err = uvll::uv_err_t { code: 1, sys_errno_: 1 };
let err: UvError = UvError(err);
fail_unless!(err.to_str() == ~"EOF: end of file");
}
/// Given a uv handle, convert a callback status to a UvError
// XXX: Follow the pattern below by parameterizing over T: Watcher, not T
fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError> {
if status != -1 {
None
} else {
unsafe {
rtdebug!("handle: %x", handle as uint);
let loop_ = uvll::get_loop_for_uv_handle(handle);
rtdebug!("loop: %x", loop_ as uint);
let err = uvll::last_error(loop_);
Some(UvError(err))
}
}
}
/// Get the uv event loop from a Watcher
pub fn loop_from_watcher<H, W: Watcher + NativeHandle<*H>>(
watcher: &W) -> Loop {
let handle = watcher.native_handle();
let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) };
NativeHandle::from_native_handle(loop_)
}
/// Set the custom data on a handle to a callback Note: This is only
/// suitable for watchers that make just one type of callback. For
/// others use WatcherData
fn set_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W, cb: CB) {
drop_watcher_callback::<H, W, CB>(watcher);
// XXX: Boxing the callback so it fits into a
// pointer. Unfortunate extra allocation
let boxed_cb = ~cb;
let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) };
unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) };
}
/// Delete a callback from a handle's custom data
fn drop_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
if handle_data.is_not_null() {
// Take ownership of the callback and drop it
let _cb = transmute::<*c_void, ~CB>(handle_data);
// Make sure the pointer is zeroed
uvll::set_data_for_uv_handle(
watcher.native_handle(), null::<()>());
}
}
}
/// Take a pointer to the callback installed as custom data
fn borrow_callback_from_watcher<H, W: Watcher + NativeHandle<*H>,
CB: Callback>(watcher: &W) -> &CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
fail_unless!(handle_data.is_not_null());
let cb = transmute::<&*c_void, &~CB>(&handle_data);
return &**cb;
}
}
/// Take ownership of the callback installed as custom data
fn take_callback_from_watcher<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) -> CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
fail_unless!(handle_data.is_not_null());
uvll::set_data_for_uv_handle(handle, null::<()>());
let cb: ~CB = transmute::<*c_void, ~CB>(handle_data);
let cb = match cb { ~cb => cb };
return cb;
}
}
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
struct WatcherData {
read_cb: Option<ReadCallback>,
write_cb: Option<ConnectionCallback>,
connect_cb: Option<ConnectionCallback>,
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>
}
fn install_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = ~WatcherData {
read_cb: None,
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), data);
}
}
fn get_watcher_data<H, W: Watcher + NativeHandle<*H>>(
watcher: &r/mut W) -> &r/mut WatcherData {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
return &mut **data;
}
}
fn drop_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let _data = transmute::<*c_void, ~WatcherData>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
}
}
#[test]
fn test_slice_to_uv_buf() {
let slice = [0, .. 20];
let buf = slice_to_uv_buf(slice);
fail_unless!(buf.len == 20);
unsafe {
let base = transmute::<*u8, *mut u8>(buf.base);
(*base) = 1;
(*ptr::mut_offset(base, 1)) = 2;
}
fail_unless!(slice[0] == 1);
fail_unless!(slice[1] == 2);
}
/// The uv buffer type
pub type Buf = uvll::uv_buf_t;
/// Borrow a slice to a Buf
pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
let data = unsafe { vec::raw::to_ptr(v) };
unsafe { uvll::buf_init(data, v.len()) }
}
// XXX: Do these conversions without copying
/// Transmute an owned vector to a Buf
fn vec_to_uv_buf(v: ~[u8]) -> Buf {
let data = unsafe { malloc(v.len() as size_t) } as *u8;
fail_unless!(data.is_not_null());
do vec::as_imm_buf(v) |b, l| {
let data = data as *mut u8;
unsafe { ptr::copy_memory(data, b, l) }
}
let buf = unsafe { uvll::buf_init(data, v.len()) };
return buf;
}
/// Transmute a Buf that was once a ~[u8] back to ~[u8]
fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> {
if !(buf.len == 0 && buf.base.is_null()) {
let v = unsafe { vec::from_buf(buf.base, buf.len as uint) };
unsafe { free(buf.base as *c_void) };
return Some(v);
} else {
// No buffer
return None;
}
}
#[test]
fn loop_smoke_test() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "valgrind - loop destroyed before watcher?")]
fn idle_new_then_close() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
idle_watcher.close();
}
}
#[test]
fn idle_smoke_test() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
let mut count = 10;
let count_ptr: *mut int = &mut count;
do idle_watcher.start |idle_watcher, status| {
let mut idle_watcher = idle_watcher;
fail_unless!(status.is_none());
if unsafe { *count_ptr == 10 } {
idle_watcher.stop();
idle_watcher.close();
} else {
unsafe { *count_ptr = *count_ptr + 1; }
}
}
loop_.run();
loop_.close();
fail_unless!(count == 10);
}
}
#[test]
fn idle_start_stop_start() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
do idle_watcher.start |idle_watcher, status| {
let mut idle_watcher = idle_watcher;
fail_unless!(status.is_none());
idle_watcher.stop();
do idle_watcher.start |idle_watcher, status| {
fail_unless!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
}
}
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "ffi struct issues")]
fn connect_close() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
// Connect to a port where nobody is listening
let addr = Ipv4(127, 0, 0, 1, 2923);
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("tcp_watcher.connect!");
fail_unless!(status.is_some());
fail_unless!(status.get().name() == ~"ECONNREFUSED");
stream_watcher.close(||());
}
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "need a server to connect to")]
fn connect_read() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = Ipv4(127, 0, 0, 1, 2924);
do tcp_watcher.connect(addr) |stream_watcher, status| {
let mut stream_watcher = stream_watcher;
rtdebug!("tcp_watcher.connect!");
fail_unless!(status.is_none());
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do stream_watcher.read_start(alloc)
|stream_watcher, nread, buf, status| {
let buf = vec_from_uv_buf(buf);
rtdebug!("read cb!");
if status.is_none() {
let bytes = buf.unwrap();
rtdebug!("%s", bytes.slice(0, nread as uint).to_str());
} else {
rtdebug!("status after read: %s", status.get().to_str());
rtdebug!("closing");
stream_watcher.close(||());
}
}
}
loop_.run();
loop_.close();
}
}
#[test]
#[ignore(reason = "ffi struct issues")]
fn listen() {
do run_in_bare_thread() {
const MAX: int = 10;
let mut loop_ = Loop::new();
let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = Ipv4(127, 0, 0, 1, 2925);
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
do server_tcp_watcher.listen |server_stream_watcher, status| {
rtdebug!("listened!");
fail_unless!(status.is_none());
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_;
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
server_stream_watcher.accept(client_tcp_watcher);
let count_cell = Cell(0);
let server_stream_watcher = server_stream_watcher;
rtdebug!("starting read");
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do client_tcp_watcher.read_start(alloc)
|stream_watcher, nread, buf, status| {
rtdebug!("i'm reading!");
let buf = vec_from_uv_buf(buf);
let mut count = count_cell.take();
if status.is_none() {
rtdebug!("got %d bytes", nread);
let buf = buf.unwrap();
for buf.view(0, nread as uint).each |byte| {
fail_unless!(*byte == count as u8);
rtdebug!("%u", *byte as uint);
count += 1;
}
} else {
fail_unless!(count == MAX);
do stream_watcher.close {
server_stream_watcher.close(||());
}
}
count_cell.put_back(count);
}
}
let _client_thread = do Thread::start {
rtdebug!("starting client thread");
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connecting");
fail_unless!(status.is_none());
let mut stream_watcher = stream_watcher;
let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
do stream_watcher.write(msg) |stream_watcher, status| {
rtdebug!("writing");
fail_unless!(status.is_none());
stream_watcher.close(||());
}
}
loop_.run();
loop_.close();
};
let mut loop_ = loop_;
loop_.run();
loop_.close();
}
}

475
src/libcore/rt/uvio.rs Normal file
View file

@ -0,0 +1,475 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
use result::*;
use super::uv::*;
use super::io::*;
use ops::Drop;
use cell::{Cell, empty_cell};
use cast::transmute;
use super::StreamObject;
use super::sched::Scheduler;
use super::IoFactoryObject;
#[cfg(test)] use super::sched::Task;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use uint;
pub struct UvEventLoop {
uvio: UvIoFactory
}
pub impl UvEventLoop {
static fn new() -> UvEventLoop {
UvEventLoop {
uvio: UvIoFactory(Loop::new())
}
}
/// A convenience constructor
static fn new_scheduler() -> Scheduler {
Scheduler::new(~UvEventLoop::new())
}
}
impl Drop for UvEventLoop {
fn finalize(&self) {
// XXX: Need mutable finalizer
let self = unsafe {
transmute::<&UvEventLoop, &mut UvEventLoop>(self)
};
let mut uv_loop = self.uvio.uv_loop();
uv_loop.close();
}
}
impl EventLoop for UvEventLoop {
fn run(&mut self) {
self.uvio.uv_loop().run();
}
fn callback(&mut self, f: ~fn()) {
let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
do idle_watcher.start |idle_watcher, status| {
fail_unless!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
f();
}
}
fn io(&mut self) -> Option<&self/mut IoFactoryObject> {
Some(&mut self.uvio)
}
}
#[test]
fn test_callback_run_once() {
do run_in_bare_thread {
let mut event_loop = UvEventLoop::new();
let mut count = 0;
let count_ptr: *mut int = &mut count;
do event_loop.callback {
unsafe { *count_ptr += 1 }
}
event_loop.run();
fail_unless!(count == 1);
}
}
pub struct UvIoFactory(Loop);
pub impl UvIoFactory {
fn uv_loop(&mut self) -> &self/mut Loop {
match self { &UvIoFactory(ref mut ptr) => ptr }
}
}
impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> {
// Create a cell in the task to hold the result. We will fill
// the cell before resuming the task.
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
do Scheduler::local |scheduler| {
fail_unless!(scheduler.in_task_context());
// Block this task and take ownership, switch to scheduler context
do scheduler.block_running_task_and_then |scheduler, task| {
rtdebug!("connect: entered scheduler context");
fail_unless!(!scheduler.in_task_context());
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
let task_cell = Cell(task);
// Wait for a connection
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connect: in connect callback");
let maybe_stream = if status.is_none() {
rtdebug!("status is none");
Some(~UvStream(stream_watcher))
} else {
rtdebug!("status is some");
stream_watcher.close(||());
None
};
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
// Context switch
do Scheduler::local |scheduler| {
scheduler.resume_task_immediately(task_cell.take());
}
}
}
}
fail_unless!(!result_cell.is_empty());
return result_cell.take();
}
fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> {
let mut watcher = TcpWatcher::new(self.uv_loop());
watcher.bind(addr);
return Some(~UvTcpListener(watcher));
}
}
pub struct UvTcpListener(TcpWatcher);
impl UvTcpListener {
fn watcher(&self) -> TcpWatcher {
match self { &UvTcpListener(w) => w }
}
fn close(&self) {
// XXX: Need to wait until close finishes before returning
self.watcher().as_stream().close(||());
}
}
impl Drop for UvTcpListener {
fn finalize(&self) {
// XXX: Again, this never gets called. Use .close() instead
//self.watcher().as_stream().close(||());
}
}
impl TcpListener for UvTcpListener {
fn listen(&mut self) -> Option<~StreamObject> {
rtdebug!("entering listen");
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
let server_tcp_watcher = self.watcher();
do Scheduler::local |scheduler| {
fail_unless!(scheduler.in_task_context());
do scheduler.block_running_task_and_then |_, task| {
let task_cell = Cell(task);
let mut server_tcp_watcher = server_tcp_watcher;
do server_tcp_watcher.listen |server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ =
loop_from_watcher(&server_stream_watcher);
let mut client_tcp_watcher =
TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher =
client_tcp_watcher.as_stream();
// XXX: Need's to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Some(~UvStream::new(client_tcp_watcher))
} else {
None
};
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
rtdebug!("resuming task from listen");
// Context switch
do Scheduler::local |scheduler| {
scheduler.resume_task_immediately(task_cell.take());
}
}
}
}
fail_unless!(!result_cell.is_empty());
return result_cell.take();
}
}
pub struct UvStream(StreamWatcher);
impl UvStream {
static fn new(watcher: StreamWatcher) -> UvStream {
UvStream(watcher)
}
fn watcher(&self) -> StreamWatcher {
match self { &UvStream(w) => w }
}
// XXX: finalize isn't working for ~UvStream???
fn close(&self) {
// XXX: Need to wait until this finishes before returning
self.watcher().close(||());
}
}
impl Drop for UvStream {
fn finalize(&self) {
rtdebug!("closing stream");
//self.watcher().close(||());
}
}
impl Stream for UvStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
do Scheduler::local |scheduler| {
fail_unless!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.block_running_task_and_then |scheduler, task| {
rtdebug!("read: entered scheduler context");
fail_unless!(!scheduler.in_task_context());
let mut watcher = watcher;
let task_cell = Cell(task);
// XXX: We shouldn't reallocate these callbacks every
// call to read
let alloc: AllocCallback = |_| unsafe {
slice_to_uv_buf(*buf_ptr)
};
do watcher.read_start(alloc) |watcher, nread, _buf, status| {
// Stop reading so that no read callbacks are
// triggered before the user calls `read` again.
// XXX: Is there a performance impact to calling
// stop here?
let mut watcher = watcher;
watcher.read_stop();
let result = if status.is_none() {
fail_unless!(nread >= 0);
Ok(nread as uint)
} else {
Err(())
};
unsafe { (*result_cell_ptr).put_back(result); }
do Scheduler::local |scheduler| {
scheduler.resume_task_immediately(task_cell.take());
}
}
}
}
fail_unless!(!result_cell.is_empty());
return result_cell.take();
}
fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
do Scheduler::local |scheduler| {
fail_unless!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&[u8] = &buf;
do scheduler.block_running_task_and_then |_, task| {
let mut watcher = watcher;
let task_cell = Cell(task);
let buf = unsafe { &*buf_ptr };
// XXX: OMGCOPIES
let buf = buf.to_vec();
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
} else {
Err(())
};
unsafe { (*result_cell_ptr).put_back(result); }
do Scheduler::local |scheduler| {
scheduler.resume_task_immediately(task_cell.take());
}
}
}
}
fail_unless!(!result_cell.is_empty());
return result_cell.take();
}
}
#[test]
#[ignore(reason = "ffi struct issues")]
fn test_simple_io_no_connect() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::local |sched| {
let io = sched.event_loop.io().unwrap();
let addr = Ipv4(127, 0, 0, 1, 2926);
let maybe_chan = io.connect(addr);
fail_unless!(maybe_chan.is_none());
}
};
sched.task_queue.push_back(task);
sched.run();
}
}
#[test]
#[ignore(reason = "ffi struct issues")]
fn test_simple_tcp_server_and_client() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let addr = Ipv4(127, 0, 0, 1, 2929);
let client_task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::local |sched| {
let io = sched.event_loop.io().unwrap();
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
};
let server_task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::local |sched| {
let io = sched.event_loop.io().unwrap();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
fail_unless!(nread == 8);
for uint::range(0, nread) |i| {
rtdebug!("%u", buf[i] as uint);
fail_unless!(buf[i] == i as u8);
}
stream.close();
listener.close();
}
};
// Start the server first so it listens before the client connects
sched.task_queue.push_back(server_task);
sched.task_queue.push_back(client_task);
sched.run();
}
}
#[test] #[ignore(reason = "busted")]
fn test_read_and_block() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let addr = Ipv4(127, 0, 0, 1, 2930);
let client_task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::local |sched| {
let io = sched.event_loop.io().unwrap();
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
};
let server_task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::local |sched| {
let io = sched.event_loop.io().unwrap();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
let mut current = 0;
let mut reads = 0;
while current < expected {
let nread = stream.read(buf).unwrap();
for uint::range(0, nread) |i| {
let val = buf[i] as uint;
fail_unless!(val == current % 8);
current += 1;
}
reads += 1;
do Scheduler::local |scheduler| {
// Yield to the other task in hopes that it
// will trigger a read callback while we are
// not ready for it
do scheduler.block_running_task_and_then
|scheduler, task| {
scheduler.task_queue.push_back(task);
}
}
}
// Make sure we had multiple reads
fail_unless!(reads > 1);
stream.close();
listener.close();
}
};
// Start the server first so it listens before the client connects
sched.task_queue.push_back(server_task);
sched.task_queue.push_back(client_task);
sched.run();
}
}
#[test] #[ignore(reason = "needs server")]
fn test_read_read_read() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let addr = Ipv4(127, 0, 0, 1, 2931);
let client_task = ~do Task::new(&mut sched.stack_pool) {
do Scheduler::local |sched| {
let io = sched.event_loop.io().unwrap();
let mut stream = io.connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < 500000000 {
let nread = stream.read(buf).unwrap();
rtdebug!("read %u bytes", nread as uint);
total_bytes_read += nread;
}
rtdebug_!("read %u bytes total", total_bytes_read as uint);
stream.close();
}
};
sched.task_queue.push_back(client_task);
sched.run();
}
}

View file

@ -0,0 +1,47 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
pub struct WorkQueue<T> {
priv queue: ~[T]
}
pub impl<T> WorkQueue<T> {
static fn new() -> WorkQueue<T> {
WorkQueue {
queue: ~[]
}
}
fn push_back(&mut self, value: T) {
self.queue.push(value)
}
fn pop_back(&mut self) -> Option<T> {
if !self.queue.is_empty() {
Some(self.queue.pop())
} else {
None
}
}
fn push_front(&mut self, value: T) {
self.queue.unshift(value)
}
fn pop_front(&mut self) -> Option<T> {
if !self.queue.is_empty() {
Some(self.queue.shift())
} else {
None
}
}
}

View file

@ -35,6 +35,8 @@ pub mod extfmt;
#[path = "unstable/lang.rs"]
#[cfg(notest)]
pub mod lang;
#[path = "unstable/uvll.rs"]
pub mod uvll;
mod rustrt {
use unstable::{raw_thread, rust_little_lock};
@ -61,7 +63,7 @@ for it to terminate.
The executing thread has no access to a task pointer and will be using
a normal large stack.
*/
pub unsafe fn run_in_bare_thread(f: ~fn()) {
pub fn run_in_bare_thread(f: ~fn()) {
let (port, chan) = comm::stream();
// FIXME #4525: Unfortunate that this creates an extra scheduler but it's
// necessary since rust_raw_thread_join_delete is blocking
@ -80,22 +82,18 @@ pub unsafe fn run_in_bare_thread(f: ~fn()) {
#[test]
fn test_run_in_bare_thread() {
unsafe {
let i = 100;
do run_in_bare_thread {
fail_unless!(i == 100);
}
let i = 100;
do run_in_bare_thread {
fail_unless!(i == 100);
}
}
#[test]
fn test_run_in_bare_thread_exchange() {
unsafe {
// Does the exchange heap work without the runtime?
let i = ~100;
do run_in_bare_thread {
fail_unless!(i == ~100);
}
// Does the exchange heap work without the runtime?
let i = ~100;
do run_in_bare_thread {
fail_unless!(i == ~100);
}
}

View file

@ -32,14 +32,15 @@
#[allow(non_camel_case_types)]; // C types
use core::libc::size_t;
use core::libc;
use core::prelude::*;
use core::ptr::to_unsafe_ptr;
use core::ptr;
use core::str;
use core::vec;
use core::comm::{stream, Chan, SharedChan, Port};
use libc::size_t;
use libc::c_void;
use prelude::*;
use ptr::to_unsafe_ptr;
pub type uv_handle_t = c_void;
pub type uv_loop_t = c_void;
pub type uv_idle_t = c_void;
pub type uv_idle_cb = *u8;
// libuv struct mappings
pub struct uv_ip4_addr {
@ -355,7 +356,10 @@ pub struct uv_getaddrinfo_t {
}
pub mod uv_ll_struct_stubgen {
use uv_ll::{
use ptr;
use super::{
uv_async_t,
uv_connect_t,
uv_getaddrinfo_t,
@ -369,15 +373,13 @@ pub mod uv_ll_struct_stubgen {
#[cfg(target_os = "android")]
#[cfg(target_os = "macos")]
#[cfg(target_os = "freebsd")]
use uv_ll::{
use super::{
uv_async_t_32bit_unix_riders,
uv_tcp_t_32bit_unix_riders,
uv_timer_t_32bit_unix_riders,
uv_write_t_32bit_unix_riders,
};
use core::ptr;
pub fn gen_stub_uv_tcp_t() -> uv_tcp_t {
return gen_stub_os();
#[cfg(target_os = "linux")]
@ -724,157 +726,157 @@ pub mod uv_ll_struct_stubgen {
}
}
pub mod rustrt {
use super::{addrinfo, sockaddr_in, sockaddr_in6, uv_async_t, uv_buf_t};
use super::{uv_connect_t, uv_err_t, uv_getaddrinfo_t, uv_stream_t};
use super::{uv_tcp_t, uv_timer_t, uv_write_t};
#[nolink]
extern mod rustrt {
use core::libc;
// libuv public API
unsafe fn rust_uv_loop_new() -> *libc::c_void;
unsafe fn rust_uv_loop_delete(lp: *libc::c_void);
unsafe fn rust_uv_run(loop_handle: *libc::c_void);
unsafe fn rust_uv_close(handle: *libc::c_void, cb: *u8);
unsafe fn rust_uv_walk(loop_handle: *libc::c_void, cb: *u8,
arg: *libc::c_void);
#[nolink]
pub extern {
// libuv public API
unsafe fn rust_uv_loop_new() -> *libc::c_void;
unsafe fn rust_uv_loop_delete(lp: *libc::c_void);
unsafe fn rust_uv_run(loop_handle: *libc::c_void);
unsafe fn rust_uv_close(handle: *libc::c_void, cb: *u8);
unsafe fn rust_uv_walk(loop_handle: *libc::c_void, cb: *u8,
arg: *libc::c_void);
unsafe fn rust_uv_async_send(handle: *uv_async_t);
unsafe fn rust_uv_async_init(loop_handle: *libc::c_void,
async_handle: *uv_async_t,
cb: *u8) -> libc::c_int;
unsafe fn rust_uv_tcp_init(
loop_handle: *libc::c_void,
handle_ptr: *uv_tcp_t) -> libc::c_int;
// FIXME ref #2604 .. ?
unsafe fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8,
len: libc::size_t);
unsafe fn rust_uv_last_error(loop_handle: *libc::c_void) -> uv_err_t;
// FIXME ref #2064
unsafe fn rust_uv_strerror(err: *uv_err_t) -> *libc::c_char;
// FIXME ref #2064
unsafe fn rust_uv_err_name(err: *uv_err_t) -> *libc::c_char;
unsafe fn rust_uv_ip4_addr(ip: *u8, port: libc::c_int)
-> sockaddr_in;
unsafe fn rust_uv_ip6_addr(ip: *u8, port: libc::c_int)
-> sockaddr_in6;
unsafe fn rust_uv_ip4_name(src: *sockaddr_in,
dst: *u8,
size: libc::size_t)
-> libc::c_int;
unsafe fn rust_uv_ip6_name(src: *sockaddr_in6,
dst: *u8,
size: libc::size_t)
-> libc::c_int;
unsafe fn rust_uv_ip4_port(src: *sockaddr_in) -> libc::c_uint;
unsafe fn rust_uv_ip6_port(src: *sockaddr_in6) -> libc::c_uint;
// FIXME ref #2064
unsafe fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
++after_cb: *u8,
++addr: *sockaddr_in) -> libc::c_int;
// FIXME ref #2064
unsafe fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t,
++addr: *sockaddr_in) -> libc::c_int;
// FIXME ref #2064
unsafe fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
++after_cb: *u8,
++addr: *sockaddr_in6) -> libc::c_int;
// FIXME ref #2064
unsafe fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t,
++addr: *sockaddr_in6) -> libc::c_int;
unsafe fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t,
++name: *sockaddr_in)
-> libc::c_int;
unsafe fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t,
++name: *sockaddr_in6)
-> libc::c_int;
unsafe fn rust_uv_listen(stream: *libc::c_void,
backlog: libc::c_int,
cb: *u8) -> libc::c_int;
unsafe fn rust_uv_accept(server: *libc::c_void, client: *libc::c_void)
-> libc::c_int;
unsafe fn rust_uv_write(req: *libc::c_void,
stream: *libc::c_void,
++buf_in: *uv_buf_t,
buf_cnt: libc::c_int,
cb: *u8)
-> libc::c_int;
unsafe fn rust_uv_read_start(stream: *libc::c_void,
on_alloc: *u8,
on_read: *u8)
-> libc::c_int;
unsafe fn rust_uv_read_stop(stream: *libc::c_void) -> libc::c_int;
unsafe fn rust_uv_timer_init(loop_handle: *libc::c_void,
timer_handle: *uv_timer_t)
-> libc::c_int;
unsafe fn rust_uv_timer_start(
timer_handle: *uv_timer_t,
cb: *u8,
timeout: libc::c_uint,
repeat: libc::c_uint) -> libc::c_int;
unsafe fn rust_uv_timer_stop(handle: *uv_timer_t) -> libc::c_int;
unsafe fn rust_uv_idle_new() -> *uv_idle_t;
unsafe fn rust_uv_idle_delete(handle: *uv_idle_t);
unsafe fn rust_uv_idle_init(loop_handle: *uv_loop_t,
handle: *uv_idle_t) -> libc::c_int;
unsafe fn rust_uv_idle_start(handle: *uv_idle_t,
cb: uv_idle_cb) -> libc::c_int;
unsafe fn rust_uv_idle_stop(handle: *uv_idle_t) -> libc::c_int;
unsafe fn rust_uv_getaddrinfo(loop_ptr: *libc::c_void,
handle: *uv_getaddrinfo_t,
cb: *u8,
node_name_ptr: *u8,
service_name_ptr: *u8,
// should probably only pass ptr::null()
hints: *addrinfo)
-> libc::c_int;
unsafe fn rust_uv_freeaddrinfo(res: *addrinfo);
unsafe fn rust_uv_async_send(handle: *uv_async_t);
unsafe fn rust_uv_async_init(loop_handle: *libc::c_void,
async_handle: *uv_async_t,
cb: *u8) -> libc::c_int;
unsafe fn rust_uv_tcp_init(
loop_handle: *libc::c_void,
handle_ptr: *uv_tcp_t) -> libc::c_int;
// FIXME ref #2604 .. ?
unsafe fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8,
len: libc::size_t);
unsafe fn rust_uv_last_error(loop_handle: *libc::c_void) -> uv_err_t;
// FIXME ref #2064
unsafe fn rust_uv_strerror(err: *uv_err_t) -> *libc::c_char;
// FIXME ref #2064
unsafe fn rust_uv_err_name(err: *uv_err_t) -> *libc::c_char;
unsafe fn rust_uv_ip4_addr(ip: *u8, port: libc::c_int)
-> sockaddr_in;
unsafe fn rust_uv_ip6_addr(ip: *u8, port: libc::c_int)
-> sockaddr_in6;
unsafe fn rust_uv_ip4_name(src: *sockaddr_in,
dst: *u8,
size: libc::size_t)
-> libc::c_int;
unsafe fn rust_uv_ip6_name(src: *sockaddr_in6,
dst: *u8,
size: libc::size_t)
-> libc::c_int;
unsafe fn rust_uv_ip4_port(src: *sockaddr_in) -> libc::c_uint;
unsafe fn rust_uv_ip6_port(src: *sockaddr_in6) -> libc::c_uint;
// FIXME ref #2064
unsafe fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
++after_cb: *u8,
++addr: *sockaddr_in) -> libc::c_int;
// FIXME ref #2064
unsafe fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t,
++addr: *sockaddr_in) -> libc::c_int;
// FIXME ref #2064
unsafe fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
++after_cb: *u8,
++addr: *sockaddr_in6) -> libc::c_int;
// FIXME ref #2064
unsafe fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t,
++addr: *sockaddr_in6) -> libc::c_int;
unsafe fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t,
++name: *sockaddr_in) -> libc::c_int;
unsafe fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t,
++name: *sockaddr_in6) ->libc::c_int;
unsafe fn rust_uv_listen(stream: *libc::c_void,
backlog: libc::c_int,
cb: *u8) -> libc::c_int;
unsafe fn rust_uv_accept(server: *libc::c_void, client: *libc::c_void)
-> libc::c_int;
unsafe fn rust_uv_write(req: *libc::c_void,
stream: *libc::c_void,
++buf_in: *uv_buf_t,
buf_cnt: libc::c_int,
cb: *u8)
-> libc::c_int;
unsafe fn rust_uv_read_start(stream: *libc::c_void,
on_alloc: *u8,
on_read: *u8)
-> libc::c_int;
unsafe fn rust_uv_read_stop(stream: *libc::c_void) -> libc::c_int;
unsafe fn rust_uv_timer_init(loop_handle: *libc::c_void,
timer_handle: *uv_timer_t)
-> libc::c_int;
unsafe fn rust_uv_timer_start(
timer_handle: *uv_timer_t,
cb: *u8,
timeout: libc::c_uint,
repeat: libc::c_uint) -> libc::c_int;
unsafe fn rust_uv_timer_stop(handle: *uv_timer_t) -> libc::c_int;
// data accessors/helpers for rust-mapped uv structs
unsafe fn rust_uv_helper_get_INADDR_NONE() -> u32;
unsafe fn rust_uv_is_ipv4_addrinfo(input: *addrinfo) -> bool;
unsafe fn rust_uv_is_ipv6_addrinfo(input: *addrinfo) -> bool;
unsafe fn rust_uv_get_next_addrinfo(input: *addrinfo) -> *addrinfo;
unsafe fn rust_uv_addrinfo_as_sockaddr_in(input: *addrinfo)
-> *sockaddr_in;
unsafe fn rust_uv_addrinfo_as_sockaddr_in6(input: *addrinfo)
-> *sockaddr_in6;
unsafe fn rust_uv_malloc_buf_base_of(sug_size: libc::size_t) -> *u8;
unsafe fn rust_uv_free_base_of_buf(++buf: uv_buf_t);
unsafe fn rust_uv_get_stream_handle_from_connect_req(
connect_req: *uv_connect_t)
-> *uv_stream_t;
unsafe fn rust_uv_get_stream_handle_from_write_req(
write_req: *uv_write_t)
-> *uv_stream_t;
unsafe fn rust_uv_get_loop_for_uv_handle(handle: *libc::c_void)
-> *libc::c_void;
unsafe fn rust_uv_get_data_for_uv_loop(loop_ptr: *libc::c_void)
-> *libc::c_void;
unsafe fn rust_uv_set_data_for_uv_loop(loop_ptr: *libc::c_void,
data: *libc::c_void);
unsafe fn rust_uv_get_data_for_uv_handle(handle: *libc::c_void)
-> *libc::c_void;
unsafe fn rust_uv_set_data_for_uv_handle(handle: *libc::c_void,
data: *libc::c_void);
unsafe fn rust_uv_get_data_for_req(req: *libc::c_void)
-> *libc::c_void;
unsafe fn rust_uv_set_data_for_req(req: *libc::c_void,
unsafe fn rust_uv_getaddrinfo(loop_ptr: *libc::c_void,
handle: *uv_getaddrinfo_t,
cb: *u8,
node_name_ptr: *u8,
service_name_ptr: *u8,
// should probably only pass ptr::null()
hints: *addrinfo)
-> libc::c_int;
unsafe fn rust_uv_freeaddrinfo(res: *addrinfo);
// data accessors/helpers for rust-mapped uv structs
unsafe fn rust_uv_helper_get_INADDR_NONE() -> u32;
unsafe fn rust_uv_is_ipv4_addrinfo(input: *addrinfo) -> bool;
unsafe fn rust_uv_is_ipv6_addrinfo(input: *addrinfo) -> bool;
unsafe fn rust_uv_get_next_addrinfo(input: *addrinfo) -> *addrinfo;
unsafe fn rust_uv_addrinfo_as_sockaddr_in(input: *addrinfo)
-> *sockaddr_in;
unsafe fn rust_uv_addrinfo_as_sockaddr_in6(input: *addrinfo)
-> *sockaddr_in6;
unsafe fn rust_uv_malloc_buf_base_of(sug_size: libc::size_t) -> *u8;
unsafe fn rust_uv_free_base_of_buf(++buf: uv_buf_t);
unsafe fn rust_uv_get_stream_handle_from_connect_req(
connect_req: *uv_connect_t)
-> *uv_stream_t;
unsafe fn rust_uv_get_stream_handle_from_write_req(
write_req: *uv_write_t)
-> *uv_stream_t;
unsafe fn rust_uv_get_loop_for_uv_handle(handle: *libc::c_void)
-> *libc::c_void;
unsafe fn rust_uv_get_data_for_uv_loop(loop_ptr: *libc::c_void)
-> *libc::c_void;
unsafe fn rust_uv_set_data_for_uv_loop(loop_ptr: *libc::c_void,
data: *libc::c_void);
unsafe fn rust_uv_get_base_from_buf(++buf: uv_buf_t) -> *u8;
unsafe fn rust_uv_get_len_from_buf(++buf: uv_buf_t) -> libc::size_t;
unsafe fn rust_uv_get_data_for_uv_handle(handle: *libc::c_void)
-> *libc::c_void;
unsafe fn rust_uv_set_data_for_uv_handle(handle: *libc::c_void,
data: *libc::c_void);
unsafe fn rust_uv_get_data_for_req(req: *libc::c_void)
-> *libc::c_void;
unsafe fn rust_uv_set_data_for_req(req: *libc::c_void,
data: *libc::c_void);
unsafe fn rust_uv_get_base_from_buf(++buf: uv_buf_t) -> *u8;
unsafe fn rust_uv_get_len_from_buf(++buf: uv_buf_t) -> libc::size_t;
// sizeof testing helpers
unsafe fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_connect_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_buf_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_write_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_err_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_sockaddr_in_size() -> libc::c_uint;
unsafe fn rust_uv_helper_sockaddr_in6_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_async_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_timer_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_getaddrinfo_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_addrinfo_size() -> libc::c_uint;
unsafe fn rust_uv_helper_addr_in_size() -> libc::c_uint;
}
// sizeof testing helpers
unsafe fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_connect_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_buf_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_write_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_err_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_sockaddr_in_size() -> libc::c_uint;
unsafe fn rust_uv_helper_sockaddr_in6_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_async_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_timer_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_uv_getaddrinfo_t_size() -> libc::c_uint;
unsafe fn rust_uv_helper_addrinfo_size() -> libc::c_uint;
unsafe fn rust_uv_helper_addr_in_size() -> libc::c_uint;
}
pub unsafe fn loop_new() -> *libc::c_void {
@ -897,6 +899,27 @@ pub unsafe fn walk(loop_handle: *libc::c_void, cb: *u8, arg: *libc::c_void) {
rustrt::rust_uv_walk(loop_handle, cb, arg);
}
pub unsafe fn idle_new() -> *uv_idle_t {
rustrt::rust_uv_idle_new()
}
pub unsafe fn idle_delete(handle: *uv_idle_t) {
rustrt::rust_uv_idle_delete(handle)
}
pub unsafe fn idle_init(loop_handle: *uv_loop_t,
handle: *uv_idle_t) -> libc::c_int {
rustrt::rust_uv_idle_init(loop_handle, handle)
}
pub unsafe fn idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> libc::c_int {
rustrt::rust_uv_idle_start(handle, cb)
}
pub unsafe fn idle_stop(handle: *uv_idle_t) -> libc::c_int {
rustrt::rust_uv_idle_stop(handle)
}
pub unsafe fn tcp_init(loop_handle: *libc::c_void, handle: *uv_tcp_t)
-> libc::c_int {
return rustrt::rust_uv_tcp_init(loop_handle, handle);
@ -1215,19 +1238,11 @@ pub unsafe fn addrinfo_as_sockaddr_in6(input: *addrinfo) -> *sockaddr_in6 {
rustrt::rust_uv_addrinfo_as_sockaddr_in6(input)
}
//#[cfg(test)]
#[cfg(test)]
pub mod test {
use core::prelude::*;
use uv_ll::*;
use core::comm::{SharedChan, stream};
use core::libc;
use core::ptr;
use core::str;
use core::sys;
use core::task;
use core::vec;
use prelude::*;
use super::*;
use comm::{SharedChan, stream, GenericChan, GenericPort};
enum tcp_read_data {
tcp_read_eof,
@ -1473,7 +1488,7 @@ pub mod test {
let client_data = get_data_for_uv_handle(
client_stream_ptr as *libc::c_void) as *tcp_server_data;
let server_kill_msg = (*client_data).server_kill_msg;
let server_kill_msg = copy (*client_data).server_kill_msg;
let write_req = (*client_data).server_write_req;
if str::contains(request_str, server_kill_msg) {
log(debug, ~"SERVER: client req contains kill_msg!");
@ -1606,8 +1621,8 @@ pub mod test {
fn impl_uv_tcp_server(server_ip: &str,
server_port: int,
+kill_server_msg: ~str,
+server_resp_msg: ~str,
kill_server_msg: ~str,
server_resp_msg: ~str,
server_chan: SharedChan<~str>,
continue_chan: SharedChan<bool>) {
unsafe {
@ -1725,10 +1740,12 @@ pub mod test {
let (continue_port, continue_chan) = stream::<bool>();
let continue_chan = SharedChan(continue_chan);
let kill_server_msg_copy = copy kill_server_msg;
let server_resp_msg_copy = copy server_resp_msg;
do task::spawn_sched(task::ManualThreads(1)) {
impl_uv_tcp_server(bind_ip, port,
kill_server_msg,
server_resp_msg,
copy kill_server_msg_copy,
copy server_resp_msg_copy,
server_chan.clone(),
continue_chan.clone());
};
@ -1738,9 +1755,10 @@ pub mod test {
continue_port.recv();
log(debug, ~"received on continue port, set up tcp client");
let kill_server_msg_copy = copy kill_server_msg;
do task::spawn_sched(task::ManualThreads(1u)) {
impl_uv_tcp_request(request_ip, port,
kill_server_msg,
kill_server_msg_copy,
client_chan.clone());
};
@ -1760,11 +1778,10 @@ pub mod test {
pub mod tcp_and_server_client_test {
#[cfg(target_arch="x86_64")]
pub mod impl64 {
use uv_ll::test::*;
#[test]
pub fn test_uv_ll_tcp_server_and_request() {
unsafe {
impl_uv_tcp_server_and_request();
super::super::impl_uv_tcp_server_and_request();
}
}
}
@ -1772,12 +1789,11 @@ pub mod test {
#[cfg(target_arch="arm")]
#[cfg(target_arch="mips")]
pub mod impl32 {
use uv_ll::test::*;
#[test]
#[ignore(cfg(target_os = "linux"))]
pub fn test_uv_ll_tcp_server_and_request() {
unsafe {
impl_uv_tcp_server_and_request();
super::super::impl_uv_tcp_server_and_request();
}
}
}
@ -1804,7 +1820,7 @@ pub mod test {
unsafe {
struct_size_check_common::<uv_tcp_t>(
~"uv_tcp_t",
::uv_ll::rustrt::rust_uv_helper_uv_tcp_t_size()
super::rustrt::rust_uv_helper_uv_tcp_t_size()
);
}
}
@ -1813,7 +1829,7 @@ pub mod test {
unsafe {
struct_size_check_common::<uv_connect_t>(
~"uv_connect_t",
::uv_ll::rustrt::rust_uv_helper_uv_connect_t_size()
super::rustrt::rust_uv_helper_uv_connect_t_size()
);
}
}
@ -1822,7 +1838,7 @@ pub mod test {
unsafe {
struct_size_check_common::<uv_buf_t>(
~"uv_buf_t",
::uv_ll::rustrt::rust_uv_helper_uv_buf_t_size()
super::rustrt::rust_uv_helper_uv_buf_t_size()
);
}
}
@ -1831,7 +1847,7 @@ pub mod test {
unsafe {
struct_size_check_common::<uv_write_t>(
~"uv_write_t",
::uv_ll::rustrt::rust_uv_helper_uv_write_t_size()
super::rustrt::rust_uv_helper_uv_write_t_size()
);
}
}
@ -1841,7 +1857,7 @@ pub mod test {
unsafe {
struct_size_check_common::<sockaddr_in>(
~"sockaddr_in",
::uv_ll::rustrt::rust_uv_helper_sockaddr_in_size()
super::rustrt::rust_uv_helper_sockaddr_in_size()
);
}
}
@ -1849,7 +1865,7 @@ pub mod test {
fn test_uv_ll_struct_size_sockaddr_in6() {
unsafe {
let foreign_handle_size =
::uv_ll::rustrt::rust_uv_helper_sockaddr_in6_size();
super::rustrt::rust_uv_helper_sockaddr_in6_size();
let rust_handle_size = sys::size_of::<sockaddr_in6>();
let output = fmt!("sockaddr_in6 -- foreign: %u rust: %u",
foreign_handle_size as uint, rust_handle_size);
@ -1868,7 +1884,7 @@ pub mod test {
fn test_uv_ll_struct_size_addr_in() {
unsafe {
let foreign_handle_size =
::uv_ll::rustrt::rust_uv_helper_addr_in_size();
super::rustrt::rust_uv_helper_addr_in_size();
let rust_handle_size = sys::size_of::<addr_in>();
let output = fmt!("addr_in -- foreign: %u rust: %u",
foreign_handle_size as uint, rust_handle_size);
@ -1884,7 +1900,7 @@ pub mod test {
unsafe {
struct_size_check_common::<uv_async_t>(
~"uv_async_t",
::uv_ll::rustrt::rust_uv_helper_uv_async_t_size()
super::rustrt::rust_uv_helper_uv_async_t_size()
);
}
}
@ -1894,7 +1910,7 @@ pub mod test {
unsafe {
struct_size_check_common::<uv_timer_t>(
~"uv_timer_t",
::uv_ll::rustrt::rust_uv_helper_uv_timer_t_size()
super::rustrt::rust_uv_helper_uv_timer_t_size()
);
}
}
@ -1905,7 +1921,7 @@ pub mod test {
unsafe {
struct_size_check_common::<uv_getaddrinfo_t>(
~"uv_getaddrinfo_t",
::uv_ll::rustrt::rust_uv_helper_uv_getaddrinfo_t_size()
super::rustrt::rust_uv_helper_uv_getaddrinfo_t_size()
);
}
}
@ -1916,7 +1932,7 @@ pub mod test {
unsafe {
struct_size_check_common::<uv_timer_t>(
~"addrinfo",
::uv_ll::rustrt::rust_uv_helper_uv_timer_t_size()
super::rustrt::rust_uv_helper_uv_timer_t_size()
);
}
}

View file

@ -21,20 +21,20 @@ use core::vec;
use iotask = uv::iotask::IoTask;
use interact = uv::iotask::interact;
use sockaddr_in = uv::ll::sockaddr_in;
use sockaddr_in6 = uv::ll::sockaddr_in6;
use addrinfo = uv::ll::addrinfo;
use uv_getaddrinfo_t = uv::ll::uv_getaddrinfo_t;
use uv_ip4_name = uv::ll::ip4_name;
use uv_ip4_port = uv::ll::ip4_port;
use uv_ip6_name = uv::ll::ip6_name;
use uv_ip6_port = uv::ll::ip6_port;
use uv_getaddrinfo = uv::ll::getaddrinfo;
use uv_freeaddrinfo = uv::ll::freeaddrinfo;
use create_uv_getaddrinfo_t = uv::ll::getaddrinfo_t;
use set_data_for_req = uv::ll::set_data_for_req;
use get_data_for_req = uv::ll::get_data_for_req;
use ll = uv::ll;
use sockaddr_in = core::unstable::uvll::sockaddr_in;
use sockaddr_in6 = core::unstable::uvll::sockaddr_in6;
use addrinfo = core::unstable::uvll::addrinfo;
use uv_getaddrinfo_t = core::unstable::uvll::uv_getaddrinfo_t;
use uv_ip4_name = core::unstable::uvll::ip4_name;
use uv_ip4_port = core::unstable::uvll::ip4_port;
use uv_ip6_name = core::unstable::uvll::ip6_name;
use uv_ip6_port = core::unstable::uvll::ip6_port;
use uv_getaddrinfo = core::unstable::uvll::getaddrinfo;
use uv_freeaddrinfo = core::unstable::uvll::freeaddrinfo;
use create_uv_getaddrinfo_t = core::unstable::uvll::getaddrinfo_t;
use set_data_for_req = core::unstable::uvll::set_data_for_req;
use get_data_for_req = core::unstable::uvll::get_data_for_req;
use ll = core::unstable::uvll;
/// An IP address
pub enum IpAddr {

View file

@ -36,6 +36,8 @@ not required in or otherwise suitable for the core library.
extern mod core(vers = "0.6");
use core::*;
pub use uv_ll = core::unstable::uvll;
// General io and system-services modules
pub mod net;
@ -45,7 +47,6 @@ pub mod net_url;
// libuv modules
pub mod uv;
pub mod uv_ll;
pub mod uv_iotask;
pub mod uv_global_loop;

View file

@ -33,6 +33,6 @@
* facilities.
*/
pub use ll = uv_ll;
pub use ll = core::unstable::uvll;
pub use iotask = uv_iotask;
pub use global_loop = uv_global_loop;

@ -1 +1 @@
Subproject commit 218ab86721eefd7b7e97fa6d9f95a80a1fa8686c
Subproject commit 576ab1db8ea03889eb7b2274654afe7c5c867230

View file

@ -15,9 +15,15 @@ getcontext.
The registers_t variable is in (%esp)
*/
#if defined(__APPLE__) || defined(_WIN32)
#define SWAP_REGISTERS _swap_registers
#else
#define SWAP_REGISTERS swap_registers
#endif
// swap_registers(registers_t *oregs, registers_t *regs)
.globl swap_registers
swap_registers:
.globl SWAP_REGISTERS
SWAP_REGISTERS:
// save the old context
movl 4(%esp), %eax
movl %ebx, 4(%eax)

View file

@ -13,8 +13,7 @@
#include "../../rust_globals.h"
extern "C" uint32_t CDECL swap_registers(registers_t *oregs,
registers_t *regs)
asm ("swap_registers");
registers_t *regs);
context::context()
{

View file

@ -49,9 +49,15 @@ First four arguments:
anyhow.
*/
#if defined(__APPLE__) || defined(_WIN32)
#define SWAP_REGISTERS _swap_registers
#else
#define SWAP_REGISTERS swap_registers
#endif
// swap_registers(registers_t *oregs, registers_t *regs)
.globl swap_registers
swap_registers:
.globl SWAP_REGISTERS
SWAP_REGISTERS:
// n.b. when we enter, the return address is at the top of
// the stack (i.e., 0(%RSP)) and the argument is in
// RUSTRT_ARG0_S. We

View file

@ -13,8 +13,7 @@
#include "../../rust_globals.h"
extern "C" void CDECL swap_registers(registers_t *oregs,
registers_t *regs)
asm ("swap_registers");
registers_t *regs);
context::context()
{

View file

@ -21,6 +21,17 @@
void* global_crate_map = NULL;
#ifndef _WIN32
pthread_key_t sched_key;
#else
DWORD sched_key;
#endif
extern "C" void*
rust_get_sched_tls_key() {
return &sched_key;
}
/**
The runtime entrypoint. The (C ABI) main function generated by rustc calls
`rust_start`, providing the address of the Rust ABI main function, the
@ -30,6 +41,10 @@ void* global_crate_map = NULL;
extern "C" CDECL int
rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
#ifndef _WIN32
pthread_key_create(&sched_key, NULL);
#endif
// Load runtime configuration options from the environment.
// FIXME #1497: Should provide a way to get these from the command
// line as well.

View file

@ -769,20 +769,20 @@ extern "C" CDECL void record_sp_limit(void *limit);
class raw_thread: public rust_thread {
public:
fn_env_pair *fn;
fn_env_pair fn;
raw_thread(fn_env_pair *fn) : fn(fn) { }
raw_thread(fn_env_pair fn) : fn(fn) { }
virtual void run() {
record_sp_limit(0);
fn->f(NULL, fn->env, NULL);
fn.f(NULL, fn.env, NULL);
}
};
extern "C" raw_thread*
rust_raw_thread_start(fn_env_pair *fn) {
assert(fn);
raw_thread *thread = new raw_thread(fn);
raw_thread *thread = new raw_thread(*fn);
thread->start();
return thread;
}

View file

@ -376,16 +376,7 @@ current_kernel_malloc_alloc_cb(uv_handle_t* handle,
extern "C" void
rust_uv_buf_init(uv_buf_t* out_buf, char* base, size_t len) {
rust_task* task = rust_get_current_task();
LOG(task, stdlib,"rust_uv_buf_init: base: %lu" \
"len: %lu",
(unsigned long int)base,
(unsigned long int)len);
*out_buf = uv_buf_init(base, len);
LOG(task, stdlib, "rust_uv_buf_init: after: "
"result->base: %" PRIxPTR " len: %" PRIxPTR,
(unsigned long int)(*out_buf).base,
(unsigned long int)(*out_buf).len);
}
extern "C" uv_loop_t*
@ -481,18 +472,11 @@ rust_uv_free_base_of_buf(uv_buf_t buf) {
extern "C" struct sockaddr_in
rust_uv_ip4_addr(const char* ip, int port) {
rust_task* task = rust_get_current_task();
LOG(task, stdlib, "before creating addr_ptr.. ip %s" \
" port %d\n", ip, port);
struct sockaddr_in addr = uv_ip4_addr(ip, port);
LOG(task, stdlib, "after creating .. port: %d", addr.sin_port);
return addr;
}
extern "C" struct sockaddr_in6
rust_uv_ip6_addr(const char* ip, int port) {
rust_task* task = rust_get_current_task();
LOG(task, stdlib, "before creating addr_ptr.. ip %s" \
" port %d\n", ip, port);
return uv_ip6_addr(ip, port);
}
extern "C" int
@ -554,3 +538,28 @@ extern "C" sockaddr_in6*
rust_uv_addrinfo_as_sockaddr_in6(addrinfo* input) {
return (sockaddr_in6*)input->ai_addr;
}
extern "C" uv_idle_t*
rust_uv_idle_new() {
return new uv_idle_t;
}
extern "C" void
rust_uv_idle_delete(uv_idle_t* handle) {
delete handle;
}
extern "C" int
rust_uv_idle_init(uv_loop_t* loop, uv_idle_t* idle) {
return uv_idle_init(loop, idle);
}
extern "C" int
rust_uv_idle_start(uv_idle_t* idle, uv_idle_cb cb) {
return uv_idle_start(idle, cb);
}
extern "C" int
rust_uv_idle_stop(uv_idle_t* idle) {
return uv_idle_stop(idle);
}

View file

@ -140,6 +140,11 @@ rust_uv_current_kernel_malloc
rust_uv_current_kernel_free
rust_uv_getaddrinfo
rust_uv_freeaddrinfo
rust_uv_idle_new
rust_uv_idle_delete
rust_uv_idle_init
rust_uv_idle_start
rust_uv_idle_stop
rust_dbg_lock_create
rust_dbg_lock_destroy
rust_dbg_lock_lock
@ -187,3 +192,5 @@ rust_get_global_data_ptr
rust_inc_kernel_live_count
rust_dec_kernel_live_count
rust_get_exchange_count_ptr
rust_get_sched_tls_key
swap_registers