More work on word-count.

Updated the MapReduce protocol so that it's correct more often. It's
still not perfect, but the bugs repro less often now.

Also found a race condition in channel sending. The problem is that
send and receive both need to refer to the _unread field in
circular_buffer. For now I just grabbed the port lock to send. We can
probably get around this by using atomics instead.
This commit is contained in:
Eric Holk 2011-07-21 12:11:05 -07:00
parent 8f2254b8c2
commit 8878b128ba
6 changed files with 134 additions and 36 deletions

View file

@ -5,6 +5,9 @@ native "rust" mod rustrt {
fn unsupervise(); fn unsupervise();
fn pin_task(); fn pin_task();
fn unpin_task(); fn unpin_task();
fn clone_chan(*rust_chan c) -> *rust_chan;
type rust_chan;
} }
/** /**
@ -44,6 +47,11 @@ fn unpin() {
rustrt::unpin_task(); rustrt::unpin_task();
} }
fn clone_chan[T](chan[T] c) -> chan[T] {
auto cloned = rustrt::clone_chan(unsafe::reinterpret_cast(c));
ret unsafe::reinterpret_cast(cloned);
}
// Local Variables: // Local Variables:
// mode: rust; // mode: rust;
// fill-column: 78; // fill-column: 78;

View file

@ -850,6 +850,11 @@ unpin_task(rust_task *task) {
task->unpin(); task->unpin();
} }
extern "C" CDECL rust_chan *
clone_chan(rust_task *task, rust_chan *chan) {
return chan->clone(task);
}
// //
// Local Variables: // Local Variables:
// mode: C++ // mode: C++

View file

@ -71,9 +71,16 @@ void rust_chan::disassociate() {
* Attempt to send data to the associated port. * Attempt to send data to the associated port.
*/ */
void rust_chan::send(void *sptr) { void rust_chan::send(void *sptr) {
rust_scheduler *sched = kernel->sched;
I(sched, !port->is_proxy());
rust_port *target_port = port->referent();
// TODO: We can probably avoid this lock by using atomic operations in
// circular_buffer.
scoped_lock with(target_port->lock);
buffer.enqueue(sptr); buffer.enqueue(sptr);
rust_scheduler *sched = kernel->sched;
if (!is_associated()) { if (!is_associated()) {
W(sched, is_associated(), W(sched, is_associated(),
"rust_chan::transmit with no associated port."); "rust_chan::transmit with no associated port.");
@ -88,8 +95,6 @@ void rust_chan::send(void *sptr) {
task->get_handle(), port->as_proxy()->handle()); task->get_handle(), port->as_proxy()->handle());
buffer.dequeue(NULL); buffer.dequeue(NULL);
} else { } else {
rust_port *target_port = port->referent();
scoped_lock with(target_port->lock);
if (target_port->task->blocked_on(target_port)) { if (target_port->task->blocked_on(target_port)) {
DLOG(sched, comm, "dequeued in rendezvous_ptr"); DLOG(sched, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(target_port->task->rendezvous_ptr); buffer.dequeue(target_port->task->rendezvous_ptr);

View file

@ -1,5 +1,6 @@
align_of align_of
check_claims check_claims
clone_chan
debug_box debug_box
debug_fn debug_fn
debug_obj debug_obj

View file

@ -17,34 +17,49 @@ import std::option::none;
import std::str; import std::str;
import std::vec; import std::vec;
import std::map; import std::map;
import std::ivec;
import std::time;
import std::u64;
import std::task;
import clone = std::task::clone_chan;
fn map(str filename, map_reduce::putter emit) { fn map(str filename, map_reduce::putter emit) {
// log_err "mapping " + filename;
auto f = io::file_reader(filename); auto f = io::file_reader(filename);
while(true) { while(true) {
alt(read_word(f)) { alt(read_word(f)) {
case (some(?w)) { case (some(?w)) {
emit(w, "1"); emit(w, 1);
} }
case (none) { case (none) {
break; break;
} }
} }
} }
// log_err "done mapping " + filename;
} }
fn reduce(str word, map_reduce::getter get) { fn reduce(str word, map_reduce::getter get) {
// log_err "reducing " + word;
auto count = 0; auto count = 0;
while(true) { while(true) {
alt(get()) { alt(get()) {
case(some(_)) { count += 1 } some(_) {
case(none) { break } // log_err "received word " + word;
count += 1;
}
none { break }
} }
} }
auto out = io::stdout(); // auto out = io::stdout();
out.write_line(#fmt("%s: %d", word, count)); // out.write_line(#fmt("%s: %d", word, count));
// log_err "reduce " + word + " done.";
} }
mod map_reduce { mod map_reduce {
@ -54,74 +69,115 @@ mod map_reduce {
export reducer; export reducer;
export map_reduce; export map_reduce;
type putter = fn(str, str) -> (); type putter = fn(str, int) -> ();
type mapper = fn(str, putter); type mapper = fn(str, putter);
type getter = fn() -> option[str]; type getter = fn() -> option[int];
type reducer = fn(str, getter); type reducer = fn(str, getter);
tag ctrl_proto { tag ctrl_proto {
find_reducer(str, chan[chan[reduce_proto]]); find_reducer(u8[], chan[chan[reduce_proto]]);
mapper_done; mapper_done;
} }
tag reduce_proto { tag reduce_proto {
emit_val(str); emit_val(int);
done; done;
ref;
release;
} }
fn start_mappers(chan[ctrl_proto] ctrl, fn start_mappers(chan[ctrl_proto] ctrl,
vec[str] inputs) { vec[str] inputs) -> task[] {
auto tasks = ~[];
// log_err "starting mappers";
for(str i in inputs) { for(str i in inputs) {
spawn map_task(ctrl, i); // log_err "starting mapper for " + i;
tasks += ~[spawn map_task(ctrl, i)];
} }
// log_err "done starting mappers";
ret tasks;
} }
fn map_task(chan[ctrl_proto] ctrl, fn map_task(chan[ctrl_proto] ctrl,
str input) { str input) {
// log_err "map_task " + input;
auto intermediates = map::new_str_hash(); auto intermediates = map::new_str_hash();
fn emit(&map::hashmap[str, chan[reduce_proto]] im, fn emit(&map::hashmap[str, chan[reduce_proto]] im,
chan[ctrl_proto] ctrl, chan[ctrl_proto] ctrl,
str key, str val) { str key, int val) {
// log_err "emitting " + key;
auto c; auto c;
alt(im.find(key)) { alt(im.find(key)) {
case(some(?_c)) { some(?_c) {
// log_err "reusing saved channel for " + key;
c = _c c = _c
} }
case(none) { none {
// log_err "fetching new channel for " + key;
auto p = port[chan[reduce_proto]](); auto p = port[chan[reduce_proto]]();
ctrl <| find_reducer(key, chan(p)); auto keyi = str::bytes_ivec(key);
ctrl <| find_reducer(keyi, chan(p));
p |> c; p |> c;
im.insert(key, c); im.insert(key, clone(c));
c <| ref;
} }
} }
c <| emit_val(val); c <| emit_val(val);
} }
map(input, bind emit(intermediates, ctrl, _, _)); map(input, bind emit(intermediates, ctrl, _, _));
for each(@tup(str, chan[reduce_proto]) kv in intermediates.items()) {
// log_err "sending done to reducer for " + kv._0;
kv._1 <| release;
}
ctrl <| mapper_done; ctrl <| mapper_done;
// log_err "~map_task " + input;
} }
fn reduce_task(str key, chan[chan[reduce_proto]] out) { fn reduce_task(str key, chan[chan[reduce_proto]] out) {
// log_err "reduce_task " + key;
auto p = port(); auto p = port();
out <| chan(p); out <| chan(p);
fn get(port[reduce_proto] p) -> option[str] { auto ref_count = 0;
auto m; auto is_done = false;
p |> m;
alt(m) { fn get(&port[reduce_proto] p, &mutable int ref_count,
case(emit_val(?v)) { ret some(v); } &mutable bool is_done) -> option[int] {
case(done) { ret none; } while (!is_done || ref_count > 0) {
auto m;
p |> m;
alt(m) {
emit_val(?v) {
// log_err #fmt("received %d", v);
ret some(v);
}
done {
// log_err "all done";
is_done = true;
}
ref {
ref_count += 1;
}
release {
ref_count -= 1;
}
}
} }
ret none;
} }
reduce(key, bind get(p)); reduce(key, bind get(p, ref_count, is_done));
// log_err "~reduce_task " + key;
} }
fn map_reduce (vec[str] inputs) { fn map_reduce (vec[str] inputs) {
@ -134,7 +190,7 @@ mod map_reduce {
reducers = map::new_str_hash(); reducers = map::new_str_hash();
start_mappers(chan(ctrl), inputs); auto tasks = start_mappers(chan(ctrl), inputs);
auto num_mappers = vec::len(inputs) as int; auto num_mappers = vec::len(inputs) as int;
@ -143,26 +199,42 @@ mod map_reduce {
ctrl |> m; ctrl |> m;
alt(m) { alt(m) {
case(mapper_done) { num_mappers -= 1; } mapper_done {
case(find_reducer(?k, ?cc)) { // log_err "received mapper terminated.";
num_mappers -= 1;
}
find_reducer(?ki, ?cc) {
auto c; auto c;
auto k = str::unsafe_from_bytes_ivec(ki);
// log_err "finding reducer for " + k;
alt(reducers.find(k)) { alt(reducers.find(k)) {
case(some(?_c)) { c = _c; } some(?_c) {
case(none) { // log_err "reusing existing reducer for " + k;
c = _c;
}
none {
// log_err "creating new reducer for " + k;
auto p = port(); auto p = port();
spawn reduce_task(k, chan(p)); tasks += ~[spawn reduce_task(k, chan(p))];
p |> c; p |> c;
reducers.insert(k, c); reducers.insert(k, c);
} }
} }
cc <| c; cc <| clone(c);
} }
} }
} }
for each(@tup(str, chan[reduce_proto]) kv in reducers.items()) { for each(@tup(str, chan[reduce_proto]) kv in reducers.items()) {
// log_err "sending done to reducer for " + kv._0;
kv._1 <| done; kv._1 <| done;
} }
// log_err #fmt("joining %u tasks", ivec::len(tasks));
for (task t in tasks) {
task::join(t);
}
// log_err "control task done.";
} }
} }
@ -174,7 +246,14 @@ fn main(vec[str] argv) {
fail; fail;
} }
auto start = time::precise_time_ns();
map_reduce::map_reduce(vec::slice(argv, 1u, vec::len(argv))); map_reduce::map_reduce(vec::slice(argv, 1u, vec::len(argv)));
auto stop = time::precise_time_ns();
auto elapsed = stop - start;
elapsed /= 1000000u64;
log_err "MapReduce completed in " + u64::str(elapsed) + "ms";
} }
fn read_word(io::reader r) -> option[str] { fn read_word(io::reader r) -> option[str] {