Merge remote-tracking branch 'brson/io-upstream' into incoming

Conflicts:
	src/libcore/logging.rs
	src/libcore/rt/local_services.rs
	src/libcore/rt/uv/mod.rs
	src/libcore/rt/uv/net.rs
	src/libcore/rt/uv/uvio.rs
	src/libcore/unstable.rs
This commit is contained in:
Brian Anderson 2013-05-14 15:30:01 -07:00
commit b04fce6a90
36 changed files with 1574 additions and 682 deletions

View file

@ -205,8 +205,11 @@ mod unicode;
#[path = "num/cmath.rs"]
mod cmath;
mod stackwalk;
// XXX: This shouldn't be pub, and it should be reexported under 'unstable'
// but name resolution doesn't work without it being pub.
#[path = "rt/mod.rs"]
mod rt;
pub mod rt;
// A curious inner-module that's not exported that contains the binding
// 'core' so that macro-expanded references to core::error and such

View file

@ -10,17 +10,16 @@
//! Logging
pub mod rustrt {
use libc;
pub extern {
unsafe fn rust_log_console_on();
unsafe fn rust_log_console_off();
unsafe fn rust_log_str(level: u32,
string: *libc::c_char,
size: libc::size_t);
}
}
use option::*;
use either::*;
use rt;
use rt::logging::{Logger, StdErrLogger};
use io;
use libc;
use repr;
use vec;
use cast;
use str;
/// Turns on logging to stdout globally
pub fn console_on() {
@ -55,8 +54,46 @@ pub fn log_type<T>(level: u32, object: &T) {
let bytes = do io::with_bytes_writer |writer| {
repr::write_repr(writer, object);
};
unsafe {
let len = bytes.len() as libc::size_t;
rustrt::rust_log_str(level, transmute(vec::raw::to_ptr(bytes)), len);
match rt::context() {
rt::OldTaskContext => {
unsafe {
let len = bytes.len() as libc::size_t;
rustrt::rust_log_str(level, cast::transmute(vec::raw::to_ptr(bytes)), len);
}
}
_ => {
// XXX: Bad allocation
let msg = str::from_bytes(bytes);
newsched_log_str(msg);
}
}
}
fn newsched_log_str(msg: ~str) {
unsafe {
match rt::local_services::unsafe_try_borrow_local_services() {
Some(local) => {
// Use the available logger
(*local).logger.log(Left(msg));
}
None => {
// There is no logger anywhere, just write to stderr
let mut logger = StdErrLogger;
logger.log(Left(msg));
}
}
}
}
pub mod rustrt {
use libc;
pub extern {
unsafe fn rust_log_console_on();
unsafe fn rust_log_console_off();
unsafe fn rust_log_str(level: u32,
string: *libc::c_char,
size: libc::size_t);
}
}

View file

@ -30,10 +30,24 @@ macro_rules! rtdebug (
($( $arg:expr),+) => ( $(let _ = $arg)*; )
)
macro_rules! rtassert (
( $arg:expr ) => ( {
if !$arg {
abort!("assertion failed: %s", stringify!($arg));
}
} )
)
macro_rules! abort(
($( $msg:expr),+) => ( {
rtdebug!($($msg),+);
unsafe { ::libc::abort(); }
do_abort();
// NB: This is in a fn to avoid putting the `unsafe` block in a macro,
// which causes spurious 'unnecessary unsafe block' warnings.
fn do_abort() -> ! {
unsafe { ::libc::abort(); }
}
} )
)

View file

@ -722,7 +722,7 @@ pub fn list_dir(p: &Path) -> ~[~str] {
use os::win32::{
as_utf16_p
};
use unstable::exchange_alloc::{malloc_raw, free_raw};
use rt::global_heap::{malloc_raw, free_raw};
#[nolink]
extern {
unsafe fn rust_list_dir_wfd_size() -> libc::size_t;

View file

@ -111,9 +111,9 @@ fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp:
let sp = align_down(sp);
let sp = mut_offset(sp, -4);
unsafe { *sp = arg as uint; }
unsafe { *sp = arg as uint };
let sp = mut_offset(sp, -1);
unsafe { *sp = 0; } // The final return address
unsafe { *sp = 0 }; // The final return address
regs.esp = sp as u32;
regs.eip = fptr as u32;
@ -195,7 +195,7 @@ fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp:
fn align_down(sp: *mut uint) -> *mut uint {
unsafe {
let sp = transmute::<*mut uint, uint>(sp);
let sp: uint = transmute(sp);
let sp = sp & !(16 - 1);
transmute::<uint, *mut uint>(sp)
}

View file

@ -10,7 +10,7 @@
use prelude::*;
use super::support::PathLike;
use super::{Reader, Writer, Seek, Close};
use super::{Reader, Writer, Seek};
use super::SeekStyle;
/// # XXX
@ -69,10 +69,6 @@ impl Seek for FileStream {
fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }
}
impl Close for FileStream {
fn close(&mut self) { fail!() }
}
#[test]
#[ignore]
fn super_simple_smoke_test_lets_go_read_some_files_and_have_a_good_time() {

View file

@ -238,6 +238,7 @@ Out of scope
* How does I/O relate to the Iterator trait?
* std::base64 filters
* Using conditions is a big unknown since we don't have much experience with them
* Too many uses of OtherIoError
*/
@ -252,7 +253,9 @@ pub use self::stdio::println;
pub use self::file::FileStream;
pub use self::net::ip::IpAddr;
#[cfg(not(stage0))]
pub use self::net::tcp::TcpListener;
#[cfg(not(stage0))]
pub use self::net::tcp::TcpStream;
pub use self::net::udp::UdpStream;
@ -266,6 +269,7 @@ pub mod file;
/// Synchronous, non-blocking network I/O.
pub mod net {
#[cfg(not(stage0))]
pub mod tcp;
pub mod udp;
pub mod ip;
@ -326,12 +330,14 @@ pub struct IoError {
#[deriving(Eq)]
pub enum IoErrorKind {
PreviousIoError,
OtherIoError,
EndOfFile,
FileNotFound,
FilePermission,
PermissionDenied,
ConnectionFailed,
Closed,
OtherIoError,
PreviousIoError
ConnectionRefused,
}
// XXX: Can't put doc comments on macros
@ -383,16 +389,7 @@ pub trait Writer {
fn flush(&mut self);
}
/// I/O types that may be closed
///
/// Any further operations performed on a closed resource will raise
/// on `io_error`
pub trait Close {
/// Close the I/O resource
fn close(&mut self);
}
pub trait Stream: Reader + Writer + Close { }
pub trait Stream: Reader + Writer { }
pub enum SeekStyle {
/// Seek from the beginning of the stream

View file

@ -40,10 +40,6 @@ impl Writer for FileDesc {
fn flush(&mut self) { fail!() }
}
impl Close for FileDesc {
fn close(&mut self) { fail!() }
}
impl Seek for FileDesc {
fn tell(&self) -> u64 { fail!() }
@ -72,10 +68,6 @@ impl Writer for CFile {
fn flush(&mut self) { fail!() }
}
impl Close for CFile {
fn close(&mut self) { fail!() }
}
impl Seek for CFile {
fn tell(&self) -> u64 { fail!() }
fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }

View file

@ -8,67 +8,273 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use super::super::*;
use super::ip::IpAddr;
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::sched::local_sched::unsafe_borrow_io;
use rt::io::net::ip::IpAddr;
use rt::io::{Reader, Writer, Listener};
use rt::io::io_error;
use rt::rtio::{IoFactory,
RtioTcpListener, RtioTcpListenerObject,
RtioTcpStream, RtioTcpStreamObject};
pub struct TcpStream;
pub struct TcpStream {
rtstream: ~RtioTcpStreamObject
}
impl TcpStream {
pub fn connect(_addr: IpAddr) -> Option<TcpStream> {
fail!()
fn new(s: ~RtioTcpStreamObject) -> TcpStream {
TcpStream {
rtstream: s
}
}
pub fn connect(addr: IpAddr) -> Option<TcpStream> {
let stream = unsafe {
rtdebug!("borrowing io to connect");
let io = unsafe_borrow_io();
rtdebug!("about to connect");
(*io).tcp_connect(addr)
};
match stream {
Ok(s) => {
Some(TcpStream::new(s))
}
Err(ioerr) => {
rtdebug!("failed to connect: %?", ioerr);
io_error::cond.raise(ioerr);
return None;
}
}
}
}
impl Reader for TcpStream {
fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
let bytes_read = self.rtstream.read(buf);
match bytes_read {
Ok(read) => Some(read),
Err(_) => {
abort!("XXX");
}
}
}
fn eof(&mut self) -> bool { fail!() }
}
impl Writer for TcpStream {
fn write(&mut self, _buf: &[u8]) { fail!() }
fn write(&mut self, buf: &[u8]) {
let res = self.rtstream.write(buf);
match res {
Ok(_) => (),
Err(_) => {
abort!("XXX");
}
}
}
fn flush(&mut self) { fail!() }
}
impl Close for TcpStream {
fn close(&mut self) { fail!() }
pub struct TcpListener {
rtlistener: ~RtioTcpListenerObject,
}
pub struct TcpListener;
impl TcpListener {
pub fn bind(_addr: IpAddr) -> Option<TcpListener> {
fail!()
pub fn bind(addr: IpAddr) -> Option<TcpListener> {
let listener = unsafe { (*unsafe_borrow_io()).tcp_bind(addr) };
match listener {
Ok(l) => {
Some(TcpListener {
rtlistener: l
})
}
Err(ioerr) => {
io_error::cond.raise(ioerr);
return None;
}
}
}
}
impl Listener<TcpStream> for TcpListener {
fn accept(&mut self) -> Option<TcpStream> { fail!() }
fn accept(&mut self) -> Option<TcpStream> {
let rtstream = self.rtlistener.accept();
match rtstream {
Ok(s) => {
Some(TcpStream::new(s))
}
Err(_) => {
abort!("XXX");
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use int;
use cell::Cell;
use rt::test::*;
use rt::io::net::ip::Ipv4;
use rt::io::*;
#[test] #[ignore]
fn bind_error() {
do run_in_newsched_task {
let mut called = false;
do io_error::cond.trap(|e| {
assert!(e.kind == PermissionDenied);
called = true;
}).in {
let addr = Ipv4(0, 0, 0, 0, 1);
let listener = TcpListener::bind(addr);
assert!(listener.is_none());
}
assert!(called);
}
}
#[test]
fn connect_error() {
do run_in_newsched_task {
let mut called = false;
do io_error::cond.trap(|e| {
assert!(e.kind == ConnectionRefused);
called = true;
}).in {
let addr = Ipv4(0, 0, 0, 0, 1);
let stream = TcpStream::connect(addr);
assert!(stream.is_none());
}
assert!(called);
}
}
#[test]
fn smoke_test() {
/*do run_in_newsched_task {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawn_immediately {
let listener = TcpListener::bind(addr);
do listener.accept() {
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
}
do spawntask_immediately {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}
}
#[test]
fn multiple_connect_serial() {
do run_in_newsched_task {
let addr = next_test_ip4();
let max = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for max.times {
let mut stream = listener.accept();
let mut buf = [0];
listener.read(buf);
stream.read(buf);
assert!(buf[0] == 99);
}
}
do spawn_immediately {
let stream = TcpStream::connect(addr);
stream.write([99]);
do spawntask_immediately {
for max.times {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}
}*/
}
}
#[test]
fn multiple_connect_interleaved_greedy_schedule() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: int = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |i| {
let stream = Cell(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_immediately {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == i as u8);
rtdebug!("read");
}
}
}
connect(0, addr);
fn connect(i: int, addr: IpAddr) {
if i == MAX { return }
do spawntask_immediately {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
connect(i + 1, addr);
rtdebug!("writing");
stream.write([i as u8]);
}
}
}
}
#[test]
fn multiple_connect_interleaved_lazy_schedule() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: int = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |_| {
let stream = Cell(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_later {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
rtdebug!("read");
}
}
}
connect(0, addr);
fn connect(i: int, addr: IpAddr) {
if i == MAX { return }
do spawntask_later {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
connect(i + 1, addr);
rtdebug!("writing");
stream.write([99]);
}
}
}
}
}

View file

@ -32,10 +32,6 @@ impl Writer for UdpStream {
fn flush(&mut self) { fail!() }
}
impl Close for UdpStream {
fn close(&mut self) { fail!() }
}
pub struct UdpListener;
impl UdpListener {

View file

@ -32,10 +32,6 @@ impl Writer for UnixStream {
fn flush(&mut self) { fail!() }
}
impl Close for UnixStream {
fn close(&mut self) { fail!() }
}
pub struct UnixListener;
impl UnixListener {

View file

@ -9,7 +9,7 @@
// except according to those terms.
use prelude::*;
use super::{Reader, Writer, Close};
use super::{Reader, Writer};
pub fn stdin() -> StdReader { fail!() }
@ -39,10 +39,6 @@ impl Reader for StdReader {
fn eof(&mut self) -> bool { fail!() }
}
impl Close for StdReader {
fn close(&mut self) { fail!() }
}
pub struct StdWriter;
impl StdWriter {
@ -55,6 +51,3 @@ impl Writer for StdWriter {
fn flush(&mut self) { fail!() }
}
impl Close for StdWriter {
fn close(&mut self) { fail!() }
}

View file

@ -13,18 +13,21 @@
use prelude::*;
use ptr::mut_null;
use libc::c_void;
use cast::transmute;
use cast;
use cell::Cell;
use super::Scheduler;
use super::super::rtio::IoFactoryObject;
use tls = super::super::thread_local_storage;
#[cfg(test)] use super::super::uvio::UvEventLoop;
use rt::sched::Scheduler;
use rt::rtio::{EventLoop, IoFactoryObject};
use tls = rt::thread_local_storage;
use unstable::finally::Finally;
#[cfg(test)] use rt::uv::uvio::UvEventLoop;
/// Give the Scheduler to thread-local storage
pub fn put(sched: ~Scheduler) {
unsafe {
let key = tls_key();
let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched);
let void_sched: *mut c_void = cast::transmute(sched);
tls::set(key, void_sched);
}
}
@ -34,8 +37,8 @@ pub fn take() -> ~Scheduler {
unsafe {
let key = tls_key();
let void_sched: *mut c_void = tls::get(key);
assert!(void_sched.is_not_null());
let sched = transmute::<*mut c_void, ~Scheduler>(void_sched);
rtassert!(void_sched.is_not_null());
let sched: ~Scheduler = cast::transmute(void_sched);
tls::set(key, mut_null());
return sched;
}
@ -55,8 +58,18 @@ pub fn exists() -> bool {
/// While the scheduler is borrowed it is not available in TLS.
pub fn borrow(f: &fn(&mut Scheduler)) {
let mut sched = take();
f(sched);
put(sched);
// XXX: Need a different abstraction from 'finally' here to avoid unsafety
unsafe {
let unsafe_sched = cast::transmute_mut_region(&mut *sched);
let sched = Cell(sched);
do (|| {
f(unsafe_sched);
}).finally {
put(sched.take());
}
}
}
/// Borrow a mutable reference to the thread-local Scheduler
@ -65,23 +78,22 @@ pub fn borrow(f: &fn(&mut Scheduler)) {
///
/// Because this leaves the Scheduler in thread-local storage it is possible
/// For the Scheduler pointer to be aliased
pub unsafe fn unsafe_borrow() -> &mut Scheduler {
pub unsafe fn unsafe_borrow() -> *mut Scheduler {
let key = tls_key();
let mut void_sched: *mut c_void = tls::get(key);
assert!(void_sched.is_not_null());
rtassert!(void_sched.is_not_null());
{
let void_sched_ptr = &mut void_sched;
let sched: &mut ~Scheduler = {
transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
};
let sched: &mut Scheduler = &mut **sched;
let sched: *mut *mut c_void = &mut void_sched;
let sched: *mut ~Scheduler = sched as *mut ~Scheduler;
let sched: *mut Scheduler = &mut **sched;
return sched;
}
}
pub unsafe fn unsafe_borrow_io() -> &mut IoFactoryObject {
pub unsafe fn unsafe_borrow_io() -> *mut IoFactoryObject {
let sched = unsafe_borrow();
return sched.event_loop.io().unwrap();
let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
return io;
}
fn tls_key() -> tls::Key {
@ -91,7 +103,7 @@ fn tls_key() -> tls::Key {
fn maybe_tls_key() -> Option<tls::Key> {
unsafe {
let key: *mut c_void = rust_get_sched_tls_key();
let key: &mut tls::Key = transmute(key);
let key: &mut tls::Key = cast::transmute(key);
let key = *key;
// Check that the key has been initialized.
@ -105,7 +117,7 @@ fn maybe_tls_key() -> Option<tls::Key> {
// another thread. I think this is fine since the only action
// they could take if it was initialized would be to check the
// thread-local value and see that it's not set.
if key != 0 {
if key != -1 {
return Some(key);
} else {
return None;

View file

@ -23,19 +23,19 @@ use libc::{c_void, uintptr_t};
use cast::transmute;
use super::sched::local_sched;
use super::local_heap::LocalHeap;
use rt::logging::StdErrLogger;
pub struct LocalServices {
heap: LocalHeap,
gc: GarbageCollector,
storage: LocalStorage,
logger: Logger,
logger: StdErrLogger,
unwinder: Option<Unwinder>,
destroyed: bool
}
pub struct GarbageCollector;
pub struct LocalStorage(*c_void, Option<~fn(*c_void)>);
pub struct Logger;
pub struct Unwinder {
unwinding: bool,
@ -47,7 +47,7 @@ impl LocalServices {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: Logger,
logger: StdErrLogger,
unwinder: Some(Unwinder { unwinding: false }),
destroyed: false
}
@ -58,7 +58,7 @@ impl LocalServices {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: Logger,
logger: StdErrLogger,
unwinder: None,
destroyed: false
}
@ -169,19 +169,27 @@ pub fn borrow_local_services(f: &fn(&mut LocalServices)) {
}
}
pub unsafe fn unsafe_borrow_local_services() -> &mut LocalServices {
use cast::transmute_mut_region;
match local_sched::unsafe_borrow().current_task {
pub unsafe fn unsafe_borrow_local_services() -> *mut LocalServices {
match (*local_sched::unsafe_borrow()).current_task {
Some(~ref mut task) => {
transmute_mut_region(&mut task.local_services)
let s: *mut LocalServices = &mut task.local_services;
return s;
}
None => {
fail!("no local services for schedulers yet")
// Don't fail. Infinite recursion
abort!("no local services for schedulers yet")
}
}
}
pub unsafe fn unsafe_try_borrow_local_services() -> Option<*mut LocalServices> {
if local_sched::exists() {
Some(unsafe_borrow_local_services())
} else {
None
}
}
#[cfg(test)]
mod test {
use rt::test::*;
@ -229,4 +237,12 @@ mod test {
let _ = r.next();
}
}
#[test]
fn logging() {
do run_in_newsched_task() {
info!("here i am. logging in a newsched task");
}
}
}

38
src/libcore/rt/logging.rs Normal file
View file

@ -0,0 +1,38 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use either::*;
pub trait Logger {
fn log(&mut self, msg: Either<~str, &'static str>);
}
pub struct StdErrLogger;
impl Logger for StdErrLogger {
fn log(&mut self, msg: Either<~str, &'static str>) {
use io::{Writer, WriterUtil};
let s: &str = match msg {
Left(ref s) => {
let s: &str = *s;
s
}
Right(ref s) => {
let s: &str = *s;
s
}
};
let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
dbg.write_str(s);
dbg.write_str("\n");
dbg.flush();
}
}

View file

@ -8,40 +8,143 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*! The Rust runtime, including the scheduler and I/O interface */
/*! The Rust Runtime, including the task scheduler and I/O
The `rt` module provides the private runtime infrastructure necessary
to support core language features like the exchange and local heap,
the garbage collector, logging, local data and unwinding. It also
implements the default task scheduler and task model. Initialization
routines are provided for setting up runtime resources in common
configurations, including that used by `rustc` when generating
executables.
It is intended that the features provided by `rt` can be factored in a
way such that the core library can be built with different 'profiles'
for different use cases, e.g. excluding the task scheduler. A number
of runtime features though are critical to the functioning of the
language and an implementation must be provided regardless of the
execution environment.
Of foremost importance is the global exchange heap, in the module
`global_heap`. Very little practical Rust code can be written without
access to the global heap. Unlike most of `rt` the global heap is
truly a global resource and generally operates independently of the
rest of the runtime.
All other runtime features are 'local', either thread-local or
task-local. Those critical to the functioning of the language are
defined in the module `local_services`. Local services are those which
are expected to be available to Rust code generally but rely on
thread- or task-local state. These currently include the local heap,
the garbage collector, local storage, logging and the stack unwinder.
Local services are primarily implemented for tasks, but may also
be implemented for use outside of tasks.
The relationship between `rt` and the rest of the core library is
not entirely clear yet and some modules will be moving into or
out of `rt` as development proceeds.
Several modules in `core` are clients of `rt`:
* `core::task` - The user-facing interface to the Rust task model.
* `core::task::local_data` - The interface to local data.
* `core::gc` - The garbage collector.
* `core::unstable::lang` - Miscellaneous lang items, some of which rely on `core::rt`.
* `core::condition` - Uses local data.
* `core::cleanup` - Local heap destruction.
* `core::io` - In the future `core::io` will use an `rt` implementation.
* `core::logging`
* `core::pipes`
* `core::comm`
* `core::stackwalk`
*/
#[doc(hidden)];
use libc::c_char;
use ptr::Ptr;
#[path = "sched/mod.rs"]
/// The global (exchange) heap.
pub mod global_heap;
/// The Scheduler and Task types.
mod sched;
mod rtio;
pub mod uvll;
mod uvio;
#[path = "uv/mod.rs"]
mod uv;
/// Thread-local access to the current Scheduler.
pub mod local_sched;
/// Synchronous I/O.
#[path = "io/mod.rs"]
mod io;
// FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
pub mod thread_local_storage;
mod work_queue;
mod stack;
mod context;
mod thread;
pub mod env;
pub mod io;
/// Thread-local implementations of language-critical runtime features like @.
pub mod local_services;
/// The EventLoop and internal synchronous I/O interface.
mod rtio;
/// libuv and default rtio implementation.
#[path = "uv/mod.rs"]
pub mod uv;
// FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
/// Bindings to pthread/windows thread-local storage.
pub mod thread_local_storage;
/// A parallel work-stealing dequeue.
mod work_queue;
/// Stack segments and caching.
mod stack;
/// CPU context swapping.
mod context;
/// Bindings to system threading libraries.
mod thread;
/// The runtime configuration, read from environment variables
pub mod env;
/// The local, managed heap
mod local_heap;
/// The Logger trait and implementations
pub mod logging;
/// Tools for testing the runtime
#[cfg(test)]
pub mod test;
/// Reference counting
pub mod rc;
/// A simple single-threaded channel type for passing buffered data between
/// scheduler and task context
pub mod tube;
/// Set up a default runtime configuration, given compiler-supplied arguments.
///
/// This is invoked by the `start` _language item_ (unstable::lang) to
/// run a Rust executable.
///
/// # Arguments
///
/// * `main` - A C-abi function that takes no arguments and returns `c_void`.
/// It is a wrapper around the user-defined `main` function, and will be run
/// in a task.
/// * `argc` & `argv` - The argument vector. On Unix this information is used
/// by os::args.
/// * `crate_map` - Runtime information about the executing crate, mostly for logging
///
/// # Return value
///
/// The return value is used as the process return code. 0 on success, 101 on error.
pub fn start(main: *u8, _argc: int, _argv: **c_char, _crate_map: *u8) -> int {
use self::sched::{Scheduler, Task};
use self::uvio::UvEventLoop;
use self::uv::uvio::UvEventLoop;
use sys::Closure;
use ptr;
use cast;
@ -72,6 +175,8 @@ pub fn start(main: *u8, _argc: int, _argv: **c_char, _crate_map: *u8) -> int {
/// Possible contexts in which Rust code may be executing.
/// Different runtime services are available depending on context.
/// Mostly used for determining if we're using the new scheduler
/// or the old scheduler.
#[deriving(Eq)]
pub enum RuntimeContext {
// Only the exchange heap is available
@ -84,6 +189,7 @@ pub enum RuntimeContext {
OldTaskContext
}
/// Determine the current RuntimeContext
pub fn context() -> RuntimeContext {
use task::rt::rust_task;
@ -119,7 +225,7 @@ pub fn context() -> RuntimeContext {
fn test_context() {
use unstable::run_in_bare_thread;
use self::sched::{local_sched, Task};
use self::uvio::UvEventLoop;
use rt::uv::uvio::UvEventLoop;
use cell::Cell;
assert!(context() == OldTaskContext);

142
src/libcore/rt/rc.rs Normal file
View file

@ -0,0 +1,142 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! An owned, task-local, reference counted type
//!
//! # Safety note
//!
//! XXX There is currently no type-system mechanism for enforcing that
//! reference counted types are both allocated on the exchange heap
//! and also non-sendable
//!
//! This doesn't prevent borrowing multiple aliasable mutable pointers
use ops::Drop;
use clone::Clone;
use libc::c_void;
use cast;
pub struct RC<T> {
p: *c_void // ~(uint, T)
}
impl<T> RC<T> {
pub fn new(val: T) -> RC<T> {
unsafe {
let v = ~(1, val);
let p: *c_void = cast::transmute(v);
RC { p: p }
}
}
fn get_mut_state(&mut self) -> *mut (uint, T) {
unsafe {
let p: &mut ~(uint, T) = cast::transmute(&mut self.p);
let p: *mut (uint, T) = &mut **p;
return p;
}
}
fn get_state(&self) -> *(uint, T) {
unsafe {
let p: &~(uint, T) = cast::transmute(&self.p);
let p: *(uint, T) = &**p;
return p;
}
}
pub fn unsafe_borrow_mut(&mut self) -> *mut T {
unsafe {
match *self.get_mut_state() {
(_, ref mut p) => {
let p: *mut T = p;
return p;
}
}
}
}
pub fn refcount(&self) -> uint {
unsafe {
match *self.get_state() {
(count, _) => count
}
}
}
}
#[unsafe_destructor]
impl<T> Drop for RC<T> {
fn finalize(&self) {
assert!(self.refcount() > 0);
unsafe {
// XXX: Mutable finalizer
let this: &mut RC<T> = cast::transmute_mut(self);
match *this.get_mut_state() {
(ref mut count, _) => {
*count = *count - 1
}
}
if this.refcount() == 0 {
let _: ~(uint, T) = cast::transmute(this.p);
}
}
}
}
impl<T> Clone for RC<T> {
fn clone(&self) -> RC<T> {
unsafe {
// XXX: Mutable clone
let this: &mut RC<T> = cast::transmute_mut(self);
match *this.get_mut_state() {
(ref mut count, _) => {
*count = *count + 1;
}
}
}
RC { p: self.p }
}
}
#[cfg(test)]
mod test {
use super::RC;
#[test]
fn smoke_test() {
unsafe {
let mut v1 = RC::new(100);
assert!(*v1.unsafe_borrow_mut() == 100);
assert!(v1.refcount() == 1);
let mut v2 = v1.clone();
assert!(*v2.unsafe_borrow_mut() == 100);
assert!(v2.refcount() == 2);
*v2.unsafe_borrow_mut() = 200;
assert!(*v2.unsafe_borrow_mut() == 200);
assert!(*v1.unsafe_borrow_mut() == 200);
let v3 = v2.clone();
assert!(v3.refcount() == 3);
{
let _v1 = v1;
let _v2 = v2;
}
assert!(v3.refcount() == 1);
}
}
}

View file

@ -11,14 +11,16 @@
use option::*;
use result::*;
use rt::io::IoError;
use super::io::net::ip::IpAddr;
use rt::uv::uvio;
// XXX: ~object doesn't work currently so these are some placeholder
// types to use instead
pub type EventLoopObject = super::uvio::UvEventLoop;
pub type IoFactoryObject = super::uvio::UvIoFactory;
pub type StreamObject = super::uvio::UvStream;
pub type TcpListenerObject = super::uvio::UvTcpListener;
pub type EventLoopObject = uvio::UvEventLoop;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub trait EventLoop {
fn run(&mut self);
@ -28,15 +30,15 @@ pub trait EventLoop {
}
pub trait IoFactory {
fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject>;
fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject>;
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
}
pub trait TcpListener {
fn listen(&mut self) -> Option<~StreamObject>;
pub trait RtioTcpListener {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
}
pub trait Stream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()>;
fn write(&mut self, buf: &[u8]) -> Result<(), ()>;
pub trait RtioTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
}

View file

@ -19,7 +19,7 @@ use super::context::Context;
use super::local_services::LocalServices;
use cell::Cell;
#[cfg(test)] use super::uvio::UvEventLoop;
#[cfg(test)] use rt::uv::uvio::UvEventLoop;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use int;
@ -106,6 +106,7 @@ pub impl Scheduler {
}
}
let scheduler = &mut *scheduler;
scheduler.event_loop.callback(run_scheduler_once);
scheduler.event_loop.run();
}
@ -179,7 +180,7 @@ pub impl Scheduler {
// Take pointers to both the task and scheduler's saved registers.
unsafe {
let sched = local_sched::unsafe_borrow();
let (sched_context, _, next_task_context) = sched.get_contexts();
let (sched_context, _, next_task_context) = (*sched).get_contexts();
let next_task_context = next_task_context.unwrap();
// Context switch to the task, restoring it's registers
// and saving the scheduler's
@ -187,10 +188,10 @@ pub impl Scheduler {
let sched = local_sched::unsafe_borrow();
// The running task should have passed ownership elsewhere
assert!(sched.current_task.is_none());
assert!((*sched).current_task.is_none());
// Running tasks may have asked us to do some cleanup
sched.run_cleanup_job();
(*sched).run_cleanup_job();
}
}
@ -208,21 +209,25 @@ pub impl Scheduler {
rtdebug!("blocking task");
let blocked_task = this.current_task.swap_unwrap();
let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
unsafe {
let blocked_task = this.current_task.swap_unwrap();
let f_fake_region = transmute::<&fn(~Task), &fn(~Task)>(f);
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
}
local_sched::put(this);
let sched = unsafe { local_sched::unsafe_borrow() };
let (sched_context, last_task_context, _) = sched.get_contexts();
let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context);
unsafe {
let sched = local_sched::unsafe_borrow();
let (sched_context, last_task_context, _) = (*sched).get_contexts();
let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context);
// We could be executing in a different thread now
let sched = unsafe { local_sched::unsafe_borrow() };
sched.run_cleanup_job();
// We could be executing in a different thread now
let sched = local_sched::unsafe_borrow();
(*sched).run_cleanup_job();
}
}
/// Switch directly to another task, without going through the scheduler.
@ -244,14 +249,14 @@ pub impl Scheduler {
unsafe {
let sched = local_sched::unsafe_borrow();
let (_, last_task_context, next_task_context) = sched.get_contexts();
let (_, last_task_context, next_task_context) = (*sched).get_contexts();
let last_task_context = last_task_context.unwrap();
let next_task_context = next_task_context.unwrap();
Context::swap(last_task_context, next_task_context);
// We could be executing in a different thread now
let sched = local_sched::unsafe_borrow();
sched.run_cleanup_job();
(*sched).run_cleanup_job();
}
}
@ -356,10 +361,10 @@ pub impl Task {
// have asked us to do some cleanup.
unsafe {
let sched = local_sched::unsafe_borrow();
sched.run_cleanup_job();
(*sched).run_cleanup_job();
let sched = local_sched::unsafe_borrow();
let task = sched.current_task.get_mut_ref();
let task = (*sched).current_task.get_mut_ref();
// FIXME #6141: shouldn't neet to put `start()` in another closure
task.local_services.run(||start());
}

View file

@ -11,21 +11,36 @@
use container::Container;
use ptr::Ptr;
use vec;
use ops::Drop;
use libc::{c_uint, uintptr_t};
pub struct StackSegment {
buf: ~[u8]
buf: ~[u8],
valgrind_id: c_uint
}
pub impl StackSegment {
fn new(size: uint) -> StackSegment {
// Crate a block of uninitialized values
let mut stack = vec::with_capacity(size);
unsafe {
// Crate a block of uninitialized values
let mut stack = vec::with_capacity(size);
vec::raw::set_len(&mut stack, size);
}
StackSegment {
buf: stack
let mut stk = StackSegment {
buf: stack,
valgrind_id: 0
};
// XXX: Using the FFI to call a C macro. Slow
stk.valgrind_id = rust_valgrind_stack_register(stk.start(), stk.end());
return stk;
}
}
/// Point to the low end of the allocated stack
fn start(&self) -> *uint {
unsafe {
vec::raw::to_ptr(self.buf) as *uint
}
}
@ -37,6 +52,15 @@ pub impl StackSegment {
}
}
impl Drop for StackSegment {
fn finalize(&self) {
unsafe {
// XXX: Using the FFI to call a C macro. Slow
rust_valgrind_stack_deregister(self.valgrind_id);
}
}
}
pub struct StackPool(());
impl StackPool {
@ -49,3 +73,8 @@ impl StackPool {
fn give_segment(&self, _stack: StackSegment) {
}
}
extern {
fn rust_valgrind_stack_register(start: *uintptr_t, end: *uintptr_t) -> c_uint;
fn rust_valgrind_stack_deregister(id: c_uint);
}

View file

@ -19,7 +19,7 @@ use rt::local_services::LocalServices;
pub fn run_in_newsched_task(f: ~fn()) {
use unstable::run_in_bare_thread;
use super::sched::Task;
use super::uvio::UvEventLoop;
use rt::uv::uvio::UvEventLoop;
let f = Cell(f);
@ -64,6 +64,46 @@ pub fn spawntask_immediately(f: ~fn()) {
}
}
/// Create a new task and run it right now. Aborts on failure
pub fn spawntask_later(f: ~fn()) {
use super::sched::*;
let mut sched = local_sched::take();
let task = ~Task::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
sched.task_queue.push_front(task);
local_sched::put(sched);
}
/// Spawn a task and either run it immediately or run it later
pub fn spawntask_random(f: ~fn()) {
use super::sched::*;
use rand::{Rand, rng};
let mut rng = rng();
let run_now: bool = Rand::rand(&mut rng);
let mut sched = local_sched::take();
let task = ~Task::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
if run_now {
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
do local_sched::borrow |sched| {
sched.task_queue.push_front(task.take());
}
}
} else {
sched.task_queue.push_front(task);
local_sched::put(sched);
}
}
/// Spawn a task and wait for it to finish, returning whether it completed successfully or failed
pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
use cell::Cell;

184
src/libcore/rt/tube.rs Normal file
View file

@ -0,0 +1,184 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A very simple unsynchronized channel type for sending buffered data from
//! scheduler context to task context.
//!
//! XXX: This would be safer to use if split into two types like Port/Chan
use option::*;
use clone::Clone;
use super::rc::RC;
use rt::sched::Task;
use rt::{context, TaskContext, SchedulerContext};
use rt::local_sched;
use vec::OwnedVector;
use container::Container;
struct TubeState<T> {
blocked_task: Option<~Task>,
buf: ~[T]
}
pub struct Tube<T> {
p: RC<TubeState<T>>
}
impl<T> Tube<T> {
pub fn new() -> Tube<T> {
Tube {
p: RC::new(TubeState {
blocked_task: None,
buf: ~[]
})
}
}
pub fn send(&mut self, val: T) {
rtdebug!("tube send");
assert!(context() == SchedulerContext);
unsafe {
let state = self.p.unsafe_borrow_mut();
(*state).buf.push(val);
if (*state).blocked_task.is_some() {
// There's a waiting task. Wake it up
rtdebug!("waking blocked tube");
let task = (*state).blocked_task.swap_unwrap();
let sched = local_sched::take();
sched.resume_task_immediately(task);
}
}
}
pub fn recv(&mut self) -> T {
assert!(context() == TaskContext);
unsafe {
let state = self.p.unsafe_borrow_mut();
if !(*state).buf.is_empty() {
return (*state).buf.shift();
} else {
// Block and wait for the next message
rtdebug!("blocking on tube recv");
assert!(self.p.refcount() > 1); // There better be somebody to wake us up
assert!((*state).blocked_task.is_none());
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
(*state).blocked_task = Some(task);
}
rtdebug!("waking after tube recv");
let buf = &mut (*state).buf;
assert!(!buf.is_empty());
return buf.shift();
}
}
}
}
impl<T> Clone for Tube<T> {
fn clone(&self) -> Tube<T> {
Tube { p: self.p.clone() }
}
}
#[cfg(test)]
mod test {
use int;
use cell::Cell;
use rt::local_sched;
use rt::test::*;
use rt::rtio::EventLoop;
use super::*;
#[test]
fn simple_test() {
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone_cell = Cell(tube_clone);
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
let mut tube_clone = tube_clone_cell.take();
tube_clone.send(1);
let sched = local_sched::take();
sched.resume_task_immediately(task);
}
assert!(tube.recv() == 1);
}
}
#[test]
fn blocking_test() {
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone = Cell(Cell(Cell(tube_clone)));
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
let tube_clone = tube_clone.take();
do local_sched::borrow |sched| {
let tube_clone = tube_clone.take();
do sched.event_loop.callback {
let mut tube_clone = tube_clone.take();
// The task should be blocked on this now and
// sending will wake it up.
tube_clone.send(1);
}
}
let sched = local_sched::take();
sched.resume_task_immediately(task);
}
assert!(tube.recv() == 1);
}
}
#[test]
fn many_blocking_test() {
static MAX: int = 100;
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone = Cell(tube_clone);
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
callback_send(tube_clone.take(), 0);
fn callback_send(tube: Tube<int>, i: int) {
if i == 100 { return; }
let tube = Cell(Cell(tube));
do local_sched::borrow |sched| {
let tube = tube.take();
do sched.event_loop.callback {
let mut tube = tube.take();
// The task should be blocked on this now and
// sending will wake it up.
tube.send(i);
callback_send(tube, i + 1);
}
}
}
let sched = local_sched::take();
sched.resume_task_immediately(task);
}
for int::range(0, MAX) |i| {
let j = tube.recv();
assert!(j == i);
}
}
}
}

View file

@ -11,15 +11,11 @@
use prelude::*;
use ptr::null;
use libc::c_void;
use super::{UvError, Callback, Request, NativeHandle, Loop};
use super::super::uvll;
use super::super::uvll::*;
pub type FsCallback = ~fn(FsRequest, Option<UvError>);
impl Callback for FsCallback { }
use rt::uv::{Request, NativeHandle, Loop, FsCallback};
use rt::uv::uvll;
use rt::uv::uvll::*;
pub struct FsRequest(*uvll::uv_fs_t);
impl Request for FsRequest;
impl FsRequest {

91
src/libcore/rt/uv/idle.rs Normal file
View file

@ -0,0 +1,91 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::c_int;
use option::Some;
use rt::uv::uvll;
use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback};
use rt::uv::status_to_maybe_uv_error;
pub struct IdleWatcher(*uvll::uv_idle_t);
impl Watcher for IdleWatcher { }
pub impl IdleWatcher {
fn new(loop_: &mut Loop) -> IdleWatcher {
unsafe {
let handle = uvll::idle_new();
assert!(handle.is_not_null());
assert!(0 == uvll::idle_init(loop_.native_handle(), handle));
let mut watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
return watcher
}
}
fn start(&mut self, cb: IdleCallback) {
{
let data = self.get_watcher_data();
data.idle_cb = Some(cb);
}
unsafe {
assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
};
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
let data = idle_watcher.get_watcher_data();
let cb: &IdleCallback = data.idle_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
(*cb)(idle_watcher, status);
}
}
fn stop(&mut self) {
// NB: Not resetting the Rust idle_cb to None here because `stop` is likely
// called from *within* the idle callback, causing a use after free
unsafe {
assert!(0 == uvll::idle_stop(self.native_handle()));
}
}
fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb) };
extern fn close_cb(handle: *uvll::uv_idle_t) {
unsafe {
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
{
let mut data = idle_watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
idle_watcher.drop_watcher_data();
uvll::idle_delete(handle);
}
}
}
}
impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
IdleWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_idle_t {
match self { &IdleWatcher(ptr) => ptr }
}
}

View file

@ -10,7 +10,7 @@
/*!
Bindings to libuv.
Bindings to libuv, along with the default implementation of `core::rt::rtio`.
UV types consist of the event loop (Loop), Watchers, Requests and
Callbacks.
@ -38,29 +38,44 @@ use container::Container;
use option::*;
use str::raw::from_c_str;
use to_str::ToStr;
use ptr::Ptr;
use libc;
use vec;
use ptr;
use ptr::Ptr;
use cast;
use str;
use option::*;
use str::raw::from_c_str;
use to_str::ToStr;
use libc::{c_void, c_int, size_t, malloc, free};
use cast::transmute;
use ptr::null;
use super::uvll;
use unstable::finally::Finally;
use rt::io::IoError;
#[cfg(test)] use unstable::run_in_bare_thread;
pub use self::file::{FsRequest, FsCallback};
pub use self::file::FsRequest;
pub use self::net::{StreamWatcher, TcpWatcher};
pub use self::net::{ReadCallback, AllocCallback, ConnectionCallback, ConnectCallback};
pub use self::idle::IdleWatcher;
/// The implementation of `rtio` for libuv
pub mod uvio;
/// C bindings to libuv
pub mod uvll;
pub mod file;
pub mod net;
pub mod idle;
/// A trait for callbacks to implement. Provides a little extra type safety
/// for generic, unsafe interop functions like `set_watcher_callback`.
pub trait Callback { }
pub trait Request { }
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
/// but the results are not correct.
pub struct Loop {
handle: *uvll::uv_loop_t
}
/// The trait implemented by uv 'watchers' (handles). Watchers are
/// non-owning wrappers around the uv handles and are not completely
@ -68,12 +83,9 @@ pub trait Request { }
/// handle. Watchers are generally created, then `start`ed, `stop`ed
/// and `close`ed, but due to their complex life cycle may not be
/// entirely memory safe if used in unanticipated patterns.
pub trait Watcher {
fn event_loop(&self) -> Loop;
}
pub trait Watcher { }
pub type NullCallback = ~fn();
impl Callback for NullCallback { }
pub trait Request { }
/// A type that wraps a native handle
pub trait NativeHandle<T> {
@ -81,13 +93,6 @@ pub trait NativeHandle<T> {
pub fn native_handle(&self) -> T;
}
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
/// but the results are not correct.
pub struct Loop {
handle: *uvll::uv_loop_t
}
pub impl Loop {
fn new() -> Loop {
let handle = unsafe { uvll::loop_new() };
@ -113,64 +118,71 @@ impl NativeHandle<*uvll::uv_loop_t> for Loop {
}
}
pub struct IdleWatcher(*uvll::uv_idle_t);
impl Watcher for IdleWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
// XXX: The uv alloc callback also has a *uv_handle_t arg
pub type AllocCallback = ~fn(uint) -> Buf;
pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
pub type NullCallback = ~fn();
pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
impl Callback for IdleCallback { }
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
pub type FsCallback = ~fn(FsRequest, Option<UvError>);
pub impl IdleWatcher {
fn new(loop_: &mut Loop) -> IdleWatcher {
unsafe {
let handle = uvll::idle_new();
assert!(handle.is_not_null());
assert!(0 == uvll::idle_init(loop_.native_handle(), handle));
uvll::set_data_for_uv_handle(handle, null::<()>());
NativeHandle::from_native_handle(handle)
}
}
fn start(&mut self, cb: IdleCallback) {
set_watcher_callback(self, cb);
unsafe {
assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
};
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
let idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
let cb: &IdleCallback = borrow_callback_from_watcher(&idle_watcher);
let status = status_to_maybe_uv_error(handle, status);
(*cb)(idle_watcher, status);
}
}
fn stop(&mut self) {
unsafe { assert!(0 == uvll::idle_stop(self.native_handle())); }
}
fn close(self) {
unsafe { uvll::close(self.native_handle(), close_cb) };
extern fn close_cb(handle: *uvll::uv_idle_t) {
let mut idle_watcher = NativeHandle::from_native_handle(handle);
drop_watcher_callback::<uvll::uv_idle_t, IdleWatcher, IdleCallback>(&mut idle_watcher);
unsafe { uvll::idle_delete(handle) };
}
}
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
struct WatcherData {
read_cb: Option<ReadCallback>,
write_cb: Option<ConnectionCallback>,
connect_cb: Option<ConnectionCallback>,
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>,
idle_cb: Option<IdleCallback>
}
impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
IdleWatcher(handle)
pub trait WatcherInterop {
fn event_loop(&self) -> Loop;
fn install_watcher_data(&mut self);
fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
fn drop_watcher_data(&mut self);
}
impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
/// Get the uv event loop from a Watcher
pub fn event_loop(&self) -> Loop {
unsafe {
let handle = self.native_handle();
let loop_ = uvll::get_loop_for_uv_handle(handle);
NativeHandle::from_native_handle(loop_)
}
}
fn native_handle(&self) -> *uvll::uv_idle_t {
match self { &IdleWatcher(ptr) => ptr }
pub fn install_watcher_data(&mut self) {
unsafe {
let data = ~WatcherData {
read_cb: None,
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None,
idle_cb: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(self.native_handle(), data);
}
}
pub fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData {
unsafe {
let data = uvll::get_data_for_uv_handle(self.native_handle());
let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
return &mut **data;
}
}
pub fn drop_watcher_data(&mut self) {
unsafe {
let data = uvll::get_data_for_uv_handle(self.native_handle());
let _data = transmute::<*c_void, ~WatcherData>(data);
uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
}
}
}
@ -213,6 +225,55 @@ fn error_smoke_test() {
assert!(err.to_str() == ~"EOF: end of file");
}
pub fn last_uv_error<H, W: Watcher + NativeHandle<*H>>(watcher: &W) -> UvError {
unsafe {
let loop_ = watcher.event_loop();
UvError(uvll::last_error(loop_.native_handle()))
}
}
pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
// XXX: Could go in str::raw
unsafe fn c_str_to_static_slice(s: *libc::c_char) -> &'static str {
let s = s as *u8;
let mut curr = s, len = 0u;
while *curr != 0u8 {
len += 1u;
curr = ptr::offset(s, len);
}
str::raw::buf_as_slice(s, len, |d| cast::transmute(d))
}
unsafe {
// Importing error constants
use rt::uv::uvll::*;
use rt::io::*;
// uv error descriptions are static
let c_desc = uvll::strerror(&*uverr);
let desc = c_str_to_static_slice(c_desc);
let kind = match uverr.code {
UNKNOWN => OtherIoError,
OK => OtherIoError,
EOF => EndOfFile,
EACCES => PermissionDenied,
ECONNREFUSED => ConnectionRefused,
e => {
abort!("unknown uv error code: %u", e as uint);
}
};
IoError {
kind: kind,
desc: desc,
detail: None
}
}
}
/// Given a uv handle, convert a callback status to a UvError
// XXX: Follow the pattern below by parameterizing over T: Watcher, not T
@ -230,133 +291,6 @@ pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError>
}
}
/// Get the uv event loop from a Watcher
pub fn loop_from_watcher<H, W: Watcher + NativeHandle<*H>>(
watcher: &W) -> Loop {
let handle = watcher.native_handle();
let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) };
NativeHandle::from_native_handle(loop_)
}
/// Set the custom data on a handle to a callback Note: This is only
/// suitable for watchers that make just one type of callback. For
/// others use WatcherData
pub fn set_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W, cb: CB) {
drop_watcher_callback::<H, W, CB>(watcher);
// XXX: Boxing the callback so it fits into a
// pointer. Unfortunate extra allocation
let boxed_cb = ~cb;
let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) };
unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) };
}
/// Delete a callback from a handle's custom data
pub fn drop_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
if handle_data.is_not_null() {
// Take ownership of the callback and drop it
let _cb = transmute::<*c_void, ~CB>(handle_data);
// Make sure the pointer is zeroed
uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
}
}
}
/// Take a pointer to the callback installed as custom data
pub fn borrow_callback_from_watcher<H, W: Watcher + NativeHandle<*H>,
CB: Callback>(watcher: &W) -> &CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
assert!(handle_data.is_not_null());
let cb = transmute::<&*c_void, &~CB>(&handle_data);
return &**cb;
}
}
/// Take ownership of the callback installed as custom data
pub fn take_callback_from_watcher<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) -> CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
assert!(handle_data.is_not_null());
uvll::set_data_for_uv_handle(handle, null::<()>());
let cb: ~CB = transmute::<*c_void, ~CB>(handle_data);
let cb = match cb { ~cb => cb };
return cb;
}
}
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
struct WatcherData {
read_cb: Option<ReadCallback>,
write_cb: Option<ConnectionCallback>,
connect_cb: Option<ConnectionCallback>,
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>,
buf: Option<Buf>
}
pub fn install_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = ~WatcherData {
read_cb: None,
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None,
buf: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), data);
}
}
pub fn get_watcher_data<'r, H, W: Watcher + NativeHandle<*H>>(
watcher: &'r mut W) -> &'r mut WatcherData {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
return &mut **data;
}
}
pub fn drop_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let _data = transmute::<*c_void, ~WatcherData>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
}
}
#[test]
fn test_slice_to_uv_buf() {
let slice = [0, .. 20];
let buf = slice_to_uv_buf(slice);
assert!(buf.len == 20);
unsafe {
let base = transmute::<*u8, *mut u8>(buf.base);
(*base) = 1;
(*ptr::mut_offset(base, 1)) = 2;
}
assert!(slice[0] == 1);
assert!(slice[1] == 2);
}
/// The uv buffer type
pub type Buf = uvll::uv_buf_t;
@ -394,6 +328,24 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> {
}
}
#[test]
fn test_slice_to_uv_buf() {
let slice = [0, .. 20];
let buf = slice_to_uv_buf(slice);
assert!(buf.len == 20);
unsafe {
let base = transmute::<*u8, *mut u8>(buf.base);
(*base) = 1;
(*ptr::mut_offset(base, 1)) = 2;
}
assert!(slice[0] == 1);
assert!(slice[1] == 2);
}
#[test]
fn loop_smoke_test() {
do run_in_bare_thread {
@ -409,7 +361,7 @@ fn idle_new_then_close() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let idle_watcher = { IdleWatcher::new(&mut loop_) };
idle_watcher.close();
idle_watcher.close(||());
}
}
@ -425,7 +377,7 @@ fn idle_smoke_test() {
assert!(status.is_none());
if unsafe { *count_ptr == 10 } {
idle_watcher.stop();
idle_watcher.close();
idle_watcher.close(||());
} else {
unsafe { *count_ptr = *count_ptr + 1; }
}
@ -449,7 +401,7 @@ fn idle_start_stop_start() {
assert!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
idle_watcher.close(||());
}
}
loop_.run();

View file

@ -10,21 +10,15 @@
use prelude::*;
use libc::{size_t, ssize_t, c_int, c_void};
use cast::transmute_mut_region;
use super::super::uvll;
use super::super::uvll::*;
use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCallback,
loop_from_watcher, status_to_maybe_uv_error,
install_watcher_data, get_watcher_data, drop_watcher_data,
vec_to_uv_buf, vec_from_uv_buf};
use super::super::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::uv::uvll;
use rt::uv::uvll::*;
use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback};
use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
status_to_maybe_uv_error};
use rt::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::uv::last_uv_error;
#[cfg(test)] use cell::Cell;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::super::thread::Thread;
#[cfg(test)] use super::super::test::*;
fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T {
match addr {
Ipv4(a, b, c, d, p) => {
unsafe {
@ -34,7 +28,7 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
c as uint,
d as uint), p as int);
do (|| {
f(addr);
f(addr)
}).finally {
free_ip4_addr(addr);
}
@ -47,34 +41,23 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
// and uv_file_t
pub struct StreamWatcher(*uvll::uv_stream_t);
impl Watcher for StreamWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
impl Callback for ReadCallback { }
// XXX: The uv alloc callback also has a *uv_handle_t arg
pub type AllocCallback = ~fn(uint) -> Buf;
impl Callback for AllocCallback { }
impl Watcher for StreamWatcher { }
pub impl StreamWatcher {
fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
// XXX: Borrowchk problems
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
data.alloc_cb = Some(alloc);
data.read_cb = Some(cb);
{
let data = self.get_watcher_data();
data.alloc_cb = Some(alloc);
data.read_cb = Some(cb);
}
let handle = self.native_handle();
unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let data = stream_watcher.get_watcher_data();
let alloc_cb = data.alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
@ -83,7 +66,7 @@ pub impl StreamWatcher {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let data = stream_watcher.get_watcher_data();
let cb = data.read_cb.get_ref();
let status = status_to_maybe_uv_error(stream, nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
@ -98,22 +81,19 @@ pub impl StreamWatcher {
unsafe { uvll::read_stop(handle); }
}
// XXX: Needs to take &[u8], not ~[u8]
fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
assert!(data.write_cb.is_none());
data.write_cb = Some(cb);
fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
{
let data = self.get_watcher_data();
assert!(data.write_cb.is_none());
data.write_cb = Some(cb);
}
let req = WriteRequest::new();
let buf = vec_to_uv_buf(msg);
assert!(data.buf.is_none());
data.buf = Some(buf);
let bufs = [buf];
unsafe {
assert!(0 == uvll::write(req.native_handle(),
self.native_handle(),
bufs, write_cb));
self.native_handle(),
bufs, write_cb));
}
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
@ -121,8 +101,7 @@ pub impl StreamWatcher {
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = {
let data = get_watcher_data(&mut stream_watcher);
let _vec = vec_from_uv_buf(data.buf.swap_unwrap());
let data = stream_watcher.get_watcher_data();
let cb = data.write_cb.swap_unwrap();
cb
};
@ -142,7 +121,7 @@ pub impl StreamWatcher {
fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = get_watcher_data(&mut this);
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
@ -152,9 +131,10 @@ pub impl StreamWatcher {
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
{
get_watcher_data(&mut stream_watcher).close_cb.swap_unwrap()();
let mut data = stream_watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
drop_watcher_data(&mut stream_watcher);
stream_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
}
}
@ -171,15 +151,7 @@ impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
}
pub struct TcpWatcher(*uvll::uv_tcp_t);
impl Watcher for TcpWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
impl Callback for ConnectionCallback { }
impl Watcher for TcpWatcher { }
pub impl TcpWatcher {
fn new(loop_: &mut Loop) -> TcpWatcher {
@ -187,21 +159,24 @@ pub impl TcpWatcher {
let handle = malloc_handle(UV_TCP);
assert!(handle.is_not_null());
assert!(0 == uvll::tcp_init(loop_.native_handle(), handle));
let mut watcher = NativeHandle::from_native_handle(handle);
install_watcher_data(&mut watcher);
let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
return watcher;
}
}
fn bind(&mut self, address: IpAddr) {
fn bind(&mut self, address: IpAddr) -> Result<(), UvError> {
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
let result = unsafe {
uvll::tcp_bind(self.native_handle(), addr)
};
// XXX: bind is likely to fail. need real error handling
assert!(result == 0);
if result == 0 {
Ok(())
} else {
Err(last_uv_error(self))
}
}
}
_ => fail!()
@ -210,8 +185,8 @@ pub impl TcpWatcher {
fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) {
unsafe {
assert!(get_watcher_data(self).connect_cb.is_none());
get_watcher_data(self).connect_cb = Some(cb);
assert!(self.get_watcher_data().connect_cb.is_none());
self.get_watcher_data().connect_cb = Some(cb);
let connect_handle = ConnectRequest::new().native_handle();
match address {
@ -232,7 +207,7 @@ pub impl TcpWatcher {
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb: ConnectionCallback = {
let data = get_watcher_data(&mut stream_watcher);
let data = stream_watcher.get_watcher_data();
data.connect_cb.swap_unwrap()
};
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
@ -242,10 +217,11 @@ pub impl TcpWatcher {
}
fn listen(&mut self, cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
assert!(data.connect_cb.is_none());
data.connect_cb = Some(cb);
{
let data = self.get_watcher_data();
assert!(data.connect_cb.is_none());
data.connect_cb = Some(cb);
}
unsafe {
static BACKLOG: c_int = 128; // XXX should be configurable
@ -257,9 +233,10 @@ pub impl TcpWatcher {
extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let cb = get_watcher_data(&mut stream_watcher).connect_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
let data = stream_watcher.get_watcher_data();
let cb = data.connect_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
(*cb)(stream_watcher, status);
}
}
@ -277,12 +254,8 @@ impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
}
}
pub type ConnectCallback = ~fn(ConnectRequest, Option<UvError>);
impl Callback for ConnectCallback { }
// uv_connect_t is a subclass of uv_req_t
struct ConnectRequest(*uvll::uv_connect_t);
impl Request for ConnectRequest { }
impl ConnectRequest {
@ -355,93 +328,109 @@ impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
}
#[test]
fn connect_close() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
// Connect to a port where nobody is listening
let addr = next_test_ip4();
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("tcp_watcher.connect!");
assert!(status.is_some());
assert!(status.get().name() == ~"ECONNREFUSED");
stream_watcher.close(||());
}
loop_.run();
loop_.close();
}
}
#[cfg(test)]
mod test {
use super::*;
use util::ignore;
use cell::Cell;
use vec;
use unstable::run_in_bare_thread;
use rt::thread::Thread;
use rt::test::*;
use rt::uv::{Loop, AllocCallback};
use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
#[test]
fn listen() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = next_test_ip4();
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
do server_tcp_watcher.listen |server_stream_watcher, status| {
rtdebug!("listened!");
assert!(status.is_none());
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_;
let client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
server_stream_watcher.accept(client_tcp_watcher);
let count_cell = Cell(0);
let server_stream_watcher = server_stream_watcher;
rtdebug!("starting read");
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do client_tcp_watcher.read_start(alloc)
|stream_watcher, nread, buf, status| {
rtdebug!("i'm reading!");
let buf = vec_from_uv_buf(buf);
let mut count = count_cell.take();
if status.is_none() {
rtdebug!("got %d bytes", nread);
let buf = buf.unwrap();
for buf.slice(0, nread as uint).each |byte| {
assert!(*byte == count as u8);
rtdebug!("%u", *byte as uint);
count += 1;
}
} else {
assert!(count == MAX);
do stream_watcher.close {
server_stream_watcher.close(||());
}
}
count_cell.put_back(count);
}
}
let _client_thread = do Thread::start {
rtdebug!("starting client thread");
#[test]
fn connect_close() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
// Connect to a port where nobody is listening
let addr = next_test_ip4();
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connecting");
assert!(status.is_none());
let mut stream_watcher = stream_watcher;
let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
do stream_watcher.write(msg) |stream_watcher, status| {
rtdebug!("writing");
assert!(status.is_none());
stream_watcher.close(||());
}
rtdebug!("tcp_watcher.connect!");
assert!(status.is_some());
assert!(status.get().name() == ~"ECONNREFUSED");
stream_watcher.close(||());
}
loop_.run();
loop_.close();
};
}
}
let mut loop_ = loop_;
loop_.run();
loop_.close();
#[test]
fn listen() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = next_test_ip4();
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
do server_tcp_watcher.listen |server_stream_watcher, status| {
rtdebug!("listened!");
assert!(status.is_none());
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_;
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
server_stream_watcher.accept(client_tcp_watcher);
let count_cell = Cell(0);
let server_stream_watcher = server_stream_watcher;
rtdebug!("starting read");
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do client_tcp_watcher.read_start(alloc)
|stream_watcher, nread, buf, status| {
rtdebug!("i'm reading!");
let buf = vec_from_uv_buf(buf);
let mut count = count_cell.take();
if status.is_none() {
rtdebug!("got %d bytes", nread);
let buf = buf.unwrap();
for buf.slice(0, nread as uint).each |byte| {
assert!(*byte == count as u8);
rtdebug!("%u", *byte as uint);
count += 1;
}
} else {
assert!(count == MAX);
do stream_watcher.close {
server_stream_watcher.close(||());
}
}
count_cell.put_back(count);
}
}
let _client_thread = do Thread::start {
rtdebug!("starting client thread");
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connecting");
assert!(status.is_none());
let mut stream_watcher = stream_watcher;
let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
let buf = slice_to_uv_buf(msg);
let msg_cell = Cell(msg);
do stream_watcher.write(buf) |stream_watcher, status| {
rtdebug!("writing");
assert!(status.is_none());
let msg_cell = Cell(msg_cell.take());
stream_watcher.close(||ignore(msg_cell.take()));
}
}
loop_.run();
loop_.close();
};
let mut loop_ = loop_;
loop_.run();
loop_.close();
}
}
}

View file

@ -10,20 +10,24 @@
use option::*;
use result::*;
use super::io::net::ip::IpAddr;
use super::uv::*;
use super::rtio::*;
use ops::Drop;
use old_iter::CopyableIter;
use cell::{Cell, empty_cell};
use cast::transmute;
use super::sched::{Scheduler, local_sched};
use clone::Clone;
use rt::io::IoError;
use rt::io::net::ip::IpAddr;
use rt::uv::*;
use rt::uv::idle::IdleWatcher;
use rt::rtio::*;
use rt::sched::{Scheduler, local_sched};
use rt::io::{standard_error, OtherIoError};
use rt::tube::Tube;
#[cfg(test)] use container::Container;
#[cfg(test)] use uint;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::test::*;
#[cfg(test)] use rt::test::*;
pub struct UvEventLoop {
uvio: UvIoFactory
@ -64,7 +68,7 @@ impl EventLoop for UvEventLoop {
assert!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
idle_watcher.close(||());
f();
}
}
@ -100,11 +104,11 @@ impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> {
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
// Create a cell in the task to hold the result. We will fill
// the cell before resuming the task.
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
@ -122,21 +126,26 @@ impl IoFactory for UvIoFactory {
// Wait for a connection
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connect: in connect callback");
let maybe_stream = if status.is_none() {
if status.is_none() {
rtdebug!("status is none");
Some(~UvStream(stream_watcher))
let res = Ok(~UvTcpStream { watcher: stream_watcher });
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(res); }
// Context switch
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
} else {
rtdebug!("status is some");
stream_watcher.close(||());
None
let task_cell = Cell(task_cell.take());
do stream_watcher.close {
let res = Err(uv_error_to_io_error(status.get()));
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
};
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
// Context switch
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -144,103 +153,124 @@ impl IoFactory for UvIoFactory {
return result_cell.take();
}
fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> {
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
let mut watcher = TcpWatcher::new(self.uv_loop());
watcher.bind(addr);
return Some(~UvTcpListener(watcher));
match watcher.bind(addr) {
Ok(_) => Ok(~UvTcpListener::new(watcher)),
Err(uverr) => {
let scheduler = local_sched::take();
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
do watcher.as_stream().close {
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
}
}
pub struct UvTcpListener(TcpWatcher);
// FIXME #6090: Prefer newtype structs but Drop doesn't work
pub struct UvTcpListener {
watcher: TcpWatcher,
listening: bool,
incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
}
impl UvTcpListener {
fn watcher(&self) -> TcpWatcher {
match self { &UvTcpListener(w) => w }
fn new(watcher: TcpWatcher) -> UvTcpListener {
UvTcpListener {
watcher: watcher,
listening: false,
incoming_streams: Tube::new()
}
}
fn close(&self) {
// XXX: Need to wait until close finishes before returning
self.watcher().as_stream().close(||());
}
fn watcher(&self) -> TcpWatcher { self.watcher }
}
impl Drop for UvTcpListener {
fn finalize(&self) {
// XXX: Again, this never gets called. Use .close() instead
//self.watcher().as_stream().close(||());
}
}
impl TcpListener for UvTcpListener {
fn listen(&mut self) -> Option<~StreamObject> {
rtdebug!("entering listen");
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
let server_tcp_watcher = self.watcher();
let watcher = self.watcher();
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
let mut server_tcp_watcher = server_tcp_watcher;
do server_tcp_watcher.listen |server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_from_watcher(&server_stream_watcher);
let client_tcp_watcher = TcpWatcher::new(&mut loop_).as_stream();
// XXX: Needs to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Some(~UvStream::new(client_tcp_watcher))
} else {
None
};
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
rtdebug!("resuming task from listen");
// Context switch
do watcher.as_stream().close {
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
}
pub struct UvStream(StreamWatcher);
impl RtioTcpListener for UvTcpListener {
impl UvStream {
fn new(watcher: StreamWatcher) -> UvStream {
UvStream(watcher)
}
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
rtdebug!("entering listen");
fn watcher(&self) -> StreamWatcher {
match self { &UvStream(w) => w }
}
if self.listening {
return self.incoming_streams.recv();
}
// XXX: finalize isn't working for ~UvStream???
fn close(&self) {
// XXX: Need to wait until this finishes before returning
self.watcher().close(||());
self.listening = true;
let server_tcp_watcher = self.watcher();
let incoming_streams_cell = Cell(self.incoming_streams.clone());
let incoming_streams_cell = Cell(incoming_streams_cell.take());
let mut server_tcp_watcher = server_tcp_watcher;
do server_tcp_watcher.listen |server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = server_stream_watcher.event_loop();
let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
let client_tcp_watcher = client_tcp_watcher.as_stream();
// XXX: Need's to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Ok(~UvTcpStream { watcher: client_tcp_watcher })
} else {
Err(standard_error(OtherIoError))
};
let mut incoming_streams = incoming_streams_cell.take();
incoming_streams.send(maybe_stream);
incoming_streams_cell.put_back(incoming_streams);
}
return self.incoming_streams.recv();
}
}
impl Drop for UvStream {
// FIXME #6090: Prefer newtype structs but Drop doesn't work
pub struct UvTcpStream {
watcher: StreamWatcher
}
impl UvTcpStream {
fn watcher(&self) -> StreamWatcher { self.watcher }
}
impl Drop for UvTcpStream {
fn finalize(&self) {
rtdebug!("closing stream");
//self.watcher().close(||());
rtdebug!("closing tcp stream");
let watcher = self.watcher();
let scheduler = local_sched::take();
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
do watcher.close {
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
}
}
impl Stream for UvStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()> {
impl RtioTcpStream for UvTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
@ -271,7 +301,7 @@ impl Stream for UvStream {
assert!(nread >= 0);
Ok(nread as uint)
} else {
Err(())
Err(standard_error(OtherIoError))
};
unsafe { (*result_cell_ptr).put_back(result); }
@ -285,9 +315,9 @@ impl Stream for UvStream {
return result_cell.take();
}
fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
@ -295,14 +325,12 @@ impl Stream for UvStream {
do scheduler.deschedule_running_task_and_then |task| {
let mut watcher = watcher;
let task_cell = Cell(task);
let buf = unsafe { &*buf_ptr };
// XXX: OMGCOPIES
let buf = buf.to_vec();
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
} else {
Err(())
Err(standard_error(OtherIoError))
};
unsafe { (*result_cell_ptr).put_back(result); }
@ -320,10 +348,12 @@ impl Stream for UvStream {
#[test]
fn test_simple_io_no_connect() {
do run_in_newsched_task {
let io = unsafe { local_sched::unsafe_borrow_io() };
let addr = next_test_ip4();
let maybe_chan = io.connect(addr);
assert!(maybe_chan.is_none());
unsafe {
let io = local_sched::unsafe_borrow_io();
let addr = next_test_ip4();
let maybe_chan = (*io).tcp_connect(addr);
assert!(maybe_chan.is_err());
}
}
}
@ -336,8 +366,8 @@ fn test_simple_tcp_server_and_client() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert!(nread == 8);
@ -345,17 +375,14 @@ fn test_simple_tcp_server_and_client() {
rtdebug!("%u", buf[i] as uint);
assert!(buf[i] == i as u8);
}
stream.close();
listener.close();
}
}
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut stream = io.connect(addr).unwrap();
let mut stream = (*io).tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
}
}
@ -368,8 +395,8 @@ fn test_read_and_block() {
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
let mut stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
@ -399,19 +426,17 @@ fn test_read_and_block() {
// Make sure we had multiple reads
assert!(reads > 1);
stream.close();
listener.close();
}
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut stream = (*io).tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
}
}
}
@ -426,34 +451,33 @@ fn test_read_read_read() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {
stream.write(buf);
total_bytes_written += buf.len();
}
stream.close();
listener.close();
}
}
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
let nread = stream.read(buf).unwrap();
rtdebug!("read %u bytes", nread as uint);
total_bytes_read += nread;
for uint::range(0, nread) |i| {
assert!(buf[i] == 1);
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut stream = (*io).tcp_connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
let nread = stream.read(buf).unwrap();
rtdebug!("read %u bytes", nread as uint);
total_bytes_read += nread;
for uint::range(0, nread) |i| {
assert!(buf[i] == 1);
}
}
rtdebug!("read %u bytes total", total_bytes_read as uint);
}
rtdebug!("read %u bytes total", total_bytes_read as uint);
stream.close();
}
}
}

View file

@ -33,6 +33,13 @@ use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t};
use libc::{malloc, free};
use prelude::*;
pub static UNKNOWN: c_int = -1;
pub static OK: c_int = 0;
pub static EOF: c_int = 1;
pub static EADDRINFO: c_int = 2;
pub static EACCES: c_int = 3;
pub static ECONNREFUSED: c_int = 12;
pub struct uv_err_t {
code: c_int,
sys_errno_: c_int

View file

@ -202,10 +202,12 @@ impl FailWithCause for &'static str {
// FIXME #4427: Temporary until rt::rt_fail_ goes away
pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
use rt::{context, OldTaskContext};
use rt::local_services::unsafe_borrow_local_services;
use option::Option;
use rt::{context, OldTaskContext, TaskContext};
use rt::local_services::{unsafe_borrow_local_services, Unwinder};
match context() {
let context = context();
match context {
OldTaskContext => {
unsafe {
gc::cleanup_stack_for_failure();
@ -214,11 +216,26 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
}
}
_ => {
// XXX: Need to print the failure message
gc::cleanup_stack_for_failure();
unsafe {
// XXX: Bad re-allocations. fail! needs some refactoring
let msg = str::raw::from_c_str(msg);
let file = str::raw::from_c_str(file);
let outmsg = fmt!("%s at line %i of file %s", msg, line as int, file);
// XXX: Logging doesn't work correctly in non-task context because it
// invokes the local heap
if context == TaskContext {
error!(outmsg);
} else {
rtdebug!("%s", outmsg);
}
gc::cleanup_stack_for_failure();
let local_services = unsafe_borrow_local_services();
match local_services.unwinder {
let unwinder: &mut Option<Unwinder> = &mut (*local_services).unwinder;
match *unwinder {
Some(ref mut unwinder) => unwinder.begin_unwind(),
None => abort!("failure without unwinder. aborting process")
}

View file

@ -36,7 +36,7 @@ impl Handle {
}
_ => {
let local_services = unsafe_borrow_local_services();
NewHandle(&mut local_services.storage)
NewHandle(&mut (*local_services).storage)
}
}
}

View file

@ -16,12 +16,12 @@ use libc::{c_char, c_uchar, c_void, size_t, uintptr_t, c_int, STDERR_FILENO};
use managed::raw::BoxRepr;
use str;
use sys;
use unstable::exchange_alloc;
use cast::transmute;
use rt::{context, OldTaskContext};
use rt::local_services::borrow_local_services;
use option::{Option, Some, None};
use io;
use rt::global_heap;
#[allow(non_camel_case_types)]
pub type rust_task = c_void;
@ -153,7 +153,7 @@ unsafe fn fail_borrowed(box: *mut BoxRepr, file: *c_char, line: size_t) {
#[lang="exchange_malloc"]
#[inline(always)]
pub unsafe fn exchange_malloc(td: *c_char, size: uintptr_t) -> *c_char {
transmute(exchange_alloc::malloc(transmute(td), transmute(size)))
transmute(global_heap::malloc(transmute(td), transmute(size)))
}
/// Because this code is so perf. sensitive, use a static constant so that
@ -233,7 +233,7 @@ impl DebugPrints for io::fd_t {
#[lang="exchange_free"]
#[inline(always)]
pub unsafe fn exchange_free(ptr: *c_char) {
exchange_alloc::free(transmute(ptr))
global_heap::free(transmute(ptr))
}
#[lang="malloc"]

View file

@ -19,7 +19,6 @@ pub mod at_exit;
pub mod global;
pub mod finally;
pub mod weak_task;
pub mod exchange_alloc;
pub mod intrinsics;
pub mod simd;
pub mod extfmt;

View file

@ -830,9 +830,9 @@ rust_get_rt_env() {
}
#ifndef _WIN32
pthread_key_t sched_key;
pthread_key_t sched_key = -1;
#else
DWORD sched_key;
DWORD sched_key = -1;
#endif
extern "C" void*

View file

@ -92,3 +92,14 @@ destroy_exchange_stack(rust_exchange_alloc *exchange, stk_seg *stk) {
deregister_valgrind_stack(stk);
exchange->free(stk);
}
extern "C" CDECL unsigned int
rust_valgrind_stack_register(void *start, void *end) {
return VALGRIND_STACK_REGISTER(start, end);
}
extern "C" CDECL void
rust_valgrind_stack_deregister(unsigned int id) {
VALGRIND_STACK_DEREGISTER(id);
}

View file

@ -234,3 +234,5 @@ rust_try
rust_begin_unwind
rust_take_task_borrow_list
rust_set_task_borrow_list
rust_valgrind_stack_register
rust_valgrind_stack_deregister