std: tightening up net::tcp, server/client test done, still has races..
.. going to rework the listen() API to be non-blocking.
This commit is contained in:
parent
a4127d3fc6
commit
733881d852
2 changed files with 327 additions and 224 deletions
|
@ -3,12 +3,25 @@ High-level interface to libuv's TCP functionality
|
|||
"];
|
||||
|
||||
import ip = net_ip;
|
||||
import comm::*;
|
||||
import result::*;
|
||||
import str::*;
|
||||
|
||||
// data
|
||||
export tcp_socket, tcp_err_data;
|
||||
export connect, write, read_start, read_stop, listen, accept;
|
||||
// operations on a tcp_socket
|
||||
export write, read_start, read_stop;
|
||||
// tcp server stuff
|
||||
export listen, accept;
|
||||
// tcp client stuff
|
||||
export connect;
|
||||
// misc util
|
||||
export is_responding;
|
||||
|
||||
#[doc="
|
||||
Encapsulates an open TCP/IP connection through libuv
|
||||
|
||||
`tcp_socket` non-sendable and handles automatically closing the underlying libuv data structures when it goes out of scope.
|
||||
"]
|
||||
resource tcp_socket(socket_data: @tcp_socket_data) unsafe {
|
||||
let closed_po = comm::port::<()>();
|
||||
|
@ -221,6 +234,7 @@ fn read_start(sock: tcp_socket)
|
|||
let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
|
||||
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
|
||||
let start_ch = comm::chan(start_po);
|
||||
log(debug, "in tcp::read_start before interact loop");
|
||||
uv::hl::interact((**sock).hl_loop) {|loop_ptr|
|
||||
log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
|
||||
alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
|
||||
|
@ -290,7 +304,19 @@ Bind to a given IP/port and listen for new connections
|
|||
* `backlog` - a uint representing the number of incoming connections
|
||||
to cache in memory
|
||||
* `new_connect_cb` - a callback to be evaluated, on the libuv thread,
|
||||
whenever a client attempts to conect on the provided ip/port
|
||||
whenever a client attempts to conect on the provided ip/port. The
|
||||
callback's arguments are:
|
||||
* `new_conn` - an opaque type that can be passed to
|
||||
`net::tcp::accept` in order to be converted to a `tcp_socket`.
|
||||
* `kill_ch` - channel of type `comm::chan<option<tcp_err_data>>`. This
|
||||
channel can be used to send a message to cause `listen` to begin
|
||||
closing the underlying libuv data structures.
|
||||
|
||||
# Returns
|
||||
|
||||
A `result` instance containing empty data of type `()` on a successful
|
||||
or normal shutdown, and a `tcp_err_data` record in the event of listen
|
||||
exiting because of an error
|
||||
"]
|
||||
fn listen(host_ip: ip::ip_addr, port: uint, backlog: uint,
|
||||
new_connect_cb: fn~(tcp_new_connection,
|
||||
|
@ -385,18 +411,49 @@ Bind an incoming client connection to a `net::tcp::tcp_socket`
|
|||
|
||||
# Notes
|
||||
|
||||
It is safe to call `net::tcp::accept` _only_ within the callback
|
||||
provided as the final argument of the `net::tcp::listen` function.
|
||||
It is safe to call `net::tcp::accept` _only_ within the context of the
|
||||
`new_connect_cb` callback provided as the final argument to the
|
||||
`net::tcp::listen` function.
|
||||
|
||||
The `new_conn` opaque value provided _only_ as the first argument to the
|
||||
`new_connect_cb`. It can be safely sent to another task but it _must_ be
|
||||
The `new_conn` opaque value is provided _only_ as the first argument to the
|
||||
`new_connect_cb` provided as a part of `net::tcp::listen`.
|
||||
It can be safely sent to another task but it _must_ be
|
||||
used (via `net::tcp::accept`) before the `new_connect_cb` call it was
|
||||
provided within returns.
|
||||
provided to returns.
|
||||
|
||||
This means that a port/chan pair must be used to make sure that the
|
||||
This implies that a port/chan pair must be used to make sure that the
|
||||
`new_connect_cb` call blocks until an attempt to create a
|
||||
`net::tcp::tcp_socket` is completed.
|
||||
|
||||
# Example
|
||||
|
||||
Here, the `new_conn` is used in conjunction with `accept` from within
|
||||
a task spawned by the `new_connect_cb` passed into `listen`
|
||||
|
||||
~~~~~~~~~~~
|
||||
net::tcp::listen(remote_ip, remote_port, backlog) {|new_conn, kill_ch|
|
||||
let cont_po = comm::port::<option<tcp_err_data>>();
|
||||
let cont_ch = comm::chan(cont_po);
|
||||
task::spawn {||
|
||||
let accept_result = net::tcp::accept(new_conn);
|
||||
alt accept_result.is_failure() {
|
||||
false { comm::send(cont_ch, result::get_err(accept_result)); }
|
||||
true {
|
||||
let sock = result::get(accept_result);
|
||||
comm::send(cont_ch, true);
|
||||
// do work here
|
||||
}
|
||||
}
|
||||
};
|
||||
alt comm::recv(cont_po) {
|
||||
// shut down listen()
|
||||
some(err_data) { comm::send(kill_chan, some(err_data)) }
|
||||
// wait for next connection
|
||||
none {}
|
||||
}
|
||||
};
|
||||
~~~~~~~~~~~
|
||||
|
||||
# Arguments
|
||||
|
||||
* `new_conn` - an opaque value used to create a new `tcp_socket`
|
||||
|
@ -418,8 +475,8 @@ fn accept(new_conn: tcp_new_connection)
|
|||
new_tcp_conn(server_handle_ptr) {
|
||||
let server_data_ptr = uv::ll::get_data_for_uv_handle(
|
||||
server_handle_ptr) as *tcp_server_data;
|
||||
let hl_loop = (*server_data_ptr).hl_loop;// FIXME
|
||||
let reader_po = comm::port::<result::result<[u8], tcp_err_data>>();
|
||||
let hl_loop = (*server_data_ptr).hl_loop;
|
||||
let client_socket_data = @{
|
||||
reader_po: reader_po,
|
||||
reader_ch: comm::chan(reader_po),
|
||||
|
@ -434,31 +491,42 @@ fn accept(new_conn: tcp_new_connection)
|
|||
|
||||
let result_po = comm::port::<option<tcp_err_data>>();
|
||||
let result_ch = comm::chan(result_po);
|
||||
uv::hl::interact(hl_loop) {|loop_ptr|
|
||||
log(debug, "in interact cb for tcp::accept");
|
||||
alt uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
|
||||
|
||||
// UNSAFE LIBUV INTERACTION BEGIN
|
||||
// .. normally this happens within the context of
|
||||
// a call to uv::hl::interact.. but we're breaking
|
||||
// the rules here because this always has to be
|
||||
// called within the context of a listen() new_connect_cb
|
||||
// callback (or it will likely fail and drown your cat)
|
||||
log(debug, "in interact cb for tcp::accept");
|
||||
let loop_ptr = uv::ll::get_loop_for_uv_handle(
|
||||
server_handle_ptr);
|
||||
alt uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
|
||||
0i32 {
|
||||
log(debug, "uv_tcp_init successful for client stream");
|
||||
alt uv::ll::accept(
|
||||
server_handle_ptr as *libc::c_void,
|
||||
client_stream_handle_ptr as *libc::c_void) {
|
||||
0i32 {
|
||||
log(debug, "uv_tcp_init successful for client stream");
|
||||
alt uv::ll::accept(server_handle_ptr,
|
||||
client_stream_handle_ptr) {
|
||||
0i32 {
|
||||
log(debug, "successfully accepted client connection");
|
||||
comm::send(result_ch, none);
|
||||
}
|
||||
_ {
|
||||
log(debug, "failed to accept client conn");
|
||||
comm::send(result_ch, some(
|
||||
uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
|
||||
}
|
||||
}
|
||||
log(debug, "successfully accepted client connection");
|
||||
uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
|
||||
client_socket_data_ptr);
|
||||
comm::send(result_ch, none);
|
||||
}
|
||||
_ {
|
||||
log(debug, "failed to init client stream");
|
||||
log(debug, "failed to accept client conn");
|
||||
comm::send(result_ch, some(
|
||||
uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
_ {
|
||||
log(debug, "failed to init client stream");
|
||||
comm::send(result_ch, some(
|
||||
uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
|
||||
}
|
||||
}
|
||||
// UNSAFE LIBUV INTERACTION END
|
||||
alt comm::recv(result_po) {
|
||||
some(err_data) {
|
||||
result::err(err_data)
|
||||
|
@ -471,6 +539,52 @@ fn accept(new_conn: tcp_new_connection)
|
|||
}
|
||||
}
|
||||
|
||||
#[doc="
|
||||
Attempt to open a TCP/IP connection on a remote host
|
||||
|
||||
The connection will (attempt to) be successfully established and then
|
||||
disconnect immediately. It is useful to determine, simply, if a remote
|
||||
host is responding, and that is all.
|
||||
|
||||
# Arguments
|
||||
|
||||
* `remote_ip` - an IP address (versions 4 or 6) for the remote host
|
||||
* `remote_port` - a uint representing the port on the remote host to
|
||||
connect to
|
||||
* `timeout_msecs` - a timeout period, in miliseconds, to wait before
|
||||
aborting the connection attempt
|
||||
|
||||
# Returns
|
||||
|
||||
A `bool` indicating success or failure. If a connection was established
|
||||
to the remote host in the alloted timeout, `true` is returned. If the
|
||||
host refused the connection, timed out or had some other error condition,
|
||||
`false` is returned.
|
||||
"]
|
||||
fn is_responding(remote_ip: ip::ip_addr, remote_port: uint,
|
||||
timeout_msecs: uint) -> bool {
|
||||
log(debug, "entering is_responding");
|
||||
let connected_po = comm::port::<bool>();
|
||||
let connected_ch = comm::chan(connected_po);
|
||||
task::spawn {||
|
||||
log(debug, "in is_responding nested task");
|
||||
let connect_result = connect(remote_ip, remote_port);
|
||||
let connect_succeeded = result::is_success(connect_result);
|
||||
log(debug, #fmt("leaving is_responding nested task .. result %?",
|
||||
connect_succeeded));
|
||||
comm::send(connected_ch, connect_succeeded);
|
||||
};
|
||||
log(debug, "exiting is_responding");
|
||||
alt timer::recv_timeout(timeout_msecs, connected_po) {
|
||||
some(connect_succeeded) {
|
||||
log(debug, #fmt("connect succedded? %?", connect_succeeded));
|
||||
connect_succeeded }
|
||||
none {
|
||||
log(debug, "is_responding timed out on waiting to connect");
|
||||
false }
|
||||
}
|
||||
}
|
||||
|
||||
// INTERNAL API
|
||||
|
||||
enum tcp_new_connection {
|
||||
|
@ -553,16 +667,19 @@ impl of to_tcp_err_iface for uv::ll::uv_err_data {
|
|||
crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
|
||||
nread: libc::ssize_t,
|
||||
++buf: uv::ll::uv_buf_t) unsafe {
|
||||
log(debug, "entering on_tcp_read_cb");
|
||||
log(debug, #fmt("entering on_tcp_read_cb stream: %? nread: %?",
|
||||
stream, nread));
|
||||
let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
|
||||
let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
|
||||
as *tcp_socket_data;
|
||||
let reader_ch = (*socket_data_ptr).reader_ch;
|
||||
alt nread {
|
||||
// incoming err.. probably eof
|
||||
-1 {
|
||||
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
||||
comm::send(reader_ch, result::err(err_data.to_tcp_err()));
|
||||
let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
|
||||
log(debug, #fmt("on_tcp_read_cb: incoming err.. name %? msg %?",
|
||||
err_data.err_name, err_data.err_msg));
|
||||
let reader_ch = (*socket_data_ptr).reader_ch;
|
||||
comm::send(reader_ch, result::err(err_data));
|
||||
}
|
||||
// do nothing .. unneeded buf
|
||||
0 {}
|
||||
|
@ -570,6 +687,7 @@ crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
|
|||
_ {
|
||||
// we have data
|
||||
log(debug, #fmt("tcp on_read_cb nread: %d", nread));
|
||||
let reader_ch = (*socket_data_ptr).reader_ch;
|
||||
let buf_base = uv::ll::get_base_from_buf(buf);
|
||||
let buf_len = uv::ll::get_len_from_buf(buf);
|
||||
let new_bytes = vec::unsafe::from_buf(buf_base, buf_len);
|
||||
|
@ -702,221 +820,206 @@ fn ipv4_ip_addr_to_sockaddr_in(input_ip: ip::ip_addr,
|
|||
|
||||
//#[cfg(test)]
|
||||
mod test {
|
||||
#[test]
|
||||
fn test_gl_tcp_ipv4_client() {
|
||||
let ip_str = "173.194.79.99";
|
||||
let port = 80u;
|
||||
let write_input = "GET / HTTP/1.1\r\n\r\n";
|
||||
let read_output =
|
||||
impl_gl_tcp_ipv4_client(ip_str, port, write_input);
|
||||
log(debug, "DATA RECEIVED: "+read_output);
|
||||
// FIXME don't run on fbsd or linux 32 bit(#2064)
|
||||
#[cfg(target_os="win32")]
|
||||
#[cfg(target_os="darwin")]
|
||||
#[cfg(target_os="linux")]
|
||||
mod tcp_ipv4_server_and_client_test {
|
||||
#[cfg(target_arch="x86_64")]
|
||||
mod impl64 {
|
||||
#[test]
|
||||
fn test_gl_tcp_server_and_client_ipv4() unsafe {
|
||||
impl_gl_tcp_ipv4_server_and_client();
|
||||
}
|
||||
}
|
||||
#[cfg(target_arch="x86")]
|
||||
mod impl32 {
|
||||
#[test]
|
||||
#[ignore(cfg(target_os = "linux"))]
|
||||
fn test_gl_tcp_server_and_client_ipv4() unsafe {
|
||||
impl_gl_tcp_ipv4_server_and_client();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gl_tcp_ipv4_server() {
|
||||
fn impl_gl_tcp_ipv4_server_and_client() {
|
||||
let server_ip = "127.0.0.1";
|
||||
let server_port = 8888u;
|
||||
let kill_str = "asdf";
|
||||
let resp_str = "hw";
|
||||
let expected_req = "ping";
|
||||
let expected_resp = "pong";
|
||||
|
||||
let result_po = comm::port::<str>();
|
||||
let result_ch = comm::chan(result_po);
|
||||
task::spawn_sched(task::manual_threads(4u)) {||
|
||||
let inner_result_po = comm::port::<str>();
|
||||
let inner_result_ch = comm::chan(inner_result_po);
|
||||
|
||||
impl_gl_tcp_ipv4_server(server_ip, server_port,
|
||||
kill_str, resp_str,
|
||||
inner_result_ch);
|
||||
let result_str = comm::recv(inner_result_po);
|
||||
comm::send(result_ch, result_str);
|
||||
};
|
||||
let output = comm::recv(result_po);
|
||||
log(debug, #fmt("RECEIVED REQ %? FROM USER", output));
|
||||
}
|
||||
|
||||
fn impl_gl_tcp_ipv4_server(host_str: str, port: uint,
|
||||
kill_str: str, resp_str: str,
|
||||
output_ch: comm::chan<str>) {
|
||||
let host_ip = ip::v4::parse_addr(host_str);
|
||||
log(debug, "about to enter listen() call for test server");
|
||||
listen(host_ip, port, 128u) {|new_conn, kill_ch|
|
||||
// this is a callback that is going to be invoked on the
|
||||
// loop's thread (can't be avoided).
|
||||
let cont_po = comm::port::<()>();
|
||||
let cont_ch = comm::chan(cont_po);
|
||||
task::spawn {||
|
||||
log(debug, "starting worker for incoming req");
|
||||
|
||||
// work loop
|
||||
let accept_result = accept(new_conn);
|
||||
if result::is_failure(accept_result) {
|
||||
// accept failed..
|
||||
log(debug,"accept in worker task failed");
|
||||
comm::send(kill_ch,
|
||||
some(result::get_err(accept_result)
|
||||
.to_tcp_err()));
|
||||
}
|
||||
// accept() succeeded, let the task that is
|
||||
// listen()'ing know so it can continue and
|
||||
// unblock libuv..
|
||||
comm::send(cont_ch, ());
|
||||
|
||||
// our precious sock.. from here on out, things
|
||||
// match the tcp request/client api, as they're
|
||||
// both on tcp_sockets at this point..
|
||||
let sock = result::unwrap(accept_result);
|
||||
let req_bytes = single_read_bytes_from(sock);
|
||||
let req_str = str::from_bytes(req_bytes);
|
||||
if str::contains(req_str, kill_str) {
|
||||
// our signal to shut down the tcp
|
||||
// server was received. shut it down.
|
||||
comm::send(kill_ch, none);
|
||||
}
|
||||
write_single_str(sock, resp_str);
|
||||
|
||||
comm::send(output_ch, req_str);
|
||||
// work's complete, let socket close..
|
||||
log(debug, "exiting worker");
|
||||
let server_result_po = comm::port::<str>();
|
||||
let server_result_ch = comm::chan(server_result_po);
|
||||
// server
|
||||
task::spawn_sched(task::manual_threads(1u)) {||
|
||||
let actual_req = comm::listen {|server_ch|
|
||||
run_tcp_test_server(
|
||||
server_ip,
|
||||
server_port,
|
||||
expected_resp,
|
||||
server_ch)
|
||||
};
|
||||
|
||||
comm::recv(cont_po);
|
||||
server_result_ch.send(actual_req);
|
||||
};
|
||||
log(debug, "exiting listen() block for test server");
|
||||
// client
|
||||
log(debug, "server started, firing up client..");
|
||||
let actual_resp = comm::listen {|client_ch|
|
||||
log(debug, "before client sleep");
|
||||
timer::sleep(2u);
|
||||
log(debug, "after client sleep");
|
||||
run_tcp_test_client(
|
||||
server_ip,
|
||||
server_port,
|
||||
expected_req,
|
||||
client_ch)
|
||||
};
|
||||
let actual_req = comm::recv(server_result_po);
|
||||
log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
|
||||
expected_req, actual_req));
|
||||
log(debug, #fmt("RESP: expected: '%s' actual: '%s'",
|
||||
expected_resp, actual_resp));
|
||||
assert str::contains(actual_req, expected_req);
|
||||
assert str::contains(actual_resp, expected_resp);
|
||||
}
|
||||
|
||||
fn impl_gl_tcp_ipv4_client(ip_str: str, port: uint,
|
||||
write_input: str) -> str {
|
||||
// pre-connection/input data
|
||||
let host_ip = ip::v4::parse_addr(ip_str);
|
||||
fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str,
|
||||
server_ch: comm::chan<str>) -> str {
|
||||
|
||||
// connect to remote host
|
||||
let connect_result = connect(host_ip, port);
|
||||
task::spawn_sched(task::manual_threads(1u)) {||
|
||||
let server_ip_addr = ip::v4::parse_addr(server_ip);
|
||||
let listen_result = listen(server_ip_addr, server_port, 128u)
|
||||
// this callback is ran on the loop.
|
||||
// .. should it go?
|
||||
{|new_conn, kill_ch|
|
||||
log(debug, "SERVER: new connection!");
|
||||
comm::listen {|cont_ch|
|
||||
task::spawn_sched(task::manual_threads(1u)) {||
|
||||
log(debug, "SERVER: starting worker for new req");
|
||||
|
||||
let accept_result = accept(new_conn);
|
||||
log(debug, "SERVER: after accept()");
|
||||
if result::is_failure(accept_result) {
|
||||
log(debug, "SERVER: error accept connection");
|
||||
let err_data = result::get_err(accept_result);
|
||||
comm::send(kill_ch, some(err_data));
|
||||
log(debug,
|
||||
"SERVER/WORKER: send on err cont ch");
|
||||
cont_ch.send(());
|
||||
}
|
||||
else {
|
||||
log(debug,
|
||||
"SERVER/WORKER: send on cont ch");
|
||||
cont_ch.send(());
|
||||
let sock = result::unwrap(accept_result);
|
||||
log(debug, "SERVER: successfully accepted"+
|
||||
"connection!");
|
||||
let received_req_bytes =
|
||||
tcp_read_single(sock);
|
||||
alt received_req_bytes {
|
||||
result::ok(data) {
|
||||
server_ch.send(
|
||||
str::from_bytes(data));
|
||||
log(debug, "SERVER: before write");
|
||||
tcp_write_single(sock, str::bytes(resp));
|
||||
log(debug, "SERVER: after write.. die");
|
||||
comm::send(kill_ch, none);
|
||||
}
|
||||
result::err(err_data) {
|
||||
comm::send(kill_ch, some(err_data));
|
||||
server_ch.send("");
|
||||
}
|
||||
}
|
||||
log(debug, "SERVER: worker spinning down");
|
||||
}
|
||||
}
|
||||
log(debug, "SERVER: waiting to recv on cont_ch");
|
||||
cont_ch.recv()
|
||||
};
|
||||
log(debug, "SERVER: recv'd on cont_ch..leaving listen cb");
|
||||
};
|
||||
// err check on listen_result
|
||||
if result::is_failure(listen_result) {
|
||||
let err_data = result::get_err(listen_result);
|
||||
log(debug, #fmt("SERVER: exited abnormally name %s msg %s",
|
||||
err_data.err_name, err_data.err_msg));
|
||||
}
|
||||
};
|
||||
let ret_val = server_ch.recv();
|
||||
log(debug, #fmt("SERVER: exited and got ret val: '%s'", ret_val));
|
||||
ret_val
|
||||
}
|
||||
|
||||
fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str,
|
||||
client_ch: comm::chan<str>) -> str {
|
||||
|
||||
let server_ip_addr = ip::v4::parse_addr(server_ip);
|
||||
|
||||
log(debug, "CLIENT: starting..");
|
||||
let connect_result = connect(server_ip_addr, server_port);
|
||||
if result::is_failure(connect_result) {
|
||||
log(debug, "CLIENT: failed to connect");
|
||||
let err_data = result::get_err(connect_result);
|
||||
log(debug, "tcp_connect_error received..");
|
||||
log(debug, #fmt("tcp connect error: %? %?", err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
log(debug, #fmt("CLIENT: connect err name: %s msg: %s",
|
||||
err_data.err_name, err_data.err_msg));
|
||||
""
|
||||
}
|
||||
|
||||
// this is our tcp_socket resource instance. It's dtor will
|
||||
// clean-up/close the underlying TCP stream when the fn scope
|
||||
// ends
|
||||
let sock = result::unwrap(connect_result);
|
||||
log(debug, "successful tcp connect");
|
||||
|
||||
// set up write data
|
||||
let write_data = [str::as_bytes(write_input) {|str_bytes|
|
||||
str_bytes
|
||||
}];
|
||||
|
||||
// write data to tcp socket
|
||||
let write_result = write(sock, write_data);
|
||||
if result::is_failure(write_result) {
|
||||
let err_data = result::get_err(write_result);
|
||||
log(debug, "tcp_write_error received..");
|
||||
log(debug, #fmt("tcp write error: %? %?", err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
}
|
||||
log(debug, "tcp::write successful");
|
||||
|
||||
// set up read data
|
||||
let mut total_read_data: [u8] = [];
|
||||
let read_start_result = read_start(sock);
|
||||
if result::is_failure(read_start_result) {
|
||||
let err_data = result::get_err(read_start_result);
|
||||
log(debug, "tcp read_start err received..");
|
||||
log(debug, #fmt("read_start error: %? %?", err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
}
|
||||
let reader_po = result::get(read_start_result);
|
||||
loop {
|
||||
let read_data_result = comm::recv(reader_po);
|
||||
if result::is_failure(read_data_result) {
|
||||
let err_data = result::get_err(read_data_result);
|
||||
log(debug, "read error data recv'd");
|
||||
log(debug, #fmt("read error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
else {
|
||||
let sock = result::unwrap(connect_result);
|
||||
let resp_bytes = str::bytes(resp);
|
||||
tcp_write_single(sock, resp_bytes);
|
||||
let read_result = tcp_read_single(sock);
|
||||
if read_result.is_failure() {
|
||||
log(debug, "CLIENT: failure to read");
|
||||
""
|
||||
}
|
||||
let new_data = result::unwrap(read_data_result);
|
||||
total_read_data += new_data;
|
||||
// theoretically, we could keep iterating, if
|
||||
// we expect the server on the other end to keep
|
||||
// streaming/chunking data to us, but..
|
||||
let read_stop_result = read_stop(sock);
|
||||
if result::is_failure(read_stop_result) {
|
||||
let err_data = result::get_err(read_stop_result);
|
||||
log(debug, "error while calling read_stop");
|
||||
log(debug, #fmt("read_stop error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
else {
|
||||
client_ch.send(str::from_bytes(read_result.get()));
|
||||
let ret_val = client_ch.recv();
|
||||
log(debug, #fmt("CLIENT: after client_ch recv ret: '%s'",
|
||||
ret_val));
|
||||
ret_val
|
||||
}
|
||||
break;
|
||||
}
|
||||
str::from_bytes(total_read_data)
|
||||
}
|
||||
|
||||
fn single_read_bytes_from(sock: tcp_socket) -> [u8] {
|
||||
let mut total_read_data: [u8] = [];
|
||||
let read_start_result = read_start(sock);
|
||||
if result::is_failure(read_start_result) {
|
||||
let err_data = result::get_err(read_start_result);
|
||||
log(debug, "srbf tcp read_start err received..");
|
||||
log(debug, #fmt("srbf read_start error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
fn tcp_read_single(sock: tcp_socket)
|
||||
-> result::result<[u8],tcp_err_data> {
|
||||
log(debug, "starting tcp_read_single");
|
||||
let rs_result = read_start(sock);
|
||||
if result::is_failure(rs_result) {
|
||||
let err_data = result::get_err(rs_result);
|
||||
result::err(err_data)
|
||||
}
|
||||
let reader_po = result::get(read_start_result);
|
||||
|
||||
let read_data_result = comm::recv(reader_po);
|
||||
if result::is_failure(read_data_result) {
|
||||
let err_data = result::get_err(read_data_result);
|
||||
log(debug, "srbf read error data recv'd");
|
||||
log(debug, #fmt("srbf read error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
else {
|
||||
log(debug, "before recv_timeout");
|
||||
let read_result = timer::recv_timeout(
|
||||
2000u, result::get(rs_result));
|
||||
log(debug, "after recv_timeout");
|
||||
alt read_result {
|
||||
none {
|
||||
log(debug, "tcp_read_single: timed out..");
|
||||
let err_data = {
|
||||
err_name: "TIMEOUT",
|
||||
err_msg: "req timed out"
|
||||
};
|
||||
result::err(err_data)
|
||||
}
|
||||
some(data_result) {
|
||||
log(debug, "tcp_read_single: got data");
|
||||
data_result
|
||||
}
|
||||
}
|
||||
}
|
||||
let new_data = result::unwrap(read_data_result);
|
||||
total_read_data += new_data;
|
||||
// theoretically, we could keep iterating, if
|
||||
// we expect the server on the other end to keep
|
||||
// streaming/chunking data to us, but..
|
||||
let read_stop_result = read_stop(sock);
|
||||
if result::is_failure(read_stop_result) {
|
||||
let err_data = result::get_err(read_stop_result);
|
||||
log(debug, "srbf error while calling read_stop");
|
||||
log(debug, #fmt("srbf read_stop error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
}
|
||||
total_read_data
|
||||
}
|
||||
|
||||
fn write_single_str(sock: tcp_socket, write_input: str) {
|
||||
// set up write data
|
||||
let write_data = [str::as_bytes(write_input) {|str_bytes|
|
||||
str_bytes
|
||||
}];
|
||||
|
||||
// write data to tcp socket
|
||||
let write_result = write(sock, write_data);
|
||||
fn tcp_write_single(sock: tcp_socket, val: [u8]) {
|
||||
let write_result = write(sock, [val]);
|
||||
if result::is_failure(write_result) {
|
||||
log(debug, "tcp_write_single: write failed!");
|
||||
let err_data = result::get_err(write_result);
|
||||
log(debug, "wss tcp_write_error received..");
|
||||
log(debug, #fmt("wss tcp write error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
log(debug, #fmt("tcp_write_single err name: %s msg: %s",
|
||||
err_data.err_name, err_data.err_msg));
|
||||
// meh. torn on what to do here.
|
||||
fail "tcp_write_single failed";
|
||||
}
|
||||
log(debug, "wss tcp::write successful");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1318,7 +1318,7 @@ mod test {
|
|||
assert str::contains(msg_from_server, server_resp_msg);
|
||||
}
|
||||
|
||||
// don't run this test on fbsd or 32bit linux
|
||||
// FIXME don't run on fbsd or linux 32 bit(#2064)
|
||||
#[cfg(target_os="win32")]
|
||||
#[cfg(target_os="darwin")]
|
||||
#[cfg(target_os="linux")]
|
||||
|
|
Loading…
Reference in a new issue