std: high-level libuv-leverage APIs now take a hl_loop as arg (tcp/timer)

This commit is contained in:
Jeff Olson 2012-05-22 16:33:33 -07:00 committed by Brian Anderson
parent b0b175214a
commit 92e9e736fa
2 changed files with 55 additions and 34 deletions

View file

@ -83,8 +83,9 @@ Initiate a client connection over TCP/IP
# Arguments # Arguments
* ip - The IP address (versions 4 or 6) of the remote host * `ip` - The IP address (versions 4 or 6) of the remote host
* port - the unsigned integer of the desired remote host port * `port` - the unsigned integer of the desired remote host port
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
# Returns # Returns
@ -92,7 +93,8 @@ A `result` that, if the operation succeeds, contains a `tcp_socket` that
can be used to send and receive data to/from the remote host. In the event can be used to send and receive data to/from the remote host. In the event
of failure, a `tcp_err_data` will be returned of failure, a `tcp_err_data` will be returned
"] "]
fn connect(input_ip: ip::ip_addr, port: uint) fn connect(input_ip: ip::ip_addr, port: uint,
hl_loop: uv::hl::high_level_loop)
-> result::result<tcp_socket, tcp_err_data> unsafe { -> result::result<tcp_socket, tcp_err_data> unsafe {
let result_po = comm::port::<conn_attempt>(); let result_po = comm::port::<conn_attempt>();
let closed_signal_po = comm::port::<()>(); let closed_signal_po = comm::port::<()>();
@ -101,7 +103,6 @@ fn connect(input_ip: ip::ip_addr, port: uint)
closed_signal_ch: comm::chan(closed_signal_po) closed_signal_ch: comm::chan(closed_signal_po)
}; };
let conn_data_ptr = ptr::addr_of(conn_data); let conn_data_ptr = ptr::addr_of(conn_data);
let hl_loop = uv::global_loop::get();
let reader_po = comm::port::<result::result<[u8], tcp_err_data>>(); let reader_po = comm::port::<result::result<[u8], tcp_err_data>>();
let stream_handle_ptr = malloc_uv_tcp_t(); let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
@ -343,6 +344,7 @@ Bind to a given IP/port and listen for new connections
* `port` - a uint representing the port to listen on * `port` - a uint representing the port to listen on
* `backlog` - a uint representing the number of incoming connections * `backlog` - a uint representing the number of incoming connections
to cache in memory to cache in memory
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
# Returns # Returns
@ -350,11 +352,11 @@ A `result` instance containing either a `tcp_conn_port` which can used
to listen for, and accept, new connections, or a `tcp_err_data` if to listen for, and accept, new connections, or a `tcp_err_data` if
failure to create the tcp listener occurs failure to create the tcp listener occurs
"] "]
fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint) fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint,
hl_loop: uv::hl::high_level_loop)
-> result::result<tcp_conn_port, tcp_err_data> unsafe { -> result::result<tcp_conn_port, tcp_err_data> unsafe {
let stream_closed_po = comm::port::<()>(); let stream_closed_po = comm::port::<()>();
let stream_closed_ch = comm::chan(stream_closed_po); let stream_closed_ch = comm::chan(stream_closed_po);
let hl_loop = uv::global_loop::get();
let new_conn_po = comm::port::<result::result<*uv::ll::uv_tcp_t, let new_conn_po = comm::port::<result::result<*uv::ll::uv_tcp_t,
tcp_err_data>>(); tcp_err_data>>();
let new_conn_ch = comm::chan(new_conn_po); let new_conn_ch = comm::chan(new_conn_po);
@ -653,6 +655,7 @@ Bind to a given IP/port and listen for new connections
* `port` - a uint representing the port to listen on * `port` - a uint representing the port to listen on
* `backlog` - a uint representing the number of incoming connections * `backlog` - a uint representing the number of incoming connections
to cache in memory to cache in memory
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
* `on_establish_cb` - a callback that is evaluated if/when the listener * `on_establish_cb` - a callback that is evaluated if/when the listener
is successfully established. it takes no parameters is successfully established. it takes no parameters
* `new_connect_cb` - a callback to be evaluated, on the libuv thread, * `new_connect_cb` - a callback to be evaluated, on the libuv thread,
@ -671,6 +674,7 @@ successful/normal shutdown, and a `tcp_err_data` record in the event
of listen exiting because of an error of listen exiting because of an error
"] "]
fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint, fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint,
hl_loop: uv::hl::high_level_loop,
on_establish_cb: fn~(comm::chan<option<tcp_err_data>>), on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
new_connect_cb: fn~(tcp_new_connection, new_connect_cb: fn~(tcp_new_connection,
comm::chan<option<tcp_err_data>>)) comm::chan<option<tcp_err_data>>))
@ -680,7 +684,6 @@ fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint,
let kill_ch = comm::chan(kill_po); let kill_ch = comm::chan(kill_po);
let server_stream = uv::ll::tcp_t(); let server_stream = uv::ll::tcp_t();
let server_stream_ptr = ptr::addr_of(server_stream); let server_stream_ptr = ptr::addr_of(server_stream);
let hl_loop = uv::global_loop::get();
let server_data = { let server_data = {
server_stream_ptr: server_stream_ptr, server_stream_ptr: server_stream_ptr,
stream_closed_ch: comm::chan(stream_closed_po), stream_closed_ch: comm::chan(stream_closed_po),
@ -804,8 +807,9 @@ impl sock_methods for tcp_socket {
// shared implementation for tcp::read // shared implementation for tcp::read
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint) fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
-> result::result<[u8],tcp_err_data> { -> result::result<[u8],tcp_err_data> unsafe {
log(debug, "starting tcp::read"); log(debug, "starting tcp::read");
let hl_loop = (*socket_data).hl_loop;
let rs_result = read_start_common_impl(socket_data); let rs_result = read_start_common_impl(socket_data);
if result::is_failure(rs_result) { if result::is_failure(rs_result) {
let err_data = result::get_err(rs_result); let err_data = result::get_err(rs_result);
@ -815,7 +819,7 @@ fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
log(debug, "tcp::read before recv_timeout"); log(debug, "tcp::read before recv_timeout");
let read_result = if timeout_msecs > 0u { let read_result = if timeout_msecs > 0u {
timer::recv_timeout( timer::recv_timeout(
timeout_msecs, result::get(rs_result)) hl_loop, timeout_msecs, result::get(rs_result))
} else { } else {
some(comm::recv(result::get(rs_result))) some(comm::recv(result::get(rs_result)))
}; };
@ -1270,7 +1274,7 @@ fn ipv4_ip_addr_to_sockaddr_in(input_ip: ip::ip_addr,
} }
} }
//#[cfg(test)] #[cfg(test)]
mod test { mod test {
// FIXME don't run on fbsd or linux 32 bit(#2064) // FIXME don't run on fbsd or linux 32 bit(#2064)
#[cfg(target_os="win32")] #[cfg(target_os="win32")]
@ -1303,6 +1307,7 @@ mod test {
} }
} }
fn impl_gl_tcp_ipv4_server_and_client() { fn impl_gl_tcp_ipv4_server_and_client() {
let hl_loop = uv::global_loop::get();
let server_ip = "127.0.0.1"; let server_ip = "127.0.0.1";
let server_port = 8888u; let server_port = 8888u;
let expected_req = "ping"; let expected_req = "ping";
@ -1321,7 +1326,8 @@ mod test {
server_port, server_port,
expected_resp, expected_resp,
server_ch, server_ch,
cont_ch) cont_ch,
hl_loop)
}; };
server_result_ch.send(actual_req); server_result_ch.send(actual_req);
}; };
@ -1333,7 +1339,8 @@ mod test {
server_ip, server_ip,
server_port, server_port,
expected_req, expected_req,
client_ch) client_ch,
hl_loop)
}; };
let actual_req = comm::recv(server_result_po); let actual_req = comm::recv(server_result_po);
log(debug, #fmt("REQ: expected: '%s' actual: '%s'", log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
@ -1344,6 +1351,7 @@ mod test {
assert str::contains(actual_resp, expected_resp); assert str::contains(actual_resp, expected_resp);
} }
fn impl_gl_tcp_ipv4_server_listener_and_client() { fn impl_gl_tcp_ipv4_server_listener_and_client() {
let hl_loop = uv::global_loop::get();
let server_ip = "127.0.0.1"; let server_ip = "127.0.0.1";
let server_port = 8889u; let server_port = 8889u;
let expected_req = "ping"; let expected_req = "ping";
@ -1362,7 +1370,8 @@ mod test {
server_port, server_port,
expected_resp, expected_resp,
server_ch, server_ch,
cont_ch) cont_ch,
hl_loop)
}; };
server_result_ch.send(actual_req); server_result_ch.send(actual_req);
}; };
@ -1374,7 +1383,8 @@ mod test {
server_ip, server_ip,
server_port, server_port,
expected_req, expected_req,
client_ch) client_ch,
hl_loop)
}; };
let actual_req = comm::recv(server_result_po); let actual_req = comm::recv(server_result_po);
log(debug, #fmt("REQ: expected: '%s' actual: '%s'", log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
@ -1387,12 +1397,14 @@ mod test {
fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str, fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str,
server_ch: comm::chan<str>, server_ch: comm::chan<str>,
cont_ch: comm::chan<()>) -> str { cont_ch: comm::chan<()>,
hl_loop: uv::hl::high_level_loop) -> str {
task::spawn_sched(task::manual_threads(1u)) {|| task::spawn_sched(task::manual_threads(1u)) {||
let server_ip_addr = ip::v4::parse_addr(server_ip); let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = let listen_result =
listen_for_conn(server_ip_addr, server_port, 128u, listen_for_conn(server_ip_addr, server_port, 128u,
hl_loop,
// on_establish_cb -- called when listener is set up // on_establish_cb -- called when listener is set up
{|kill_ch| {|kill_ch|
log(debug, #fmt("establish_cb %?", log(debug, #fmt("establish_cb %?",
@ -1464,12 +1476,13 @@ mod test {
fn run_tcp_test_server_listener(server_ip: str, fn run_tcp_test_server_listener(server_ip: str,
server_port: uint, resp: str, server_port: uint, resp: str,
server_ch: comm::chan<str>, server_ch: comm::chan<str>,
cont_ch: comm::chan<()>) -> str { cont_ch: comm::chan<()>,
hl_loop: uv::hl::high_level_loop) -> str {
task::spawn_sched(task::manual_threads(1u)) {|| task::spawn_sched(task::manual_threads(1u)) {||
let server_ip_addr = ip::v4::parse_addr(server_ip); let server_ip_addr = ip::v4::parse_addr(server_ip);
let new_listener_result = let new_listener_result =
new_listener(server_ip_addr, server_port, 128u); new_listener(server_ip_addr, server_port, 128u, hl_loop);
if result::is_failure(new_listener_result) { if result::is_failure(new_listener_result) {
let err_data = result::get_err(new_listener_result); let err_data = result::get_err(new_listener_result);
log(debug, #fmt("SERVER: exited abnormally name %s msg %s", log(debug, #fmt("SERVER: exited abnormally name %s msg %s",
@ -1512,12 +1525,13 @@ mod test {
} }
fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str, fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str,
client_ch: comm::chan<str>) -> str { client_ch: comm::chan<str>,
hl_loop: uv::hl::high_level_loop) -> str {
let server_ip_addr = ip::v4::parse_addr(server_ip); let server_ip_addr = ip::v4::parse_addr(server_ip);
log(debug, "CLIENT: starting.."); log(debug, "CLIENT: starting..");
let connect_result = connect(server_ip_addr, server_port); let connect_result = connect(server_ip_addr, server_port, hl_loop);
if result::is_failure(connect_result) { if result::is_failure(connect_result) {
log(debug, "CLIENT: failed to connect"); log(debug, "CLIENT: failed to connect");
let err_data = result::get_err(connect_result); let err_data = result::get_err(connect_result);

View file

@ -16,11 +16,13 @@ for *at least* that period of time.
# Arguments # Arguments
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
* msecs - a timeout period, in milliseconds, to wait * msecs - a timeout period, in milliseconds, to wait
* ch - a channel of type T to send a `val` on * ch - a channel of type T to send a `val` on
* val - a value of type T to send over the provided `ch` * val - a value of type T to send over the provided `ch`
"] "]
fn delayed_send<T: copy send>(msecs: uint, ch: comm::chan<T>, val: T) { fn delayed_send<T: send>(hl_loop: uv::hl::high_level_loop,
msecs: uint, ch: comm::chan<T>, val: T) {
task::spawn() {|| task::spawn() {||
unsafe { unsafe {
let timer_done_po = comm::port::<()>(); let timer_done_po = comm::port::<()>();
@ -28,7 +30,6 @@ fn delayed_send<T: copy send>(msecs: uint, ch: comm::chan<T>, val: T) {
let timer_done_ch_ptr = ptr::addr_of(timer_done_ch); let timer_done_ch_ptr = ptr::addr_of(timer_done_ch);
let timer = uv::ll::timer_t(); let timer = uv::ll::timer_t();
let timer_ptr = ptr::addr_of(timer); let timer_ptr = ptr::addr_of(timer);
let hl_loop = uv::global_loop::get();
uv::hl::interact(hl_loop) {|loop_ptr| uv::hl::interact(hl_loop) {|loop_ptr|
let init_result = uv::ll::timer_init(loop_ptr, timer_ptr); let init_result = uv::ll::timer_init(loop_ptr, timer_ptr);
if (init_result == 0i32) { if (init_result == 0i32) {
@ -67,12 +68,13 @@ for *at least* that period of time.
# Arguments # Arguments
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
* msecs - an amount of time, in milliseconds, for the current task to block * msecs - an amount of time, in milliseconds, for the current task to block
"] "]
fn sleep(msecs: uint) { fn sleep(hl_loop: uv::hl::high_level_loop, msecs: uint) {
let exit_po = comm::port::<()>(); let exit_po = comm::port::<()>();
let exit_ch = comm::chan(exit_po); let exit_ch = comm::chan(exit_po);
delayed_send(msecs, exit_ch, ()); delayed_send(hl_loop, msecs, exit_ch, ());
comm::recv(exit_po); comm::recv(exit_po);
} }
@ -85,6 +87,7 @@ timeout. Depending on whether the provided port receives in that time period,
# Arguments # Arguments
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
* msecs - an mount of time, in milliseconds, to wait to receive * msecs - an mount of time, in milliseconds, to wait to receive
* wait_port - a `comm::port<T>` to receive on * wait_port - a `comm::port<T>` to receive on
@ -94,12 +97,11 @@ An `option<T>` representing the outcome of the call. If the call `recv`'d on
the provided port in the allotted timeout period, then the result will be a the provided port in the allotted timeout period, then the result will be a
`some(T)`. If not, then `none` will be returned. `some(T)`. If not, then `none` will be returned.
"] "]
fn recv_timeout<T: copy send>(msecs: uint, wait_po: comm::port<T>) fn recv_timeout<T: send>(hl_loop: uv::hl::high_level_loop,
-> option<T> { msecs: uint, wait_po: comm::port<T>) -> option<T> {
let timeout_po = comm::port::<()>(); let timeout_po = comm::port::<()>();
let timeout_ch = comm::chan(timeout_po); let timeout_ch = comm::chan(timeout_po);
delayed_send(msecs, timeout_ch, ()); delayed_send(hl_loop, msecs, timeout_ch, ());
either::either( either::either(
{|left_val| {|left_val|
log(debug, #fmt("recv_time .. left_val %?", log(debug, #fmt("recv_time .. left_val %?",
@ -140,13 +142,15 @@ crust fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) unsafe {
mod test { mod test {
#[test] #[test]
fn test_gl_timer_simple_sleep_test() { fn test_gl_timer_simple_sleep_test() {
sleep(1u); let hl_loop = uv::global_loop::get();
sleep(hl_loop, 1u);
} }
#[test] #[test]
fn test_gl_timer_sleep_stress1() { fn test_gl_timer_sleep_stress1() {
let hl_loop = uv::global_loop::get();
iter::repeat(200u) {|| iter::repeat(200u) {||
sleep(1u); sleep(hl_loop, 1u);
} }
} }
@ -154,6 +158,7 @@ mod test {
fn test_gl_timer_sleep_stress2() { fn test_gl_timer_sleep_stress2() {
let po = comm::port(); let po = comm::port();
let ch = comm::chan(po); let ch = comm::chan(po);
let hl_loop = uv::global_loop::get();
let repeat = 20u; let repeat = 20u;
let spec = { let spec = {
@ -172,7 +177,7 @@ mod test {
import rand::*; import rand::*;
let rng = rng(); let rng = rng();
iter::repeat(times) {|| iter::repeat(times) {||
sleep(rng.next() as uint % maxms); sleep(hl_loop, rng.next() as uint % maxms);
} }
comm::send(ch, ()); comm::send(ch, ());
} }
@ -195,6 +200,7 @@ mod test {
let times = 100; let times = 100;
let mut successes = 0; let mut successes = 0;
let mut failures = 0; let mut failures = 0;
let hl_loop = uv::global_loop::get();
iter::repeat(times as uint) {|| iter::repeat(times as uint) {||
task::yield(); task::yield();
@ -204,10 +210,10 @@ mod test {
let test_ch = comm::chan(test_po); let test_ch = comm::chan(test_po);
task::spawn() {|| task::spawn() {||
delayed_send(1u, test_ch, expected); delayed_send(hl_loop, 1u, test_ch, expected);
}; };
alt recv_timeout(10u, test_po) { alt recv_timeout(hl_loop, 10u, test_po) {
some(val) { assert val == expected; successes += 1; } some(val) { assert val == expected; successes += 1; }
_ { failures += 1; } _ { failures += 1; }
}; };
@ -221,6 +227,7 @@ mod test {
let times = 100; let times = 100;
let mut successes = 0; let mut successes = 0;
let mut failures = 0; let mut failures = 0;
let hl_loop = uv::global_loop::get();
iter::repeat(times as uint) {|| iter::repeat(times as uint) {||
let expected = rand::rng().gen_str(16u); let expected = rand::rng().gen_str(16u);
@ -228,10 +235,10 @@ mod test {
let test_ch = comm::chan(test_po); let test_ch = comm::chan(test_po);
task::spawn() {|| task::spawn() {||
delayed_send(1000u, test_ch, expected); delayed_send(hl_loop, 1000u, test_ch, expected);
}; };
let actual = alt recv_timeout(1u, test_po) { let actual = alt recv_timeout(hl_loop, 1u, test_po) {
none { successes += 1; } none { successes += 1; }
_ { failures += 1; } _ { failures += 1; }
}; };