std: Remove PortSet. Not supported by new scheduler. Replace uses with SharedChan.
This commit is contained in:
parent
389aba0952
commit
ebd14c92f8
9 changed files with 38 additions and 119 deletions
|
@ -576,16 +576,12 @@ mod tests {
|
||||||
let (p, c) = comm::stream();
|
let (p, c) = comm::stream();
|
||||||
|
|
||||||
do task::spawn() || {
|
do task::spawn() || {
|
||||||
let p = comm::PortSet::new();
|
|
||||||
c.send(p.chan());
|
|
||||||
|
|
||||||
let arc_v : Arc<~[int]> = p.recv();
|
let arc_v : Arc<~[int]> = p.recv();
|
||||||
|
|
||||||
let v = (*arc_v.get()).clone();
|
let v = (*arc_v.get()).clone();
|
||||||
assert_eq!(v[3], 4);
|
assert_eq!(v[3], 4);
|
||||||
};
|
};
|
||||||
|
|
||||||
let c = p.recv();
|
|
||||||
c.send(arc_v.clone());
|
c.send(arc_v.clone());
|
||||||
|
|
||||||
assert_eq!(arc_v.get()[2], 3);
|
assert_eq!(arc_v.get()[2], 3);
|
||||||
|
|
|
@ -14,14 +14,10 @@ Message passing
|
||||||
|
|
||||||
#[allow(missing_doc)];
|
#[allow(missing_doc)];
|
||||||
|
|
||||||
use cast::{transmute, transmute_mut};
|
use cast::transmute;
|
||||||
use container::Container;
|
|
||||||
use either::{Either, Left, Right};
|
use either::{Either, Left, Right};
|
||||||
use kinds::Send;
|
use kinds::Send;
|
||||||
use option::{Option, Some, None};
|
use option::{Option, Some};
|
||||||
use uint;
|
|
||||||
use vec::OwnedVector;
|
|
||||||
use util::replace;
|
|
||||||
use unstable::sync::Exclusive;
|
use unstable::sync::Exclusive;
|
||||||
use rtcomm = rt::comm;
|
use rtcomm = rt::comm;
|
||||||
use rt;
|
use rt;
|
||||||
|
@ -143,81 +139,6 @@ impl<T: Send> Selectable for Port<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Treat many ports as one.
|
|
||||||
#[unsafe_mut_field(ports)]
|
|
||||||
pub struct PortSet<T> {
|
|
||||||
ports: ~[pipesy::Port<T>],
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Send> PortSet<T> {
|
|
||||||
pub fn new() -> PortSet<T> {
|
|
||||||
PortSet {
|
|
||||||
ports: ~[]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn add(&self, port: Port<T>) {
|
|
||||||
let Port { inner } = port;
|
|
||||||
let port = match inner {
|
|
||||||
Left(p) => p,
|
|
||||||
Right(_) => fail!("PortSet not implemented")
|
|
||||||
};
|
|
||||||
unsafe {
|
|
||||||
let self_ports = transmute_mut(&self.ports);
|
|
||||||
self_ports.push(port)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn chan(&self) -> Chan<T> {
|
|
||||||
let (po, ch) = stream();
|
|
||||||
self.add(po);
|
|
||||||
ch
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T:Send> GenericPort<T> for PortSet<T> {
|
|
||||||
fn try_recv(&self) -> Option<T> {
|
|
||||||
unsafe {
|
|
||||||
let self_ports = transmute_mut(&self.ports);
|
|
||||||
let mut result = None;
|
|
||||||
// we have to swap the ports array so we aren't borrowing
|
|
||||||
// aliasable mutable memory.
|
|
||||||
let mut ports = replace(self_ports, ~[]);
|
|
||||||
while result.is_none() && ports.len() > 0 {
|
|
||||||
let i = wait_many(ports);
|
|
||||||
match ports[i].try_recv() {
|
|
||||||
Some(m) => {
|
|
||||||
result = Some(m);
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// Remove this port.
|
|
||||||
let _ = ports.swap_remove(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*self_ports = ports;
|
|
||||||
result
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fn recv(&self) -> T {
|
|
||||||
self.try_recv().expect("port_set: endpoints closed")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Send> Peekable<T> for PortSet<T> {
|
|
||||||
fn peek(&self) -> bool {
|
|
||||||
// It'd be nice to use self.port.each, but that version isn't
|
|
||||||
// pure.
|
|
||||||
for uint::range(0, self.ports.len()) |i| {
|
|
||||||
let port: &pipesy::Port<T> = &self.ports[i];
|
|
||||||
if port.peek() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A channel that can be shared between many senders.
|
/// A channel that can be shared between many senders.
|
||||||
pub struct SharedChan<T> {
|
pub struct SharedChan<T> {
|
||||||
inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
|
inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
|
|
||||||
extern mod extra;
|
extern mod extra;
|
||||||
|
|
||||||
use std::comm::{PortSet, Chan, stream};
|
use std::comm::{SharedChan, Chan, stream};
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::os;
|
use std::os;
|
||||||
use std::task;
|
use std::task;
|
||||||
|
@ -30,7 +30,7 @@ enum request {
|
||||||
stop
|
stop
|
||||||
}
|
}
|
||||||
|
|
||||||
fn server(requests: &PortSet<request>, responses: &Chan<uint>) {
|
fn server(requests: &Port<request>, responses: &Chan<uint>) {
|
||||||
let mut count: uint = 0;
|
let mut count: uint = 0;
|
||||||
let mut done = false;
|
let mut done = false;
|
||||||
while !done {
|
while !done {
|
||||||
|
@ -50,9 +50,8 @@ fn server(requests: &PortSet<request>, responses: &Chan<uint>) {
|
||||||
|
|
||||||
fn run(args: &[~str]) {
|
fn run(args: &[~str]) {
|
||||||
let (from_child, to_parent) = stream();
|
let (from_child, to_parent) = stream();
|
||||||
let (from_parent_, to_child) = stream();
|
let (from_parent, to_child) = stream();
|
||||||
let from_parent = PortSet::new();
|
let to_child = SharedChan::new(to_child);
|
||||||
from_parent.add(from_parent_);
|
|
||||||
|
|
||||||
let size = uint::from_str(args[1]).get();
|
let size = uint::from_str(args[1]).get();
|
||||||
let workers = uint::from_str(args[2]).get();
|
let workers = uint::from_str(args[2]).get();
|
||||||
|
@ -60,8 +59,7 @@ fn run(args: &[~str]) {
|
||||||
let start = extra::time::precise_time_s();
|
let start = extra::time::precise_time_s();
|
||||||
let mut worker_results = ~[];
|
let mut worker_results = ~[];
|
||||||
for uint::range(0, workers) |_i| {
|
for uint::range(0, workers) |_i| {
|
||||||
let (from_parent_, to_child) = stream();
|
let to_child = to_child.clone();
|
||||||
from_parent.add(from_parent_);
|
|
||||||
let mut builder = task::task();
|
let mut builder = task::task();
|
||||||
builder.future_result(|r| worker_results.push(r));
|
builder.future_result(|r| worker_results.push(r));
|
||||||
do builder.spawn {
|
do builder.spawn {
|
||||||
|
|
|
@ -33,22 +33,24 @@ use std::u64;
|
||||||
use std::uint;
|
use std::uint;
|
||||||
|
|
||||||
fn fib(n: int) -> int {
|
fn fib(n: int) -> int {
|
||||||
fn pfib(c: &Chan<int>, n: int) {
|
fn pfib(c: &SharedChan<int>, n: int) {
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
c.send(0);
|
c.send(0);
|
||||||
} else if n <= 2 {
|
} else if n <= 2 {
|
||||||
c.send(1);
|
c.send(1);
|
||||||
} else {
|
} else {
|
||||||
let p = PortSet::new();
|
let (pp, cc) = stream();
|
||||||
let ch = p.chan();
|
let cc = SharedChan::new(cc);
|
||||||
|
let ch = cc.clone();
|
||||||
task::spawn(|| pfib(&ch, n - 1) );
|
task::spawn(|| pfib(&ch, n - 1) );
|
||||||
let ch = p.chan();
|
let ch = cc.clone();
|
||||||
task::spawn(|| pfib(&ch, n - 2) );
|
task::spawn(|| pfib(&ch, n - 2) );
|
||||||
c.send(p.recv() + p.recv());
|
c.send(pp.recv() + pp.recv());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (p, ch) = stream();
|
let (p, ch) = stream();
|
||||||
|
let ch = SharedChan::new(ch);
|
||||||
let _t = task::spawn(|| pfib(&ch, n) );
|
let _t = task::spawn(|| pfib(&ch, n) );
|
||||||
p.recv()
|
p.recv()
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,14 +14,14 @@ use std::comm;
|
||||||
use std::task;
|
use std::task;
|
||||||
|
|
||||||
pub fn main() {
|
pub fn main() {
|
||||||
let po = comm::PortSet::new();
|
let (po, ch) = comm::stream();
|
||||||
|
let ch = comm::SharedChan::new(ch);
|
||||||
|
|
||||||
// Spawn 10 tasks each sending us back one int.
|
// Spawn 10 tasks each sending us back one int.
|
||||||
let mut i = 10;
|
let mut i = 10;
|
||||||
while (i > 0) {
|
while (i > 0) {
|
||||||
info!(i);
|
info!(i);
|
||||||
let (p, ch) = comm::stream();
|
let ch = ch.clone();
|
||||||
po.add(p);
|
|
||||||
task::spawn({let i = i; || child(i, &ch)});
|
task::spawn({let i = i; || child(i, &ch)});
|
||||||
i = i - 1;
|
i = i - 1;
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ pub fn main() {
|
||||||
info!("main thread exiting");
|
info!("main thread exiting");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn child(x: int, ch: &comm::Chan<int>) {
|
fn child(x: int, ch: &comm::SharedChan<int>) {
|
||||||
info!(x);
|
info!(x);
|
||||||
ch.send(x);
|
ch.send(x);
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,13 +12,13 @@
|
||||||
|
|
||||||
extern mod extra;
|
extern mod extra;
|
||||||
|
|
||||||
use std::comm::Chan;
|
use std::comm::SharedChan;
|
||||||
use std::comm;
|
use std::comm;
|
||||||
use std::task;
|
use std::task;
|
||||||
|
|
||||||
pub fn main() { info!("===== WITHOUT THREADS ====="); test00(); }
|
pub fn main() { info!("===== WITHOUT THREADS ====="); test00(); }
|
||||||
|
|
||||||
fn test00_start(ch: &Chan<int>, message: int, count: int) {
|
fn test00_start(ch: &SharedChan<int>, message: int, count: int) {
|
||||||
info!("Starting test00_start");
|
info!("Starting test00_start");
|
||||||
let mut i: int = 0;
|
let mut i: int = 0;
|
||||||
while i < count {
|
while i < count {
|
||||||
|
@ -35,14 +35,15 @@ fn test00() {
|
||||||
|
|
||||||
info!("Creating tasks");
|
info!("Creating tasks");
|
||||||
|
|
||||||
let po = comm::PortSet::new();
|
let (po, ch) = comm::stream();
|
||||||
|
let ch = comm::SharedChan::new(ch);
|
||||||
|
|
||||||
let mut i: int = 0;
|
let mut i: int = 0;
|
||||||
|
|
||||||
// Create and spawn tasks...
|
// Create and spawn tasks...
|
||||||
let mut results = ~[];
|
let mut results = ~[];
|
||||||
while i < number_of_tasks {
|
while i < number_of_tasks {
|
||||||
let ch = po.chan();
|
let ch = ch.clone();
|
||||||
let mut builder = task::task();
|
let mut builder = task::task();
|
||||||
builder.future_result(|r| results.push(r));
|
builder.future_result(|r| results.push(r));
|
||||||
builder.spawn({
|
builder.spawn({
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
// option. This file may not be copied, modified, or distributed
|
// option. This file may not be copied, modified, or distributed
|
||||||
// except according to those terms.
|
// except according to those terms.
|
||||||
|
|
||||||
use std::comm::Chan;
|
use std::comm::SharedChan;
|
||||||
use std::comm;
|
use std::comm;
|
||||||
|
|
||||||
pub fn main() { test00(); }
|
pub fn main() { test00(); }
|
||||||
|
@ -16,11 +16,12 @@ pub fn main() { test00(); }
|
||||||
fn test00() {
|
fn test00() {
|
||||||
let mut r: int = 0;
|
let mut r: int = 0;
|
||||||
let mut sum: int = 0;
|
let mut sum: int = 0;
|
||||||
let p = comm::PortSet::new();
|
let (p, ch) = comm::stream();
|
||||||
let c0 = p.chan();
|
let ch = SharedChan::new(ch);
|
||||||
let c1 = p.chan();
|
let c0 = ch.clone();
|
||||||
let c2 = p.chan();
|
let c1 = ch.clone();
|
||||||
let c3 = p.chan();
|
let c2 = ch.clone();
|
||||||
|
let c3 = ch.clone();
|
||||||
let number_of_messages: int = 1000;
|
let number_of_messages: int = 1000;
|
||||||
let mut i: int = 0;
|
let mut i: int = 0;
|
||||||
while i < number_of_messages {
|
while i < number_of_messages {
|
||||||
|
|
|
@ -17,7 +17,7 @@ use std::task;
|
||||||
|
|
||||||
pub fn main() { test00(); }
|
pub fn main() { test00(); }
|
||||||
|
|
||||||
fn test00_start(c: &comm::Chan<int>, start: int, number_of_messages: int) {
|
fn test00_start(c: &comm::SharedChan<int>, start: int, number_of_messages: int) {
|
||||||
let mut i: int = 0;
|
let mut i: int = 0;
|
||||||
while i < number_of_messages { c.send(start + i); i += 1; }
|
while i < number_of_messages { c.send(start + i); i += 1; }
|
||||||
}
|
}
|
||||||
|
@ -25,22 +25,23 @@ fn test00_start(c: &comm::Chan<int>, start: int, number_of_messages: int) {
|
||||||
fn test00() {
|
fn test00() {
|
||||||
let mut r: int = 0;
|
let mut r: int = 0;
|
||||||
let mut sum: int = 0;
|
let mut sum: int = 0;
|
||||||
let p = comm::PortSet::new();
|
let (p, ch) = comm::stream();
|
||||||
|
let ch = comm::SharedChan::new(ch);
|
||||||
let number_of_messages: int = 10;
|
let number_of_messages: int = 10;
|
||||||
|
|
||||||
let c = p.chan();
|
let c = ch.clone();
|
||||||
do task::spawn || {
|
do task::spawn || {
|
||||||
test00_start(&c, number_of_messages * 0, number_of_messages);
|
test00_start(&c, number_of_messages * 0, number_of_messages);
|
||||||
}
|
}
|
||||||
let c = p.chan();
|
let c = ch.clone();
|
||||||
do task::spawn || {
|
do task::spawn || {
|
||||||
test00_start(&c, number_of_messages * 1, number_of_messages);
|
test00_start(&c, number_of_messages * 1, number_of_messages);
|
||||||
}
|
}
|
||||||
let c = p.chan();
|
let c = ch.clone();
|
||||||
do task::spawn || {
|
do task::spawn || {
|
||||||
test00_start(&c, number_of_messages * 2, number_of_messages);
|
test00_start(&c, number_of_messages * 2, number_of_messages);
|
||||||
}
|
}
|
||||||
let c = p.chan();
|
let c = ch.clone();
|
||||||
do task::spawn || {
|
do task::spawn || {
|
||||||
test00_start(&c, number_of_messages * 3, number_of_messages);
|
test00_start(&c, number_of_messages * 3, number_of_messages);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,9 +25,8 @@ fn test00_start(c: &comm::Chan<int>, number_of_messages: int) {
|
||||||
fn test00() {
|
fn test00() {
|
||||||
let r: int = 0;
|
let r: int = 0;
|
||||||
let mut sum: int = 0;
|
let mut sum: int = 0;
|
||||||
let p = comm::PortSet::new();
|
let (p, ch) = comm::stream();
|
||||||
let number_of_messages: int = 10;
|
let number_of_messages: int = 10;
|
||||||
let ch = p.chan();
|
|
||||||
|
|
||||||
let mut result = None;
|
let mut result = None;
|
||||||
let mut builder = task::task();
|
let mut builder = task::task();
|
||||||
|
|
Loading…
Reference in a new issue