std: EADDRINUSE and EACCES err tests for tcp server + more cleanup

.. confounded resolve!
This commit is contained in:
Jeff Olson 2012-06-05 06:40:39 -07:00 committed by Brian Anderson
parent 235f6c7ab7
commit f7e3a4e036

View file

@ -23,8 +23,7 @@ export listen, accept;
// tcp client stuff
export connect;
// helper methods
import methods = net_tcp_methods;
export methods;
export methods_tcp_socket;
#[nolink]
native mod rustrt {
@ -57,6 +56,38 @@ type tcp_err_data = {
err_name: str,
err_msg: str
};
#[doc="
Details returned as part of a `result::err` result from `tcp::listen`
"]
enum tcp_listen_err_data {
#[doc="
Some unplanned-for error. The first and second fields correspond
to libuv's `err_name` and `err_msg` fields, respectively.
"]
generic_listen_err(str, str),
#[doc="
Failed to bind to the requested IP/Port, because it is already in use.
# Possible Causes
* Attempting to bind to a port already bound to another listener
"]
address_in_use,
#[doc="
Request to bind to an IP/Port was denied by the system.
# Possible Causes
* Attemping to binding to an IP/Port as a non-Administrator
on Windows Vista+
* Attempting to bind, as a non-priv'd
user, to 'privileged' ports (< 1024) on *nix
"]
access_denied
}
#[doc="
Details returned as part of a `result::err` result from `tcp::connect`
"]
enum tcp_connect_err_data {
#[doc="
Some unplanned-for error. The first and second fields correspond
@ -333,185 +364,6 @@ fn read_future(sock: tcp_socket, timeout_msecs: uint)
}
#[doc="
<<<<<<< HEAD
Bind to a given IP/port and listen for new connections
# Arguments
* `host_ip` - a `net::ip::ip_addr` representing a unique IP
(versions 4 or 6)
* `port` - a uint representing the port to listen on
* `backlog` - a uint representing the number of incoming connections
to cache in memory
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
# Returns
A `result` instance containing either a `tcp_conn_port` which can used
to listen for, and accept, new connections, or a `tcp_err_data` if
failure to create the tcp listener occurs
"]
fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint,
iotask: iotask)
-> result::result<tcp_conn_port, tcp_err_data> unsafe {
let stream_closed_po = comm::port::<()>();
let stream_closed_ch = comm::chan(stream_closed_po);
let new_conn_po = comm::port::<result::result<*uv::ll::uv_tcp_t,
tcp_err_data>>();
let new_conn_ch = comm::chan(new_conn_po);
// FIXME (#2656): This shared box should not be captured in the i/o
// task Make it a unique pointer.
let server_data: @tcp_conn_port_data = @{
server_stream: uv::ll::tcp_t(),
stream_closed_po: stream_closed_po,
stream_closed_ch: stream_closed_ch,
iotask: iotask,
new_conn_po: new_conn_po,
new_conn_ch: new_conn_ch
};
let server_data_ptr = ptr::addr_of(*server_data);
let server_stream_ptr = ptr::addr_of((*server_data_ptr)
.server_stream);
let setup_po = comm::port::<option<tcp_err_data>>();
let setup_ch = comm::chan(setup_po);
iotask::interact(iotask) {|loop_ptr|
let tcp_addr = ipv4_ip_addr_to_sockaddr_in(host_ip,
port);
alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
0i32 {
alt uv::ll::tcp_bind(server_stream_ptr,
ptr::addr_of(tcp_addr)) {
0i32 {
alt uv::ll::listen(server_stream_ptr,
backlog as libc::c_int,
tcp_nl_on_connection_cb) {
0i32 {
uv::ll::set_data_for_uv_handle(
server_stream_ptr,
server_data_ptr);
comm::send(setup_ch, none);
}
_ {
log(debug, "failure to uv_listen()");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(setup_ch, some(err_data));
}
}
}
_ {
log(debug, "failure to uv_tcp_bind");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(setup_ch, some(err_data));
}
}
}
_ {
log(debug, "failure to uv_tcp_init");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(setup_ch, some(err_data));
}
}
};
alt comm::recv(setup_po) {
some(err_data) {
// we failed to bind/list w/ libuv
result::err(err_data.to_tcp_err())
}
none {
result::ok(tcp_conn_port(server_data))
}
}
}
#[doc="
Block on a `net::tcp::tcp_conn_port` until a new connection arrives
This function behaves similarly to `comm::recv()`
# Arguments
* server_port -- a `net::tcp::tcp_conn_port` that you wish to listen
on for an incoming connection
# Returns
A `result` object containing a `net::tcp::tcp_socket`, ready for immediate
use, as the `ok` varient, or a `net::tcp::tcp_err_data` for the `err`
variant
"]
fn conn_recv(server_port: tcp_conn_port)
-> result::result<tcp_socket, tcp_err_data> {
let new_conn_po = (*(server_port.conn_data)).new_conn_po;
let iotask = (*(server_port.conn_data)).iotask;
let new_conn_result = comm::recv(new_conn_po);
alt new_conn_result {
ok(client_stream_ptr) {
conn_port_new_tcp_socket(client_stream_ptr, iotask)
}
err(err_data) {
result::err(err_data)
}
}
}
#[doc="
Identical to `net::tcp::conn_recv`, but ran on a new task
The recv'd tcp_socket is created with a new task on the current scheduler,
and given as a parameter to the provided callback
# Arguments
* `server_port` -- a `net::tcp::tcp_conn_port` that you wish to listen
on for an incoming connection
* `cb` -- a callback that will be ran, in a new task on the current scheduler,
once a new connection is recv'd. Its parameter:
* A `result` object containing a `net::tcp::tcp_socket`, ready for immediate
use, as the `ok` varient, or a `net::tcp::tcp_err_data` for the `err`
variant
"]
fn conn_recv_spawn(server_port: tcp_conn_port,
+cb: fn~(result::result<tcp_socket, tcp_err_data>)) {
let new_conn_po = (*(server_port.conn_data)).new_conn_po;
let iotask = (*(server_port.conn_data)).iotask;
let new_conn_result = comm::recv(new_conn_po);
task::spawn {||
let sock_create_result = alt new_conn_result {
ok(client_stream_ptr) {
conn_port_new_tcp_socket(client_stream_ptr, iotask)
}
err(err_data) {
result::err(err_data)
}
};
cb(sock_create_result);
};
}
#[doc="
Check if a `net::tcp::tcp_conn_port` has one-or-more pending, new connections
This function behaves similarly to `comm::peek()`
# Arguments
* `server_port` -- a `net::tcp::tcp_conn_port` representing a server
connection
# Returns
`true` if there are one-or-more pending connections, `false` if there are
none.
"]
fn conn_peek(server_port: tcp_conn_port) -> bool {
let new_conn_po = (*(server_port.conn_data)).new_conn_po;
comm::peek(new_conn_po)
}
#[doc="
=======
>>>>>>> std: dump the tcp::new_listener server API
Bind an incoming client connection to a `net::tcp::tcp_socket`
# Notes
@ -681,7 +533,7 @@ fn listen(host_ip: ip::ip_addr, port: uint, backlog: uint,
on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
+new_connect_cb: fn~(tcp_new_connection,
comm::chan<option<tcp_err_data>>))
-> result::result<(), tcp_err_data> unsafe {
-> result::result<(), tcp_listen_err_data> unsafe {
listen_common(host_ip, port, backlog, iotask, on_establish_cb)
// on_connect_cb
{|handle|
@ -697,7 +549,7 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint,
iotask: iotask,
on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
-on_connect_cb: fn~(*uv::ll::uv_tcp_t))
-> result::result<(), tcp_err_data> unsafe {
-> result::result<(), tcp_listen_err_data> unsafe {
let stream_closed_po = comm::port::<()>();
let kill_po = comm::port::<option<tcp_err_data>>();
let kill_ch = comm::chan(kill_po);
@ -719,6 +571,9 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint,
port);
alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
0i32 {
uv::ll::set_data_for_uv_handle(
server_stream_ptr,
server_data_ptr);
alt uv::ll::tcp_bind(server_stream_ptr,
ptr::addr_of(tcp_addr)) {
0i32 {
@ -726,9 +581,6 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint,
backlog as libc::c_int,
tcp_lfc_on_connection_cb) {
0i32 {
uv::ll::set_data_for_uv_handle(
server_stream_ptr,
server_data_ptr);
comm::send(setup_ch, none);
}
_ {
@ -756,8 +608,29 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint,
};
alt setup_result {
some(err_data) {
// we failed to bind/list w/ libuv
result::err(err_data.to_tcp_err())
iotask::interact(iotask) {|loop_ptr|
log(debug, #fmt("tcp::listen post-kill recv hl interact %?",
loop_ptr));
(*server_data_ptr).active = false;
uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
};
stream_closed_po.recv();
alt err_data.err_name {
"EACCES" {
log(debug, "Got EACCES error");
result::err(access_denied)
}
"EADDRINUSE" {
log(debug, "Got EADDRINUSE error");
result::err(address_in_use)
}
_ {
log(debug, #fmt("Got '%s' '%s' libuv error",
err_data.err_name, err_data.err_msg));
result::err(
generic_listen_err(err_data.err_name, err_data.err_msg))
}
}
}
none {
on_establish_cb(kill_ch);
@ -768,11 +641,12 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint,
(*server_data_ptr).active = false;
uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
};
comm::recv(stream_closed_po);
stream_closed_po.recv();
alt kill_result {
// some failure post bind/listen
some(err_data) {
result::err(err_data)
result::err(generic_listen_err(err_data.err_name,
err_data.err_msg))
}
// clean exit
none {
@ -783,35 +657,33 @@ fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint,
}
}
mod net_tcp_methods {
#[doc="
Convenience methods extending `net::tcp::tcp_socket`
"]
impl methods_tcp_socket for tcp_socket {
fn read_start() -> result::result<comm::port<
result::result<[u8]/~, tcp_err_data>>, tcp_err_data> {
read_start(self)
}
fn read_stop() ->
result::result<(), tcp_err_data> {
read_stop(self)
}
fn read(timeout_msecs: uint) ->
result::result<[u8]/~, tcp_err_data> {
read(self, timeout_msecs)
}
fn read_future(timeout_msecs: uint) ->
future::future<result::result<[u8]/~, tcp_err_data>> {
read_future(self, timeout_msecs)
}
fn write(raw_write_data: [u8]/~)
-> result::result<(), tcp_err_data> {
write(self, raw_write_data)
}
fn write_future(raw_write_data: [u8]/~)
-> future::future<result::result<(), tcp_err_data>> {
write_future(self, raw_write_data)
}
#[doc="
Convenience methods extending `net::tcp::tcp_socket`
"]
impl methods_tcp_socket for tcp_socket {
fn read_start() -> result::result<comm::port<
result::result<[u8]/~, tcp_err_data>>, tcp_err_data> {
read_start(self)
}
fn read_stop() ->
result::result<(), tcp_err_data> {
read_stop(self)
}
fn read(timeout_msecs: uint) ->
result::result<[u8]/~, tcp_err_data> {
read(self, timeout_msecs)
}
fn read_future(timeout_msecs: uint) ->
future::future<result::result<[u8]/~, tcp_err_data>> {
read_future(self, timeout_msecs)
}
fn write(raw_write_data: [u8]/~)
-> result::result<(), tcp_err_data> {
write(self, raw_write_data)
}
fn write_future(raw_write_data: [u8]/~)
-> future::future<result::result<(), tcp_err_data>> {
write_future(self, raw_write_data)
}
}
// INTERNAL API
@ -984,38 +856,6 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data,
}
}
<<<<<<< HEAD
// various recv_* can use a tcp_conn_port can re-use this..
fn conn_port_new_tcp_socket(
stream_handle_ptr: *uv::ll::uv_tcp_t,
iotask: iotask)
-> result::result<tcp_socket,tcp_err_data> unsafe {
// tcp_nl_on_connection_cb
let reader_po = comm::port::<result::result<[u8]/~, tcp_err_data>>();
let client_socket_data = @{
reader_po : reader_po,
reader_ch : comm::chan(reader_po),
stream_handle_ptr : stream_handle_ptr,
connect_req : uv::ll::connect_t(),
write_req : uv::ll::write_t(),
iotask : iotask
};
let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
comm::listen {|cont_ch|
iotask::interact(iotask) {|loop_ptr|
log(debug, #fmt("in interact cb 4 conn_port_new_tcp.. loop %?",
loop_ptr));
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
client_socket_data_ptr);
cont_ch.send(());
};
cont_ch.recv()
};
result::ok(tcp_socket(client_socket_data))
}
=======
>>>>>>> std: dump the tcp::new_listener server API
enum tcp_new_connection {
new_tcp_conn(*uv::ll::uv_tcp_t)
}
@ -1260,6 +1100,18 @@ mod test {
fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
impl_gl_tcp_ipv4_client_error_connection_refused();
}
#[test]
fn test_gl_tcp_server_address_in_use() unsafe {
impl_gl_tcp_ipv4_server_address_in_use();
}
#[test]
// FIXME: this probably needs to be ignored on windows.
// ... need to verify (someday we'll have 64bit windows! :)
//#[ignore(cfg(target_os = "win32"))]
fn test_gl_tcp_server_access_denied() unsafe {
impl_gl_tcp_ipv4_server_access_denied();
}
}
#[cfg(target_arch="x86")]
mod impl32 {
@ -1273,6 +1125,19 @@ mod test {
fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
impl_gl_tcp_ipv4_client_error_connection_refused();
}
#[test]
#[ignore(cfg(target_os = "linux"))]
fn test_gl_tcp_server_address_in_use() unsafe {
impl_gl_tcp_ipv4_server_address_in_use();
}
#[test]
#[ignore(cfg(target_os = "linux"))]
// FIXME: this probably needs to be ignored on windows.
// ... need to verify
//#[ignore(cfg(target_os = "win32"))]
fn test_gl_tcp_server_access_denied() unsafe {
impl_gl_tcp_ipv4_server_access_denied();
}
}
}
fn impl_gl_tcp_ipv4_server_and_client() {
@ -1324,7 +1189,7 @@ mod test {
fn impl_gl_tcp_ipv4_client_error_connection_refused() {
let hl_loop = uv::global_loop::get();
let server_ip = "127.0.0.1";
let server_port = 8890u;
let server_port = 8889u;
let expected_req = "ping";
// client
log(debug, "firing up client..");
@ -1344,85 +1209,181 @@ mod test {
}
}
}
fn impl_gl_tcp_ipv4_server_address_in_use() {
let hl_loop = uv::global_loop::get();
let server_ip = "127.0.0.1";
let server_port = 8890u;
let expected_req = "ping";
let expected_resp = "pong";
let server_result_po = comm::port::<str>();
let server_result_ch = comm::chan(server_result_po);
let cont_po = comm::port::<()>();
let cont_ch = comm::chan(cont_po);
// server
task::spawn_sched(task::manual_threads(1u)) {||
let actual_req = comm::listen {|server_ch|
run_tcp_test_server(
server_ip,
server_port,
expected_resp,
server_ch,
cont_ch,
hl_loop)
};
server_result_ch.send(actual_req);
};
comm::recv(cont_po);
// this one should fail..
let listen_err = run_tcp_test_server_fail(
server_ip,
server_port,
hl_loop);
// client.. just doing this so that the first server tears down
log(debug, "server started, firing up client..");
comm::listen {|client_ch|
run_tcp_test_client(
server_ip,
server_port,
expected_req,
client_ch,
hl_loop)
};
alt listen_err {
address_in_use {
assert true;
}
_ {
fail "expected address_in_use listen error,"+
"but got a different error varient. check logs.";
}
}
}
fn impl_gl_tcp_ipv4_server_access_denied() {
let hl_loop = uv::global_loop::get();
let server_ip = "127.0.0.1";
let server_port = 80u;
// this one should fail..
let listen_err = run_tcp_test_server_fail(
server_ip,
server_port,
hl_loop);
alt listen_err {
access_denied {
assert true;
}
_ {
fail "expected address_in_use listen error,"+
"but got a different error varient. check logs.";
}
}
}
fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str,
server_ch: comm::chan<str>,
cont_ch: comm::chan<()>,
iotask: iotask) -> str {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
// on_establish_cb -- called when listener is set up
{|kill_ch|
log(debug, #fmt("establish_cb %?",
kill_ch));
comm::send(cont_ch, ());
},
// risky to run this on the loop, but some users
// will want the POWER
{|new_conn, kill_ch|
log(debug, "SERVER: new connection!");
comm::listen {|cont_ch|
task::spawn_sched(task::manual_threads(1u)) {||
log(debug, "SERVER: starting worker for new req");
task::spawn_sched(task::manual_threads(1u)) {||
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result =
listen(server_ip_addr, server_port, 128u,
iotask,
// on_establish_cb -- called when listener is set up
{|kill_ch|
log(debug, #fmt("establish_cb %?",
kill_ch));
comm::send(cont_ch, ());
},
// risky to run this on the loop, but some users
// will want the POWER
{|new_conn, kill_ch|
log(debug, "SERVER: new connection!");
comm::listen {|cont_ch|
task::spawn_sched(task::manual_threads(1u)) {||
log(debug, "SERVER: starting worker for new req");
let accept_result = accept(new_conn);
log(debug, "SERVER: after accept()");
if result::is_err(accept_result) {
log(debug, "SERVER: error accept connection");
let err_data = result::get_err(accept_result);
comm::send(kill_ch, some(err_data));
log(debug,
"SERVER/WORKER: send on err cont ch");
cont_ch.send(());
}
else {
log(debug,
"SERVER/WORKER: send on cont ch");
cont_ch.send(());
let sock = result::unwrap(accept_result);
log(debug, "SERVER: successfully accepted"+
"connection!");
let received_req_bytes = read(sock, 0u);
alt received_req_bytes {
result::ok(data) {
server_ch.send(
str::from_bytes(data));
log(debug, "SERVER: before write");
tcp_write_single(sock, str::bytes(resp));
log(debug, "SERVER: after write.. die");
comm::send(kill_ch, none);
}
result::err(err_data) {
log(debug, #fmt("SERVER: error recvd: %s %s",
err_data.err_name, err_data.err_msg));
comm::send(kill_ch, some(err_data));
server_ch.send("");
}
}
log(debug, "SERVER: worker spinning down");
}
let accept_result = accept(new_conn);
log(debug, "SERVER: after accept()");
if result::is_err(accept_result) {
log(debug, "SERVER: error accept connection");
let err_data = result::get_err(accept_result);
comm::send(kill_ch, some(err_data));
log(debug,
"SERVER/WORKER: send on err cont ch");
cont_ch.send(());
}
log(debug, "SERVER: waiting to recv on cont_ch");
cont_ch.recv()
};
log(debug, "SERVER: recv'd on cont_ch..leaving listen cb");
});
// err check on listen_result
if result::is_err(listen_result) {
let err_data = result::get_err(listen_result);
log(debug, #fmt("SERVER: exited abnormally name %s msg %s",
else {
log(debug,
"SERVER/WORKER: send on cont ch");
cont_ch.send(());
let sock = result::unwrap(accept_result);
log(debug, "SERVER: successfully accepted"+
"connection!");
let received_req_bytes = read(sock, 0u);
alt received_req_bytes {
result::ok(data) {
server_ch.send(
str::from_bytes(data));
log(debug, "SERVER: before write");
tcp_write_single(sock, str::bytes(resp));
log(debug, "SERVER: after write.. die");
comm::send(kill_ch, none);
}
result::err(err_data) {
log(debug, #fmt("SERVER: error recvd: %s %s",
err_data.err_name, err_data.err_msg));
comm::send(kill_ch, some(err_data));
server_ch.send("");
}
}
log(debug, "SERVER: worker spinning down");
}
}
log(debug, "SERVER: waiting to recv on cont_ch");
cont_ch.recv()
};
log(debug, "SERVER: recv'd on cont_ch..leaving listen cb");
});
// err check on listen_result
if result::is_err(listen_result) {
alt result::get_err(listen_result) {
generic_listen_err(name, msg) {
fail #fmt("SERVER: exited abnormally name %s msg %s",
name, msg);
}
access_denied {
fail "SERVER: exited abnormally, got access denied..";
}
address_in_use {
fail "SERVER: exited abnormally, got address in use...";
}
}
};
}
let ret_val = server_ch.recv();
log(debug, #fmt("SERVER: exited and got ret val: '%s'", ret_val));
ret_val
}
fn run_tcp_test_server_fail(server_ip: str, server_port: uint,
iotask: iotask) -> tcp_listen_err_data {
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
// on_establish_cb -- called when listener is set up
{|kill_ch|
log(debug, #fmt("establish_cb %?",
kill_ch));
},
{|new_conn, kill_ch|
fail #fmt("SERVER: shouldn't be called.. %? %?",
new_conn, kill_ch);
});
// err check on listen_result
if result::is_failure(listen_result) {
result::get_err(listen_result)
}
else {
fail "SERVER: did not fail as expected"
}
}
fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str,
client_ch: comm::chan<str>,
iotask: iotask) -> result::result<str,
@ -1455,13 +1416,8 @@ mod test {
}
}
<<<<<<< HEAD
fn tcp_write_single(sock: tcp_socket, val: [u8]/~) {
let write_result_future = sock.write_future(val);
=======
fn tcp_write_single(sock: tcp_socket, val: [u8]) {
let write_result_future = write_future(sock, val);
>>>>>>> std: mod cleanup, impl/test for conn. refused err + mem leak fix
let write_result = write_result_future.get();
if result::is_err(write_result) {
log(debug, "tcp_write_single: write failed!");