From c1b5c4db8fdaec025f3ace3c69f046426d69d5db Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 4 Nov 2013 16:42:05 -0800 Subject: [PATCH] Start migrating stream I/O away from ~fn() --- src/librustuv/lib.rs | 6 +- src/librustuv/pipe.rs | 261 ++++++++++++++++++++++++++++++--------- src/librustuv/process.rs | 14 +-- src/librustuv/stream.rs | 216 ++++++++++++++++++++++++++++++++ src/librustuv/tty.rs | 102 +++++++++------ src/librustuv/uvio.rs | 231 ++-------------------------------- src/libstd/rt/rtio.rs | 2 - 7 files changed, 507 insertions(+), 325 deletions(-) create mode 100644 src/librustuv/stream.rs diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index de8bed948c4..1d6f2f0edb5 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -66,8 +66,9 @@ pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; pub use self::async::AsyncWatcher; pub use self::process::Process; -pub use self::pipe::Pipe; +pub use self::pipe::PipeWatcher; pub use self::signal::SignalWatcher; +pub use self::tty::TtyWatcher; mod macros; @@ -87,6 +88,7 @@ pub mod process; pub mod pipe; pub mod tty; pub mod signal; +pub mod stream; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -218,7 +220,6 @@ pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option); pub type NullCallback = ~fn(); pub type ConnectionCallback = ~fn(StreamWatcher, Option); pub type FsCallback = ~fn(&mut FsRequest, Option); -pub type AsyncCallback = ~fn(AsyncWatcher, Option); pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option); pub type UdpSendCallback = ~fn(UdpWatcher, Option); @@ -231,7 +232,6 @@ struct WatcherData { connect_cb: Option, close_cb: Option, alloc_cb: Option, - async_cb: Option, udp_recv_cb: Option, udp_send_cb: Option, } diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 0b65c55636d..f1936635a18 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -8,91 +8,234 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::libc; use std::c_str::CString; +use std::cast; +use std::libc; +use std::rt::BlockedTask; +use std::rt::io::IoError; +use std::rt::local::Local; +use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; +use std::rt::sched::{Scheduler, SchedHandle}; +use std::rt::tube::Tube; -use super::{Loop, UvError, Watcher, NativeHandle, status_to_maybe_uv_error}; -use super::ConnectionCallback; -use net; +use stream::StreamWatcher; +use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle}; +use uvio::HomingIO; use uvll; -pub struct Pipe(*uvll::uv_pipe_t); +pub struct PipeWatcher { + stream: StreamWatcher, + home: SchedHandle, +} -impl Watcher for Pipe {} +pub struct PipeListener { + home: SchedHandle, + pipe: *uvll::uv_pipe_t, + priv closing_task: Option, + priv outgoing: Tube>, +} -impl Pipe { - pub fn new(loop_: &Loop, ipc: bool) -> Pipe { +pub struct PipeAcceptor { + listener: ~PipeListener, + priv incoming: Tube>, +} + +// PipeWatcher implementation and traits + +impl PipeWatcher { + pub fn new(pipe: *uvll::uv_pipe_t) -> PipeWatcher { + PipeWatcher { + stream: StreamWatcher::new(pipe), + home: get_handle_to_current_scheduler!(), + } + } + + pub fn alloc(loop_: &Loop, ipc: bool) -> *uvll::uv_pipe_t { unsafe { let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE); - assert!(handle.is_not_null()); + assert!(!handle.is_null()); let ipc = ipc as libc::c_int; assert_eq!(uvll::uv_pipe_init(loop_.native_handle(), handle, ipc), 0); - let mut ret: Pipe = - NativeHandle::from_native_handle(handle); - ret.install_watcher_data(); - ret + handle } } - pub fn as_stream(&self) -> net::StreamWatcher { - net::StreamWatcher(**self as *uvll::uv_stream_t) - } - - #[fixed_stack_segment] #[inline(never)] - pub fn open(&mut self, file: libc::c_int) -> Result<(), UvError> { - match unsafe { uvll::uv_pipe_open(self.native_handle(), file) } { - 0 => Ok(()), - n => Err(UvError(n)) - } - } - - #[fixed_stack_segment] #[inline(never)] - pub fn bind(&mut self, name: &CString) -> Result<(), UvError> { - do name.with_ref |name| { - match unsafe { uvll::uv_pipe_bind(self.native_handle(), name) } { - 0 => Ok(()), - n => Err(UvError(n)) + pub fn open(loop_: &Loop, file: libc::c_int) -> Result + { + let handle = PipeWatcher::alloc(loop_, false); + match unsafe { uvll::uv_pipe_open(handle, file) } { + 0 => Ok(PipeWatcher::new(handle)), + n => { + unsafe { uvll::uv_close(handle, pipe_close_cb) } + Err(UvError(n)) } } } - #[fixed_stack_segment] #[inline(never)] - pub fn connect(&mut self, name: &CString, cb: ConnectionCallback) { - { - let data = self.get_watcher_data(); - assert!(data.connect_cb.is_none()); - data.connect_cb = Some(cb); + pub fn connect(loop_: &Loop, name: &CString) -> Result + { + struct Ctx { + task: Option, + result: Option>, } + let mut cx = Ctx { task: None, result: None }; + let req = unsafe { uvll::malloc_req(uvll::UV_CONNECT) }; + unsafe { uvll::set_data_for_req(req, &cx as *Ctx) } - let connect = net::ConnectRequest::new(); - let name = do name.with_ref |p| { p }; - - unsafe { - uvll::uv_pipe_connect(connect.native_handle(), - self.native_handle(), - name, - connect_cb) + let sched: ~Scheduler = Local::take(); + do sched.deschedule_running_task_and_then |_, task| { + cx.task = Some(task); + unsafe { + uvll::uv_pipe_connect(req, + PipeWatcher::alloc(loop_, false), + name.with_ref(|p| p), + connect_cb) + } } + assert!(cx.task.is_none()); + return cx.result.take().expect("pipe connect needs a result"); - extern "C" fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) { - let connect_request: net::ConnectRequest = - NativeHandle::from_native_handle(req); - let mut stream_watcher = connect_request.stream(); - connect_request.delete(); + extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) { + unsafe { + let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req)); + let stream = uvll::get_stream_handle_from_connect_req(req); + cx.result = Some(match status { + 0 => Ok(PipeWatcher::new(stream)), + n => { + uvll::free_handle(stream); + Err(UvError(n)) + } + }); + uvll::free_req(req); - let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap(); - let status = status_to_maybe_uv_error(status); - cb(stream_watcher, status); + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(cx.task.take_unwrap()); + } } } - } -impl NativeHandle<*uvll::uv_pipe_t> for Pipe { - fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe { - Pipe(handle) +impl RtioPipe for PipeWatcher { + fn read(&mut self, buf: &mut [u8]) -> Result { + let _m = self.fire_missiles(); + self.stream.read(buf).map_err(uv_error_to_io_error) } - fn native_handle(&self) -> *uvll::uv_pipe_t { - match self { &Pipe(ptr) => ptr } + + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + let _m = self.fire_missiles(); + self.stream.write(buf).map_err(uv_error_to_io_error) } } + +impl HomingIO for PipeWatcher { + fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home } +} + +impl Drop for PipeWatcher { + fn drop(&mut self) { + let _m = self.fire_missiles(); + self.stream.close(true); // close synchronously + } +} + +extern fn pipe_close_cb(handle: *uvll::uv_handle_t) { + unsafe { uvll::free_handle(handle) } +} + +// PipeListener implementation and traits + +impl PipeListener { + pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> { + let pipe = PipeWatcher::alloc(loop_, false); + match unsafe { uvll::uv_pipe_bind(pipe, name.with_ref(|p| p)) } { + 0 => { + let p = ~PipeListener { + home: get_handle_to_current_scheduler!(), + pipe: pipe, + closing_task: None, + outgoing: Tube::new(), + }; + Ok(p.install()) + } + n => { + unsafe { uvll::free_handle(pipe) } + Err(UvError(n)) + } + } + } +} + +impl RtioUnixListener for PipeListener { + fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> { + // create the acceptor object from ourselves + let incoming = self.outgoing.clone(); + let mut acceptor = ~PipeAcceptor { + listener: self, + incoming: incoming, + }; + + let _m = acceptor.fire_missiles(); + // XXX: the 128 backlog should be configurable + match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } { + 0 => Ok(acceptor as ~RtioUnixAcceptor), + n => Err(uv_error_to_io_error(UvError(n))), + } + } +} + +impl HomingIO for PipeListener { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl UvHandle for PipeListener { + fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe } +} + +extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) { + let msg = match status { + 0 => { + let loop_ = NativeHandle::from_native_handle(unsafe { + uvll::get_loop_for_uv_handle(server) + }); + let client = PipeWatcher::alloc(&loop_, false); + assert_eq!(unsafe { uvll::uv_accept(server, client) }, 0); + Ok(~PipeWatcher::new(client) as ~RtioPipe) + } + n => Err(uv_error_to_io_error(UvError(n))) + }; + + let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) }; + pipe.outgoing.send(msg); +} + +impl Drop for PipeListener { + fn drop(&mut self) { + let (_m, sched) = self.fire_missiles_sched(); + + do sched.deschedule_running_task_and_then |_, task| { + self.closing_task = Some(task); + unsafe { uvll::uv_close(self.pipe, listener_close_cb) } + } + } +} + +extern fn listener_close_cb(handle: *uvll::uv_handle_t) { + let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&handle) }; + unsafe { uvll::free_handle(handle) } + + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(pipe.closing_task.take_unwrap()); +} + +// PipeAcceptor implementation and traits + +impl RtioUnixAcceptor for PipeAcceptor { + fn accept(&mut self) -> Result<~RtioPipe, IoError> { + let _m = self.fire_missiles(); + self.incoming.recv() + } +} + +impl HomingIO for PipeAcceptor { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } +} diff --git a/src/librustuv/process.rs b/src/librustuv/process.rs index d143bc059e4..50964d7a84c 100644 --- a/src/librustuv/process.rs +++ b/src/librustuv/process.rs @@ -21,8 +21,9 @@ use std::rt::sched::{Scheduler, SchedHandle}; use std::vec; use super::{Loop, NativeHandle, UvHandle, UvError, uv_error_to_io_error}; -use uvio::{HomingIO, UvPipeStream, UvUnboundPipe}; +use uvio::HomingIO; use uvll; +use pipe::PipeWatcher; pub struct Process { handle: *uvll::uv_process_t, @@ -42,7 +43,7 @@ impl Process { /// Returns either the corresponding process object or an error which /// occurred. pub fn spawn(loop_: &Loop, config: ProcessConfig) - -> Result<(~Process, ~[Option<~UvPipeStream>]), UvError> + -> Result<(~Process, ~[Option]), UvError> { let cwd = config.cwd.map(|s| s.to_c_str()); let io = config.io; @@ -121,7 +122,7 @@ extern fn on_exit(handle: *uvll::uv_process_t, unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, io: &StdioContainer, - loop_: &Loop) -> Option<~UvPipeStream> { + loop_: &Loop) -> Option { match *io { Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); @@ -140,11 +141,10 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, if writable { flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int; } - let pipe = UvUnboundPipe::new(loop_); - let handle = pipe.pipe.as_stream().native_handle(); + let pipe_handle = PipeWatcher::alloc(loop_, false); uvll::set_stdio_container_flags(dst, flags); - uvll::set_stdio_container_stream(dst, handle); - Some(~UvPipeStream::new(pipe)) + uvll::set_stdio_container_stream(dst, pipe_handle); + Some(PipeWatcher::new(pipe_handle)) } } } diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs new file mode 100644 index 00000000000..ad0deebd457 --- /dev/null +++ b/src/librustuv/stream.rs @@ -0,0 +1,216 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::cast; +use std::libc::{c_int, size_t, ssize_t, c_void}; +use std::ptr; +use std::rt::BlockedTask; +use std::rt::local::Local; +use std::rt::sched::Scheduler; + +use super::{UvError, Buf, slice_to_uv_buf}; +use uvll; + +// This is a helper structure which is intended to get embedded into other +// Watcher structures. This structure will retain a handle to the underlying +// uv_stream_t instance, and all I/O operations assume that it's already located +// on the appropriate scheduler. +pub struct StreamWatcher { + handle: *uvll::uv_stream_t, + + // Cache the last used uv_write_t so we don't have to allocate a new one on + // every call to uv_write(). Ideally this would be a stack-allocated + // structure, but currently we don't have mappings for all the structures + // defined in libuv, so we're foced to malloc this. + priv last_write_req: Option<*uvll::uv_write_t>, +} + +struct ReadContext { + buf: Option, + result: Option>, + task: Option, +} + +struct WriteContext { + result: Option>, + task: Option, +} + +impl StreamWatcher { + // Creates a new helper structure which should be then embedded into another + // watcher. This provides the generic read/write methods on streams. + // + // This structure will *not* close the stream when it is dropped. It is up + // to the enclosure structure to be sure to call the close method (which + // will block the task). Note that this is also required to prevent memory + // leaks. + // + // It should also be noted that the `data` field of the underlying uv handle + // will be manipulated on each of the methods called on this watcher. + // Wrappers should ensure to always reset the field to an appropriate value + // if they rely on the field to perform an action. + pub fn new(stream: *uvll::uv_stream_t) -> StreamWatcher { + StreamWatcher { + handle: stream, + last_write_req: None, + } + } + + pub fn read(&mut self, buf: &mut [u8]) -> Result { + // Send off the read request, but don't block until we're sure that the + // read request is queued. + match unsafe { + uvll::uv_read_start(self.handle, alloc_cb, read_cb) + } { + 0 => { + let mut rcx = ReadContext { + buf: Some(slice_to_uv_buf(buf)), + result: None, + task: None, + }; + unsafe { + uvll::set_data_for_uv_handle(self.handle, &rcx) + } + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_sched, task| { + rcx.task = Some(task); + } + rcx.result.take().expect("no result in read stream?") + } + n => Err(UvError(n)) + } + } + + pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> { + // Prepare the write request, either using a cached one or allocating a + // new one + let req = match self.last_write_req { + Some(req) => req, + None => unsafe { uvll::malloc_req(uvll::UV_WRITE) }, + }; + self.last_write_req = Some(req); + let mut wcx = WriteContext { result: None, task: None, }; + unsafe { uvll::set_data_for_req(req, &wcx as *WriteContext) } + + // Send off the request, but be careful to not block until we're sure + // that the write reqeust is queued. If the reqeust couldn't be queued, + // then we should return immediately with an error. + match unsafe { + uvll::uv_write(req, self.handle, [slice_to_uv_buf(buf)], write_cb) + } { + 0 => { + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_sched, task| { + wcx.task = Some(task); + } + assert!(wcx.task.is_none()); + wcx.result.take().expect("no result in write stream?") + } + n => Err(UvError(n)), + } + } + + // This will deallocate an internally used memory, along with closing the + // handle (and freeing it). + // + // The `synchronous` flag dictates whether this handle is closed + // synchronously (the task is blocked) or asynchronously (the task is not + // block, but the handle is still deallocated). + pub fn close(&mut self, synchronous: bool) { + // clean up the cached write request if we have one + match self.last_write_req { + Some(req) => unsafe { uvll::free_req(req) }, + None => {} + } + + if synchronous { + let mut closing_task = None; + unsafe { + uvll::set_data_for_uv_handle(self.handle, &closing_task); + } + + // Wait for this stream to close because it possibly represents a remote + // connection which may have consequences if we close asynchronously. + let sched: ~Scheduler = Local::take(); + do sched.deschedule_running_task_and_then |_, task| { + closing_task = Some(task); + unsafe { uvll::uv_close(self.handle, close_cb) } + } + } else { + unsafe { + uvll::set_data_for_uv_handle(self.handle, ptr::null::()); + uvll::uv_close(self.handle, close_cb) + } + } + + extern fn close_cb(handle: *uvll::uv_handle_t) { + let data: *c_void = unsafe { uvll::get_data_for_uv_handle(handle) }; + unsafe { uvll::free_handle(handle) } + if data.is_null() { return } + + let closing_task: &mut Option = unsafe { + cast::transmute(data) + }; + let task = closing_task.take_unwrap(); + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task); + } + } +} + +// This allocation callback expects to be invoked once and only once. It will +// unwrap the buffer in the ReadContext stored in the stream and return it. This +// will fail if it is called more than once. +extern fn alloc_cb(stream: *uvll::uv_stream_t, _hint: size_t) -> Buf { + let rcx: &mut ReadContext = unsafe { + cast::transmute(uvll::get_data_for_uv_handle(stream)) + }; + rcx.buf.take().expect("alloc_cb called more than once") +} + +// When a stream has read some data, we will always forcibly stop reading and +// return all the data read (even if it didn't fill the whole buffer). +extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: Buf) { + let rcx: &mut ReadContext = unsafe { + cast::transmute(uvll::get_data_for_uv_handle(handle)) + }; + // Stop reading so that no read callbacks are + // triggered before the user calls `read` again. + // XXX: Is there a performance impact to calling + // stop here? + unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); } + + assert!(rcx.result.is_none()); + rcx.result = Some(match nread { + n if n < 0 => Err(UvError(n as c_int)), + n => Ok(n as uint), + }); + + let task = rcx.task.take().expect("read_cb needs a task"); + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task); +} + +// Unlike reading, the WriteContext is stored in the uv_write_t request. Like +// reading, however, all this does is wake up the blocked task after squirreling +// away the error code as a result. +extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { + // Remember to not free the request because it is re-used between writes on + // the same stream. + unsafe { + let wcx: &mut WriteContext = cast::transmute(uvll::get_data_for_req(req)); + wcx.result = Some(match status { + 0 => Ok(()), + n => Err(UvError(n)), + }); + let sched: ~Scheduler = Local::take(); + sched.resume_blocked_task_immediately(wcx.task.take_unwrap()); + } +} diff --git a/src/librustuv/tty.rs b/src/librustuv/tty.rs index ad5f5043737..316a817354d 100644 --- a/src/librustuv/tty.rs +++ b/src/librustuv/tty.rs @@ -9,75 +9,105 @@ // except according to those terms. use std::libc; +use std::rt::io::IoError; +use std::rt::local::Local; +use std::rt::rtio::RtioTTY; +use std::rt::sched::{Scheduler, SchedHandle}; -use super::{Watcher, Loop, NativeHandle, UvError}; -use net; +use stream::StreamWatcher; +use super::{Loop, UvError, UvHandle, uv_error_to_io_error}; +use uvio::HomingIO; use uvll; -/// A process wraps the handle of the underlying uv_process_t. -pub struct TTY(*uvll::uv_tty_t); +pub struct TtyWatcher{ + tty: *uvll::uv_tty_t, + stream: StreamWatcher, + home: SchedHandle, + fd: libc::c_int, +} -impl Watcher for TTY {} - -impl TTY { - #[fixed_stack_segment] #[inline(never)] - pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool) -> - Result +impl TtyWatcher { + pub fn new(loop_: &Loop, fd: libc::c_int, readable: bool) + -> Result { - let handle = unsafe { uvll::malloc_handle(uvll::UV_TTY) }; - assert!(handle.is_not_null()); + let handle = UvHandle::alloc(None::, uvll::UV_TTY); - let ret = unsafe { + match unsafe { uvll::uv_tty_init(loop_.native_handle(), handle, fd as libc::c_int, readable as libc::c_int) - }; - match ret { + } { 0 => { - let mut ret: TTY = NativeHandle::from_native_handle(handle); - ret.install_watcher_data(); - Ok(ret) + Ok(TtyWatcher { + tty: handle, + stream: StreamWatcher::new(handle), + home: get_handle_to_current_scheduler!(), + fd: fd, + }) } n => { - unsafe { uvll::free_handle(handle); } + unsafe { uvll::free_handle(handle) } Err(UvError(n)) } } } +} - pub fn as_stream(&self) -> net::StreamWatcher { - net::StreamWatcher(**self as *uvll::uv_stream_t) +impl RtioTTY for TtyWatcher { + fn read(&mut self, buf: &mut [u8]) -> Result { + let _m = self.fire_missiles(); + self.stream.read(buf).map_err(uv_error_to_io_error) } - #[fixed_stack_segment] #[inline(never)] - pub fn set_mode(&self, raw: bool) -> Result<(), UvError> { + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + let _m = self.fire_missiles(); + self.stream.write(buf).map_err(uv_error_to_io_error) + } + + fn set_raw(&mut self, raw: bool) -> Result<(), IoError> { let raw = raw as libc::c_int; - match unsafe { uvll::uv_tty_set_mode(self.native_handle(), raw) } { + let _m = self.fire_missiles(); + match unsafe { uvll::uv_tty_set_mode(self.tty, raw) } { 0 => Ok(()), - n => Err(UvError(n)) + n => Err(uv_error_to_io_error(UvError(n))) } } - #[fixed_stack_segment] #[inline(never)] #[allow(unused_mut)] - pub fn get_winsize(&self) -> Result<(int, int), UvError> { + #[allow(unused_mut)] + fn get_winsize(&mut self) -> Result<(int, int), IoError> { let mut width: libc::c_int = 0; let mut height: libc::c_int = 0; let widthptr: *libc::c_int = &width; let heightptr: *libc::c_int = &width; - match unsafe { uvll::uv_tty_get_winsize(self.native_handle(), + let _m = self.fire_missiles(); + match unsafe { uvll::uv_tty_get_winsize(self.tty, widthptr, heightptr) } { 0 => Ok((width as int, height as int)), - n => Err(UvError(n)) + n => Err(uv_error_to_io_error(UvError(n))) } } -} -impl NativeHandle<*uvll::uv_tty_t> for TTY { - fn from_native_handle(handle: *uvll::uv_tty_t) -> TTY { - TTY(handle) - } - fn native_handle(&self) -> *uvll::uv_tty_t { - match self { &TTY(ptr) => ptr } + fn isatty(&self) -> bool { + unsafe { uvll::uv_guess_handle(self.fd) == uvll::UV_TTY } } } +impl UvHandle for TtyWatcher { + fn uv_handle(&self) -> *uvll::uv_tty_t { self.tty } +} + +impl HomingIO for TtyWatcher { + fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home } +} + +impl Drop for TtyWatcher { + // TTY handles are used for the logger in a task, so this destructor is run + // when a task is destroyed. When a task is being destroyed, a local + // scheduler isn't available, so we can't do the normal "take the scheduler + // and resume once close is done". Instead close operations on a TTY are + // asynchronous. + fn drop(&mut self) { + let _m = self.fire_missiles(); + self.stream.close(false); + } +} diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index e0ceb954e58..592a23297cc 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -49,6 +49,7 @@ use super::*; use idle::IdleWatcher; use net::{UvIpv4SocketAddr, UvIpv6SocketAddr}; use addrinfo::{GetAddrInfoRequest, accum_addrinfo}; +use pipe::PipeListener; // XXX we should not be calling uvll functions in here. @@ -616,67 +617,38 @@ impl IoFactory for UvIoFactory { match Process::spawn(self.uv_loop(), config) { Ok((p, io)) => { Ok((p as ~RtioProcess, - io.move_iter().map(|i| i.map(|p| p as ~RtioPipe)).collect())) + io.move_iter().map(|i| i.map(|p| ~p as ~RtioPipe)).collect())) } Err(e) => Err(uv_error_to_io_error(e)), } } - fn unix_bind(&mut self, path: &CString) -> - Result<~RtioUnixListener, IoError> { - let mut pipe = UvUnboundPipe::new(self.uv_loop()); - match pipe.pipe.bind(path) { - Ok(()) => Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener), + fn unix_bind(&mut self, path: &CString) -> Result<~RtioUnixListener, IoError> + { + match PipeListener::bind(self.uv_loop(), path) { + Ok(p) => Ok(p as ~RtioUnixListener), Err(e) => Err(uv_error_to_io_error(e)), } } fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> { - let pipe = UvUnboundPipe::new(self.uv_loop()); - let mut rawpipe = pipe.pipe; - - let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; - let pipe_cell = Cell::new(pipe); - let pipe_cell_ptr: *Cell = &pipe_cell; - - let scheduler: ~Scheduler = Local::take(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do rawpipe.connect(path) |_stream, err| { - let res = match err { - None => { - let pipe = unsafe { (*pipe_cell_ptr).take() }; - Ok(~UvPipeStream::new(pipe) as ~RtioPipe) - } - Some(e) => Err(uv_error_to_io_error(e)), - }; - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } + match PipeWatcher::connect(self.uv_loop(), path) { + Ok(p) => Ok(~p as ~RtioPipe), + Err(e) => Err(uv_error_to_io_error(e)), } - - assert!(!result_cell.is_empty()); - return result_cell.take(); } fn tty_open(&mut self, fd: c_int, readable: bool) -> Result<~RtioTTY, IoError> { - match tty::TTY::new(self.uv_loop(), fd, readable) { - Ok(tty) => Ok(~UvTTY { - home: get_handle_to_current_scheduler!(), - tty: tty, - fd: fd, - } as ~RtioTTY), + match TtyWatcher::new(self.uv_loop(), fd, readable) { + Ok(tty) => Ok(~tty as ~RtioTTY), Err(e) => Err(uv_error_to_io_error(e)) } } fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> { - let mut pipe = UvUnboundPipe::new(self.uv_loop()); - match pipe.pipe.open(fd) { - Ok(()) => Ok(~UvPipeStream::new(pipe) as ~RtioPipe), + match PipeWatcher::open(self.uv_loop(), fd) { + Ok(s) => Ok(~s as ~RtioPipe), Err(e) => Err(uv_error_to_io_error(e)) } } @@ -865,60 +837,6 @@ fn write_stream(mut watcher: StreamWatcher, result_cell.take() } -pub struct UvUnboundPipe { - pipe: Pipe, - priv home: SchedHandle, -} - -impl UvUnboundPipe { - /// Creates a new unbound pipe homed to the current scheduler, placed on the - /// specified event loop - pub fn new(loop_: &Loop) -> UvUnboundPipe { - UvUnboundPipe { - pipe: Pipe::new(loop_, false), - home: get_handle_to_current_scheduler!(), - } - } -} - -impl HomingIO for UvUnboundPipe { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl Drop for UvUnboundPipe { - fn drop(&mut self) { - let (_m, sched) = self.fire_homing_missile_sched(); - do sched.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.pipe.close { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - } - } - } -} - -pub struct UvPipeStream { - priv inner: UvUnboundPipe, -} - -impl UvPipeStream { - pub fn new(inner: UvUnboundPipe) -> UvPipeStream { - UvPipeStream { inner: inner } - } -} - -impl RtioPipe for UvPipeStream { - fn read(&mut self, buf: &mut [u8]) -> Result { - let (_m, scheduler) = self.inner.fire_homing_missile_sched(); - read_stream(self.inner.pipe.as_stream(), scheduler, buf) - } - fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let (_m, scheduler) = self.inner.fire_homing_missile_sched(); - write_stream(self.inner.pipe.as_stream(), scheduler, buf) - } -} - pub struct UvTcpStream { priv watcher: TcpWatcher, priv home: SchedHandle, @@ -1307,129 +1225,6 @@ impl RtioFileStream for UvFileStream { } } -pub struct UvUnixListener { - priv inner: UvUnboundPipe -} - -impl HomingIO for UvUnixListener { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.inner.home() } -} - -impl UvUnixListener { - fn new(pipe: UvUnboundPipe) -> UvUnixListener { - UvUnixListener { inner: pipe } - } -} - -impl RtioUnixListener for UvUnixListener { - fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> { - let _m = self.fire_homing_missile(); - let acceptor = ~UvUnixAcceptor::new(*self); - let incoming = Cell::new(acceptor.incoming.clone()); - let mut stream = acceptor.listener.inner.pipe.as_stream(); - let res = do stream.listen |mut server, status| { - do incoming.with_mut_ref |incoming| { - let inc = match status { - Some(e) => Err(uv_error_to_io_error(e)), - None => { - let pipe = UvUnboundPipe::new(&server.event_loop()); - server.accept(pipe.pipe.as_stream()); - Ok(~UvPipeStream::new(pipe) as ~RtioPipe) - } - }; - incoming.send(inc); - } - }; - match res { - Ok(()) => Ok(acceptor as ~RtioUnixAcceptor), - Err(e) => Err(uv_error_to_io_error(e)), - } - } -} - -pub struct UvTTY { - tty: tty::TTY, - home: SchedHandle, - fd: c_int, -} - -impl HomingIO for UvTTY { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } -} - -impl Drop for UvTTY { - fn drop(&mut self) { - // TTY handles are used for the logger in a task, so this destructor is - // run when a task is destroyed. When a task is being destroyed, a local - // scheduler isn't available, so we can't do the normal "take the - // scheduler and resume once close is done". Instead close operations on - // a TTY are asynchronous. - self.tty.close_async(); - } -} - -impl RtioTTY for UvTTY { - fn read(&mut self, buf: &mut [u8]) -> Result { - let (_m, scheduler) = self.fire_homing_missile_sched(); - read_stream(self.tty.as_stream(), scheduler, buf) - } - - fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { - let (_m, scheduler) = self.fire_homing_missile_sched(); - write_stream(self.tty.as_stream(), scheduler, buf) - } - - fn set_raw(&mut self, raw: bool) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - match self.tty.set_mode(raw) { - Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e)) - } - } - - fn get_winsize(&mut self) -> Result<(int, int), IoError> { - let _m = self.fire_homing_missile(); - match self.tty.get_winsize() { - Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e)) - } - } - - fn isatty(&self) -> bool { - unsafe { uvll::uv_guess_handle(self.fd) == uvll::UV_TTY } - } -} - -pub struct UvUnixAcceptor { - listener: UvUnixListener, - incoming: Tube>, -} - -impl HomingIO for UvUnixAcceptor { - fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } -} - -impl UvUnixAcceptor { - fn new(listener: UvUnixListener) -> UvUnixAcceptor { - UvUnixAcceptor { listener: listener, incoming: Tube::new() } - } -} - -impl RtioUnixAcceptor for UvUnixAcceptor { - fn accept(&mut self) -> Result<~RtioPipe, IoError> { - let _m = self.fire_homing_missile(); - self.incoming.recv() - } - - fn accept_simultaneously(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - accept_simultaneously(self.listener.inner.pipe.as_stream(), 1) - } - - fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { - let _m = self.fire_homing_missile(); - accept_simultaneously(self.listener.inner.pipe.as_stream(), 0) - } -} - // this function is full of lies unsafe fn local_io() -> &'static mut IoFactory { do Local::borrow |sched: &mut Scheduler| { diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 8684537f4e4..f8b87abb9f6 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -213,8 +213,6 @@ pub trait RtioUnixListener { pub trait RtioUnixAcceptor { fn accept(&mut self) -> Result<~RtioPipe, IoError>; - fn accept_simultaneously(&mut self) -> Result<(), IoError>; - fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; } pub trait RtioTTY {