auto merge of #11294 : alexcrichton/rust/native-timer, r=brson

Commit messages have the fun details

Closes #10925
This commit is contained in:
bors 2014-01-22 20:41:29 -08:00
commit 7ea063ea0f
11 changed files with 1236 additions and 30 deletions

View file

@ -45,5 +45,6 @@ pub fn wait_for_other_tasks() {
TASK_LOCK.wait();
}
TASK_LOCK.unlock();
TASK_LOCK.destroy();
}
}

View file

@ -46,6 +46,22 @@ pub mod file;
pub mod process;
pub mod net;
#[cfg(target_os = "macos")]
#[cfg(target_os = "freebsd")]
#[path = "timer_other.rs"]
pub mod timer;
#[cfg(target_os = "linux")]
#[cfg(target_os = "android")]
#[path = "timer_timerfd.rs"]
pub mod timer;
#[cfg(target_os = "win32")]
#[path = "timer_win32.rs"]
pub mod timer;
mod timer_helper;
type IoResult<T> = Result<T, IoError>;
fn unimpl() -> IoError {
@ -249,7 +265,7 @@ impl rtio::IoFactory for IoFactory {
// misc
fn timer_init(&mut self) -> IoResult<~RtioTimer> {
Err(unimpl())
timer::Timer::new().map(|t| ~t as ~RtioTimer)
}
fn spawn(&mut self, config: ProcessConfig)
-> IoResult<(~RtioProcess, ~[Option<~RtioPipe>])> {

View file

@ -0,0 +1,143 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Implementation of the helper thread for the timer module
//!
//! This module contains the management necessary for the timer worker thread.
//! This thread is responsible for performing the send()s on channels for timers
//! that are using channels instead of a blocking call.
//!
//! The timer thread is lazily initialized, and it's shut down via the
//! `shutdown` function provided. It must be maintained as an invariant that
//! `shutdown` is only called when the entire program is finished. No new timers
//! can be created in the future and there must be no active timers at that
//! time.
use std::cast;
use std::rt;
use std::unstable::mutex::{Once, ONCE_INIT};
use bookkeeping;
use io::timer::{Req, Shutdown};
use task;
// You'll note that these variables are *not* protected by a lock. These
// variables are initialized with a Once before any Timer is created and are
// only torn down after everything else has exited. This means that these
// variables are read-only during use (after initialization) and both of which
// are safe to use concurrently.
static mut HELPER_CHAN: *mut SharedChan<Req> = 0 as *mut SharedChan<Req>;
static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
pub fn boot(helper: fn(imp::signal, Port<Req>)) {
static mut INIT: Once = ONCE_INIT;
unsafe {
INIT.doit(|| {
let (msgp, msgc) = SharedChan::new();
HELPER_CHAN = cast::transmute(~msgc);
let (receive, send) = imp::new();
HELPER_SIGNAL = send;
do task::spawn {
bookkeeping::decrement();
helper(receive, msgp);
}
rt::at_exit(proc() { shutdown() });
})
}
}
pub fn send(req: Req) {
unsafe {
assert!(!HELPER_CHAN.is_null());
(*HELPER_CHAN).send(req);
imp::signal(HELPER_SIGNAL);
}
}
fn shutdown() {
// We want to wait for the entire helper task to exit, and in doing so it
// will attempt to decrement the global task count. When the helper was
// created, it decremented the count so it wouldn't count towards preventing
// the program to exit, so here we pair that manual decrement with a manual
// increment. We will then wait for the helper thread to exit by calling
// wait_for_other_tasks.
bookkeeping::increment();
// Request a shutdown, and then wait for the task to exit
send(Shutdown);
bookkeeping::wait_for_other_tasks();
// Clean up after ther helper thread
unsafe {
imp::close(HELPER_SIGNAL);
let _chan: ~SharedChan<Req> = cast::transmute(HELPER_CHAN);
HELPER_CHAN = 0 as *mut SharedChan<Req>;
HELPER_SIGNAL = 0 as imp::signal;
}
}
#[cfg(unix)]
mod imp {
use std::libc;
use std::os;
use io::file::FileDesc;
pub type signal = libc::c_int;
pub fn new() -> (signal, signal) {
let pipe = os::pipe();
(pipe.input, pipe.out)
}
pub fn signal(fd: libc::c_int) {
FileDesc::new(fd, false).inner_write([0]);
}
pub fn close(fd: libc::c_int) {
let _fd = FileDesc::new(fd, true);
}
}
#[cfg(windows)]
mod imp {
use std::libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle};
use std::ptr;
use std::libc;
pub type signal = HANDLE;
pub fn new() -> (HANDLE, HANDLE) {
unsafe {
let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE,
ptr::null());
(handle, handle)
}
}
pub fn signal(handle: HANDLE) {
unsafe { SetEvent(handle); }
}
pub fn close(handle: HANDLE) {
unsafe { CloseHandle(handle); }
}
extern "system" {
fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES,
bManualReset: BOOL,
bInitialState: BOOL,
lpName: LPCSTR) -> HANDLE;
fn SetEvent(hEvent: HANDLE) -> BOOL;
}
}

View file

@ -0,0 +1,328 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Timers for non-linux/non-windows OSes
//!
//! This module implements timers with a worker thread, select(), and a lot of
//! witchcraft that turns out to be horribly inaccurate timers. The unfortunate
//! part is that I'm at a loss of what else to do one these OSes. This is also
//! why linux has a specialized timerfd implementation and windows has its own
//! implementation (they're more accurate than this one).
//!
//! The basic idea is that there is a worker thread that's communicated to via a
//! channel and a pipe, the pipe is used by the worker thread in a select()
//! syscall with a timeout. The timeout is the "next timer timeout" while the
//! channel is used to send data over to the worker thread.
//!
//! Whenever the call to select() times out, then a channel receives a message.
//! Whenever the call returns that the file descriptor has information, then the
//! channel from timers is drained, enqueueing all incoming requests.
//!
//! The actual implementation of the helper thread is a sorted array of
//! timers in terms of target firing date. The target is the absolute time at
//! which the timer should fire. Timers are then re-enqueued after a firing if
//! the repeat boolean is set.
//!
//! Naturally, all this logic of adding times and keeping track of
//! relative/absolute time is a little lossy and not quite exact. I've done the
//! best I could to reduce the amount of calls to 'now()', but there's likely
//! still inaccuracies trickling in here and there.
//!
//! One of the tricky parts of this implementation is that whenever a timer is
//! acted upon, it must cancel whatever the previous action was (if one is
//! active) in order to act like the other implementations of this timer. In
//! order to do this, the timer's inner pointer is transferred to the worker
//! thread. Whenever the timer is modified, it first takes ownership back from
//! the worker thread in order to modify the same data structure. This has the
//! side effect of "cancelling" the previous requests while allowing a
//! re-enqueueing later on.
//!
//! Note that all time units in this file are in *milliseconds*.
use std::comm::Data;
use std::hashmap::HashMap;
use std::libc;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::sync::atomics;
use std::unstable::intrinsics;
use io::file::FileDesc;
use io::IoResult;
use io::timer_helper;
pub struct Timer {
priv id: uint,
priv inner: Option<~Inner>,
}
struct Inner {
chan: Option<Chan<()>>,
interval: u64,
repeat: bool,
target: u64,
id: uint,
}
pub enum Req {
// Add a new timer to the helper thread.
NewTimer(~Inner),
// Remove a timer based on its id and then send it back on the channel
// provided
RemoveTimer(uint, Chan<~Inner>),
// Shut down the loop and then ACK this channel once it's shut down
Shutdown,
}
// returns the current time (in milliseconds)
fn now() -> u64 {
unsafe {
let mut now: libc::timeval = intrinsics::init();
assert_eq!(imp::gettimeofday(&mut now, ptr::null()), 0);
return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000;
}
}
fn helper(input: libc::c_int, messages: Port<Req>) {
let mut set: imp::fd_set = unsafe { intrinsics::init() };
let mut fd = FileDesc::new(input, true);
let mut timeout: libc::timeval = unsafe { intrinsics::init() };
// active timers are those which are able to be selected upon (and it's a
// sorted list, and dead timers are those which have expired, but ownership
// hasn't yet been transferred back to the timer itself.
let mut active: ~[~Inner] = ~[];
let mut dead = HashMap::new();
// inserts a timer into an array of timers (sorted by firing time)
fn insert(t: ~Inner, active: &mut ~[~Inner]) {
match active.iter().position(|tm| tm.target > t.target) {
Some(pos) => { active.insert(pos, t); }
None => { active.push(t); }
}
}
// signals the first requests in the queue, possible re-enqueueing it.
fn signal(active: &mut ~[~Inner], dead: &mut HashMap<uint, ~Inner>) {
let mut timer = match active.shift() {
Some(timer) => timer, None => return
};
let chan = timer.chan.take_unwrap();
if chan.try_send(()) && timer.repeat {
timer.chan = Some(chan);
timer.target += timer.interval;
insert(timer, active);
} else {
drop(chan);
dead.insert(timer.id, timer);
}
}
'outer: loop {
let timeout = match active {
// Empty array? no timeout (wait forever for the next request)
[] => ptr::null(),
[~Inner { target, .. }, ..] => {
let now = now();
// If this request has already expired, then signal it and go
// through another iteration
if target <= now {
signal(&mut active, &mut dead);
continue;
}
// The actual timeout listed in the requests array is an
// absolute date, so here we translate the absolute time to a
// relative time.
let tm = target - now;
timeout.tv_sec = (tm / 1000) as libc::time_t;
timeout.tv_usec = ((tm % 1000) * 1000) as libc::suseconds_t;
&timeout as *libc::timeval
}
};
imp::fd_set(&mut set, input);
match unsafe {
imp::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
} {
// timed out
0 => signal(&mut active, &mut dead),
// file descriptor write woke us up, we've got some new requests
1 => {
loop {
match messages.try_recv() {
Data(Shutdown) => {
assert!(active.len() == 0);
break 'outer;
}
Data(NewTimer(timer)) => insert(timer, &mut active),
Data(RemoveTimer(id, ack)) => {
match dead.pop(&id) {
Some(i) => { ack.send(i); continue }
None => {}
}
let i = active.iter().position(|i| i.id == id);
let i = i.expect("no timer found");
let t = active.remove(i).unwrap();
ack.send(t);
}
_ => break
}
}
// drain the file descriptor
let mut buf = [0];
fd.inner_read(buf);
}
-1 if os::errno() == libc::EINTR as int => {}
n => fail!("helper thread failed in select() with error: {} ({})",
n, os::last_os_error())
}
}
}
impl Timer {
pub fn new() -> IoResult<Timer> {
timer_helper::boot(helper);
static mut ID: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
let id = unsafe { ID.fetch_add(1, atomics::Relaxed) };
Ok(Timer {
id: id,
inner: Some(~Inner {
chan: None,
interval: 0,
target: 0,
repeat: false,
id: id,
})
})
}
pub fn sleep(ms: u64) {
unsafe { libc::usleep((ms * 1000) as libc::c_uint); }
}
fn inner(&mut self) -> ~Inner {
match self.inner.take() {
Some(i) => i,
None => {
let (p, c) = Chan::new();
timer_helper::send(RemoveTimer(self.id, c));
p.recv()
}
}
}
}
impl rtio::RtioTimer for Timer {
fn sleep(&mut self, msecs: u64) {
let mut inner = self.inner();
inner.chan = None; // cancel any previous request
self.inner = Some(inner);
Timer::sleep(msecs);
}
fn oneshot(&mut self, msecs: u64) -> Port<()> {
let now = now();
let mut inner = self.inner();
let (p, c) = Chan::new();
inner.repeat = false;
inner.chan = Some(c);
inner.interval = msecs;
inner.target = now + msecs;
timer_helper::send(NewTimer(inner));
return p;
}
fn period(&mut self, msecs: u64) -> Port<()> {
let now = now();
let mut inner = self.inner();
let (p, c) = Chan::new();
inner.repeat = true;
inner.chan = Some(c);
inner.interval = msecs;
inner.target = now + msecs;
timer_helper::send(NewTimer(inner));
return p;
}
}
impl Drop for Timer {
fn drop(&mut self) {
self.inner = Some(self.inner());
}
}
#[cfg(target_os = "macos")]
mod imp {
use std::libc;
pub static FD_SETSIZE: uint = 1024;
pub struct fd_set {
fds_bits: [i32, ..(FD_SETSIZE / 32)]
}
pub fn fd_set(set: &mut fd_set, fd: i32) {
set.fds_bits[fd / 32] |= 1 << (fd % 32);
}
extern {
pub fn select(nfds: libc::c_int,
readfds: *fd_set,
writefds: *fd_set,
errorfds: *fd_set,
timeout: *libc::timeval) -> libc::c_int;
pub fn gettimeofday(timeval: *mut libc::timeval,
tzp: *libc::c_void) -> libc::c_int;
}
}
#[cfg(target_os = "freebsd")]
mod imp {
use std::libc;
pub static FD_SETSIZE: uint = 1024;
pub struct fd_set {
fds_bits: [u64, ..(FD_SETSIZE / 64)]
}
pub fn fd_set(set: &mut fd_set, fd: i32) {
set.fds_bits[fd / 64] |= (1 << (fd % 64)) as u64;
}
extern {
pub fn select(nfds: libc::c_int,
readfds: *fd_set,
writefds: *fd_set,
errorfds: *fd_set,
timeout: *libc::timeval) -> libc::c_int;
pub fn gettimeofday(timeval: *mut libc::timeval,
tzp: *libc::c_void) -> libc::c_int;
}
}

View file

@ -0,0 +1,303 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Timers based on timerfd_create(2)
//!
//! On OSes which support timerfd_create, we can use these much more accurate
//! timers over select() + a timeout (see timer_other.rs). This strategy still
//! employs a worker thread which does the waiting on the timer fds (to send
//! messages away).
//!
//! The worker thread in this implementation uses epoll(7) to block. It
//! maintains a working set of *all* native timers in the process, along with a
//! pipe file descriptor used to communicate that there is data available on the
//! incoming channel to the worker thread. Timers send requests to update their
//! timerfd settings to the worker thread (see the comment above 'oneshot' for
//! why).
//!
//! As with timer_other, timers just using sleep() do not use the timerfd at
//! all. They remove the timerfd from the worker thread and then invoke usleep()
//! to block the calling thread.
//!
//! As with timer_other, all units in this file are in units of millseconds.
use std::comm::Data;
use std::libc;
use std::ptr;
use std::os;
use std::rt::rtio;
use std::hashmap::HashMap;
use std::unstable::intrinsics;
use io::file::FileDesc;
use io::IoResult;
use io::timer_helper;
pub struct Timer {
priv fd: FileDesc,
priv on_worker: bool,
}
pub enum Req {
NewTimer(libc::c_int, Chan<()>, bool, imp::itimerspec),
RemoveTimer(libc::c_int, Chan<()>),
Shutdown,
}
fn helper(input: libc::c_int, messages: Port<Req>) {
let efd = unsafe { imp::epoll_create(10) };
let _fd1 = FileDesc::new(input, true);
let _fd2 = FileDesc::new(efd, true);
fn add(efd: libc::c_int, fd: libc::c_int) {
let event = imp::epoll_event {
events: imp::EPOLLIN as u32,
data: imp::epoll_data_t { fd: fd, pad: 0, }
};
let ret = unsafe {
imp::epoll_ctl(efd, imp::EPOLL_CTL_ADD, fd, &event)
};
assert_eq!(ret, 0);
}
fn del(efd: libc::c_int, fd: libc::c_int) {
let event = imp::epoll_event {
events: 0, data: imp::epoll_data_t { fd: 0, pad: 0, }
};
let ret = unsafe {
imp::epoll_ctl(efd, imp::EPOLL_CTL_DEL, fd, &event)
};
assert_eq!(ret, 0);
}
add(efd, input);
let events: [imp::epoll_event, ..16] = unsafe { intrinsics::init() };
let mut map: HashMap<libc::c_int, (Chan<()>, bool)> = HashMap::new();
'outer: loop {
let n = match unsafe {
imp::epoll_wait(efd, events.as_ptr(),
events.len() as libc::c_int, -1)
} {
0 => fail!("epoll_wait returned immediately!"),
-1 => fail!("epoll wait failed: {}", os::last_os_error()),
n => n
};
let mut incoming = false;
debug!("{} events to process", n);
for event in events.slice_to(n as uint).iter() {
let fd = event.data.fd;
debug!("data on fd {} (input = {})", fd, input);
if fd == input {
let mut buf = [0, ..1];
// drain the input file descriptor of its input
FileDesc::new(fd, false).inner_read(buf);
incoming = true;
} else {
let mut bits = [0, ..8];
// drain the timerfd of how many times its fired
//
// XXX: should this perform a send() this number of
// times?
FileDesc::new(fd, false).inner_read(bits);
let remove = {
match map.find(&fd).expect("fd unregistered") {
&(ref c, oneshot) => !c.try_send(()) || oneshot
}
};
if remove {
map.remove(&fd);
del(efd, fd);
}
}
}
while incoming {
match messages.try_recv() {
Data(NewTimer(fd, chan, one, timeval)) => {
// acknowledge we have the new channel, we will never send
// another message to the old channel
chan.send(());
// If we haven't previously seen the file descriptor, then
// we need to add it to the epoll set.
if map.insert(fd, (chan, one)) {
add(efd, fd);
}
// Update the timerfd's time value now that we have control
// of the timerfd
let ret = unsafe {
imp::timerfd_settime(fd, 0, &timeval, ptr::null())
};
assert_eq!(ret, 0);
}
Data(RemoveTimer(fd, chan)) => {
if map.remove(&fd) {
del(efd, fd);
}
chan.send(());
}
Data(Shutdown) => {
assert!(map.len() == 0);
break 'outer;
}
_ => break,
}
}
}
}
impl Timer {
pub fn new() -> IoResult<Timer> {
timer_helper::boot(helper);
match unsafe { imp::timerfd_create(imp::CLOCK_MONOTONIC, 0) } {
-1 => Err(super::last_error()),
n => Ok(Timer { fd: FileDesc::new(n, true), on_worker: false, }),
}
}
pub fn sleep(ms: u64) {
unsafe { libc::usleep((ms * 1000) as libc::c_uint); }
}
fn remove(&mut self) {
if !self.on_worker { return }
let (p, c) = Chan::new();
timer_helper::send(RemoveTimer(self.fd.fd(), c));
p.recv();
self.on_worker = false;
}
}
impl rtio::RtioTimer for Timer {
fn sleep(&mut self, msecs: u64) {
self.remove();
Timer::sleep(msecs);
}
// Periodic and oneshot channels are updated by updating the settings on the
// corresopnding timerfd. The update is not performed on the thread calling
// oneshot or period, but rather the helper epoll thread. The reason for
// this is to avoid losing messages and avoid leaking messages across ports.
//
// By updating the timerfd on the helper thread, we're guaranteed that all
// messages for a particular setting of the timer will be received by the
// new channel/port pair rather than leaking old messages onto the new port
// or leaking new messages onto the old port.
//
// We also wait for the remote thread to actually receive the new settings
// before returning to guarantee the invariant that when oneshot() and
// period() return that the old port will never receive any more messages.
fn oneshot(&mut self, msecs: u64) -> Port<()> {
let (p, c) = Chan::new();
let new_value = imp::itimerspec {
it_interval: imp::timespec { tv_sec: 0, tv_nsec: 0 },
it_value: imp::timespec {
tv_sec: (msecs / 1000) as libc::time_t,
tv_nsec: ((msecs % 1000) * 1000000) as libc::c_long,
}
};
timer_helper::send(NewTimer(self.fd.fd(), c, true, new_value));
p.recv();
self.on_worker = true;
return p;
}
fn period(&mut self, msecs: u64) -> Port<()> {
let (p, c) = Chan::new();
let spec = imp::timespec {
tv_sec: (msecs / 1000) as libc::time_t,
tv_nsec: ((msecs % 1000) * 1000000) as libc::c_long,
};
let new_value = imp::itimerspec { it_interval: spec, it_value: spec, };
timer_helper::send(NewTimer(self.fd.fd(), c, false, new_value));
p.recv();
self.on_worker = true;
return p;
}
}
impl Drop for Timer {
fn drop(&mut self) {
// When the timerfd file descriptor is closed, it will be automatically
// removed from the epoll set of the worker thread, but we want to make
// sure that the associated channel is also removed from the worker's
// hash map.
self.remove();
}
}
#[allow(dead_code)]
mod imp {
use std::libc;
pub static CLOCK_MONOTONIC: libc::c_int = 1;
pub static EPOLL_CTL_ADD: libc::c_int = 1;
pub static EPOLL_CTL_DEL: libc::c_int = 2;
pub static EPOLL_CTL_MOD: libc::c_int = 3;
pub static EPOLLIN: libc::c_int = 0x001;
pub static EPOLLOUT: libc::c_int = 0x004;
pub static EPOLLPRI: libc::c_int = 0x002;
pub static EPOLLERR: libc::c_int = 0x008;
pub static EPOLLRDHUP: libc::c_int = 0x2000;
pub static EPOLLET: libc::c_int = 1 << 31;
pub static EPOLLHUP: libc::c_int = 0x010;
pub static EPOLLONESHOT: libc::c_int = 1 << 30;
pub struct epoll_event {
events: u32,
data: epoll_data_t,
}
pub struct epoll_data_t {
fd: i32,
pad: u32,
}
pub struct timespec {
tv_sec: libc::time_t,
tv_nsec: libc::c_long,
}
pub struct itimerspec {
it_interval: timespec,
it_value: timespec,
}
extern {
pub fn timerfd_create(clockid: libc::c_int,
flags: libc::c_int) -> libc::c_int;
pub fn timerfd_settime(fd: libc::c_int,
flags: libc::c_int,
new_value: *itimerspec,
old_value: *itimerspec) -> libc::c_int;
pub fn timerfd_gettime(fd: libc::c_int,
curr_value: *itimerspec) -> libc::c_int;
pub fn epoll_create(size: libc::c_int) -> libc::c_int;
pub fn epoll_ctl(epfd: libc::c_int,
op: libc::c_int,
fd: libc::c_int,
event: *epoll_event) -> libc::c_int;
pub fn epoll_wait(epfd: libc::c_int,
events: *epoll_event,
maxevents: libc::c_int,
timeout: libc::c_int) -> libc::c_int;
}
}

View file

@ -0,0 +1,203 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Timers based on win32 WaitableTimers
//!
//! This implementation is meant to be used solely on windows. As with other
//! implementations, there is a worker thread which is doing all the waiting on
//! a large number of timers for all active timers in the system. This worker
//! thread uses the select() equivalent, WaitForMultipleObjects. One of the
//! objects being waited on is a signal into the worker thread to notify that
//! the incoming channel should be looked at.
//!
//! Other than that, the implementation is pretty straightforward in terms of
//! the other two implementations of timers with nothing *that* new showing up.
use std::comm::Data;
use std::libc;
use std::ptr;
use std::rt::rtio;
use io::timer_helper;
use io::IoResult;
pub struct Timer {
priv obj: libc::HANDLE,
priv on_worker: bool,
}
pub enum Req {
NewTimer(libc::HANDLE, Chan<()>, bool),
RemoveTimer(libc::HANDLE, Chan<()>),
Shutdown,
}
fn helper(input: libc::HANDLE, messages: Port<Req>) {
let mut objs = ~[input];
let mut chans = ~[];
'outer: loop {
let idx = unsafe {
imp::WaitForMultipleObjects(objs.len() as libc::DWORD,
objs.as_ptr(),
0 as libc::BOOL,
libc::INFINITE)
};
if idx == 0 {
loop {
match messages.try_recv() {
Data(NewTimer(obj, c, one)) => {
objs.push(obj);
chans.push((c, one));
}
Data(RemoveTimer(obj, c)) => {
c.send(());
match objs.iter().position(|&o| o == obj) {
Some(i) => {
objs.remove(i);
chans.remove(i - 1);
}
None => {}
}
}
Data(Shutdown) => {
assert_eq!(objs.len(), 1);
assert_eq!(chans.len(), 0);
break 'outer;
}
_ => break
}
}
} else {
let remove = {
match &chans[idx - 1] {
&(ref c, oneshot) => !c.try_send(()) || oneshot
}
};
if remove {
objs.remove(idx as uint);
chans.remove(idx as uint - 1);
}
}
}
}
impl Timer {
pub fn new() -> IoResult<Timer> {
timer_helper::boot(helper);
let obj = unsafe {
imp::CreateWaitableTimerA(ptr::mut_null(), 0, ptr::null())
};
if obj.is_null() {
Err(super::last_error())
} else {
Ok(Timer { obj: obj, on_worker: false, })
}
}
pub fn sleep(ms: u64) {
use std::rt::rtio::RtioTimer;
let mut t = Timer::new().ok().expect("must allocate a timer!");
t.sleep(ms);
}
fn remove(&mut self) {
if !self.on_worker { return }
let (p, c) = Chan::new();
timer_helper::send(RemoveTimer(self.obj, c));
p.recv();
self.on_worker = false;
}
}
impl rtio::RtioTimer for Timer {
fn sleep(&mut self, msecs: u64) {
self.remove();
// there are 10^6 nanoseconds in a millisecond, and the parameter is in
// 100ns intervals, so we multiply by 10^4.
let due = -(msecs * 10000) as libc::LARGE_INTEGER;
assert_eq!(unsafe {
imp::SetWaitableTimer(self.obj, &due, 0, ptr::null(),
ptr::mut_null(), 0)
}, 1);
unsafe { imp::WaitForSingleObject(self.obj, libc::INFINITE); }
}
fn oneshot(&mut self, msecs: u64) -> Port<()> {
self.remove();
let (p, c) = Chan::new();
// see above for the calculation
let due = -(msecs * 10000) as libc::LARGE_INTEGER;
assert_eq!(unsafe {
imp::SetWaitableTimer(self.obj, &due, 0, ptr::null(),
ptr::mut_null(), 0)
}, 1);
timer_helper::send(NewTimer(self.obj, c, true));
self.on_worker = true;
return p;
}
fn period(&mut self, msecs: u64) -> Port<()> {
self.remove();
let (p, c) = Chan::new();
// see above for the calculation
let due = -(msecs * 10000) as libc::LARGE_INTEGER;
assert_eq!(unsafe {
imp::SetWaitableTimer(self.obj, &due, msecs as libc::LONG,
ptr::null(), ptr::mut_null(), 0)
}, 1);
timer_helper::send(NewTimer(self.obj, c, false));
self.on_worker = true;
return p;
}
}
impl Drop for Timer {
fn drop(&mut self) {
self.remove();
unsafe { libc::CloseHandle(self.obj); }
}
}
mod imp {
use std::libc::{LPSECURITY_ATTRIBUTES, BOOL, LPCSTR, HANDLE, LARGE_INTEGER,
LONG, LPVOID, DWORD, c_void};
pub type PTIMERAPCROUTINE = *c_void;
extern "system" {
pub fn CreateWaitableTimerA(lpTimerAttributes: LPSECURITY_ATTRIBUTES,
bManualReset: BOOL,
lpTimerName: LPCSTR) -> HANDLE;
pub fn SetWaitableTimer(hTimer: HANDLE,
pDueTime: *LARGE_INTEGER,
lPeriod: LONG,
pfnCompletionRoutine: PTIMERAPCROUTINE,
lpArgToCompletionRoutine: LPVOID,
fResume: BOOL) -> BOOL;
pub fn WaitForMultipleObjects(nCount: DWORD,
lpHandles: *HANDLE,
bWaitAll: BOOL,
dwMilliseconds: DWORD) -> DWORD;
pub fn WaitForSingleObject(hHandle: HANDLE,
dwMilliseconds: DWORD) -> DWORD;
}
}

View file

@ -34,6 +34,7 @@ macro_rules! iotest (
use io::net::udp::*;
#[cfg(unix)]
use io::net::unix::*;
use io::timer::*;
use io::process::*;
use str;
use util;

View file

@ -96,61 +96,177 @@ impl Timer {
#[cfg(test)]
mod test {
use prelude::*;
use super::*;
#[test]
fn test_io_timer_sleep_simple() {
iotest!(fn test_io_timer_sleep_simple() {
let mut timer = Timer::new().unwrap();
timer.sleep(1);
}
})
#[test]
fn test_io_timer_sleep_oneshot() {
iotest!(fn test_io_timer_sleep_oneshot() {
let mut timer = Timer::new().unwrap();
timer.oneshot(1).recv();
}
})
#[test]
fn test_io_timer_sleep_oneshot_forget() {
iotest!(fn test_io_timer_sleep_oneshot_forget() {
let mut timer = Timer::new().unwrap();
timer.oneshot(100000000000);
}
})
#[test]
fn oneshot_twice() {
iotest!(fn oneshot_twice() {
let mut timer = Timer::new().unwrap();
let port1 = timer.oneshot(10000);
let port = timer.oneshot(1);
port.recv();
assert!(port1.recv_opt().is_none());
}
assert_eq!(port1.recv_opt(), None);
})
#[test]
fn test_io_timer_oneshot_then_sleep() {
iotest!(fn test_io_timer_oneshot_then_sleep() {
let mut timer = Timer::new().unwrap();
let port = timer.oneshot(100000000000);
timer.sleep(1); // this should invalidate the port
assert!(port.recv_opt().is_none());
}
#[test]
fn test_io_timer_sleep_periodic() {
assert_eq!(port.recv_opt(), None);
})
iotest!(fn test_io_timer_sleep_periodic() {
let mut timer = Timer::new().unwrap();
let port = timer.periodic(1);
port.recv();
port.recv();
port.recv();
}
})
#[test]
fn test_io_timer_sleep_periodic_forget() {
iotest!(fn test_io_timer_sleep_periodic_forget() {
let mut timer = Timer::new().unwrap();
timer.periodic(100000000000);
}
})
#[test]
fn test_io_timer_sleep_standalone() {
iotest!(fn test_io_timer_sleep_standalone() {
sleep(1)
}
})
iotest!(fn oneshot() {
let mut timer = Timer::new().unwrap();
let port = timer.oneshot(1);
port.recv();
assert!(port.recv_opt().is_none());
let port = timer.oneshot(1);
port.recv();
assert!(port.recv_opt().is_none());
})
iotest!(fn override() {
let mut timer = Timer::new().unwrap();
let oport = timer.oneshot(100);
let pport = timer.periodic(100);
timer.sleep(1);
assert_eq!(oport.recv_opt(), None);
assert_eq!(pport.recv_opt(), None);
timer.oneshot(1).recv();
})
iotest!(fn period() {
let mut timer = Timer::new().unwrap();
let port = timer.periodic(1);
port.recv();
port.recv();
let port2 = timer.periodic(1);
port2.recv();
port2.recv();
})
iotest!(fn sleep() {
let mut timer = Timer::new().unwrap();
timer.sleep(1);
timer.sleep(1);
})
iotest!(fn oneshot_fail() {
let mut timer = Timer::new().unwrap();
let _port = timer.oneshot(1);
fail!();
} #[should_fail])
iotest!(fn period_fail() {
let mut timer = Timer::new().unwrap();
let _port = timer.periodic(1);
fail!();
} #[should_fail])
iotest!(fn normal_fail() {
let _timer = Timer::new().unwrap();
fail!();
} #[should_fail])
iotest!(fn closing_channel_during_drop_doesnt_kill_everything() {
// see issue #10375
let mut timer = Timer::new().unwrap();
let timer_port = timer.periodic(1000);
do spawn {
timer_port.recv_opt();
}
// when we drop the TimerWatcher we're going to destroy the channel,
// which must wake up the task on the other end
})
iotest!(fn reset_doesnt_switch_tasks() {
// similar test to the one above.
let mut timer = Timer::new().unwrap();
let timer_port = timer.periodic(1000);
do spawn {
timer_port.recv_opt();
}
timer.oneshot(1);
})
iotest!(fn reset_doesnt_switch_tasks2() {
// similar test to the one above.
let mut timer = Timer::new().unwrap();
let timer_port = timer.periodic(1000);
do spawn {
timer_port.recv_opt();
}
timer.sleep(1);
})
iotest!(fn sender_goes_away_oneshot() {
let port = {
let mut timer = Timer::new().unwrap();
timer.oneshot(1000)
};
assert_eq!(port.recv_opt(), None);
})
iotest!(fn sender_goes_away_period() {
let port = {
let mut timer = Timer::new().unwrap();
timer.periodic(1000)
};
assert_eq!(port.recv_opt(), None);
})
iotest!(fn receiver_goes_away_oneshot() {
let mut timer1 = Timer::new().unwrap();
timer1.oneshot(1);
let mut timer2 = Timer::new().unwrap();
// while sleeping, the prevous timer should fire and not have its
// callback do something terrible.
timer2.sleep(2);
})
iotest!(fn receiver_goes_away_period() {
let mut timer1 = Timer::new().unwrap();
timer1.periodic(1);
let mut timer2 = Timer::new().unwrap();
// while sleeping, the prevous timer should fire and not have its
// callback do something terrible.
timer2.sleep(2);
})
}

View file

@ -3548,6 +3548,7 @@ pub mod funcs {
pub fn setsid() -> pid_t;
pub fn setuid(uid: uid_t) -> c_int;
pub fn sleep(secs: c_uint) -> c_uint;
pub fn usleep(secs: c_uint) -> c_int;
pub fn sysconf(name: c_int) -> c_long;
pub fn tcgetpgrp(fd: c_int) -> pid_t;
pub fn ttyname(fd: c_int) -> *c_char;

View file

@ -0,0 +1,72 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Implementation of running at_exit routines
//!
//! Documentation can be found on the `rt::at_exit` function.
use cast;
use iter::Iterator;
use option::{Some, None};
use ptr::RawPtr;
use unstable::sync::Exclusive;
use util;
use vec::OwnedVector;
type Queue = Exclusive<~[proc()]>;
// You'll note that these variables are *not* atomic, and this is done on
// purpose. This module is designed to have init() called *once* in a
// single-task context, and then run() is called only once in another
// single-task context. As a result of this, only the `push` function is
// thread-safe, and it assumes that the `init` function has run previously.
static mut QUEUE: *mut Queue = 0 as *mut Queue;
static mut RUNNING: bool = false;
pub fn init() {
unsafe {
rtassert!(!RUNNING);
rtassert!(QUEUE.is_null());
let state: ~Queue = ~Exclusive::new(~[]);
QUEUE = cast::transmute(state);
}
}
pub fn push(f: proc()) {
unsafe {
rtassert!(!RUNNING);
rtassert!(!QUEUE.is_null());
let state: &mut Queue = cast::transmute(QUEUE);
let mut f = Some(f);
state.with(|arr| {
arr.push(f.take_unwrap());
});
}
}
pub fn run() {
let vec = unsafe {
rtassert!(!RUNNING);
rtassert!(!QUEUE.is_null());
RUNNING = true;
let state: ~Queue = cast::transmute(QUEUE);
QUEUE = 0 as *mut Queue;
let mut vec = None;
state.with(|arr| {
vec = Some(util::replace(arr, ~[]));
});
vec.take_unwrap()
};
for f in vec.move_iter() {
f();
}
}

View file

@ -127,6 +127,9 @@ mod util;
// Global command line argument storage
pub mod args;
// Support for running procedures when a program has exited.
mod at_exit_imp;
/// The default error code of the rust runtime if the main task fails instead
/// of exiting cleanly.
pub static DEFAULT_ERROR_CODE: int = 101;
@ -171,9 +174,27 @@ pub fn init(argc: int, argv: **u8) {
env::init();
logging::init();
local_ptr::init();
at_exit_imp::init();
}
}
/// Enqueues a procedure to run when the runtime is cleaned up
///
/// The procedure passed to this function will be executed as part of the
/// runtime cleanup phase. For normal rust programs, this means that it will run
/// after all other tasks have exited.
///
/// The procedure is *not* executed with a local `Task` available to it, so
/// primitives like logging, I/O, channels, spawning, etc, are *not* available.
/// This is meant for "bare bones" usage to clean up runtime details, this is
/// not meant as a general-purpose "let's clean everything up" function.
///
/// It is forbidden for procedures to register more `at_exit` handlers when they
/// are running, and doing so will lead to a process abort.
pub fn at_exit(f: proc()) {
at_exit_imp::push(f);
}
/// One-time runtime cleanup.
///
/// This function is unsafe because it performs no checks to ensure that the
@ -184,6 +205,7 @@ pub fn init(argc: int, argv: **u8) {
/// Invoking cleanup while portions of the runtime are still in use may cause
/// undefined behavior.
pub unsafe fn cleanup() {
at_exit_imp::run();
args::cleanup();
local_ptr::cleanup();
}