rustuv: Deal with the rtio changes

This commit is contained in:
Alex Crichton 2014-06-04 00:00:59 -07:00
parent 51348b068b
commit 550c347d7b
14 changed files with 318 additions and 545 deletions

View file

@ -8,12 +8,12 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use ai = std::io::net::addrinfo;
use libc::c_int;
use libc;
use std::mem;
use std::ptr::null;
use std::rt::task::BlockedTask;
use std::rt::rtio;
use net;
use super::{Loop, UvError, Request, wait_until_woken_after, wakeup};
@ -33,7 +33,9 @@ pub struct GetAddrInfoRequest;
impl GetAddrInfoRequest {
pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
hints: Option<ai::Hint>) -> Result<Vec<ai::Info>, UvError> {
hints: Option<rtio::AddrinfoHint>)
-> Result<Vec<rtio::AddrinfoInfo>, UvError>
{
assert!(node.is_some() || service.is_some());
let (_c_node, c_node_ptr) = match node {
Some(n) => {
@ -54,20 +56,11 @@ impl GetAddrInfoRequest {
};
let hint = hints.map(|hint| {
let mut flags = 0;
each_ai_flag(|cval, aival| {
if hint.flags & (aival as uint) != 0 {
flags |= cval as i32;
}
});
let socktype = 0;
let protocol = 0;
libc::addrinfo {
ai_flags: flags,
ai_flags: 0,
ai_family: hint.family as c_int,
ai_socktype: socktype,
ai_protocol: protocol,
ai_socktype: 0,
ai_protocol: 0,
ai_addrlen: 0,
ai_canonname: null(),
ai_addr: null(),
@ -119,22 +112,8 @@ impl Drop for Addrinfo {
}
}
fn each_ai_flag(_f: |c_int, ai::Flag|) {
/* FIXME: do we really want to support these?
unsafe {
f(uvll::rust_AI_ADDRCONFIG(), ai::AddrConfig);
f(uvll::rust_AI_ALL(), ai::All);
f(uvll::rust_AI_CANONNAME(), ai::CanonName);
f(uvll::rust_AI_NUMERICHOST(), ai::NumericHost);
f(uvll::rust_AI_NUMERICSERV(), ai::NumericServ);
f(uvll::rust_AI_PASSIVE(), ai::Passive);
f(uvll::rust_AI_V4MAPPED(), ai::V4Mapped);
}
*/
}
// Traverse the addrinfo linked list, producing a vector of Rust socket addresses
pub fn accum_addrinfo(addr: &Addrinfo) -> Vec<ai::Info> {
pub fn accum_addrinfo(addr: &Addrinfo) -> Vec<rtio::AddrinfoInfo> {
unsafe {
let mut addr = addr.handle;
@ -143,35 +122,12 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> Vec<ai::Info> {
let rustaddr = net::sockaddr_to_addr(mem::transmute((*addr).ai_addr),
(*addr).ai_addrlen as uint);
let mut flags = 0;
each_ai_flag(|cval, aival| {
if (*addr).ai_flags & cval != 0 {
flags |= aival as uint;
}
});
/* FIXME: do we really want to support these
let protocol = match (*addr).ai_protocol {
p if p == uvll::rust_IPPROTO_UDP() => Some(ai::UDP),
p if p == uvll::rust_IPPROTO_TCP() => Some(ai::TCP),
_ => None,
};
let socktype = match (*addr).ai_socktype {
p if p == uvll::rust_SOCK_STREAM() => Some(ai::Stream),
p if p == uvll::rust_SOCK_DGRAM() => Some(ai::Datagram),
p if p == uvll::rust_SOCK_RAW() => Some(ai::Raw),
_ => None,
};
*/
let protocol = None;
let socktype = None;
addrs.push(ai::Info {
addrs.push(rtio::AddrinfoInfo {
address: rustaddr,
family: (*addr).ai_family as uint,
socktype: socktype,
protocol: protocol,
flags: flags,
socktype: 0,
protocol: 0,
flags: 0,
});
if (*addr).ai_next.is_not_null() {
addr = (*addr).ai_next;

View file

@ -8,9 +8,10 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use alloc::arc::Arc;
use std::mem;
use std::rt::exclusive::Exclusive;
use std::rt::rtio::{Callback, RemoteCallback};
use std::unstable::sync::Exclusive;
use uvll;
use super::{Loop, UvHandle};
@ -22,12 +23,12 @@ pub struct AsyncWatcher {
// A flag to tell the callback to exit, set from the dtor. This is
// almost never contested - only in rare races with the dtor.
exit_flag: Exclusive<bool>
exit_flag: Arc<Exclusive<bool>>,
}
struct Payload {
callback: Box<Callback:Send>,
exit_flag: Exclusive<bool>,
exit_flag: Arc<Exclusive<bool>>,
}
impl AsyncWatcher {
@ -36,7 +37,7 @@ impl AsyncWatcher {
assert_eq!(unsafe {
uvll::uv_async_init(loop_.handle, handle, async_cb)
}, 0);
let flag = Exclusive::new(false);
let flag = Arc::new(Exclusive::new(false));
let payload = box Payload { callback: cb, exit_flag: flag.clone() };
unsafe {
let payload: *u8 = mem::transmute(payload);
@ -80,9 +81,7 @@ extern fn async_cb(handle: *uvll::uv_async_t) {
// could be called in the other thread, missing the final
// callback while still destroying the handle.
let should_exit = unsafe {
payload.exit_flag.with_imm(|&should_exit| should_exit)
};
let should_exit = unsafe { *payload.exit_flag.lock() };
payload.callback.call();
@ -108,16 +107,13 @@ impl RemoteCallback for AsyncWatcher {
impl Drop for AsyncWatcher {
fn drop(&mut self) {
unsafe {
self.exit_flag.with(|should_exit| {
// NB: These two things need to happen atomically. Otherwise
// the event handler could wake up due to a *previous*
// signal and see the exit flag, destroying the handle
// before the final send.
*should_exit = true;
uvll::uv_async_send(self.handle)
})
}
let mut should_exit = unsafe { self.exit_flag.lock() };
// NB: These two things need to happen atomically. Otherwise
// the event handler could wake up due to a *previous*
// signal and see the exit flag, destroying the handle
// before the final send.
*should_exit = true;
unsafe { uvll::uv_async_send(self.handle) }
}
}

View file

@ -12,9 +12,9 @@ use libc::{c_int, c_char, c_void, ssize_t};
use libc;
use std::c_str::CString;
use std::c_str;
use std::io::{FileStat, IoError};
use std::io;
use std::mem;
use std::os;
use std::rt::rtio::{IoResult, IoError};
use std::rt::rtio;
use std::rt::task::BlockedTask;
@ -56,21 +56,23 @@ impl FsRequest {
})
}
pub fn lstat(loop_: &Loop, path: &CString) -> Result<FileStat, UvError> {
pub fn lstat(loop_: &Loop, path: &CString)
-> Result<rtio::FileStat, UvError>
{
execute(|req, cb| unsafe {
uvll::uv_fs_lstat(loop_.handle, req, path.with_ref(|p| p),
cb)
}).map(|req| req.mkstat())
}
pub fn stat(loop_: &Loop, path: &CString) -> Result<FileStat, UvError> {
pub fn stat(loop_: &Loop, path: &CString) -> Result<rtio::FileStat, UvError> {
execute(|req, cb| unsafe {
uvll::uv_fs_stat(loop_.handle, req, path.with_ref(|p| p),
cb)
}).map(|req| req.mkstat())
}
pub fn fstat(loop_: &Loop, fd: c_int) -> Result<FileStat, UvError> {
pub fn fstat(loop_: &Loop, fd: c_int) -> Result<rtio::FileStat, UvError> {
execute(|req, cb| unsafe {
uvll::uv_fs_fstat(loop_.handle, req, fd, cb)
}).map(|req| req.mkstat())
@ -269,40 +271,30 @@ impl FsRequest {
unsafe { uvll::get_ptr_from_fs_req(self.req) }
}
pub fn mkstat(&self) -> FileStat {
pub fn mkstat(&self) -> rtio::FileStat {
let stat = self.get_stat();
fn to_msec(stat: uvll::uv_timespec_t) -> u64 {
// Be sure to cast to u64 first to prevent overflowing if the tv_sec
// field is a 32-bit integer.
(stat.tv_sec as u64) * 1000 + (stat.tv_nsec as u64) / 1000000
}
let kind = match (stat.st_mode as c_int) & libc::S_IFMT {
libc::S_IFREG => io::TypeFile,
libc::S_IFDIR => io::TypeDirectory,
libc::S_IFIFO => io::TypeNamedPipe,
libc::S_IFBLK => io::TypeBlockSpecial,
libc::S_IFLNK => io::TypeSymlink,
_ => io::TypeUnknown,
};
FileStat {
rtio::FileStat {
size: stat.st_size as u64,
kind: kind,
perm: io::FilePermission::from_bits_truncate(stat.st_mode as u32),
kind: stat.st_mode as u64,
perm: stat.st_mode as u64,
created: to_msec(stat.st_birthtim),
modified: to_msec(stat.st_mtim),
accessed: to_msec(stat.st_atim),
unstable: io::UnstableFileStat {
device: stat.st_dev as u64,
inode: stat.st_ino as u64,
rdev: stat.st_rdev as u64,
nlink: stat.st_nlink as u64,
uid: stat.st_uid as u64,
gid: stat.st_gid as u64,
blksize: stat.st_blksize as u64,
blocks: stat.st_blocks as u64,
flags: stat.st_flags as u64,
gen: stat.st_gen as u64,
}
device: stat.st_dev as u64,
inode: stat.st_ino as u64,
rdev: stat.st_rdev as u64,
nlink: stat.st_nlink as u64,
uid: stat.st_uid as u64,
gid: stat.st_gid as u64,
blksize: stat.st_blksize as u64,
blocks: stat.st_blocks as u64,
flags: stat.st_flags as u64,
gen: stat.st_gen as u64,
}
}
}
@ -369,29 +361,26 @@ impl FileWatcher {
}
}
fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
fn base_read(&mut self, buf: &mut [u8], offset: i64) -> IoResult<int> {
let _m = self.fire_homing_missile();
let r = FsRequest::read(&self.loop_, self.fd, buf, offset);
r.map_err(uv_error_to_io_error)
}
fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
fn base_write(&mut self, buf: &[u8], offset: i64) -> IoResult<()> {
let _m = self.fire_homing_missile();
let r = FsRequest::write(&self.loop_, self.fd, buf, offset);
r.map_err(uv_error_to_io_error)
}
fn seek_common(&self, pos: i64, whence: c_int) ->
Result<u64, IoError>{
unsafe {
match libc::lseek(self.fd, pos as libc::off_t, whence) {
-1 => {
Err(IoError {
kind: io::OtherIoError,
desc: "Failed to lseek.",
detail: None
})
},
n => Ok(n as u64)
}
fn seek_common(&self, pos: i64, whence: c_int) -> IoResult<u64>{
match unsafe { libc::lseek(self.fd, pos as libc::off_t, whence) } {
-1 => {
Err(IoError {
code: os::errno() as uint,
extra: 0,
detail: None,
})
},
n => Ok(n as u64)
}
}
}
@ -425,47 +414,47 @@ impl Drop for FileWatcher {
}
impl rtio::RtioFileStream for FileWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<int> {
self.base_read(buf, -1)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.base_write(buf, -1)
}
fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
fn pread(&mut self, buf: &mut [u8], offset: u64) -> IoResult<int> {
self.base_read(buf, offset as i64)
}
fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
fn pwrite(&mut self, buf: &[u8], offset: u64) -> IoResult<()> {
self.base_write(buf, offset as i64)
}
fn seek(&mut self, pos: i64, whence: io::SeekStyle) -> Result<u64, IoError> {
fn seek(&mut self, pos: i64, whence: rtio::SeekStyle) -> IoResult<u64> {
use libc::{SEEK_SET, SEEK_CUR, SEEK_END};
let whence = match whence {
io::SeekSet => SEEK_SET,
io::SeekCur => SEEK_CUR,
io::SeekEnd => SEEK_END
rtio::SeekSet => SEEK_SET,
rtio::SeekCur => SEEK_CUR,
rtio::SeekEnd => SEEK_END
};
self.seek_common(pos, whence)
}
fn tell(&self) -> Result<u64, IoError> {
fn tell(&self) -> IoResult<u64> {
use libc::SEEK_CUR;
self.seek_common(0, SEEK_CUR)
}
fn fsync(&mut self) -> Result<(), IoError> {
fn fsync(&mut self) -> IoResult<()> {
let _m = self.fire_homing_missile();
FsRequest::fsync(&self.loop_, self.fd).map_err(uv_error_to_io_error)
}
fn datasync(&mut self) -> Result<(), IoError> {
fn datasync(&mut self) -> IoResult<()> {
let _m = self.fire_homing_missile();
FsRequest::datasync(&self.loop_, self.fd).map_err(uv_error_to_io_error)
}
fn truncate(&mut self, offset: i64) -> Result<(), IoError> {
fn truncate(&mut self, offset: i64) -> IoResult<()> {
let _m = self.fire_homing_missile();
let r = FsRequest::truncate(&self.loop_, self.fd, offset);
r.map_err(uv_error_to_io_error)
}
fn fstat(&mut self) -> Result<FileStat, IoError> {
fn fstat(&mut self) -> IoResult<rtio::FileStat> {
let _m = self.fire_homing_missile();
FsRequest::fstat(&self.loop_, self.fd).map_err(uv_error_to_io_error)
}
@ -475,7 +464,6 @@ impl rtio::RtioFileStream for FileWatcher {
mod test {
use libc::c_int;
use libc::{O_CREAT, O_RDWR, O_RDONLY, S_IWUSR, S_IRUSR};
use std::io;
use std::str;
use super::FsRequest;
use super::super::Loop;
@ -562,10 +550,6 @@ mod test {
let result = FsRequest::mkdir(l(), path, mode);
assert!(result.is_ok());
let result = FsRequest::stat(l(), path);
assert!(result.is_ok());
assert!(result.unwrap().kind == io::TypeDirectory);
let result = FsRequest::rmdir(l(), path);
assert!(result.is_ok());

View file

@ -153,8 +153,7 @@ mod test {
use green::sched;
use green::{SchedPool, PoolConfig};
use std::rt::rtio::RtioUdpSocket;
use std::io::test::next_test_ip4;
use std::task::TaskOpts;
use std::rt::task::TaskOpts;
use net::UdpWatcher;
use super::super::local_loop;
@ -172,7 +171,7 @@ mod test {
});
pool.spawn(TaskOpts::new(), proc() {
let listener = UdpWatcher::bind(local_loop(), next_test_ip4());
let listener = UdpWatcher::bind(local_loop(), ::next_test_ip4());
tx.send(listener.unwrap());
});
@ -193,18 +192,18 @@ mod test {
});
pool.spawn(TaskOpts::new(), proc() {
let addr1 = next_test_ip4();
let addr2 = next_test_ip4();
let addr1 = ::next_test_ip4();
let addr2 = ::next_test_ip4();
let listener = UdpWatcher::bind(local_loop(), addr2);
tx.send((listener.unwrap(), addr1));
let mut listener = UdpWatcher::bind(local_loop(), addr1).unwrap();
listener.sendto([1, 2, 3, 4], addr2).unwrap();
listener.sendto([1, 2, 3, 4], addr2).ok().unwrap();
});
let task = pool.task(TaskOpts::new(), proc() {
let (mut watcher, addr) = rx.recv();
let mut buf = [0, ..10];
assert_eq!(watcher.recvfrom(buf).unwrap(), (4, addr));
assert!(watcher.recvfrom(buf).ok().unwrap() == (4, addr));
});
pool.spawn_sched().send(sched::TaskFromFriend(task));

View file

@ -51,16 +51,14 @@ extern crate alloc;
use libc::{c_int, c_void};
use std::fmt;
use std::io::IoError;
use std::io;
use std::mem;
use std::ptr::null;
use std::ptr;
use std::rt::local::Local;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
use std::rt::task::{BlockedTask, Task};
use std::str::raw::from_c_str;
use std::str;
use std::task;
pub use self::async::AsyncWatcher;
@ -391,40 +389,39 @@ fn error_smoke_test() {
assert_eq!(err.to_str(), "EOF: end of file".to_string());
}
#[cfg(unix)]
pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
unsafe {
// Importing error constants
let UvError(errcode) = uverr;
IoError {
code: if errcode == uvll::EOF {libc::EOF as uint} else {-errcode as uint},
extra: 0,
detail: Some(uverr.desc()),
}
}
// uv error descriptions are static
let UvError(errcode) = uverr;
let c_desc = uvll::uv_strerror(errcode);
let desc = str::raw::c_str_to_static_slice(c_desc);
let kind = match errcode {
uvll::UNKNOWN => io::OtherIoError,
uvll::OK => io::OtherIoError,
uvll::EOF => io::EndOfFile,
uvll::EACCES => io::PermissionDenied,
uvll::ECONNREFUSED => io::ConnectionRefused,
uvll::ECONNRESET => io::ConnectionReset,
uvll::ENOTCONN => io::NotConnected,
uvll::ENOENT => io::FileNotFound,
uvll::EPIPE => io::BrokenPipe,
uvll::ECONNABORTED => io::ConnectionAborted,
uvll::EADDRNOTAVAIL => io::ConnectionRefused,
uvll::ECANCELED => io::TimedOut,
#[cfg(windows)]
pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
let UvError(errcode) = uverr;
IoError {
code: match errcode {
uvll::EOF => io::EOF,
uvll::EACCES => io::ERROR_ACCESS_DENIED,
uvll::ECONNREFUSED => io::WSAECONNREFUSED,
uvll::ECONNRESET => io::WSAECONNRESET,
uvll::ENOTCONN => io::WSAENOTCONN,
uvll::ENOENT => io::ERROR_NOT_FOUND,
uvll::EPIPE => io::ERROR_BROKEN_PIPE,
uvll::ECONNABORTED => io::WSAECONNABORTED,
uvll::EADDRNOTAVAIL => io::WSAEADDRNOTAVAIL,
uvll::ECANCELED => libc::ERROR_OPERATION_ABORTED,
err => {
uvdebug!("uverr.code {}", err as int);
// FIXME: Need to map remaining uv error types
io::OtherIoError
-1
}
};
IoError {
kind: kind,
desc: desc,
detail: None
}
},
extra: 0,
detail: Some(uverr.desc()),
}
}
@ -437,7 +434,7 @@ pub fn status_to_maybe_uv_error(status: c_int) -> Option<UvError> {
}
}
pub fn status_to_io_result(status: c_int) -> Result<(), IoError> {
pub fn status_to_io_result(status: c_int) -> IoResult<()> {
if status >= 0 {Ok(())} else {Err(uv_error_to_io_error(UvError(status)))}
}
@ -471,6 +468,33 @@ fn local_loop() -> &'static mut uvio::UvIoFactory {
}
}
#[cfg(test)]
fn next_test_ip4() -> std::rt::rtio::SocketAddr {
use std::io;
use std::rt::rtio;
let io::net::ip::SocketAddr { ip, port } = io::test::next_test_ip4();
let ip = match ip {
io::net::ip::Ipv4Addr(a, b, c, d) => rtio::Ipv4Addr(a, b, c, d),
_ => unreachable!(),
};
rtio::SocketAddr { ip: ip, port: port }
}
#[cfg(test)]
fn next_test_ip6() -> std::rt::rtio::SocketAddr {
use std::io;
use std::rt::rtio;
let io::net::ip::SocketAddr { ip, port } = io::test::next_test_ip6();
let ip = match ip {
io::net::ip::Ipv6Addr(a, b, c, d, e, f, g, h) =>
rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
_ => unreachable!(),
};
rtio::SocketAddr { ip: ip, port: port }
}
#[cfg(test)]
mod test {
use std::mem::transmute;

View file

@ -10,12 +10,10 @@
use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use libc;
use std::io;
use std::io::IoError;
use std::io::net::ip;
use std::mem;
use std::ptr;
use std::rt::rtio;
use std::rt::rtio::IoError;
use std::rt::task::BlockedTask;
use homing::{HomingIO, HomeHandle};
@ -36,7 +34,7 @@ pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
len: uint) -> ip::SocketAddr {
len: uint) -> rtio::SocketAddr {
match storage.ss_family as c_int {
libc::AF_INET => {
assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
@ -48,8 +46,8 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
let b = (ip >> 16) as u8;
let c = (ip >> 8) as u8;
let d = (ip >> 0) as u8;
ip::SocketAddr {
ip: ip::Ipv4Addr(a, b, c, d),
rtio::SocketAddr {
ip: rtio::Ipv4Addr(a, b, c, d),
port: ntohs(storage.sin_port),
}
}
@ -66,8 +64,8 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
let f = ntohs(storage.sin6_addr.s6_addr[5]);
let g = ntohs(storage.sin6_addr.s6_addr[6]);
let h = ntohs(storage.sin6_addr.s6_addr[7]);
ip::SocketAddr {
ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
rtio::SocketAddr {
ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
port: ntohs(storage.sin6_port),
}
}
@ -77,11 +75,11 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
}
}
fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
fn addr_to_sockaddr(addr: rtio::SocketAddr) -> (libc::sockaddr_storage, uint) {
unsafe {
let mut storage: libc::sockaddr_storage = mem::zeroed();
let len = match addr.ip {
ip::Ipv4Addr(a, b, c, d) => {
rtio::Ipv4Addr(a, b, c, d) => {
let ip = (a as u32 << 24) |
(b as u32 << 16) |
(c as u32 << 8) |
@ -95,7 +93,7 @@ fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
};
mem::size_of::<libc::sockaddr_in>()
}
ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
let storage: &mut libc::sockaddr_in6 =
mem::transmute(&mut storage);
storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
@ -126,7 +124,7 @@ enum SocketNameKind {
}
fn socket_name(sk: SocketNameKind,
handle: *c_void) -> Result<ip::SocketAddr, IoError> {
handle: *c_void) -> Result<rtio::SocketAddr, IoError> {
let getsockname = match sk {
TcpPeer => uvll::uv_tcp_getpeername,
Tcp => uvll::uv_tcp_getsockname,
@ -201,7 +199,7 @@ impl TcpWatcher {
}
pub fn connect(io: &mut UvIoFactory,
address: ip::SocketAddr,
address: rtio::SocketAddr,
timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
let tcp = TcpWatcher::new(io);
let cx = ConnectCtx { status: -1, task: None, timer: None };
@ -218,7 +216,7 @@ impl HomingIO for TcpWatcher {
}
impl rtio::RtioSocket for TcpWatcher {
fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Tcp, self.handle)
}
@ -231,7 +229,7 @@ impl rtio::RtioTcpStream for TcpWatcher {
// see comments in close_read about this check
if guard.access.is_closed() {
return Err(io::standard_error(io::EndOfFile))
return Err(uv_error_to_io_error(UvError(uvll::EOF)))
}
self.stream.read(buf).map_err(uv_error_to_io_error)
@ -243,7 +241,7 @@ impl rtio::RtioTcpStream for TcpWatcher {
self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
}
fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
fn peer_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(TcpPeer, self.handle)
}
@ -350,7 +348,7 @@ impl Drop for TcpWatcher {
// TCP listeners (unbound servers)
impl TcpListener {
pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr)
-> Result<Box<TcpListener>, UvError> {
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
assert_eq!(unsafe {
@ -385,7 +383,7 @@ impl UvHandle<uvll::uv_tcp_t> for TcpListener {
}
impl rtio::RtioSocket for TcpListener {
fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Tcp, self.handle)
}
@ -439,7 +437,7 @@ impl HomingIO for TcpAcceptor {
}
impl rtio::RtioSocket for TcpAcceptor {
fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Tcp, self.listener.handle)
}
@ -492,7 +490,7 @@ pub struct UdpWatcher {
struct UdpRecvCtx {
task: Option<BlockedTask>,
buf: Option<Buf>,
result: Option<(ssize_t, Option<ip::SocketAddr>)>,
result: Option<(ssize_t, Option<rtio::SocketAddr>)>,
}
struct UdpSendCtx {
@ -502,7 +500,7 @@ struct UdpSendCtx {
}
impl UdpWatcher {
pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
pub fn bind(io: &mut UvIoFactory, address: rtio::SocketAddr)
-> Result<UdpWatcher, UvError> {
let udp = UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
@ -536,7 +534,7 @@ impl HomingIO for UdpWatcher {
}
impl rtio::RtioSocket for UdpWatcher {
fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Udp, self.handle)
}
@ -544,7 +542,7 @@ impl rtio::RtioSocket for UdpWatcher {
impl rtio::RtioUdpSocket for UdpWatcher {
fn recvfrom(&mut self, buf: &mut [u8])
-> Result<(uint, ip::SocketAddr), IoError>
-> Result<(uint, rtio::SocketAddr), IoError>
{
let loop_ = self.uv_loop();
let m = self.fire_homing_missile();
@ -609,7 +607,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
}
}
fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
fn sendto(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> Result<(), IoError> {
let m = self.fire_homing_missile();
let loop_ = self.uv_loop();
let guard = try!(self.write_access.grant(m));
@ -675,7 +673,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
}
}
fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
fn join_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
multi.to_str().with_c_str(|m_addr| {
@ -686,7 +684,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
})
}
fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
fn leave_multicast(&mut self, multi: rtio::IpAddr) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
multi.to_str().with_c_str(|m_addr| {
@ -843,14 +841,13 @@ pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError>
mod test {
use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
RtioUdpSocket};
use std::io::test::{next_test_ip4, next_test_ip6};
use super::{UdpWatcher, TcpWatcher, TcpListener};
use super::super::local_loop;
#[test]
fn connect_close_ip4() {
match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
match TcpWatcher::connect(local_loop(), ::next_test_ip4(), None) {
Ok(..) => fail!(),
Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()),
}
@ -858,7 +855,7 @@ mod test {
#[test]
fn connect_close_ip6() {
match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
match TcpWatcher::connect(local_loop(), ::next_test_ip6(), None) {
Ok(..) => fail!(),
Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_string()),
}
@ -866,7 +863,7 @@ mod test {
#[test]
fn udp_bind_close_ip4() {
match UdpWatcher::bind(local_loop(), next_test_ip4()) {
match UdpWatcher::bind(local_loop(), ::next_test_ip4()) {
Ok(..) => {}
Err(..) => fail!()
}
@ -874,7 +871,7 @@ mod test {
#[test]
fn udp_bind_close_ip6() {
match UdpWatcher::bind(local_loop(), next_test_ip6()) {
match UdpWatcher::bind(local_loop(), ::next_test_ip6()) {
Ok(..) => {}
Err(..) => fail!()
}
@ -883,7 +880,7 @@ mod test {
#[test]
fn listen_ip4() {
let (tx, rx) = channel();
let addr = next_test_ip4();
let addr = ::next_test_ip4();
spawn(proc() {
let w = match TcpListener::bind(local_loop(), addr) {
@ -919,7 +916,7 @@ mod test {
#[test]
fn listen_ip6() {
let (tx, rx) = channel();
let addr = next_test_ip6();
let addr = ::next_test_ip6();
spawn(proc() {
let w = match TcpListener::bind(local_loop(), addr) {
@ -955,8 +952,8 @@ mod test {
#[test]
fn udp_recv_ip4() {
let (tx, rx) = channel();
let client = next_test_ip4();
let server = next_test_ip4();
let client = ::next_test_ip4();
let server = ::next_test_ip4();
spawn(proc() {
match UdpWatcher::bind(local_loop(), server) {
@ -964,7 +961,7 @@ mod test {
tx.send(());
let mut buf = [0u8, ..10];
match w.recvfrom(buf) {
Ok((10, addr)) => assert_eq!(addr, client),
Ok((10, addr)) => assert!(addr == client),
e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
@ -987,8 +984,8 @@ mod test {
#[test]
fn udp_recv_ip6() {
let (tx, rx) = channel();
let client = next_test_ip6();
let server = next_test_ip6();
let client = ::next_test_ip6();
let server = ::next_test_ip6();
spawn(proc() {
match UdpWatcher::bind(local_loop(), server) {
@ -996,7 +993,7 @@ mod test {
tx.send(());
let mut buf = [0u8, ..10];
match w.recvfrom(buf) {
Ok((10, addr)) => assert_eq!(addr, client),
Ok((10, addr)) => assert!(addr == client),
e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
@ -1018,15 +1015,15 @@ mod test {
#[test]
fn test_read_read_read() {
let addr = next_test_ip4();
let addr = ::next_test_ip4();
static MAX: uint = 5000;
let (tx, rx) = channel();
spawn(proc() {
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let mut acceptor = listener.listen().ok().unwrap();
tx.send(());
let mut stream = acceptor.accept().unwrap();
let mut stream = acceptor.accept().ok().unwrap();
let buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {
@ -1041,7 +1038,7 @@ mod test {
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
let nread = stream.read(buf).unwrap();
let nread = stream.read(buf).ok().unwrap();
total_bytes_read += nread;
for i in range(0u, nread) {
assert_eq!(buf[i], 1);
@ -1053,8 +1050,8 @@ mod test {
#[test]
#[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
fn test_udp_twice() {
let server_addr = next_test_ip4();
let client_addr = next_test_ip4();
let server_addr = ::next_test_ip4();
let client_addr = ::next_test_ip4();
let (tx, rx) = channel();
spawn(proc() {
@ -1068,22 +1065,22 @@ mod test {
tx.send(());
let mut buf1 = [0];
let mut buf2 = [0];
let (nread1, src1) = server.recvfrom(buf1).unwrap();
let (nread2, src2) = server.recvfrom(buf2).unwrap();
let (nread1, src1) = server.recvfrom(buf1).ok().unwrap();
let (nread2, src2) = server.recvfrom(buf2).ok().unwrap();
assert_eq!(nread1, 1);
assert_eq!(nread2, 1);
assert_eq!(src1, client_addr);
assert_eq!(src2, client_addr);
assert!(src1 == client_addr);
assert!(src2 == client_addr);
assert_eq!(buf1[0], 1);
assert_eq!(buf2[0], 2);
}
#[test]
fn test_udp_many_read() {
let server_out_addr = next_test_ip4();
let server_in_addr = next_test_ip4();
let client_out_addr = next_test_ip4();
let client_in_addr = next_test_ip4();
let server_out_addr = ::next_test_ip4();
let server_in_addr = ::next_test_ip4();
let client_out_addr = ::next_test_ip4();
let client_in_addr = ::next_test_ip4();
static MAX: uint = 500_000;
let (tx1, rx1) = channel::<()>();
@ -1106,9 +1103,9 @@ mod test {
// check if the client has received enough
let res = server_in.recvfrom(buf);
assert!(res.is_ok());
let (nread, src) = res.unwrap();
let (nread, src) = res.ok().unwrap();
assert_eq!(nread, 1);
assert_eq!(src, client_out_addr);
assert!(src == client_out_addr);
}
assert!(total_bytes_sent >= MAX);
});
@ -1127,8 +1124,8 @@ mod test {
// wait for data
let res = client_in.recvfrom(buf);
assert!(res.is_ok());
let (nread, src) = res.unwrap();
assert_eq!(src, server_out_addr);
let (nread, src) = res.ok().unwrap();
assert!(src == server_out_addr);
total_bytes_recv += nread;
for i in range(0u, nread) {
assert_eq!(buf[i], 1);
@ -1140,25 +1137,25 @@ mod test {
#[test]
fn test_read_and_block() {
let addr = next_test_ip4();
let addr = ::next_test_ip4();
let (tx, rx) = channel::<Receiver<()>>();
spawn(proc() {
let rx = rx.recv();
let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
rx.recv();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
rx.recv();
});
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let mut acceptor = listener.listen().ok().unwrap();
let (tx2, rx2) = channel();
tx.send(rx2);
let mut stream = acceptor.accept().unwrap();
let mut stream = acceptor.accept().ok().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
@ -1166,7 +1163,7 @@ mod test {
let mut reads = 0;
while current < expected {
let nread = stream.read(buf).unwrap();
let nread = stream.read(buf).ok().unwrap();
for i in range(0u, nread) {
let val = buf[i] as uint;
assert_eq!(val, current % 8);
@ -1183,14 +1180,14 @@ mod test {
#[test]
fn test_simple_tcp_server_and_client_on_diff_threads() {
let addr = next_test_ip4();
let addr = ::next_test_ip4();
spawn(proc() {
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap();
let mut acceptor = listener.listen().ok().unwrap();
let mut stream = acceptor.accept().ok().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
let nread = stream.read(buf).ok().unwrap();
assert_eq!(nread, 8);
for i in range(0u, nread) {
assert_eq!(buf[i], i as u8);
@ -1201,27 +1198,27 @@ mod test {
while stream.is_err() {
stream = TcpWatcher::connect(local_loop(), addr, None);
}
stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).ok().unwrap();
}
#[should_fail] #[test]
fn tcp_listener_fail_cleanup() {
let addr = next_test_ip4();
let addr = ::next_test_ip4();
let w = TcpListener::bind(local_loop(), addr).unwrap();
let _w = w.listen().unwrap();
let _w = w.listen().ok().unwrap();
fail!();
}
#[should_fail] #[test]
fn tcp_stream_fail_cleanup() {
let (tx, rx) = channel();
let addr = next_test_ip4();
let addr = ::next_test_ip4();
spawn(proc() {
let w = TcpListener::bind(local_loop(), addr).unwrap();
let mut w = w.listen().unwrap();
let mut w = w.listen().ok().unwrap();
tx.send(());
drop(w.accept().unwrap());
drop(w.accept().ok().unwrap());
});
rx.recv();
let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
@ -1230,14 +1227,14 @@ mod test {
#[should_fail] #[test]
fn udp_listener_fail_cleanup() {
let addr = next_test_ip4();
let addr = ::next_test_ip4();
let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
fail!();
}
#[should_fail] #[test]
fn udp_fail_other_task() {
let addr = next_test_ip4();
let addr = ::next_test_ip4();
let (tx, rx) = channel();
// force the handle to be created on a different scheduler, failure in

View file

@ -10,10 +10,9 @@
use libc;
use std::c_str::CString;
use std::io::IoError;
use std::io;
use std::mem;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
use std::rt::rtio;
use std::rt::rtio::IoResult;
use std::rt::task::BlockedTask;
use homing::{HomingIO, HomeHandle};
@ -39,8 +38,8 @@ pub struct PipeWatcher {
pub struct PipeListener {
home: HomeHandle,
pipe: *uvll::uv_pipe_t,
outgoing: Sender<Result<Box<RtioPipe:Send>, IoError>>,
incoming: Receiver<Result<Box<RtioPipe:Send>, IoError>>,
outgoing: Sender<IoResult<Box<rtio::RtioPipe:Send>>>,
incoming: Receiver<IoResult<Box<rtio::RtioPipe:Send>>>,
}
pub struct PipeAcceptor {
@ -111,26 +110,26 @@ impl PipeWatcher {
}
}
impl RtioPipe for PipeWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
impl rtio::RtioPipe for PipeWatcher {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let m = self.fire_homing_missile();
let guard = try!(self.read_access.grant(m));
// see comments in close_read about this check
if guard.access.is_closed() {
return Err(io::standard_error(io::EndOfFile))
return Err(uv_error_to_io_error(UvError(uvll::EOF)))
}
self.stream.read(buf).map_err(uv_error_to_io_error)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
let m = self.fire_homing_missile();
let guard = try!(self.write_access.grant(m));
self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
}
fn clone(&self) -> Box<RtioPipe:Send> {
fn clone(&self) -> Box<rtio::RtioPipe:Send> {
box PipeWatcher {
stream: StreamWatcher::new(self.stream.handle),
defused: false,
@ -138,10 +137,10 @@ impl RtioPipe for PipeWatcher {
refcount: self.refcount.clone(),
read_access: self.read_access.clone(),
write_access: self.write_access.clone(),
} as Box<RtioPipe:Send>
} as Box<rtio::RtioPipe:Send>
}
fn close_read(&mut self) -> Result<(), IoError> {
fn close_read(&mut self) -> IoResult<()> {
// The current uv_shutdown method only shuts the writing half of the
// connection, and no method is provided to shut down the reading half
// of the connection. With a lack of method, we emulate shutting down
@ -168,7 +167,7 @@ impl RtioPipe for PipeWatcher {
Ok(())
}
fn close_write(&mut self) -> Result<(), IoError> {
fn close_write(&mut self) -> IoResult<()> {
let _m = self.fire_homing_missile();
net::shutdown(self.stream.handle, &self.uv_loop())
}
@ -248,8 +247,8 @@ impl PipeListener {
}
}
impl RtioUnixListener for PipeListener {
fn listen(~self) -> Result<Box<RtioUnixAcceptor:Send>, IoError> {
impl rtio::RtioUnixListener for PipeListener {
fn listen(~self) -> IoResult<Box<rtio::RtioUnixAcceptor:Send>> {
// create the acceptor object from ourselves
let mut acceptor = box PipeAcceptor {
listener: self,
@ -259,7 +258,7 @@ impl RtioUnixListener for PipeListener {
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
0 => Ok(acceptor as Box<RtioUnixAcceptor:Send>),
0 => Ok(acceptor as Box<rtio::RtioUnixAcceptor:Send>),
n => Err(uv_error_to_io_error(UvError(n))),
}
}
@ -284,7 +283,7 @@ extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
});
let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
Ok(box client as Box<RtioPipe:Send>)
Ok(box client as Box<rtio::RtioPipe:Send>)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
@ -300,8 +299,8 @@ impl Drop for PipeListener {
// PipeAcceptor implementation and traits
impl RtioUnixAcceptor for PipeAcceptor {
fn accept(&mut self) -> Result<Box<RtioPipe:Send>, IoError> {
impl rtio::RtioUnixAcceptor for PipeAcceptor {
fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe:Send>> {
self.timeout.accept(&self.listener.incoming)
}
@ -366,11 +365,11 @@ mod tests {
spawn(proc() {
let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
let mut p = p.listen().unwrap();
let mut p = p.listen().ok().unwrap();
tx.send(());
let mut client = p.accept().unwrap();
let mut client = p.accept().ok().unwrap();
let mut buf = [0];
assert!(client.read(buf).unwrap() == 1);
assert!(client.read(buf).ok().unwrap() == 1);
assert_eq!(buf[0], 1);
assert!(client.write([2]).is_ok());
});
@ -378,7 +377,7 @@ mod tests {
let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
assert!(c.write([1]).is_ok());
let mut buf = [0];
assert!(c.read(buf).unwrap() == 1);
assert!(c.read(buf).ok().unwrap() == 1);
assert_eq!(buf[0], 2);
}
@ -390,9 +389,9 @@ mod tests {
spawn(proc() {
let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
let mut p = p.listen().unwrap();
let mut p = p.listen().ok().unwrap();
tx.send(());
drop(p.accept().unwrap());
drop(p.accept().ok().unwrap());
});
rx.recv();
let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();

View file

@ -10,11 +10,10 @@
use libc::c_int;
use libc;
use std::io::IoError;
use std::io::process;
use std::ptr;
use std::c_str::CString;
use std::rt::rtio::{ProcessConfig, RtioProcess};
use std::rt::rtio;
use std::rt::rtio::IoResult;
use std::rt::task::BlockedTask;
use homing::{HomingIO, HomeHandle};
@ -33,7 +32,7 @@ pub struct Process {
to_wake: Option<BlockedTask>,
/// Collected from the exit_cb
exit_status: Option<process::ProcessExit>,
exit_status: Option<rtio::ProcessExit>,
/// Lazily initialized timeout timer
timer: Option<Box<TimerWatcher>>,
@ -51,7 +50,7 @@ impl Process {
///
/// Returns either the corresponding process object or an error which
/// occurred.
pub fn spawn(io_loop: &mut UvIoFactory, cfg: ProcessConfig)
pub fn spawn(io_loop: &mut UvIoFactory, cfg: rtio::ProcessConfig)
-> Result<(Box<Process>, Vec<Option<PipeWatcher>>), UvError> {
let mut io = vec![cfg.stdin, cfg.stdout, cfg.stderr];
for slot in cfg.extra_io.iter() {
@ -137,8 +136,8 @@ extern fn on_exit(handle: *uvll::uv_process_t,
assert!(p.exit_status.is_none());
p.exit_status = Some(match term_signal {
0 => process::ExitStatus(exit_status as int),
n => process::ExitSignal(n as int),
0 => rtio::ExitStatus(exit_status as int),
n => rtio::ExitSignal(n as int),
});
if p.to_wake.is_none() { return }
@ -146,19 +145,19 @@ extern fn on_exit(handle: *uvll::uv_process_t,
}
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
io: &process::StdioContainer,
io: &rtio::StdioContainer,
io_loop: &mut UvIoFactory) -> Option<PipeWatcher> {
match *io {
process::Ignored => {
rtio::Ignored => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
None
}
process::InheritFd(fd) => {
rtio::InheritFd(fd) => {
uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD);
uvll::set_stdio_container_fd(dst, fd);
None
}
process::CreatePipe(readable, writable) => {
rtio::CreatePipe(readable, writable) => {
let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int;
if readable {
flags |= uvll::STDIO_READABLE_PIPE as libc::c_int;
@ -231,12 +230,12 @@ impl UvHandle<uvll::uv_process_t> for Process {
fn uv_handle(&self) -> *uvll::uv_process_t { self.handle }
}
impl RtioProcess for Process {
impl rtio::RtioProcess for Process {
fn id(&self) -> libc::pid_t {
unsafe { uvll::process_pid(self.handle) as libc::pid_t }
}
fn kill(&mut self, signal: int) -> Result<(), IoError> {
fn kill(&mut self, signal: int) -> IoResult<()> {
let _m = self.fire_homing_missile();
match unsafe {
uvll::uv_process_kill(self.handle, signal as libc::c_int)
@ -246,7 +245,7 @@ impl RtioProcess for Process {
}
}
fn wait(&mut self) -> Result<process::ProcessExit, IoError> {
fn wait(&mut self) -> IoResult<rtio::ProcessExit> {
// Make sure (on the home scheduler) that we have an exit status listed
let _m = self.fire_homing_missile();
match self.exit_status {

View file

@ -23,8 +23,8 @@
use alloc::arc::Arc;
use libc::c_void;
use std::mem;
use std::rt::mutex::NativeMutex;
use std::rt::task::BlockedTask;
use std::unstable::mutex::NativeMutex;
use mpsc = std::sync::mpsc_queue;
use async::AsyncWatcher;

View file

@ -9,8 +9,7 @@
// except according to those terms.
use libc::c_int;
use std::io::signal::Signum;
use std::rt::rtio::RtioSignal;
use std::rt::rtio::{RtioSignal, Callback};
use homing::{HomingIO, HomeHandle};
use super::{UvError, UvHandle};
@ -21,18 +20,16 @@ pub struct SignalWatcher {
handle: *uvll::uv_signal_t,
home: HomeHandle,
channel: Sender<Signum>,
signal: Signum,
cb: Box<Callback:Send>,
}
impl SignalWatcher {
pub fn new(io: &mut UvIoFactory, signum: Signum, channel: Sender<Signum>)
pub fn new(io: &mut UvIoFactory, signum: int, cb: Box<Callback:Send>)
-> Result<Box<SignalWatcher>, UvError> {
let s = box SignalWatcher {
handle: UvHandle::alloc(None::<SignalWatcher>, uvll::UV_SIGNAL),
home: io.make_handle(),
channel: channel,
signal: signum,
cb: cb,
};
assert_eq!(unsafe {
uvll::uv_signal_init(io.uv_loop(), s.handle)
@ -48,10 +45,9 @@ impl SignalWatcher {
}
}
extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) {
extern fn signal_cb(handle: *uvll::uv_signal_t, _signum: c_int) {
let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
assert_eq!(signum as int, s.signal as int);
let _ = s.channel.send_opt(s.signal);
let _ = s.cb.call();
}
impl HomingIO for SignalWatcher {
@ -70,25 +66,3 @@ impl Drop for SignalWatcher {
self.close();
}
}
#[cfg(test)]
mod test {
use super::super::local_loop;
use std::io::signal;
use super::SignalWatcher;
#[test]
fn closing_channel_during_drop_doesnt_kill_everything() {
// see issue #10375, relates to timers as well.
let (tx, rx) = channel();
let _signal = SignalWatcher::new(local_loop(), signal::Interrupt,
tx);
spawn(proc() {
let _ = rx.recv_opt();
});
// when we drop the SignalWatcher we're going to destroy the channel,
// which must wake up the task on the other end
}
}

View file

@ -9,9 +9,9 @@
// except according to those terms.
use libc::c_int;
use std::io::IoResult;
use std::mem;
use std::rt::task::BlockedTask;
use std::rt::rtio::IoResult;
use access;
use homing::{HomeHandle, HomingMissile, HomingIO};

View file

@ -9,7 +9,7 @@
// except according to those terms.
use std::mem;
use std::rt::rtio::RtioTimer;
use std::rt::rtio::{RtioTimer, Callback};
use std::rt::task::BlockedTask;
use homing::{HomeHandle, HomingIO};
@ -27,8 +27,8 @@ pub struct TimerWatcher {
pub enum NextAction {
WakeTask,
SendOnce(Sender<()>),
SendMany(Sender<()>, uint),
CallOnce(Box<Callback:Send>),
CallMany(Box<Callback:Send>, uint),
}
impl TimerWatcher {
@ -103,9 +103,7 @@ impl RtioTimer for TimerWatcher {
self.stop();
}
fn oneshot(&mut self, msecs: u64) -> Receiver<()> {
let (tx, rx) = channel();
fn oneshot(&mut self, msecs: u64, cb: Box<Callback:Send>) {
// similarly to the destructor, we must drop the previous action outside
// of the homing missile
let _prev_action = {
@ -113,15 +111,11 @@ impl RtioTimer for TimerWatcher {
self.id += 1;
self.stop();
self.start(timer_cb, msecs, 0);
mem::replace(&mut self.action, Some(SendOnce(tx)))
mem::replace(&mut self.action, Some(CallOnce(cb)))
};
return rx;
}
fn period(&mut self, msecs: u64) -> Receiver<()> {
let (tx, rx) = channel();
fn period(&mut self, msecs: u64, cb: Box<Callback:Send>) {
// similarly to the destructor, we must drop the previous action outside
// of the homing missile
let _prev_action = {
@ -129,10 +123,8 @@ impl RtioTimer for TimerWatcher {
self.id += 1;
self.stop();
self.start(timer_cb, msecs, msecs);
mem::replace(&mut self.action, Some(SendMany(tx, self.id)))
mem::replace(&mut self.action, Some(CallMany(cb, self.id)))
};
return rx;
}
}
@ -145,9 +137,9 @@ extern fn timer_cb(handle: *uvll::uv_timer_t) {
let task = timer.blocker.take_unwrap();
let _ = task.wake().map(|t| t.reawaken());
}
SendOnce(chan) => { let _ = chan.send_opt(()); }
SendMany(chan, id) => {
let _ = chan.send_opt(());
CallOnce(mut cb) => { cb.call() }
CallMany(mut cb, id) => {
cb.call();
// Note that the above operation could have performed some form of
// scheduling. This means that the timer may have decided to insert
@ -158,7 +150,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t) {
// for you. We're guaranteed to all be running on the same thread,
// so there's no need for any synchronization here.
if timer.id == id {
timer.action = Some(SendMany(chan, id));
timer.action = Some(CallMany(cb, id));
}
}
}
@ -179,145 +171,3 @@ impl Drop for TimerWatcher {
};
}
}
#[cfg(test)]
mod test {
use std::rt::rtio::RtioTimer;
use super::super::local_loop;
use super::TimerWatcher;
#[test]
fn oneshot() {
let mut timer = TimerWatcher::new(local_loop());
let port = timer.oneshot(1);
port.recv();
let port = timer.oneshot(1);
port.recv();
}
#[test]
fn override() {
let mut timer = TimerWatcher::new(local_loop());
let oport = timer.oneshot(1);
let pport = timer.period(1);
timer.sleep(1);
assert_eq!(oport.recv_opt(), Err(()));
assert_eq!(pport.recv_opt(), Err(()));
timer.oneshot(1).recv();
}
#[test]
fn period() {
let mut timer = TimerWatcher::new(local_loop());
let port = timer.period(1);
port.recv();
port.recv();
let port2 = timer.period(1);
port2.recv();
port2.recv();
}
#[test]
fn sleep() {
let mut timer = TimerWatcher::new(local_loop());
timer.sleep(1);
timer.sleep(1);
}
#[test] #[should_fail]
fn oneshot_fail() {
let mut timer = TimerWatcher::new(local_loop());
let _port = timer.oneshot(1);
fail!();
}
#[test] #[should_fail]
fn period_fail() {
let mut timer = TimerWatcher::new(local_loop());
let _port = timer.period(1);
fail!();
}
#[test] #[should_fail]
fn normal_fail() {
let _timer = TimerWatcher::new(local_loop());
fail!();
}
#[test]
fn closing_channel_during_drop_doesnt_kill_everything() {
// see issue #10375
let mut timer = TimerWatcher::new(local_loop());
let timer_port = timer.period(1000);
spawn(proc() {
let _ = timer_port.recv_opt();
});
// when we drop the TimerWatcher we're going to destroy the channel,
// which must wake up the task on the other end
}
#[test]
fn reset_doesnt_switch_tasks() {
// similar test to the one above.
let mut timer = TimerWatcher::new(local_loop());
let timer_port = timer.period(1000);
spawn(proc() {
let _ = timer_port.recv_opt();
});
drop(timer.oneshot(1));
}
#[test]
fn reset_doesnt_switch_tasks2() {
// similar test to the one above.
let mut timer = TimerWatcher::new(local_loop());
let timer_port = timer.period(1000);
spawn(proc() {
let _ = timer_port.recv_opt();
});
timer.sleep(1);
}
#[test]
fn sender_goes_away_oneshot() {
let port = {
let mut timer = TimerWatcher::new(local_loop());
timer.oneshot(1000)
};
assert_eq!(port.recv_opt(), Err(()));
}
#[test]
fn sender_goes_away_period() {
let port = {
let mut timer = TimerWatcher::new(local_loop());
timer.period(1000)
};
assert_eq!(port.recv_opt(), Err(()));
}
#[test]
fn receiver_goes_away_oneshot() {
let mut timer1 = TimerWatcher::new(local_loop());
drop(timer1.oneshot(1));
let mut timer2 = TimerWatcher::new(local_loop());
// while sleeping, the prevous timer should fire and not have its
// callback do something terrible.
timer2.sleep(2);
}
#[test]
fn receiver_goes_away_period() {
let mut timer1 = TimerWatcher::new(local_loop());
drop(timer1.period(1));
let mut timer2 = TimerWatcher::new(local_loop());
// while sleeping, the prevous timer should fire and not have its
// callback do something terrible.
timer2.sleep(2);
}
}

View file

@ -9,9 +9,8 @@
// except according to those terms.
use libc;
use std::io::IoError;
use std::ptr;
use std::rt::rtio::RtioTTY;
use std::rt::rtio::{RtioTTY, IoResult};
use homing::{HomingIO, HomeHandle};
use stream::StreamWatcher;
@ -80,17 +79,17 @@ impl TtyWatcher {
}
impl RtioTTY for TtyWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let _m = self.fire_homing_missile();
self.stream.read(buf).map_err(uv_error_to_io_error)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
let _m = self.fire_homing_missile();
self.stream.write(buf, false).map_err(uv_error_to_io_error)
}
fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
fn set_raw(&mut self, raw: bool) -> IoResult<()> {
let raw = raw as libc::c_int;
let _m = self.fire_homing_missile();
match unsafe { uvll::uv_tty_set_mode(self.tty, raw) } {
@ -100,7 +99,7 @@ impl RtioTTY for TtyWatcher {
}
#[allow(unused_mut)]
fn get_winsize(&mut self) -> Result<(int, int), IoError> {
fn get_winsize(&mut self) -> IoResult<(int, int)> {
let mut width: libc::c_int = 0;
let mut height: libc::c_int = 0;
let widthptr: *libc::c_int = &width;

View file

@ -11,20 +11,13 @@
//! The implementation of `rtio` for libuv
use std::c_str::CString;
use std::io::IoError;
use std::io::net::ip::SocketAddr;
use std::io::signal::Signum;
use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
ReadWrite, FileStat};
use std::io;
use std::mem;
use libc::c_int;
use libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR,
S_IWUSR};
use libc;
use std::rt::rtio;
use std::rt::rtio::{ProcessConfig, IoFactory, EventLoop};
use ai = std::io::net::addrinfo;
use std::rt::rtio::{ProcessConfig, IoFactory, EventLoop, IoResult};
#[cfg(test)] use std::rt::thread::Thread;
@ -147,36 +140,38 @@ impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn tcp_connect(&mut self, addr: SocketAddr, timeout: Option<u64>)
-> Result<Box<rtio::RtioTcpStream:Send>, IoError> {
fn tcp_connect(&mut self, addr: rtio::SocketAddr, timeout: Option<u64>)
-> IoResult<Box<rtio::RtioTcpStream:Send>> {
match TcpWatcher::connect(self, addr, timeout) {
Ok(t) => Ok(box t as Box<rtio::RtioTcpStream:Send>),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
fn tcp_bind(&mut self, addr: SocketAddr)
-> Result<Box<rtio::RtioTcpListener:Send>, IoError> {
fn tcp_bind(&mut self, addr: rtio::SocketAddr)
-> IoResult<Box<rtio::RtioTcpListener:Send>> {
match TcpListener::bind(self, addr) {
Ok(t) => Ok(t as Box<rtio::RtioTcpListener:Send>),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
fn udp_bind(&mut self, addr: SocketAddr)
-> Result<Box<rtio::RtioUdpSocket:Send>, IoError> {
fn udp_bind(&mut self, addr: rtio::SocketAddr)
-> IoResult<Box<rtio::RtioUdpSocket:Send>> {
match UdpWatcher::bind(self, addr) {
Ok(u) => Ok(box u as Box<rtio::RtioUdpSocket:Send>),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
fn timer_init(&mut self) -> Result<Box<rtio::RtioTimer:Send>, IoError> {
fn timer_init(&mut self) -> IoResult<Box<rtio::RtioTimer:Send>> {
Ok(TimerWatcher::new(self) as Box<rtio::RtioTimer:Send>)
}
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<ai::Hint>) -> Result<Vec<ai::Info>, IoError> {
hint: Option<rtio::AddrinfoHint>)
-> IoResult<Vec<rtio::AddrinfoInfo>>
{
let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint);
r.map_err(uv_error_to_io_error)
}
@ -187,20 +182,22 @@ impl IoFactory for UvIoFactory {
Box<rtio::RtioFileStream:Send>
}
fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess)
-> Result<Box<rtio::RtioFileStream:Send>, IoError> {
fn fs_open(&mut self, path: &CString, fm: rtio::FileMode,
fa: rtio::FileAccess)
-> IoResult<Box<rtio::RtioFileStream:Send>>
{
let flags = match fm {
io::Open => 0,
io::Append => libc::O_APPEND,
io::Truncate => libc::O_TRUNC,
rtio::Open => 0,
rtio::Append => libc::O_APPEND,
rtio::Truncate => libc::O_TRUNC,
};
// Opening with a write permission must silently create the file.
let (flags, mode) = match fa {
io::Read => (flags | libc::O_RDONLY, 0),
io::Write => (flags | libc::O_WRONLY | libc::O_CREAT,
libc::S_IRUSR | libc::S_IWUSR),
io::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT,
libc::S_IRUSR | libc::S_IWUSR),
rtio::Read => (flags | libc::O_RDONLY, 0),
rtio::Write => (flags | libc::O_WRONLY | libc::O_CREAT,
libc::S_IRUSR | libc::S_IWUSR),
rtio::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT,
libc::S_IRUSR | libc::S_IWUSR),
};
match FsRequest::open(self, path, flags as int, mode as int) {
@ -209,69 +206,66 @@ impl IoFactory for UvIoFactory {
}
}
fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> {
fn fs_unlink(&mut self, path: &CString) -> IoResult<()> {
let r = FsRequest::unlink(&self.loop_, path);
r.map_err(uv_error_to_io_error)
}
fn fs_lstat(&mut self, path: &CString) -> Result<FileStat, IoError> {
fn fs_lstat(&mut self, path: &CString) -> IoResult<rtio::FileStat> {
let r = FsRequest::lstat(&self.loop_, path);
r.map_err(uv_error_to_io_error)
}
fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError> {
fn fs_stat(&mut self, path: &CString) -> IoResult<rtio::FileStat> {
let r = FsRequest::stat(&self.loop_, path);
r.map_err(uv_error_to_io_error)
}
fn fs_mkdir(&mut self, path: &CString,
perm: io::FilePermission) -> Result<(), IoError> {
let r = FsRequest::mkdir(&self.loop_, path, perm.bits() as c_int);
fn fs_mkdir(&mut self, path: &CString, perm: uint) -> IoResult<()> {
let r = FsRequest::mkdir(&self.loop_, path, perm as c_int);
r.map_err(uv_error_to_io_error)
}
fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> {
fn fs_rmdir(&mut self, path: &CString) -> IoResult<()> {
let r = FsRequest::rmdir(&self.loop_, path);
r.map_err(uv_error_to_io_error)
}
fn fs_rename(&mut self, path: &CString, to: &CString) -> Result<(), IoError> {
fn fs_rename(&mut self, path: &CString, to: &CString) -> IoResult<()> {
let r = FsRequest::rename(&self.loop_, path, to);
r.map_err(uv_error_to_io_error)
}
fn fs_chmod(&mut self, path: &CString,
perm: io::FilePermission) -> Result<(), IoError> {
let r = FsRequest::chmod(&self.loop_, path, perm.bits() as c_int);
fn fs_chmod(&mut self, path: &CString, perm: uint) -> IoResult<()> {
let r = FsRequest::chmod(&self.loop_, path, perm as c_int);
r.map_err(uv_error_to_io_error)
}
fn fs_readdir(&mut self, path: &CString, flags: c_int)
-> Result<Vec<CString>, IoError>
-> IoResult<Vec<CString>>
{
let r = FsRequest::readdir(&self.loop_, path, flags);
r.map_err(uv_error_to_io_error)
}
fn fs_link(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
fn fs_link(&mut self, src: &CString, dst: &CString) -> IoResult<()> {
let r = FsRequest::link(&self.loop_, src, dst);
r.map_err(uv_error_to_io_error)
}
fn fs_symlink(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
fn fs_symlink(&mut self, src: &CString, dst: &CString) -> IoResult<()> {
let r = FsRequest::symlink(&self.loop_, src, dst);
r.map_err(uv_error_to_io_error)
}
fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> Result<(), IoError> {
fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> IoResult<()> {
let r = FsRequest::chown(&self.loop_, path, uid, gid);
r.map_err(uv_error_to_io_error)
}
fn fs_readlink(&mut self, path: &CString) -> Result<CString, IoError> {
fn fs_readlink(&mut self, path: &CString) -> IoResult<CString> {
let r = FsRequest::readlink(&self.loop_, path);
r.map_err(uv_error_to_io_error)
}
fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64)
-> Result<(), IoError>
-> IoResult<()>
{
let r = FsRequest::utime(&self.loop_, path, atime, mtime);
r.map_err(uv_error_to_io_error)
}
fn spawn(&mut self, cfg: ProcessConfig)
-> Result<(Box<rtio::RtioProcess:Send>,
Vec<Option<Box<rtio::RtioPipe:Send>>>),
IoError>
-> IoResult<(Box<rtio::RtioProcess:Send>,
Vec<Option<Box<rtio::RtioPipe:Send>>>)>
{
match Process::spawn(self, cfg) {
Ok((p, io)) => {
@ -284,12 +278,12 @@ impl IoFactory for UvIoFactory {
}
}
fn kill(&mut self, pid: libc::pid_t, signum: int) -> Result<(), IoError> {
fn kill(&mut self, pid: libc::pid_t, signum: int) -> IoResult<()> {
Process::kill(pid, signum).map_err(uv_error_to_io_error)
}
fn unix_bind(&mut self, path: &CString)
-> Result<Box<rtio::RtioUnixListener:Send>, IoError> {
-> IoResult<Box<rtio::RtioUnixListener:Send>> {
match PipeListener::bind(self, path) {
Ok(p) => Ok(p as Box<rtio::RtioUnixListener:Send>),
Err(e) => Err(uv_error_to_io_error(e)),
@ -297,7 +291,7 @@ impl IoFactory for UvIoFactory {
}
fn unix_connect(&mut self, path: &CString, timeout: Option<u64>)
-> Result<Box<rtio::RtioPipe:Send>, IoError> {
-> IoResult<Box<rtio::RtioPipe:Send>> {
match PipeWatcher::connect(self, path, timeout) {
Ok(p) => Ok(box p as Box<rtio::RtioPipe:Send>),
Err(e) => Err(uv_error_to_io_error(e)),
@ -305,7 +299,7 @@ impl IoFactory for UvIoFactory {
}
fn tty_open(&mut self, fd: c_int, readable: bool)
-> Result<Box<rtio::RtioTTY:Send>, IoError> {
-> IoResult<Box<rtio::RtioTTY:Send>> {
match TtyWatcher::new(self, fd, readable) {
Ok(tty) => Ok(box tty as Box<rtio::RtioTTY:Send>),
Err(e) => Err(uv_error_to_io_error(e))
@ -313,16 +307,18 @@ impl IoFactory for UvIoFactory {
}
fn pipe_open(&mut self, fd: c_int)
-> Result<Box<rtio::RtioPipe:Send>, IoError> {
-> IoResult<Box<rtio::RtioPipe:Send>>
{
match PipeWatcher::open(self, fd) {
Ok(s) => Ok(box s as Box<rtio::RtioPipe:Send>),
Err(e) => Err(uv_error_to_io_error(e))
}
}
fn signal(&mut self, signum: Signum, channel: Sender<Signum>)
-> Result<Box<rtio::RtioSignal:Send>, IoError> {
match SignalWatcher::new(self, signum, channel) {
fn signal(&mut self, signum: int, cb: Box<rtio::Callback:Send>)
-> IoResult<Box<rtio::RtioSignal:Send>>
{
match SignalWatcher::new(self, signum, cb) {
Ok(s) => Ok(s as Box<rtio::RtioSignal:Send>),
Err(e) => Err(uv_error_to_io_error(e)),
}