De-mode pipes

This commit is contained in:
Eric Holk 2012-08-14 16:39:57 -07:00
parent 51d98d9c7b
commit 8be0f665bc
2 changed files with 47 additions and 42 deletions

View file

@ -72,6 +72,10 @@ bounded and unbounded protocols allows for less code duplication.
*/
// NB: transitionary, de-mode-ing.
#[forbid(deprecated_mode)];
#[forbid(deprecated_pattern)];
import unsafe::{forget, reinterpret_cast, transmute};
import either::{either, left, right};
import option::unwrap;
@ -143,15 +147,15 @@ struct packet_header {
// Returns the old state.
unsafe fn mark_blocked(this: *rust_task) -> state {
rustrt::rust_task_ref(this);
let old_task = swap_task(self.blocked_task, this);
let old_task = swap_task(&mut self.blocked_task, this);
assert old_task.is_null();
swap_state_acq(self.state, blocked)
swap_state_acq(&mut self.state, blocked)
}
unsafe fn unblock() {
let old_task = swap_task(self.blocked_task, ptr::null());
let old_task = swap_task(&mut self.blocked_task, ptr::null());
if !old_task.is_null() { rustrt::rust_task_deref(old_task) }
match swap_state_acq(self.state, empty) {
match swap_state_acq(&mut self.state, empty) {
empty | blocked => (),
terminated => self.state = terminated,
full => self.state = full
@ -224,7 +228,7 @@ fn packet<T: send>() -> *packet<T> {
#[doc(hidden)]
fn entangle_buffer<T: send, Tstart: send>(
-buffer: ~buffer<T>,
+buffer: ~buffer<T>,
init: fn(*libc::c_void, x: &T) -> *packet<Tstart>)
-> (send_packet_buffered<Tstart, T>, recv_packet_buffered<Tstart, T>)
{
@ -247,27 +251,27 @@ extern mod rusti {
// If I call the rusti versions directly from a polymorphic function,
// I get link errors. This is a bug that needs investigated more.
#[doc(hidden)]
fn atomic_xchng_rel(&dst: int, src: int) -> int {
rusti::atomic_xchng_rel(dst, src)
fn atomic_xchng_rel(dst: &mut int, src: int) -> int {
rusti::atomic_xchng_rel(*dst, src)
}
#[doc(hidden)]
fn atomic_add_acq(&dst: int, src: int) -> int {
rusti::atomic_add_acq(dst, src)
fn atomic_add_acq(dst: &mut int, src: int) -> int {
rusti::atomic_add_acq(*dst, src)
}
#[doc(hidden)]
fn atomic_sub_rel(&dst: int, src: int) -> int {
rusti::atomic_sub_rel(dst, src)
fn atomic_sub_rel(dst: &mut int, src: int) -> int {
rusti::atomic_sub_rel(*dst, src)
}
#[doc(hidden)]
fn swap_task(&dst: *rust_task, src: *rust_task) -> *rust_task {
fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task {
// It might be worth making both acquire and release versions of
// this.
unsafe {
reinterpret_cast(rusti::atomic_xchng(
*(ptr::mut_addr_of(dst) as *mut int),
*(ptr::mut_addr_of(*dst) as *mut int),
src as int))
}
}
@ -302,19 +306,19 @@ fn wait_event(this: *rust_task) -> *libc::c_void {
}
#[doc(hidden)]
fn swap_state_acq(&dst: state, src: state) -> state {
fn swap_state_acq(dst: &mut state, src: state) -> state {
unsafe {
reinterpret_cast(rusti::atomic_xchng_acq(
*(ptr::mut_addr_of(dst) as *mut int),
*(ptr::mut_addr_of(*dst) as *mut int),
src as int))
}
}
#[doc(hidden)]
fn swap_state_rel(&dst: state, src: state) -> state {
fn swap_state_rel(dst: &mut state, src: state) -> state {
unsafe {
reinterpret_cast(rusti::atomic_xchng_rel(
*(ptr::mut_addr_of(dst) as *mut int),
*(ptr::mut_addr_of(*dst) as *mut int),
src as int))
}
}
@ -329,7 +333,7 @@ struct buffer_resource<T: send> {
new(+b: ~buffer<T>) {
//let p = ptr::addr_of(*b);
//error!{"take %?", p};
atomic_add_acq(b.header.ref_count, 1);
atomic_add_acq(&mut b.header.ref_count, 1);
self.buffer = b;
}
@ -337,7 +341,7 @@ struct buffer_resource<T: send> {
let b = move_it!{self.buffer};
//let p = ptr::addr_of(*b);
//error!{"drop %?", p};
let old_count = atomic_sub_rel(b.header.ref_count, 1);
let old_count = atomic_sub_rel(&mut b.header.ref_count, 1);
//let old_count = atomic_xchng_rel(b.header.ref_count, 0);
if old_count == 1 {
// The new count is 0.
@ -351,15 +355,15 @@ struct buffer_resource<T: send> {
}
#[doc(hidden)]
fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
-payload: T) -> bool {
fn send<T: send, Tbuffer: send>(+p: send_packet_buffered<T, Tbuffer>,
+payload: T) -> bool {
let header = p.header();
let p_ = p.unwrap();
let p = unsafe { &*p_ };
assert ptr::addr_of(p.header) == header;
assert p.payload == none;
p.payload <- some(payload);
let old_state = swap_state_rel(p.header.state, full);
let old_state = swap_state_rel(&mut p.header.state, full);
match old_state {
empty => {
// Yay, fastpath.
@ -371,7 +375,7 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
full => fail ~"duplicate send",
blocked => {
debug!{"waking up task for %?", p_};
let old_task = swap_task(p.header.blocked_task, ptr::null());
let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::task_signal_event(
old_task, ptr::addr_of(p.header) as *libc::c_void);
@ -395,7 +399,7 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
Fails if the sender closes the connection.
*/
fn recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>) -> T {
fn recv<T: send, Tbuffer: send>(+p: recv_packet_buffered<T, Tbuffer>) -> T {
option::unwrap_expect(try_recv(p), "connection closed")
}
@ -405,7 +409,7 @@ Returns `none` if the sender has closed the connection without sending
a message, or `some(T)` if a message was received.
*/
fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
fn try_recv<T: send, Tbuffer: send>(+p: recv_packet_buffered<T, Tbuffer>)
-> option<T>
{
let p_ = p.unwrap();
@ -417,7 +421,8 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
drop {
if task::failing() {
self.p.state = terminated;
let old_task = swap_task(self.p.blocked_task, ptr::null());
let old_task = swap_task(&mut self.p.blocked_task,
ptr::null());
if !old_task.is_null() {
rustrt::rust_task_deref(old_task);
}
@ -443,13 +448,13 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
let this = rustrt::rust_get_task();
rustrt::task_clear_event_reject(this);
rustrt::rust_task_ref(this);
let old_task = swap_task(p.header.blocked_task, this);
let old_task = swap_task(&mut p.header.blocked_task, this);
assert old_task.is_null();
let mut first = true;
let mut count = SPIN_COUNT;
loop {
rustrt::task_clear_event_reject(this);
let old_state = swap_state_acq(p.header.state,
let old_state = swap_state_acq(&mut p.header.state,
blocked);
match old_state {
empty => {
@ -474,7 +479,7 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
full => {
let mut payload = none;
payload <-> p.payload;
let old_task = swap_task(p.header.blocked_task, ptr::null());
let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::rust_task_deref(old_task);
}
@ -486,7 +491,7 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
// casted too big of a number to a state.
assert old_state == terminated;
let old_task = swap_task(p.header.blocked_task, ptr::null());
let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::rust_task_deref(old_task);
}
@ -498,7 +503,7 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
}
/// Returns true if messages are available.
pure fn peek<T: send, Tb: send>(p: recv_packet_buffered<T, Tb>) -> bool {
pure fn peek<T: send, Tb: send>(p: &recv_packet_buffered<T, Tb>) -> bool {
match unsafe {(*p.header()).state} {
empty => false,
blocked => fail ~"peeking on blocked packet",
@ -508,20 +513,20 @@ pure fn peek<T: send, Tb: send>(p: recv_packet_buffered<T, Tb>) -> bool {
impl<T: send, Tb: send> recv_packet_buffered<T, Tb> {
pure fn peek() -> bool {
peek(self)
peek(&self)
}
}
#[doc(hidden)]
fn sender_terminate<T: send>(p: *packet<T>) {
let p = unsafe { &*p };
match swap_state_rel(p.header.state, terminated) {
match swap_state_rel(&mut p.header.state, terminated) {
empty => {
// The receiver will eventually clean up.
}
blocked => {
// wake up the target
let old_task = swap_task(p.header.blocked_task, ptr::null());
let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::task_signal_event(
old_task,
@ -544,13 +549,13 @@ fn sender_terminate<T: send>(p: *packet<T>) {
#[doc(hidden)]
fn receiver_terminate<T: send>(p: *packet<T>) {
let p = unsafe { &*p };
match swap_state_rel(p.header.state, terminated) {
match swap_state_rel(&mut p.header.state, terminated) {
empty => {
assert p.header.blocked_task.is_null();
// the sender will clean up
}
blocked => {
let old_task = swap_task(p.header.blocked_task, ptr::null());
let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::rust_task_deref(old_task);
assert old_task == rustrt::rust_get_task();
@ -682,7 +687,7 @@ fn selecti<T: selectable>(endpoints: &[T]) -> uint {
}
/// Returns 0 or 1 depending on which endpoint is ready to receive
fn select2i<A: selectable, B: selectable>(a: A, b: B) -> either<(), ()> {
fn select2i<A: selectable, B: selectable>(a: &A, b: &B) -> either<(), ()> {
match wait_many([a.header(), b.header()]/_) {
0 => left(()),
1 => right(()),
@ -1004,7 +1009,7 @@ impl<T: send> port<T>: recv<T> {
let mut endp = none;
endp <-> self.endp;
let peek = match endp {
some(endp) => pipes::peek(endp),
some(endp) => pipes::peek(&endp),
none => fail ~"peeking empty stream"
};
self.endp <-> endp;
@ -1122,7 +1127,7 @@ impl<T: send, U: send, Left: selectable recv<T>, Right: selectable recv<U>>
fn select() -> either<T, U> {
match self {
(lp, rp) => match select2i(lp, rp) {
(lp, rp) => match select2i(&lp, &rp) {
left(()) => left (lp.recv()),
right(()) => right(rp.recv())
}
@ -1131,7 +1136,7 @@ impl<T: send, U: send, Left: selectable recv<T>, Right: selectable recv<U>>
fn try_select() -> either<option<T>, option<U>> {
match self {
(lp, rp) => match select2i(lp, rp) {
(lp, rp) => match select2i(&lp, &rp) {
left(()) => left (lp.try_recv()),
right(()) => right(rp.try_recv())
}

View file

@ -13,9 +13,9 @@ proto! oneshot {
fn main() {
let (c, p) = oneshot::init();
assert !pipes::peek(p);
assert !pipes::peek(&p);
oneshot::client::signal(c);
assert pipes::peek(p);
assert pipes::peek(&p);
}