From c7e967148c0de44efe325a9ab8c25b990da53acb Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Tue, 12 Jul 2011 11:13:15 -0700 Subject: [PATCH] Task-ified the word count program. This meant most of the generic-ness of it had to go away, since our type system doesn't quite support it yet. Hopefully someday... This version has lots of memory management errors. My next commit will hopefully fix these. --- src/test/bench/task-perf/word-count.rs | 174 +++++++++++++++++-------- 1 file changed, 119 insertions(+), 55 deletions(-) diff --git a/src/test/bench/task-perf/word-count.rs b/src/test/bench/task-perf/word-count.rs index 960ca9efcee..e22e77d0a53 100644 --- a/src/test/bench/task-perf/word-count.rs +++ b/src/test/bench/task-perf/word-count.rs @@ -18,6 +18,35 @@ import std::str; import std::vec; import std::map; +fn map(str filename, map_reduce::putter emit) { + auto f = io::file_reader(filename); + + while(true) { + alt(read_word(f)) { + case (some(?w)) { + emit(w, "1"); + } + case (none) { + break; + } + } + } +} + +fn reduce(str word, map_reduce::getter get) { + auto count = 0; + + while(true) { + alt(get()) { + case(some(_)) { count += 1 } + case(none) { break } + } + } + + auto out = io::stdout(); + out.write_line(#fmt("%s: %d", word, count)); +} + mod map_reduce { export putter; export getter; @@ -33,42 +62,106 @@ mod map_reduce { type reducer = fn(str, getter); + tag ctrl_proto { + find_reducer(str, chan[chan[reduce_proto]]); + mapper_done; + } - fn map_reduce (vec[str] inputs, - mapper f, - reducer reduce) { - auto intermediates = map::new_str_hash[vec[str]](); + tag reduce_proto { + emit_val(str); + done; + } - fn emit(&map::hashmap[str, vec[str]] im, + fn start_mappers(chan[ctrl_proto] ctrl, + vec[str] inputs) { + for(str i in inputs) { + spawn map_task(ctrl, i); + } + } + + fn map_task(chan[ctrl_proto] ctrl, + str input) { + + auto intermediates = map::new_str_hash(); + + fn emit(&map::hashmap[str, chan[reduce_proto]] im, + chan[ctrl_proto] ctrl, str key, str val) { - auto old = []; - alt(im.remove(key)) { - case (some(?v)) { - old = v; + auto c; + alt(im.find(key)) { + case(some(?_c)) { + c = _c + } + case(none) { + auto p = port[chan[reduce_proto]](); + ctrl <| find_reducer(key, chan(p)); + p |> c; + im.insert(key, c); } - case (none) { } } - - im.insert(key, old + [val]); + c <| emit_val(val); } - for (str i in inputs) { - f(i, bind emit(intermediates, _, _)); - } + map(input, bind emit(intermediates, ctrl, _, _)); + ctrl <| mapper_done; + } - fn get(vec[str] vals, &mutable uint i) -> option[str] { - i += 1u; - if(i <= vec::len(vals)) { - some(vals.(i - 1u)) - } - else { - none + fn reduce_task(str key, chan[chan[reduce_proto]] out) { + auto p = port(); + + out <| chan(p); + + fn get(port[reduce_proto] p) -> option[str] { + auto m; + p |> m; + + alt(m) { + case(emit_val(?v)) { ret some(v); } + case(done) { ret none; } } } - for each (@tup(str, vec[str]) kv in intermediates.items()) { - auto i = 0u; - reduce(kv._0, bind get(kv._1, i)); + reduce(key, bind get(p)); + } + + fn map_reduce (vec[str] inputs) { + auto ctrl = port[ctrl_proto](); + + // This task becomes the master control task. It spawns others + // to do the rest. + + let map::hashmap[str, chan[reduce_proto]] reducers; + + reducers = map::new_str_hash(); + + start_mappers(chan(ctrl), inputs); + + auto num_mappers = vec::len(inputs) as int; + + while(num_mappers > 0) { + auto m; + ctrl |> m; + + alt(m) { + case(mapper_done) { num_mappers -= 1; } + case(find_reducer(?k, ?cc)) { + auto c; + alt(reducers.find(k)) { + case(some(?_c)) { c = _c; } + case(none) { + auto p = port(); + spawn reduce_task(k, chan(p)); + p |> c; + reducers.insert(k, c); + } + } + cc <| c; + } + } + } + + for each(@tup(str, chan[reduce_proto]) kv in reducers.items()) { + kv._1 <| done; } } } @@ -81,36 +174,7 @@ fn main(vec[str] argv) { fail; } - fn map(str filename, map_reduce::putter emit) { - auto f = io::file_reader(filename); - - while(true) { - alt(read_word(f)) { - case (some(?w)) { - emit(w, "1"); - } - case (none) { - break; - } - } - } - } - - fn reduce(str word, map_reduce::getter get) { - auto count = 0; - - while(true) { - alt(get()) { - case(some(_)) { count += 1 } - case(none) { break } - } - } - - auto out = io::stdout(); - out.write_line(#fmt("%s: %d", word, count)); - } - - map_reduce::map_reduce(vec::slice(argv, 1u, vec::len(argv)), map, reduce); + map_reduce::map_reduce(vec::slice(argv, 1u, vec::len(argv))); } fn read_word(io::reader r) -> option[str] {