Runtime removal: refactor pipes and networking

This patch continues the runtime removal by moving pipe and
networking-related code into `sys`.

Because this eliminates APIs in `libnative` and `librustrt`, it is a:

[breaking-change]

This functionality is likely to be available publicly, in some form,
from `std` in the future.
This commit is contained in:
Aaron Turon 2014-10-10 10:11:49 -07:00
parent 0c1e1ff1e3
commit d34b1b0ca9
19 changed files with 1293 additions and 1399 deletions

View file

@ -35,8 +35,6 @@ pub use self::process::Process;
mod helper_thread;
// Native I/O implementations
pub mod addrinfo;
pub mod net;
pub mod process;
mod util;
@ -53,14 +51,6 @@ pub mod timer;
#[path = "timer_windows.rs"]
pub mod timer;
#[cfg(unix)]
#[path = "pipe_unix.rs"]
pub mod pipe;
#[cfg(windows)]
#[path = "pipe_windows.rs"]
pub mod pipe;
#[cfg(windows)]
#[path = "tty_windows.rs"]
mod tty;
@ -126,52 +116,11 @@ pub struct IoFactory {
impl IoFactory {
pub fn new() -> IoFactory {
net::init();
IoFactory { _cannot_construct_outside_of_this_module: () }
}
}
impl rtio::IoFactory for IoFactory {
// networking
fn tcp_connect(&mut self, addr: rtio::SocketAddr,
timeout: Option<u64>)
-> IoResult<Box<rtio::RtioTcpStream + Send>>
{
net::TcpStream::connect(addr, timeout).map(|s| {
box s as Box<rtio::RtioTcpStream + Send>
})
}
fn tcp_bind(&mut self, addr: rtio::SocketAddr)
-> IoResult<Box<rtio::RtioTcpListener + Send>> {
net::TcpListener::bind(addr).map(|s| {
box s as Box<rtio::RtioTcpListener + Send>
})
}
fn udp_bind(&mut self, addr: rtio::SocketAddr)
-> IoResult<Box<rtio::RtioUdpSocket + Send>> {
net::UdpSocket::bind(addr).map(|u| {
box u as Box<rtio::RtioUdpSocket + Send>
})
}
fn unix_bind(&mut self, path: &CString)
-> IoResult<Box<rtio::RtioUnixListener + Send>> {
pipe::UnixListener::bind(path).map(|s| {
box s as Box<rtio::RtioUnixListener + Send>
})
}
fn unix_connect(&mut self, path: &CString,
timeout: Option<u64>) -> IoResult<Box<rtio::RtioPipe + Send>> {
pipe::UnixStream::connect(path, timeout).map(|s| {
box s as Box<rtio::RtioPipe + Send>
})
}
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<rtio::AddrinfoHint>)
-> IoResult<Vec<rtio::AddrinfoInfo>>
{
addrinfo::GetAddrInfoRequest::run(host, servname, hint)
}
// misc
fn timer_init(&mut self) -> IoResult<Box<rtio::RtioTimer + Send>> {
timer::Timer::new().map(|t| box t as Box<rtio::RtioTimer + Send>)
@ -189,9 +138,6 @@ impl rtio::IoFactory for IoFactory {
fn kill(&mut self, pid: libc::pid_t, signum: int) -> IoResult<()> {
process::Process::kill(pid, signum)
}
fn pipe_open(&mut self, fd: c_int) -> IoResult<Box<rtio::RtioPipe + Send>> {
Ok(box file::FileDesc::new(fd, true) as Box<rtio::RtioPipe + Send>)
}
#[cfg(unix)]
fn tty_open(&mut self, fd: c_int, _readable: bool)
-> IoResult<Box<rtio::RtioTTY + Send>> {

View file

@ -13,13 +13,9 @@
use core::prelude::*;
use alloc::boxed::Box;
use collections::string::String;
use collections::vec::Vec;
use core::fmt;
use core::mem;
use libc::c_int;
use libc;
use c_str::CString;
use local::Local;
use task::Task;
@ -173,87 +169,15 @@ impl<'a> LocalIo<'a> {
}
pub trait IoFactory {
// networking
fn tcp_connect(&mut self, addr: SocketAddr,
timeout: Option<u64>) -> IoResult<Box<RtioTcpStream + Send>>;
fn tcp_bind(&mut self, addr: SocketAddr)
-> IoResult<Box<RtioTcpListener + Send>>;
fn udp_bind(&mut self, addr: SocketAddr)
-> IoResult<Box<RtioUdpSocket + Send>>;
fn unix_bind(&mut self, path: &CString)
-> IoResult<Box<RtioUnixListener + Send>>;
fn unix_connect(&mut self, path: &CString,
timeout: Option<u64>) -> IoResult<Box<RtioPipe + Send>>;
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
hint: Option<AddrinfoHint>)
-> IoResult<Vec<AddrinfoInfo>>;
// misc
fn timer_init(&mut self) -> IoResult<Box<RtioTimer + Send>>;
fn spawn(&mut self, cfg: ProcessConfig)
-> IoResult<(Box<RtioProcess + Send>,
Vec<Option<Box<RtioPipe + Send>>>)>;
fn kill(&mut self, pid: libc::pid_t, signal: int) -> IoResult<()>;
fn pipe_open(&mut self, fd: c_int) -> IoResult<Box<RtioPipe + Send>>;
fn tty_open(&mut self, fd: c_int, readable: bool)
-> IoResult<Box<RtioTTY + Send>>;
}
pub trait RtioTcpListener : RtioSocket {
fn listen(self: Box<Self>) -> IoResult<Box<RtioTcpAcceptor + Send>>;
}
pub trait RtioTcpAcceptor : RtioSocket {
fn accept(&mut self) -> IoResult<Box<RtioTcpStream + Send>>;
fn accept_simultaneously(&mut self) -> IoResult<()>;
fn dont_accept_simultaneously(&mut self) -> IoResult<()>;
fn set_timeout(&mut self, timeout: Option<u64>);
fn clone(&self) -> Box<RtioTcpAcceptor + Send>;
fn close_accept(&mut self) -> IoResult<()>;
}
pub trait RtioTcpStream : RtioSocket {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint>;
fn write(&mut self, buf: &[u8]) -> IoResult<()>;
fn peer_name(&mut self) -> IoResult<SocketAddr>;
fn control_congestion(&mut self) -> IoResult<()>;
fn nodelay(&mut self) -> IoResult<()>;
fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()>;
fn letdie(&mut self) -> IoResult<()>;
fn clone(&self) -> Box<RtioTcpStream + Send>;
fn close_write(&mut self) -> IoResult<()>;
fn close_read(&mut self) -> IoResult<()>;
fn set_timeout(&mut self, timeout_ms: Option<u64>);
fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
}
pub trait RtioSocket {
fn socket_name(&mut self) -> IoResult<SocketAddr>;
}
pub trait RtioUdpSocket : RtioSocket {
fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)>;
fn send_to(&mut self, buf: &[u8], dst: SocketAddr) -> IoResult<()>;
fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()>;
fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()>;
fn loop_multicast_locally(&mut self) -> IoResult<()>;
fn dont_loop_multicast_locally(&mut self) -> IoResult<()>;
fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()>;
fn time_to_live(&mut self, ttl: int) -> IoResult<()>;
fn hear_broadcasts(&mut self) -> IoResult<()>;
fn ignore_broadcasts(&mut self) -> IoResult<()>;
fn clone(&self) -> Box<RtioUdpSocket + Send>;
fn set_timeout(&mut self, timeout_ms: Option<u64>);
fn set_read_timeout(&mut self, timeout_ms: Option<u64>);
fn set_write_timeout(&mut self, timeout_ms: Option<u64>);
}
pub trait RtioTimer {
fn sleep(&mut self, msecs: u64);
fn oneshot(&mut self, msecs: u64, cb: Box<Callback + Send>);
@ -313,54 +237,3 @@ pub struct IoError {
}
pub type IoResult<T> = Result<T, IoError>;
#[deriving(PartialEq, Eq)]
pub enum IpAddr {
Ipv4Addr(u8, u8, u8, u8),
Ipv6Addr(u16, u16, u16, u16, u16, u16, u16, u16),
}
impl fmt::Show for IpAddr {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
Ipv4Addr(a, b, c, d) => write!(fmt, "{}.{}.{}.{}", a, b, c, d),
Ipv6Addr(a, b, c, d, e, f, g, h) => {
write!(fmt,
"{:04x}:{:04x}:{:04x}:{:04x}:{:04x}:{:04x}:{:04x}:{:04x}",
a, b, c, d, e, f, g, h)
}
}
}
}
#[deriving(PartialEq, Eq)]
pub struct SocketAddr {
pub ip: IpAddr,
pub port: u16,
}
pub enum StdioContainer {
Ignored,
InheritFd(i32),
CreatePipe(bool, bool),
}
pub enum ProcessExit {
ExitStatus(int),
ExitSignal(int),
}
pub struct AddrinfoHint {
pub family: uint,
pub socktype: uint,
pub protocol: uint,
pub flags: uint,
}
pub struct AddrinfoInfo {
pub address: SocketAddr,
pub family: uint,
pub socktype: uint,
pub protocol: uint,
pub flags: uint,
}

View file

@ -20,12 +20,10 @@ getaddrinfo()
#![allow(missing_docs)]
use iter::Iterator;
use io::{IoResult, IoError};
use io::{IoResult};
use io::net::ip::{SocketAddr, IpAddr};
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::rtio::{IoFactory, LocalIo};
use rt::rtio;
use sys;
use vec::Vec;
/// Hints to the types of sockets that are desired when looking up hosts
@ -94,31 +92,7 @@ pub fn get_host_addresses(host: &str) -> IoResult<Vec<IpAddr>> {
#[allow(unused_variables)]
fn lookup(hostname: Option<&str>, servname: Option<&str>, hint: Option<Hint>)
-> IoResult<Vec<Info>> {
let hint = hint.map(|Hint { family, socktype, protocol, flags }| {
rtio::AddrinfoHint {
family: family,
socktype: 0, // FIXME: this should use the above variable
protocol: 0, // FIXME: this should use the above variable
flags: flags,
}
});
match LocalIo::maybe_raise(|io| {
io.get_host_addresses(hostname, servname, hint)
}) {
Ok(v) => Ok(v.into_iter().map(|info| {
Info {
address: SocketAddr {
ip: super::from_rtio(info.address.ip),
port: info.address.port,
},
family: info.family,
socktype: None, // FIXME: this should use the above variable
protocol: None, // FIXME: this should use the above variable
flags: info.flags,
}
}).collect()),
Err(e) => Err(IoError::from_rtio_error(e)),
}
sys::addrinfo::get_host_addresses(hostname, servname, hint)
}
// Ignored on android since we cannot give tcp/ip

View file

@ -12,9 +12,8 @@
use io::{IoError, IoResult, InvalidInput};
use option::None;
use result::{Result, Ok, Err};
use rt::rtio;
use self::ip::{Ipv4Addr, Ipv6Addr, IpAddr, SocketAddr, ToSocketAddr};
use result::{Ok, Err};
use self::ip::{SocketAddr, ToSocketAddr};
pub use self::addrinfo::get_host_addresses;
@ -24,46 +23,6 @@ pub mod udp;
pub mod ip;
pub mod pipe;
fn to_rtio(ip: IpAddr) -> rtio::IpAddr {
match ip {
Ipv4Addr(a, b, c, d) => rtio::Ipv4Addr(a, b, c, d),
Ipv6Addr(a, b, c, d, e, f, g, h) => {
rtio::Ipv6Addr(a, b, c, d, e, f, g, h)
}
}
}
fn from_rtio(ip: rtio::IpAddr) -> IpAddr {
match ip {
rtio::Ipv4Addr(a, b, c, d) => Ipv4Addr(a, b, c, d),
rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
Ipv6Addr(a, b, c, d, e, f, g, h)
}
}
}
fn with_addresses_io<A: ToSocketAddr, T>(
addr: A,
action: |&mut rtio::IoFactory, rtio::SocketAddr| -> Result<T, rtio::IoError>
) -> Result<T, IoError> {
const DEFAULT_ERROR: IoError = IoError {
kind: InvalidInput,
desc: "no addresses found for hostname",
detail: None
};
let addresses = try!(addr.to_socket_addr_all());
let mut err = DEFAULT_ERROR;
for addr in addresses.into_iter() {
let addr = rtio::SocketAddr { ip: to_rtio(addr.ip), port: addr.port };
match rtio::LocalIo::maybe_raise(|io| action(io, addr)) {
Ok(r) => return Ok(r),
Err(e) => err = IoError::from_rtio_error(e)
}
}
Err(err)
}
fn with_addresses<A: ToSocketAddr, T>(addr: A, action: |SocketAddr| -> IoResult<T>)
-> IoResult<T> {
const DEFAULT_ERROR: IoError = IoError {

View file

@ -26,17 +26,20 @@ instances as clients.
use prelude::*;
use io::{Listener, Acceptor, IoResult, IoError, TimedOut, standard_error};
use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
use rt::rtio::{RtioUnixAcceptor, RtioPipe};
use io::{Listener, Acceptor, IoResult, TimedOut, standard_error};
use time::Duration;
use sys::pipe::UnixStream as UnixStreamImp;
use sys::pipe::UnixListener as UnixListenerImp;
use sys::pipe::UnixAcceptor as UnixAcceptorImp;
/// A stream which communicates over a named pipe.
pub struct UnixStream {
obj: Box<RtioPipe + Send>,
inner: UnixStreamImp,
}
impl UnixStream {
/// Connect to a pipe named by `path`. This will attempt to open a
/// connection to the underlying socket.
///
@ -53,9 +56,8 @@ impl UnixStream {
/// stream.write([1, 2, 3]);
/// ```
pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
LocalIo::maybe_raise(|io| {
io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
}).map_err(IoError::from_rtio_error)
UnixStreamImp::connect(&path.to_c_str(), None)
.map(|inner| UnixStream { inner: inner })
}
/// Connect to a pipe named by `path`, timing out if the specified number of
@ -73,10 +75,8 @@ impl UnixStream {
return Err(standard_error(TimedOut));
}
LocalIo::maybe_raise(|io| {
let s = io.unix_connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64));
s.map(|p| UnixStream { obj: p })
}).map_err(IoError::from_rtio_error)
UnixStreamImp::connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64))
.map(|inner| UnixStream { inner: inner })
}
@ -88,7 +88,7 @@ impl UnixStream {
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_read(&mut self) -> IoResult<()> {
self.obj.close_read().map_err(IoError::from_rtio_error)
self.inner.close_read()
}
/// Closes the writing half of this connection.
@ -99,7 +99,7 @@ impl UnixStream {
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_write(&mut self) -> IoResult<()> {
self.obj.close_write().map_err(IoError::from_rtio_error)
self.inner.close_write()
}
/// Sets the read/write timeout for this socket.
@ -107,7 +107,7 @@ impl UnixStream {
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_timeout(timeout_ms)
self.inner.set_timeout(timeout_ms)
}
/// Sets the read timeout for this socket.
@ -115,7 +115,7 @@ impl UnixStream {
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_read_timeout(timeout_ms)
self.inner.set_read_timeout(timeout_ms)
}
/// Sets the write timeout for this socket.
@ -123,36 +123,35 @@ impl UnixStream {
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_write_timeout(timeout_ms)
self.inner.set_write_timeout(timeout_ms)
}
}
impl Clone for UnixStream {
fn clone(&self) -> UnixStream {
UnixStream { obj: self.obj.clone() }
UnixStream { inner: self.inner.clone() }
}
}
impl Reader for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.obj.read(buf).map_err(IoError::from_rtio_error)
self.inner.read(buf)
}
}
impl Writer for UnixStream {
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.obj.write(buf).map_err(IoError::from_rtio_error)
self.inner.write(buf)
}
}
/// A value that can listen for incoming named pipe connection requests.
pub struct UnixListener {
/// The internal, opaque runtime Unix listener.
obj: Box<RtioUnixListener + Send>,
inner: UnixListenerImp,
}
impl UnixListener {
/// Creates a new listener, ready to receive incoming connections on the
/// specified socket. The server will be named by `path`.
///
@ -175,24 +174,22 @@ impl UnixListener {
/// # }
/// ```
pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
LocalIo::maybe_raise(|io| {
io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
}).map_err(IoError::from_rtio_error)
UnixListenerImp::bind(&path.to_c_str())
.map(|inner| UnixListener { inner: inner })
}
}
impl Listener<UnixStream, UnixAcceptor> for UnixListener {
fn listen(self) -> IoResult<UnixAcceptor> {
self.obj.listen().map(|obj| {
UnixAcceptor { obj: obj }
}).map_err(IoError::from_rtio_error)
self.inner.listen()
.map(|inner| UnixAcceptor { inner: inner })
}
}
/// A value that can accept named pipe connections, returned from `listen()`.
pub struct UnixAcceptor {
/// The internal, opaque runtime Unix acceptor.
obj: Box<RtioUnixAcceptor + Send>,
inner: UnixAcceptorImp
}
impl UnixAcceptor {
@ -210,7 +207,7 @@ impl UnixAcceptor {
#[experimental = "the name and arguments to this function are likely \
to change"]
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_timeout(timeout_ms)
self.inner.set_timeout(timeout_ms)
}
/// Closes the accepting capabilities of this acceptor.
@ -219,15 +216,15 @@ impl UnixAcceptor {
/// more information can be found in that documentation.
#[experimental]
pub fn close_accept(&mut self) -> IoResult<()> {
self.obj.close_accept().map_err(IoError::from_rtio_error)
self.inner.close_accept()
}
}
impl Acceptor<UnixStream> for UnixAcceptor {
fn accept(&mut self) -> IoResult<UnixStream> {
self.obj.accept().map(|s| {
UnixStream { obj: s }
}).map_err(IoError::from_rtio_error)
self.inner.accept().map(|s| {
UnixStream { inner: s }
})
}
}
@ -246,7 +243,7 @@ impl Clone for UnixAcceptor {
/// This function is useful for creating a handle to invoke `close_accept`
/// on to wake up any other task blocked in `accept`.
fn clone(&self) -> UnixAcceptor {
UnixAcceptor { obj: self.obj.clone() }
UnixAcceptor { inner: self.inner.clone() }
}
}

View file

@ -20,19 +20,17 @@
use clone::Clone;
use io::IoResult;
use iter::Iterator;
use result::{Ok,Err};
use result::Err;
use io::net::ip::{SocketAddr, ToSocketAddr};
use io::IoError;
use io::{Reader, Writer, Listener, Acceptor};
use io::{standard_error, TimedOut};
use kinds::Send;
use option::{None, Some, Option};
use boxed::Box;
use rt::rtio::{IoFactory, RtioSocket, RtioTcpListener};
use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
use rt::rtio;
use time::Duration;
use sys::tcp::TcpStream as TcpStreamImp;
use sys::tcp::TcpListener as TcpListenerImp;
use sys::tcp::TcpAcceptor as TcpAcceptorImp;
/// A structure which represents a TCP stream between a local socket and a
/// remote socket.
///
@ -50,12 +48,12 @@ use time::Duration;
/// drop(stream); // close the connection
/// ```
pub struct TcpStream {
obj: Box<RtioTcpStream + Send>,
inner: TcpStreamImp,
}
impl TcpStream {
fn new(s: Box<RtioTcpStream + Send>) -> TcpStream {
TcpStream { obj: s }
fn new(s: TcpStreamImp) -> TcpStream {
TcpStream { inner: s }
}
/// Open a TCP connection to a remote host.
@ -64,7 +62,9 @@ impl TcpStream {
/// trait can be supplied for the address; see this trait documentation for
/// concrete examples.
pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> {
super::with_addresses_io(addr, |io, addr| io.tcp_connect(addr, None).map(TcpStream::new))
super::with_addresses(addr, |addr| {
TcpStreamImp::connect(addr, None).map(TcpStream::new)
})
}
/// Creates a TCP connection to a remote socket address, timing out after
@ -86,39 +86,26 @@ impl TcpStream {
return Err(standard_error(TimedOut));
}
super::with_addresses_io(addr, |io, addr|
io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new)
)
super::with_addresses(addr, |addr| {
TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64))
.map(TcpStream::new)
})
}
/// Returns the socket address of the remote peer of this TCP connection.
pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
match self.obj.peer_name() {
Ok(rtio::SocketAddr { ip, port }) => {
Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
}
Err(e) => Err(IoError::from_rtio_error(e)),
}
self.inner.peer_name()
}
/// Returns the socket address of the local half of this TCP connection.
pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
match self.obj.socket_name() {
Ok(rtio::SocketAddr { ip, port }) => {
Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
}
Err(e) => Err(IoError::from_rtio_error(e)),
}
self.inner.socket_name()
}
/// Sets the nodelay flag on this connection to the boolean specified
#[experimental]
pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
if nodelay {
self.obj.nodelay()
} else {
self.obj.control_congestion()
}.map_err(IoError::from_rtio_error)
self.inner.set_nodelay(nodelay)
}
/// Sets the keepalive timeout to the timeout specified.
@ -128,10 +115,7 @@ impl TcpStream {
/// specified time, in seconds.
#[experimental]
pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
match delay_in_seconds {
Some(i) => self.obj.keepalive(i),
None => self.obj.letdie(),
}.map_err(IoError::from_rtio_error)
self.inner.set_keepalive(delay_in_seconds)
}
/// Closes the reading half of this connection.
@ -165,7 +149,7 @@ impl TcpStream {
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_read(&mut self) -> IoResult<()> {
self.obj.close_read().map_err(IoError::from_rtio_error)
self.inner.close_read()
}
/// Closes the writing half of this connection.
@ -176,7 +160,7 @@ impl TcpStream {
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_write(&mut self) -> IoResult<()> {
self.obj.close_write().map_err(IoError::from_rtio_error)
self.inner.close_write()
}
/// Sets a timeout, in milliseconds, for blocking operations on this stream.
@ -198,7 +182,7 @@ impl TcpStream {
/// take a look at `set_read_timeout` and `set_write_timeout`.
#[experimental = "the timeout argument may change in type and value"]
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_timeout(timeout_ms)
self.inner.set_timeout(timeout_ms)
}
/// Sets the timeout for read operations on this stream.
@ -215,7 +199,7 @@ impl TcpStream {
/// during the timeout period.
#[experimental = "the timeout argument may change in type and value"]
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_read_timeout(timeout_ms)
self.inner.set_read_timeout(timeout_ms)
}
/// Sets the timeout for write operations on this stream.
@ -242,7 +226,7 @@ impl TcpStream {
/// asynchronous fashion after the call to write returns.
#[experimental = "the timeout argument may change in type and value"]
pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_write_timeout(timeout_ms)
self.inner.set_write_timeout(timeout_ms)
}
}
@ -256,19 +240,19 @@ impl Clone for TcpStream {
/// Instead, the first read will receive the first packet received, and the
/// second read will receive the second packet.
fn clone(&self) -> TcpStream {
TcpStream { obj: self.obj.clone() }
TcpStream { inner: self.inner.clone() }
}
}
impl Reader for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.obj.read(buf).map_err(IoError::from_rtio_error)
self.inner.read(buf)
}
}
impl Writer for TcpStream {
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.obj.write(buf).map_err(IoError::from_rtio_error)
self.inner.write(buf)
}
}
@ -309,7 +293,7 @@ impl Writer for TcpStream {
/// # }
/// ```
pub struct TcpListener {
obj: Box<RtioTcpListener + Send>,
inner: TcpListenerImp,
}
impl TcpListener {
@ -324,26 +308,20 @@ impl TcpListener {
/// The address type can be any implementor of `ToSocketAddr` trait. See its
/// documentation for concrete examples.
pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> {
super::with_addresses_io(addr, |io, addr| io.tcp_bind(addr).map(|l| TcpListener { obj: l }))
super::with_addresses(addr, |addr| {
TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner })
})
}
/// Returns the local socket address of this listener.
pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
match self.obj.socket_name() {
Ok(rtio::SocketAddr { ip, port }) => {
Ok(SocketAddr { ip: super::from_rtio(ip), port: port })
}
Err(e) => Err(IoError::from_rtio_error(e)),
}
self.inner.socket_name()
}
}
impl Listener<TcpStream, TcpAcceptor> for TcpListener {
fn listen(self) -> IoResult<TcpAcceptor> {
match self.obj.listen() {
Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }),
Err(e) => Err(IoError::from_rtio_error(e)),
}
self.inner.listen(128).map(|a| TcpAcceptor { inner: a })
}
}
@ -351,7 +329,7 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener {
/// a `TcpListener`'s `listen` method, and this object can be used to accept new
/// `TcpStream` instances.
pub struct TcpAcceptor {
obj: Box<RtioTcpAcceptor + Send>,
inner: TcpAcceptorImp,
}
impl TcpAcceptor {
@ -399,7 +377,7 @@ impl TcpAcceptor {
/// ```
#[experimental = "the type of the argument and name of this function are \
subject to change"]
pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); }
/// Closes the accepting capabilities of this acceptor.
///
@ -445,16 +423,13 @@ impl TcpAcceptor {
/// ```
#[experimental]
pub fn close_accept(&mut self) -> IoResult<()> {
self.obj.close_accept().map_err(IoError::from_rtio_error)
self.inner.close_accept()
}
}
impl Acceptor<TcpStream> for TcpAcceptor {
fn accept(&mut self) -> IoResult<TcpStream> {
match self.obj.accept(){
Ok(s) => Ok(TcpStream::new(s)),
Err(e) => Err(IoError::from_rtio_error(e)),
}
self.inner.accept().map(TcpStream::new)
}
}
@ -473,7 +448,7 @@ impl Clone for TcpAcceptor {
/// This function is useful for creating a handle to invoke `close_accept`
/// on to wake up any other task blocked in `accept`.
fn clone(&self) -> TcpAcceptor {
TcpAcceptor { obj: self.obj.clone() }
TcpAcceptor { inner: self.inner.clone() }
}
}
@ -1112,8 +1087,6 @@ mod test {
#[test]
fn shutdown_smoke() {
use rt::rtio::RtioTcpStream;
let addr = next_test_ip4();
let a = TcpListener::bind(addr).unwrap().listen();
spawn(proc() {
@ -1124,7 +1097,7 @@ mod test {
});
let mut s = TcpStream::connect(addr).unwrap();
assert!(s.obj.close_write().is_ok());
assert!(s.inner.close_write().is_ok());
assert!(s.write([1]).is_err());
assert_eq!(s.read_to_end(), Ok(vec!(1)));
}

View file

@ -17,13 +17,10 @@
use clone::Clone;
use io::net::ip::{SocketAddr, IpAddr, ToSocketAddr};
use io::{Reader, Writer, IoResult, IoError};
use kinds::Send;
use boxed::Box;
use io::{Reader, Writer, IoResult};
use option::Option;
use result::{Ok, Err};
use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory};
use rt::rtio;
use sys::udp::UdpSocket as UdpSocketImp;
/// A User Datagram Protocol socket.
///
@ -60,7 +57,7 @@ use rt::rtio;
/// }
/// ```
pub struct UdpSocket {
obj: Box<RtioUdpSocket + Send>,
inner: UdpSocketImp,
}
impl UdpSocket {
@ -69,18 +66,15 @@ impl UdpSocket {
/// Address type can be any implementor of `ToSocketAddr` trait. See its
/// documentation for concrete examples.
pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<UdpSocket> {
super::with_addresses_io(addr, |io, addr| io.udp_bind(addr).map(|s| UdpSocket { obj: s }))
super::with_addresses(addr, |addr| {
UdpSocketImp::bind(addr).map(|s| UdpSocket { inner: s })
})
}
/// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came.
pub fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)> {
match self.obj.recv_from(buf) {
Ok((amt, rtio::SocketAddr { ip, port })) => {
Ok((amt, SocketAddr { ip: super::from_rtio(ip), port: port }))
}
Err(e) => Err(IoError::from_rtio_error(e)),
}
self.inner.recv_from(buf)
}
/// Sends data on the socket to the given address. Returns nothing on
@ -89,10 +83,7 @@ impl UdpSocket {
/// Address type can be any implementor of `ToSocketAddr` trait. See its
/// documentation for concrete examples.
pub fn send_to<A: ToSocketAddr>(&mut self, buf: &[u8], addr: A) -> IoResult<()> {
super::with_addresses(addr, |addr| self.obj.send_to(buf, rtio::SocketAddr {
ip: super::to_rtio(addr.ip),
port: addr.port,
}).map_err(IoError::from_rtio_error))
super::with_addresses(addr, |addr| self.inner.send_to(buf, addr))
}
/// Creates a `UdpStream`, which allows use of the `Reader` and `Writer`
@ -112,24 +103,19 @@ impl UdpSocket {
/// Returns the socket address that this socket was created from.
pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
match self.obj.socket_name() {
Ok(a) => Ok(SocketAddr { ip: super::from_rtio(a.ip), port: a.port }),
Err(e) => Err(IoError::from_rtio_error(e))
}
self.inner.socket_name()
}
/// Joins a multicast IP address (becomes a member of it)
#[experimental]
pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
let e = self.obj.join_multicast(super::to_rtio(multi));
e.map_err(IoError::from_rtio_error)
self.inner.join_multicast(multi)
}
/// Leaves a multicast IP address (drops membership from it)
#[experimental]
pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
let e = self.obj.leave_multicast(super::to_rtio(multi));
e.map_err(IoError::from_rtio_error)
self.inner.leave_multicast(multi)
}
/// Set the multicast loop flag to the specified value
@ -137,33 +123,25 @@ impl UdpSocket {
/// This lets multicast packets loop back to local sockets (if enabled)
#[experimental]
pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
if on {
self.obj.loop_multicast_locally()
} else {
self.obj.dont_loop_multicast_locally()
}.map_err(IoError::from_rtio_error)
self.inner.set_multicast_loop(on)
}
/// Sets the multicast TTL
#[experimental]
pub fn set_multicast_ttl(&mut self, ttl: int) -> IoResult<()> {
self.obj.multicast_time_to_live(ttl).map_err(IoError::from_rtio_error)
self.inner.multicast_time_to_live(ttl)
}
/// Sets this socket's TTL
#[experimental]
pub fn set_ttl(&mut self, ttl: int) -> IoResult<()> {
self.obj.time_to_live(ttl).map_err(IoError::from_rtio_error)
self.inner.time_to_live(ttl)
}
/// Sets the broadcast flag on or off
#[experimental]
pub fn set_broadcast(&mut self, broadcast: bool) -> IoResult<()> {
if broadcast {
self.obj.hear_broadcasts()
} else {
self.obj.ignore_broadcasts()
}.map_err(IoError::from_rtio_error)
self.inner.set_broadcast(broadcast)
}
/// Sets the read/write timeout for this socket.
@ -171,7 +149,7 @@ impl UdpSocket {
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_timeout(timeout_ms)
self.inner.set_timeout(timeout_ms)
}
/// Sets the read timeout for this socket.
@ -179,7 +157,7 @@ impl UdpSocket {
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_read_timeout(timeout_ms)
self.inner.set_read_timeout(timeout_ms)
}
/// Sets the write timeout for this socket.
@ -187,7 +165,7 @@ impl UdpSocket {
/// For more information, see `TcpStream::set_timeout`
#[experimental = "the timeout argument may change in type and value"]
pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
self.obj.set_write_timeout(timeout_ms)
self.inner.set_write_timeout(timeout_ms)
}
}
@ -201,7 +179,7 @@ impl Clone for UdpSocket {
/// received, and the second read will receive the second packet.
fn clone(&self) -> UdpSocket {
UdpSocket {
obj: self.obj.clone(),
inner: self.inner.clone(),
}
}
}

View file

@ -17,15 +17,17 @@
use prelude::*;
use io::{IoResult, IoError};
use io::IoResult;
use libc;
use os;
use rt::rtio::{RtioPipe, LocalIo};
use sync::Arc;
use sys_common;
use sys;
use sys::fs::FileDesc as FileDesc;
/// A synchronous, in-memory pipe.
pub struct PipeStream {
/// The internal, opaque runtime pipe object.
obj: Box<RtioPipe + Send>,
inner: Arc<FileDesc>
}
pub struct PipePair {
@ -55,14 +57,14 @@ impl PipeStream {
/// }
/// ```
pub fn open(fd: libc::c_int) -> IoResult<PipeStream> {
LocalIo::maybe_raise(|io| {
io.pipe_open(fd).map(|obj| PipeStream { obj: obj })
}).map_err(IoError::from_rtio_error)
Ok(PipeStream::from_filedesc(FileDesc::new(fd, true)))
}
// FIXME: expose this some other way
/// Wrap a FileDesc directly, taking ownership.
#[doc(hidden)]
pub fn new(inner: Box<RtioPipe + Send>) -> PipeStream {
PipeStream { obj: inner }
pub fn from_filedesc(fd: FileDesc) -> PipeStream {
PipeStream { inner: Arc::new(fd) }
}
/// Creates a pair of in-memory OS pipes for a unidirectional communication
@ -76,43 +78,35 @@ impl PipeStream {
/// This function can fail to succeed if the underlying OS has run out of
/// available resources to allocate a new pipe.
pub fn pair() -> IoResult<PipePair> {
struct Closer { fd: libc::c_int }
let (reader, writer) = try!(unsafe { sys::os::pipe() });
Ok(PipePair {
reader: PipeStream::from_filedesc(reader),
writer: PipeStream::from_filedesc(writer),
})
}
}
let os::Pipe { reader, writer } = try!(unsafe { os::pipe() });
let mut reader = Closer { fd: reader };
let mut writer = Closer { fd: writer };
let io_reader = try!(PipeStream::open(reader.fd));
reader.fd = -1;
let io_writer = try!(PipeStream::open(writer.fd));
writer.fd = -1;
return Ok(PipePair { reader: io_reader, writer: io_writer });
impl Drop for Closer {
fn drop(&mut self) {
if self.fd != -1 {
let _ = unsafe { libc::close(self.fd) };
}
}
}
impl sys_common::AsFileDesc for PipeStream {
fn as_fd(&self) -> &sys::fs::FileDesc {
&*self.inner
}
}
impl Clone for PipeStream {
fn clone(&self) -> PipeStream {
PipeStream { obj: self.obj.clone() }
PipeStream { inner: self.inner.clone() }
}
}
impl Reader for PipeStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.obj.read(buf).map_err(IoError::from_rtio_error)
self.inner.read(buf)
}
}
impl Writer for PipeStream {
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.obj.write(buf).map_err(IoError::from_rtio_error)
self.inner.write(buf)
}
}

View file

@ -43,6 +43,7 @@ use ops::Drop;
use option::{Some, None, Option};
use os;
use path::{Path, GenericPath, BytesContainer};
use sys;
use sys::os as os_imp;
use ptr::RawPtr;
use ptr;
@ -603,35 +604,11 @@ pub struct Pipe {
/// descriptors to be closed, the file descriptors will leak. For safe handling
/// of this scenario, use `std::io::PipeStream` instead.
pub unsafe fn pipe() -> IoResult<Pipe> {
return _pipe();
#[cfg(unix)]
unsafe fn _pipe() -> IoResult<Pipe> {
let mut fds = [0, ..2];
match libc::pipe(fds.as_mut_ptr()) {
0 => Ok(Pipe { reader: fds[0], writer: fds[1] }),
_ => Err(IoError::last_error()),
}
}
#[cfg(windows)]
unsafe fn _pipe() -> IoResult<Pipe> {
// Windows pipes work subtly differently than unix pipes, and their
// inheritance has to be handled in a different way that I do not
// fully understand. Here we explicitly make the pipe non-inheritable,
// which means to pass it to a subprocess they need to be duplicated
// first, as in std::run.
let mut fds = [0, ..2];
match libc::pipe(fds.as_mut_ptr(), 1024 as ::libc::c_uint,
(libc::O_BINARY | libc::O_NOINHERIT) as c_int) {
0 => {
assert!(fds[0] != -1 && fds[0] != 0);
assert!(fds[1] != -1 && fds[1] != 0);
Ok(Pipe { reader: fds[0], writer: fds[1] })
}
_ => Err(IoError::last_error()),
}
}
let (reader, writer) = try!(sys::os::pipe());
Ok(Pipe {
reader: reader.unwrap(),
writer: writer.unwrap(),
})
}
/// Returns the proper dll filename for the given basename of a file

File diff suppressed because it is too large Load diff

View file

@ -8,24 +8,51 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![allow(missing_doc)]
extern crate libc;
use num;
use prelude::*;
use io::{mod, IoResult, IoError};
use sys_common::mkerr_libc;
pub mod c;
pub mod fs;
pub mod os;
pub mod c;
pub mod tcp;
pub mod udp;
pub mod pipe;
pub type sock_t = io::file::fd_t;
pub mod addrinfo {
pub use sys_common::net::get_host_addresses;
}
// FIXME: move these to c module
pub type sock_t = self::fs::fd_t;
pub type wrlen = libc::size_t;
pub type msglen_t = libc::size_t;
pub unsafe fn close_sock(sock: sock_t) { let _ = libc::close(sock); }
pub fn last_error() -> IoError {
let errno = os::errno() as i32;
let mut err = decode_error(errno);
err.detail = Some(os::error_string(errno));
decode_error_detailed(os::errno() as i32)
}
pub fn last_net_error() -> IoError {
last_error()
}
extern "system" {
fn gai_strerror(errcode: libc::c_int) -> *const libc::c_char;
}
pub fn last_gai_error(s: libc::c_int) -> IoError {
use c_str::CString;
let mut err = decode_error(s);
err.detail = Some(unsafe {
CString::new(gai_strerror(s), false).as_str().unwrap().to_string()
});
err
}
@ -64,6 +91,12 @@ pub fn decode_error(errno: i32) -> IoError {
IoError { kind: kind, desc: desc, detail: None }
}
pub fn decode_error_detailed(errno: i32) -> IoError {
let mut err = decode_error(errno);
err.detail = Some(os::error_string(errno));
err
}
#[inline]
pub fn retry<I: PartialEq + num::One + Neg<I>> (f: || -> I) -> I {
let minus_one = -num::one::<I>();
@ -86,7 +119,10 @@ pub fn wouldblock() -> bool {
err == libc::EWOULDBLOCK as int || err == libc::EAGAIN as int
}
pub fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> {
pub fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
let set = nb as libc::c_int;
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
}
// nothing needed on unix platforms
pub fn init_net() {}

View file

@ -11,6 +11,8 @@
use libc;
use libc::{c_int, c_char};
use prelude::*;
use io::IoResult;
use sys::fs::FileDesc;
use os::TMPBUF_SZ;
@ -99,3 +101,12 @@ pub fn error_string(errno: i32) -> String {
::string::raw::from_buf(p as *const u8)
}
}
pub unsafe fn pipe() -> IoResult<(FileDesc, FileDesc)> {
let mut fds = [0, ..2];
if libc::pipe(fds.as_mut_ptr()) == 0 {
Ok((FileDesc::new(fds[0], true), FileDesc::new(fds[1], true)))
} else {
Err(super::last_error())
}
}

View file

@ -10,19 +10,17 @@
use alloc::arc::Arc;
use libc;
use std::c_str::CString;
use std::mem;
use std::rt::mutex;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
use std::sync::atomic;
use c_str::CString;
use mem;
use rt::mutex;
use sync::atomic;
use io::{mod, IoResult, IoError};
use prelude::*;
use super::retry;
use super::net;
use super::util;
use super::c;
use super::process;
use super::file::{fd_t, FileDesc};
use sys::{mod, timer, retry, c, set_nonblocking, wouldblock};
use sys::fs::{fd_t, FileDesc};
use sys_common::net::*;
use sys_common::{eof, mkerr_libc};
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
@ -41,12 +39,10 @@ fn addr_to_sockaddr_un(addr: &CString,
let len = addr.len();
if len > s.sun_path.len() - 1 {
#[cfg(unix)] use libc::EINVAL as ERROR;
#[cfg(windows)] use libc::WSAEINVAL as ERROR;
return Err(IoError {
code: ERROR as uint,
extra: 0,
detail: Some("path must be smaller than SUN_LEN".to_string()),
kind: io::InvalidInput,
desc: "invalid argument: path must be smaller than SUN_LEN",
detail: None,
})
}
s.sun_family = libc::AF_UNIX as libc::sa_family_t;
@ -92,7 +88,7 @@ fn connect(addr: &CString, ty: libc::c_int,
}
}
Some(timeout_ms) => {
try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
try!(connect_timeout(inner.fd, addrp, len, timeout_ms));
Ok(inner)
}
}
@ -143,18 +139,16 @@ impl UnixStream {
fn lock_nonblocking(&self) {}
#[cfg(not(target_os = "linux"))]
fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> {
let ret = net::Guard {
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { self.inner.lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
assert!(set_nonblocking(self.fd(), true).is_ok());
ret
}
}
impl rtio::RtioPipe for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let fd = self.fd();
let dolock = || self.lock_nonblocking();
let doread = |nb| unsafe {
@ -164,10 +158,10 @@ impl rtio::RtioPipe for UnixStream {
buf.len() as libc::size_t,
flags) as libc::c_int
};
net::read(fd, self.read_deadline, dolock, doread)
read(fd, self.read_deadline, dolock, doread)
}
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
let fd = self.fd();
let dolock = || self.lock_nonblocking();
let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
@ -177,32 +171,38 @@ impl rtio::RtioPipe for UnixStream {
len as libc::size_t,
flags) as i64
};
match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) {
match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
Ok(_) => Ok(()),
Err(e) => Err(e)
}
}
fn clone(&self) -> Box<rtio::RtioPipe + Send> {
box UnixStream::new(self.inner.clone()) as Box<rtio::RtioPipe + Send>
pub fn close_write(&mut self) -> IoResult<()> {
mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
}
fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
pub fn close_read(&mut self) -> IoResult<()> {
mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
fn set_timeout(&mut self, timeout: Option<u64>) {
let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
pub fn set_timeout(&mut self, timeout: Option<u64>) {
let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
self.read_deadline = deadline;
self.write_deadline = deadline;
}
fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
}
fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
}
}
impl Clone for UnixStream {
fn clone(&self) -> UnixStream {
UnixStream::new(self.inner.clone())
}
}
@ -224,16 +224,15 @@ impl UnixListener {
fn fd(&self) -> fd_t { self.inner.fd }
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
pub fn listen(self) -> IoResult<UnixAcceptor> {
match unsafe { libc::listen(self.fd(), 128) } {
-1 => Err(super::last_error()),
#[cfg(unix)]
_ => {
let (reader, writer) = try!(process::pipe());
try!(util::set_nonblocking(reader.fd(), true));
try!(util::set_nonblocking(writer.fd(), true));
try!(util::set_nonblocking(self.fd(), true));
let (reader, writer) = try!(unsafe { sys::os::pipe() });
try!(set_nonblocking(reader.fd(), true));
try!(set_nonblocking(writer.fd(), true));
try!(set_nonblocking(self.fd(), true));
Ok(UnixAcceptor {
inner: Arc::new(AcceptorInner {
listener: self,
@ -248,21 +247,11 @@ impl UnixListener {
}
}
impl rtio::RtioUnixListener for UnixListener {
fn listen(self: Box<UnixListener>)
-> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
self.native_listen(128).map(|a| {
box a as Box<rtio::RtioUnixAcceptor + Send>
})
}
}
pub struct UnixAcceptor {
inner: Arc<AcceptorInner>,
deadline: u64,
}
#[cfg(unix)]
struct AcceptorInner {
listener: UnixListener,
reader: FileDesc,
@ -273,7 +262,7 @@ struct AcceptorInner {
impl UnixAcceptor {
fn fd(&self) -> fd_t { self.inner.listener.fd() }
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
pub fn accept(&mut self) -> IoResult<UnixStream> {
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
while !self.inner.closed.load(atomic::SeqCst) {
@ -287,43 +276,36 @@ impl UnixAcceptor {
storagep as *mut libc::sockaddr,
&mut size as *mut libc::socklen_t) as libc::c_int
}) {
-1 if util::wouldblock() => {}
-1 if wouldblock() => {}
-1 => return Err(super::last_error()),
fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
}
}
try!(util::await([self.fd(), self.inner.reader.fd()],
deadline, util::Readable));
try!(await([self.fd(), self.inner.reader.fd()],
deadline, Readable));
}
Err(util::eof())
Err(eof())
}
pub fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
}
pub fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.write([0]) {
Ok(..) => Ok(()),
Err(..) if wouldblock() => Ok(()),
Err(e) => Err(e),
}
}
}
impl rtio::RtioUnixAcceptor for UnixAcceptor {
fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>)
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
box UnixAcceptor {
inner: self.inner.clone(),
deadline: 0,
} as Box<rtio::RtioUnixAcceptor + Send>
}
#[cfg(unix)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.inner_write([0]) {
Ok(..) => Ok(()),
Err(..) if util::wouldblock() => Ok(()),
Err(e) => Err(e),
}
impl Clone for UnixAcceptor {
fn clone(&self) -> UnixAcceptor {
UnixAcceptor { inner: self.inner.clone(), deadline: 0 }
}
}

157
src/libstd/sys/unix/tcp.rs Normal file
View file

@ -0,0 +1,157 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use io::net::ip;
use io::IoResult;
use libc;
use mem;
use ptr;
use prelude::*;
use super::{last_error, last_net_error, retry, sock_t};
use sync::{Arc, atomic};
use sys::fs::FileDesc;
use sys::{set_nonblocking, wouldblock};
use sys;
use sys_common;
use sys_common::net::*;
pub use sys_common::net::TcpStream;
////////////////////////////////////////////////////////////////////////////////
// TCP listeners
////////////////////////////////////////////////////////////////////////////////
pub struct TcpListener {
pub inner: FileDesc,
}
impl TcpListener {
pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
let fd = try!(socket(addr, libc::SOCK_STREAM));
let ret = TcpListener { inner: FileDesc::new(fd, true) };
let mut storage = unsafe { mem::zeroed() };
let len = addr_to_sockaddr(addr, &mut storage);
let addrp = &storage as *const _ as *const libc::sockaddr;
// On platforms with Berkeley-derived sockets, this allows
// to quickly rebind a socket, without needing to wait for
// the OS to clean up the previous one.
try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR, 1 as libc::c_int));
match unsafe { libc::bind(fd, addrp, len) } {
-1 => Err(last_error()),
_ => Ok(ret),
}
}
pub fn fd(&self) -> sock_t { self.inner.fd() }
pub fn listen(self, backlog: int) -> IoResult<TcpAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(last_net_error()),
_ => {
let (reader, writer) = try!(unsafe { sys::os::pipe() });
try!(set_nonblocking(reader.fd(), true));
try!(set_nonblocking(writer.fd(), true));
try!(set_nonblocking(self.fd(), true));
Ok(TcpAcceptor {
inner: Arc::new(AcceptorInner {
listener: self,
reader: reader,
writer: writer,
closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
}
}
}
pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
sockname(self.fd(), libc::getsockname)
}
}
pub struct TcpAcceptor {
inner: Arc<AcceptorInner>,
deadline: u64,
}
struct AcceptorInner {
listener: TcpListener,
reader: FileDesc,
writer: FileDesc,
closed: atomic::AtomicBool,
}
impl TcpAcceptor {
pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
pub fn accept(&mut self) -> IoResult<TcpStream> {
// In implementing accept, the two main concerns are dealing with
// close_accept() and timeouts. The unix implementation is based on a
// nonblocking accept plus a call to select(). Windows ends up having
// an entirely separate implementation than unix, which is explained
// below.
//
// To implement timeouts, all blocking is done via select() instead of
// accept() by putting the socket in non-blocking mode. Because
// select() takes a timeout argument, we just pass through the timeout
// to select().
//
// To implement close_accept(), we have a self-pipe to ourselves which
// is passed to select() along with the socket being accepted on. The
// self-pipe is never written to unless close_accept() is called.
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
while !self.inner.closed.load(atomic::SeqCst) {
match retry(|| unsafe {
libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
}) {
-1 if wouldblock() => {}
-1 => return Err(last_net_error()),
fd => return Ok(TcpStream::new(fd as sock_t)),
}
try!(await([self.fd(), self.inner.reader.fd()],
deadline, Readable));
}
Err(sys_common::eof())
}
pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
sockname(self.fd(), libc::getsockname)
}
pub fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| sys::timer::now() + a).unwrap_or(0);
}
pub fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.write([0]) {
Ok(..) => Ok(()),
Err(..) if wouldblock() => Ok(()),
Err(e) => Err(e),
}
}
}
impl Clone for TcpAcceptor {
fn clone(&self) -> TcpAcceptor {
TcpAcceptor {
inner: self.inner.clone(),
deadline: 0,
}
}
}

View file

@ -0,0 +1,11 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
pub use sys_common::net::UdpSocket;

View file

@ -33,12 +33,21 @@ macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
};
) )
pub mod c;
pub mod fs;
pub mod os;
pub mod c;
pub mod tcp;
pub mod udp;
pub mod pipe;
pub mod addrinfo {
pub use sys_common::net::get_host_addresses;
}
// FIXME: move these to c module
pub type sock_t = libc::SOCKET;
pub type wrlen = libc::c_int;
pub type msglen_t = libc::c_int;
pub unsafe fn close_sock(sock: sock_t) { let _ = libc::closesocket(sock); }
// windows has zero values as errors
@ -140,7 +149,6 @@ pub fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
}
}
// FIXME: call this
pub fn init_net() {
unsafe {
static START: Once = ONCE_INIT;

View file

@ -86,18 +86,17 @@
use alloc::arc::Arc;
use libc;
use std::c_str::CString;
use std::mem;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
use std::sync::atomic;
use std::rt::mutex;
use c_str::CString;
use mem;
use ptr;
use sync::atomic;
use rt::mutex;
use io::{mod, IoError, IoResult};
use prelude::*;
use super::c;
use super::util;
use super::file::to_utf16;
use sys_common::{mod, eof};
use super::{c, os, timer, to_utf16, decode_error_detailed};
struct Event(libc::HANDLE);
@ -177,7 +176,7 @@ pub fn await(handle: libc::HANDLE, deadline: u64,
let ms = if deadline == 0 {
libc::INFINITE as u64
} else {
let now = ::io::timer::now();
let now = timer::now();
if deadline < now {0} else {deadline - now}
};
let ret = unsafe {
@ -190,7 +189,7 @@ pub fn await(handle: libc::HANDLE, deadline: u64,
WAIT_FAILED => Err(super::last_error()),
WAIT_TIMEOUT => unsafe {
let _ = c::CancelIo(handle);
Err(util::timeout("operation timed out"))
Err(sys_common::timeout("operation timed out"))
},
n => Ok((n - WAIT_OBJECT_0) as uint)
}
@ -198,8 +197,8 @@ pub fn await(handle: libc::HANDLE, deadline: u64,
fn epipe() -> IoError {
IoError {
code: libc::ERROR_BROKEN_PIPE as uint,
extra: 0,
kind: io::EndOfFile,
desc: "the pipe has ended",
detail: None,
}
}
@ -268,8 +267,8 @@ impl UnixStream {
}
pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
let addr = try!(to_utf16(addr));
let start = ::io::timer::now();
let addr = try!(to_utf16(addr.as_str()));
let start = timer::now();
loop {
match UnixStream::try_connect(addr.as_ptr()) {
Some(handle) => {
@ -308,13 +307,13 @@ impl UnixStream {
match timeout {
Some(timeout) => {
let now = ::io::timer::now();
let now = timer::now();
let timed_out = (now - start) >= timeout || unsafe {
let ms = (timeout - (now - start)) as libc::DWORD;
libc::WaitNamedPipeW(addr.as_ptr(), ms) == 0
};
if timed_out {
return Err(util::timeout("connect timed out"))
return Err(sys_common::timeout("connect timed out"))
}
}
@ -349,10 +348,8 @@ impl UnixStream {
_ => Ok(())
}
}
}
impl rtio::RtioPipe for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
if self.read.is_none() {
self.read = Some(try!(Event::new(true, false)));
}
@ -368,7 +365,7 @@ impl rtio::RtioPipe for UnixStream {
// See comments in close_read() about why this lock is necessary.
let guard = unsafe { self.inner.lock.lock() };
if self.read_closed() {
return Err(util::eof())
return Err(eof())
}
// Issue a nonblocking requests, succeeding quickly if it happened to
@ -416,15 +413,15 @@ impl rtio::RtioPipe for UnixStream {
// If the reading half is now closed, then we're done. If we woke up
// because the writing half was closed, keep trying.
if wait_succeeded.is_err() {
return Err(util::timeout("read timed out"))
return Err(sys_common::timeout("read timed out"))
}
if self.read_closed() {
return Err(util::eof())
return Err(eof())
}
}
}
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
if self.write.is_none() {
self.write = Some(try!(Event::new(true, false)));
}
@ -458,11 +455,7 @@ impl rtio::RtioPipe for UnixStream {
if ret == 0 {
if err != libc::ERROR_IO_PENDING as uint {
return Err(IoError {
code: err as uint,
extra: 0,
detail: Some(os::error_string(err as uint)),
})
return Err(decode_error_detailed(err as i32))
}
// Process a timeout if one is pending
let wait_succeeded = await(self.handle(), self.write_deadline,
@ -484,12 +477,12 @@ impl rtio::RtioPipe for UnixStream {
let amt = offset + bytes_written as uint;
return if amt > 0 {
Err(IoError {
code: libc::ERROR_OPERATION_ABORTED as uint,
extra: amt,
detail: Some("short write during write".to_string()),
kind: io::ShortWrite(amt),
desc: "short write during write",
detail: None,
})
} else {
Err(util::timeout("write timed out"))
Err(sys_common::timeout("write timed out"))
}
}
if self.write_closed() {
@ -503,17 +496,7 @@ impl rtio::RtioPipe for UnixStream {
Ok(())
}
fn clone(&self) -> Box<rtio::RtioPipe + Send> {
box UnixStream {
inner: self.inner.clone(),
read: None,
write: None,
read_deadline: 0,
write_deadline: 0,
} as Box<rtio::RtioPipe + Send>
}
fn close_read(&mut self) -> IoResult<()> {
pub fn close_read(&mut self) -> IoResult<()> {
// On windows, there's no actual shutdown() method for pipes, so we're
// forced to emulate the behavior manually at the application level. To
// do this, we need to both cancel any pending requests, as well as
@ -536,23 +519,35 @@ impl rtio::RtioPipe for UnixStream {
self.cancel_io()
}
fn close_write(&mut self) -> IoResult<()> {
pub fn close_write(&mut self) -> IoResult<()> {
// see comments in close_read() for why this lock is necessary
let _guard = unsafe { self.inner.lock.lock() };
self.inner.write_closed.store(true, atomic::SeqCst);
self.cancel_io()
}
fn set_timeout(&mut self, timeout: Option<u64>) {
let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
pub fn set_timeout(&mut self, timeout: Option<u64>) {
let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
self.read_deadline = deadline;
self.write_deadline = deadline;
}
fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
}
fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
}
}
impl Clone for UnixStream {
fn clone(&self) -> UnixStream {
UnixStream {
inner: self.inner.clone(),
read: None,
write: None,
read_deadline: 0,
write_deadline: 0,
}
}
}
@ -570,7 +565,7 @@ impl UnixListener {
// Although we technically don't need the pipe until much later, we
// create the initial handle up front to test the validity of the name
// and such.
let addr_v = try!(to_utf16(addr));
let addr_v = try!(to_utf16(addr.as_str()));
let ret = unsafe { pipe(addr_v.as_ptr(), true) };
if ret == libc::INVALID_HANDLE_VALUE {
Err(super::last_error())
@ -579,7 +574,7 @@ impl UnixListener {
}
}
pub fn native_listen(self) -> IoResult<UnixAcceptor> {
pub fn listen(self) -> IoResult<UnixAcceptor> {
Ok(UnixAcceptor {
listener: self,
event: try!(Event::new(true, false)),
@ -598,15 +593,6 @@ impl Drop for UnixListener {
}
}
impl rtio::RtioUnixListener for UnixListener {
fn listen(self: Box<UnixListener>)
-> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
self.native_listen().map(|a| {
box a as Box<rtio::RtioUnixAcceptor + Send>
})
}
}
pub struct UnixAcceptor {
inner: Arc<AcceptorState>,
listener: UnixListener,
@ -620,7 +606,7 @@ struct AcceptorState {
}
impl UnixAcceptor {
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
pub fn accept(&mut self) -> IoResult<UnixStream> {
// This function has some funky implementation details when working with
// unix pipes. On windows, each server named pipe handle can be
// connected to a one or zero clients. To the best of my knowledge, a
@ -657,9 +643,9 @@ impl UnixAcceptor {
// If we've had an artificial call to close_accept, be sure to never
// proceed in accepting new clients in the future
if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) }
if self.inner.closed.load(atomic::SeqCst) { return Err(eof()) }
let name = try!(to_utf16(&self.listener.name));
let name = try!(to_utf16(self.listener.name.as_str()));
// Once we've got a "server handle", we need to wait for a client to
// connect. The ConnectNamedPipe function will block this thread until
@ -691,7 +677,7 @@ impl UnixAcceptor {
if wait_succeeded.is_ok() {
err = unsafe { libc::GetLastError() };
} else {
return Err(util::timeout("accept timed out"))
return Err(sys_common::timeout("accept timed out"))
}
} else {
// we succeeded, bypass the check below
@ -727,34 +713,12 @@ impl UnixAcceptor {
write_deadline: 0,
})
}
}
impl rtio::RtioUnixAcceptor for UnixAcceptor {
fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe + Send>> {
self.native_accept().map(|s| box s as Box<rtio::RtioPipe + Send>)
}
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
pub fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|i| i + timer::now()).unwrap_or(0);
}
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
let name = to_utf16(&self.listener.name).ok().unwrap();
box UnixAcceptor {
inner: self.inner.clone(),
event: Event::new(true, false).ok().unwrap(),
deadline: 0,
listener: UnixListener {
name: self.listener.name.clone(),
handle: unsafe {
let p = pipe(name.as_ptr(), false) ;
assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
p
},
},
} as Box<rtio::RtioUnixAcceptor + Send>
}
fn close_accept(&mut self) -> IoResult<()> {
pub fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let ret = unsafe {
c::SetEvent(self.inner.abort.handle())
@ -767,3 +731,21 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
}
}
impl Clone for UnixAcceptor {
fn clone(&self) -> UnixAcceptor {
let name = to_utf16(self.listener.name.as_str()).ok().unwrap();
UnixAcceptor {
inner: self.inner.clone(),
event: Event::new(true, false).ok().unwrap(),
deadline: 0,
listener: UnixListener {
name: self.listener.name.clone(),
handle: unsafe {
let p = pipe(name.as_ptr(), false) ;
assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
p
},
},
}
}
}

View file

@ -0,0 +1,219 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use io::net::ip;
use io::IoResult;
use libc;
use mem;
use ptr;
use prelude::*;
use super::{last_error, last_net_error, retry, sock_t};
use sync::{Arc, atomic};
use sys::fs::FileDesc;
use sys::{mod, c, set_nonblocking, wouldblock, timer};
use sys_common::{mod, timeout, eof};
use sys_common::net::*;
pub use sys_common::net::TcpStream;
pub struct Event(c::WSAEVENT);
impl Event {
pub fn new() -> IoResult<Event> {
let event = unsafe { c::WSACreateEvent() };
if event == c::WSA_INVALID_EVENT {
Err(super::last_error())
} else {
Ok(Event(event))
}
}
pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle }
}
impl Drop for Event {
fn drop(&mut self) {
unsafe { let _ = c::WSACloseEvent(self.handle()); }
}
}
////////////////////////////////////////////////////////////////////////////////
// TCP listeners
////////////////////////////////////////////////////////////////////////////////
pub struct TcpListener {
inner: FileDesc,
}
impl TcpListener {
pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
sys::init_net();
let fd = try!(socket(addr, libc::SOCK_STREAM));
let ret = TcpListener { inner: FileDesc::new(fd as libc::c_int, true) };
let mut storage = unsafe { mem::zeroed() };
let len = addr_to_sockaddr(addr, &mut storage);
let addrp = &storage as *const _ as *const libc::sockaddr;
match unsafe { libc::bind(fd, addrp, len) } {
-1 => Err(last_net_error()),
_ => Ok(ret),
}
}
pub fn fd(&self) -> sock_t { self.inner.fd as sock_t }
pub fn listen(self, backlog: int) -> IoResult<TcpAcceptor> {
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(last_net_error()),
_ => {
let accept = try!(Event::new());
let ret = unsafe {
c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT)
};
if ret != 0 {
return Err(last_net_error())
}
Ok(TcpAcceptor {
inner: Arc::new(AcceptorInner {
listener: self,
abort: try!(Event::new()),
accept: accept,
closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
}
}
}
pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
sockname(self.fd(), libc::getsockname)
}
}
pub struct TcpAcceptor {
inner: Arc<AcceptorInner>,
deadline: u64,
}
struct AcceptorInner {
listener: TcpListener,
abort: Event,
accept: Event,
closed: atomic::AtomicBool,
}
impl TcpAcceptor {
pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
pub fn accept(&mut self) -> IoResult<TcpStream> {
// Unlink unix, windows cannot invoke `select` on arbitrary file
// descriptors like pipes, only sockets. Consequently, windows cannot
// use the same implementation as unix for accept() when close_accept()
// is considered.
//
// In order to implement close_accept() and timeouts, windows uses
// event handles. An acceptor-specific abort event is created which
// will only get set in close_accept(), and it will never be un-set.
// Additionally, another acceptor-specific event is associated with the
// FD_ACCEPT network event.
//
// These two events are then passed to WaitForMultipleEvents to see
// which one triggers first, and the timeout passed to this function is
// the local timeout for the acceptor.
//
// If the wait times out, then the accept timed out. If the wait
// succeeds with the abort event, then we were closed, and if the wait
// succeeds otherwise, then we do a nonblocking poll via `accept` to
// see if we can accept a connection. The connection is candidate to be
// stolen, so we do all of this in a loop as well.
let events = [self.inner.abort.handle(), self.inner.accept.handle()];
while !self.inner.closed.load(atomic::SeqCst) {
let ms = if self.deadline == 0 {
c::WSA_INFINITE as u64
} else {
let now = timer::now();
if self.deadline < now {0} else {self.deadline - now}
};
let ret = unsafe {
c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
ms as libc::DWORD, libc::FALSE)
};
match ret {
c::WSA_WAIT_TIMEOUT => {
return Err(timeout("accept timed out"))
}
c::WSA_WAIT_FAILED => return Err(last_net_error()),
c::WSA_WAIT_EVENT_0 => break,
n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
}
let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
let ret = unsafe {
c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents)
};
if ret != 0 { return Err(last_net_error()) }
if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue }
match unsafe {
libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
} {
-1 if wouldblock() => {}
-1 => return Err(last_net_error()),
// Accepted sockets inherit the same properties as the caller,
// so we need to deregister our event and switch the socket back
// to blocking mode
fd => {
let stream = TcpStream::new(fd);
let ret = unsafe {
c::WSAEventSelect(fd, events[1], 0)
};
if ret != 0 { return Err(last_net_error()) }
try!(set_nonblocking(fd, false));
return Ok(stream)
}
}
}
Err(eof())
}
pub fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
sockname(self.fd(), libc::getsockname)
}
pub fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
}
pub fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
if ret == libc::TRUE {
Ok(())
} else {
Err(last_net_error())
}
}
}
impl Clone for TcpAcceptor {
fn clone(&self) -> TcpAcceptor {
TcpAcceptor {
inner: self.inner.clone(),
deadline: 0,
}
}
}

View file

@ -0,0 +1,11 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
pub use sys_common::net::UdpSocket;