From 62bc6b51136760b1d4f4b691aaa089bdb9bf0af5 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Sat, 23 Jul 2011 19:03:02 -0700 Subject: [PATCH] Per-thread scheduling. Closes #682. Tasks are spawned on a random thread. Currently they stay there, but we should add task migration and load balancing in the future. This should drammatically improve our task performance benchmarks. --- src/rt/circular_buffer.cpp | 84 +++++++++++++++--------------- src/rt/rust.cpp | 7 ++- src/rt/rust_chan.cpp | 50 +++++++++--------- src/rt/rust_kernel.cpp | 85 +++++++++++++++---------------- src/rt/rust_kernel.h | 42 ++++++++------- src/rt/rust_scheduler.cpp | 56 +++++++++++--------- src/rt/rust_scheduler.h | 16 ++++-- src/rt/rust_task.cpp | 14 +++-- src/rt/rust_task.h | 1 + src/rt/rust_upcall.cpp | 10 ++-- src/rt/rust_util.h | 3 +- src/rt/sync/sync.h | 2 +- src/rt/test/rust_test_runtime.cpp | 13 +++-- src/test/run-pass/lib-task.rs | 41 +++++++++++++++ 14 files changed, 239 insertions(+), 185 deletions(-) create mode 100644 src/test/run-pass/lib-task.rs diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp index b645a08e563..aa0127d8c25 100644 --- a/src/rt/circular_buffer.cpp +++ b/src/rt/circular_buffer.cpp @@ -5,7 +5,6 @@ #include "rust_internal.h" circular_buffer::circular_buffer(rust_kernel *kernel, size_t unit_sz) : - sched(kernel->sched), kernel(kernel), unit_sz(unit_sz), _buffer_sz(initial_size()), @@ -13,26 +12,26 @@ circular_buffer::circular_buffer(rust_kernel *kernel, size_t unit_sz) : _unread(0), _buffer((uint8_t *)kernel->malloc(_buffer_sz, "circular_buffer")) { - A(sched, unit_sz, "Unit size must be larger than zero."); + // A(sched, unit_sz, "Unit size must be larger than zero."); - DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)" - "-> circular_buffer=0x%" PRIxPTR, - _buffer_sz, _unread, this); + // DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)" + // "-> circular_buffer=0x%" PRIxPTR, + // _buffer_sz, _unread, this); - A(sched, _buffer, "Failed to allocate buffer."); + // A(sched, _buffer, "Failed to allocate buffer."); } circular_buffer::~circular_buffer() { - DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this); - I(sched, _buffer); - W(sched, _unread == 0, - "freeing circular_buffer with %d unread bytes", _unread); + // DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this); + // I(sched, _buffer); + // W(sched, _unread == 0, + // "freeing circular_buffer with %d unread bytes", _unread); kernel->free(_buffer); } size_t circular_buffer::initial_size() { - I(sched, unit_sz > 0); + // I(sched, unit_sz > 0); return INITIAL_CIRCULAR_BUFFER_SIZE_IN_UNITS * unit_sz; } @@ -41,8 +40,8 @@ circular_buffer::initial_size() { */ void circular_buffer::transfer(void *dst) { - I(sched, dst); - I(sched, _unread <= _buffer_sz); + // I(sched, dst); + // I(sched, _unread <= _buffer_sz); uint8_t *ptr = (uint8_t *) dst; @@ -54,13 +53,13 @@ circular_buffer::transfer(void *dst) { } else { head_sz = _buffer_sz - _next; } - I(sched, _next + head_sz <= _buffer_sz); + // I(sched, _next + head_sz <= _buffer_sz); memcpy(ptr, _buffer + _next, head_sz); // Then copy any other items from the beginning of the buffer - I(sched, _unread >= head_sz); + // I(sched, _unread >= head_sz); size_t tail_sz = _unread - head_sz; - I(sched, head_sz + tail_sz <= _buffer_sz); + // I(sched, head_sz + tail_sz <= _buffer_sz); memcpy(ptr + head_sz, _buffer, tail_sz); } @@ -70,21 +69,21 @@ circular_buffer::transfer(void *dst) { */ void circular_buffer::enqueue(void *src) { - I(sched, src); - I(sched, _unread <= _buffer_sz); - I(sched, _buffer); + // I(sched, src); + // I(sched, _unread <= _buffer_sz); + // I(sched, _buffer); // Grow if necessary. if (_unread == _buffer_sz) { grow(); } - DLOG(sched, mem, "circular_buffer enqueue " - "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", - _unread, _next, _buffer_sz, unit_sz); + // DLOG(sched, mem, "circular_buffer enqueue " + // "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", + // _unread, _next, _buffer_sz, unit_sz); - I(sched, _unread < _buffer_sz); - I(sched, _unread + unit_sz <= _buffer_sz); + // I(sched, _unread < _buffer_sz); + // I(sched, _unread + unit_sz <= _buffer_sz); // Copy data size_t dst_idx = _next + _unread; @@ -92,15 +91,15 @@ circular_buffer::enqueue(void *src) { if (dst_idx >= _buffer_sz) { dst_idx -= _buffer_sz; - I(sched, _next >= unit_sz); - I(sched, dst_idx <= _next - unit_sz); + // I(sched, _next >= unit_sz); + // I(sched, dst_idx <= _next - unit_sz); } - I(sched, dst_idx + unit_sz <= _buffer_sz); + // I(sched, dst_idx + unit_sz <= _buffer_sz); memcpy(&_buffer[dst_idx], src, unit_sz); _unread += unit_sz; - DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx); + // DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx); } /** @@ -110,17 +109,17 @@ circular_buffer::enqueue(void *src) { */ void circular_buffer::dequeue(void *dst) { - I(sched, unit_sz > 0); - I(sched, _unread >= unit_sz); - I(sched, _unread <= _buffer_sz); - I(sched, _buffer); + // I(sched, unit_sz > 0); + // I(sched, _unread >= unit_sz); + // I(sched, _unread <= _buffer_sz); + // I(sched, _buffer); - DLOG(sched, mem, - "circular_buffer dequeue " - "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", - _unread, _next, _buffer_sz, unit_sz); + // DLOG(sched, mem, + // "circular_buffer dequeue " + // "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", + // _unread, _next, _buffer_sz, unit_sz); - I(sched, _next + unit_sz <= _buffer_sz); + // I(sched, _next + unit_sz <= _buffer_sz); if (dst != NULL) { memcpy(dst, &_buffer[_next], unit_sz); } @@ -140,8 +139,9 @@ circular_buffer::dequeue(void *dst) { void circular_buffer::grow() { size_t new_buffer_sz = _buffer_sz * 2; - I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE); - DLOG(sched, mem, "circular_buffer is growing to %d bytes", new_buffer_sz); + // I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE); + // DLOG(sched, mem, "circular_buffer is growing to %d bytes", + // new_buffer_sz); void *new_buffer = kernel->malloc(new_buffer_sz, "new circular_buffer (grow)"); transfer(new_buffer); @@ -154,9 +154,9 @@ circular_buffer::grow() { void circular_buffer::shrink() { size_t new_buffer_sz = _buffer_sz / 2; - I(sched, initial_size() <= new_buffer_sz); - DLOG(sched, mem, "circular_buffer is shrinking to %d bytes", - new_buffer_sz); + // I(sched, initial_size() <= new_buffer_sz); + // DLOG(sched, mem, "circular_buffer is shrinking to %d bytes", + // new_buffer_sz); void *new_buffer = kernel->malloc(new_buffer_sz, "new circular_buffer (shrink)"); transfer(new_buffer); diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 06097e34197..df1486952eb 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -140,9 +140,10 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { update_log_settings(crate_map, getenv("RUST_LOG")); enable_claims(getenv("CHECK_CLAIMS")); + int num_threads = get_num_threads(); rust_srv *srv = new rust_srv(); - rust_kernel *kernel = new rust_kernel(srv); + rust_kernel *kernel = new rust_kernel(srv, num_threads); kernel->start(); rust_task *root_task = kernel->create_task(NULL, "main"); rust_scheduler *sched = root_task->sched; @@ -158,11 +159,9 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { root_task->start(main_fn, (uintptr_t)args->args); - int num_threads = get_num_threads(); - DLOG(sched, dom, "Using %d worker threads.", num_threads); - int ret = kernel->start_task_threads(num_threads); + int ret = kernel->start_task_threads(); delete args; delete kernel; delete srv; diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 470be6e6a37..9253d7d0361 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -13,17 +13,17 @@ rust_chan::rust_chan(rust_kernel *kernel, maybe_proxy *port, if (port) { associate(port); } - DLOG(kernel->sched, comm, "new rust_chan(task=0x%" PRIxPTR - ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR, - (uintptr_t) task, (uintptr_t) port, (uintptr_t) this); + // DLOG(task->sched, comm, "new rust_chan(task=0x%" PRIxPTR + // ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR, + // (uintptr_t) task, (uintptr_t) port, (uintptr_t) this); } rust_chan::~rust_chan() { - DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")", - (uintptr_t) this); + // DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")", + // (uintptr_t) this); - A(kernel->sched, is_associated() == false, - "Channel must be disassociated before being freed."); + // A(kernel->sched, is_associated() == false, + // "Channel must be disassociated before being freed."); } /** @@ -33,9 +33,9 @@ void rust_chan::associate(maybe_proxy *port) { this->port = port; if (port->is_proxy() == false) { scoped_lock with(port->referent()->lock); - DLOG(kernel->sched, task, - "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, - this, port); + // DLOG(kernel->sched, task, + // "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, + // this, port); ++this->ref_count; this->task = port->referent()->task; this->task->ref(); @@ -51,14 +51,14 @@ bool rust_chan::is_associated() { * Unlink this channel from its associated port. */ void rust_chan::disassociate() { - A(kernel->sched, is_associated(), - "Channel must be associated with a port."); + // A(kernel->sched, is_associated(), + // "Channel must be associated with a port."); if (port->is_proxy() == false) { scoped_lock with(port->referent()->lock); - DLOG(kernel->sched, task, - "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, - this, port->referent()); + // DLOG(kernel->sched, task, + // "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, + // this, port->referent()); --this->ref_count; --this->task->ref_count; this->task = NULL; @@ -73,8 +73,8 @@ void rust_chan::disassociate() { * Attempt to send data to the associated port. */ void rust_chan::send(void *sptr) { - rust_scheduler *sched = kernel->sched; - I(sched, !port->is_proxy()); + // rust_scheduler *sched = kernel->sched; + // I(sched, !port->is_proxy()); rust_port *target_port = port->referent(); // TODO: We can probably avoid this lock by using atomic operations in @@ -84,13 +84,13 @@ void rust_chan::send(void *sptr) { buffer.enqueue(sptr); if (!is_associated()) { - W(sched, is_associated(), - "rust_chan::transmit with no associated port."); + // W(sched, is_associated(), + // "rust_chan::transmit with no associated port."); return; } - A(sched, !buffer.is_empty(), - "rust_chan::transmit with nothing to send."); + // A(sched, !buffer.is_empty(), + // "rust_chan::transmit with nothing to send."); if (port->is_proxy()) { data_message::send(buffer.peek(), buffer.unit_sz, "send data", @@ -98,7 +98,7 @@ void rust_chan::send(void *sptr) { buffer.dequeue(NULL); } else { if (target_port->task->blocked_on(target_port)) { - DLOG(sched, comm, "dequeued in rendezvous_ptr"); + // DLOG(sched, comm, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); target_port->task->rendezvous_ptr = 0; target_port->task->wakeup(target_port); @@ -120,7 +120,7 @@ rust_chan *rust_chan::clone(maybe_proxy *target) { rust_handle *handle = task->sched->kernel->get_port_handle(port->as_referent()); maybe_proxy *proxy = new rust_proxy (handle); - DLOG(kernel->sched, mem, "new proxy: " PTR, proxy); + DLOG(task->sched, mem, "new proxy: " PTR, proxy); port = proxy; target_task = target->as_proxy()->handle()->referent(); } @@ -133,8 +133,8 @@ rust_chan *rust_chan::clone(maybe_proxy *target) { * appear to be live, causing modify-after-free errors. */ void rust_chan::destroy() { - A(kernel->sched, ref_count == 0, - "Channel's ref count should be zero."); + // A(kernel->sched, ref_count == 0, + // "Channel's ref count should be zero."); if (is_associated()) { if (port->is_proxy()) { diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 53c2d945b09..1eb82602798 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -7,36 +7,40 @@ } \ } while (0) -rust_kernel::rust_kernel(rust_srv *srv) : +rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) : _region(srv, true), _log(srv, NULL), - _srv(srv), - _interrupt_kernel_loop(FALSE) + srv(srv), + _interrupt_kernel_loop(FALSE), + num_threads(num_threads), + rval(0), + live_tasks(0) { - sched = create_scheduler("main"); + isaac_init(this, &rctx); + create_schedulers(); } rust_scheduler * -rust_kernel::create_scheduler(const char *name) { +rust_kernel::create_scheduler(int id) { _kernel_lock.lock(); rust_message_queue *message_queue = - new (this, "rust_message_queue") rust_message_queue(_srv, this); - rust_srv *srv = _srv->clone(); + new (this, "rust_message_queue") rust_message_queue(srv, this); + rust_srv *srv = this->srv->clone(); rust_scheduler *sched = new (this, "rust_scheduler") - rust_scheduler(this, message_queue, srv, name); + rust_scheduler(this, message_queue, srv, id); rust_handle *handle = internal_get_sched_handle(sched); message_queue->associate(handle); message_queues.append(message_queue); - KLOG("created scheduler: " PTR ", name: %s, index: %d", - sched, name, sched->list_index); + KLOG("created scheduler: " PTR ", id: %d, index: %d", + sched, id, sched->list_index); _kernel_lock.signal_all(); _kernel_lock.unlock(); return sched; } void -rust_kernel::destroy_scheduler() { +rust_kernel::destroy_scheduler(rust_scheduler *sched) { _kernel_lock.lock(); KLOG("deleting scheduler: " PTR ", name: %s, index: %d", sched, sched->name, sched->list_index); @@ -48,6 +52,18 @@ rust_kernel::destroy_scheduler() { _kernel_lock.unlock(); } +void rust_kernel::create_schedulers() { + for(int i = 0; i < num_threads; ++i) { + threads.push(create_scheduler(i)); + } +} + +void rust_kernel::destroy_schedulers() { + for(int i = 0; i < num_threads; ++i) { + destroy_scheduler(threads[i]); + } +} + rust_handle * rust_kernel::internal_get_sched_handle(rust_scheduler *sched) { rust_handle *handle = NULL; @@ -59,14 +75,6 @@ rust_kernel::internal_get_sched_handle(rust_scheduler *sched) { return handle; } -rust_handle * -rust_kernel::get_sched_handle(rust_scheduler *sched) { - _kernel_lock.lock(); - rust_handle *handle = internal_get_sched_handle(sched); - _kernel_lock.unlock(); - return handle; -} - rust_handle * rust_kernel::get_task_handle(rust_task *task) { _kernel_lock.lock(); @@ -98,7 +106,9 @@ rust_kernel::get_port_handle(rust_port *port) { void rust_kernel::log_all_scheduler_state() { - sched->log_state(); + for(int i = 0; i < num_threads; ++i) { + threads[i]->log_state(); + } } /** @@ -170,7 +180,7 @@ rust_kernel::terminate_kernel_loop() { } rust_kernel::~rust_kernel() { - destroy_scheduler(); + destroy_schedulers(); terminate_kernel_loop(); @@ -193,7 +203,7 @@ rust_kernel::~rust_kernel() { rust_message_queue *queue = NULL; while (message_queues.pop(&queue)) { - K(_srv, queue->is_empty(), "Kernel message queue should be empty " + K(srv, queue->is_empty(), "Kernel message queue should be empty " "before killing the kernel."); delete queue; } @@ -240,30 +250,25 @@ rust_kernel::signal_kernel_lock() { _kernel_lock.unlock(); } -int rust_kernel::start_task_threads(int num_threads) +int rust_kernel::start_task_threads() { - rust_task_thread *thread = NULL; - - // -1, because this thread will also be a thread. - for(int i = 0; i < num_threads - 1; ++i) { - thread = new rust_task_thread(i + 1, this); + for(int i = 0; i < num_threads; ++i) { + rust_scheduler *thread = threads[i]; thread->start(); - threads.push(thread); } - sched->start_main_loop(0); - - while(threads.pop(&thread)) { + for(int i = 0; i < num_threads; ++i) { + rust_scheduler *thread = threads[i]; thread->join(); - delete thread; } - return sched->rval; + return rval; } rust_task * rust_kernel::create_task(rust_task *spawner, const char *name) { - return sched->create_task(spawner, name); + // TODO: use a different rand. + return threads[rand(&rctx) % num_threads]->create_task(spawner, name); } #ifdef __WIN32__ @@ -285,16 +290,6 @@ rust_kernel::win32_require(LPCTSTR fn, BOOL ok) { } #endif -rust_task_thread::rust_task_thread(int id, rust_kernel *owner) - : id(id), owner(owner) -{ -} - -void rust_task_thread::run() -{ - owner->sched->start_main_loop(id); -} - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 07f4ff2f787..8be9bb96e90 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -45,7 +45,10 @@ class rust_task_thread; class rust_kernel : public rust_thread { memory_region _region; rust_log _log; - rust_srv *_srv; + +public: + rust_srv *srv; +private: /** * Task proxy objects are kernel owned handles to Rust objects. @@ -62,20 +65,29 @@ class rust_kernel : public rust_thread { lock_and_signal _kernel_lock; + const size_t num_threads; + void terminate_kernel_loop(); void pump_message_queues(); rust_handle * internal_get_sched_handle(rust_scheduler *sched); - array_list threads; + array_list threads; - rust_scheduler *create_scheduler(const char *name); - void destroy_scheduler(); + randctx rctx; + + rust_scheduler *create_scheduler(int id); + void destroy_scheduler(rust_scheduler *sched); + + void create_schedulers(); + void destroy_schedulers(); public: - rust_scheduler *sched; - lock_and_signal scheduler_lock; + + int rval; + + volatile int live_tasks; /** * Message queues are kernel objects and are associated with domains. @@ -86,11 +98,10 @@ public: */ indexed_list message_queues; - rust_handle *get_sched_handle(rust_scheduler *sched); rust_handle *get_task_handle(rust_task *task); rust_handle *get_port_handle(rust_port *port); - rust_kernel(rust_srv *srv); + rust_kernel(rust_srv *srv, size_t num_threads); bool is_deadlocked(); @@ -113,10 +124,7 @@ public: void *realloc(void *mem, size_t size); void free(void *mem); - // FIXME: this should go away - inline rust_scheduler *get_scheduler() const { return sched; } - - int start_task_threads(int num_threads); + int start_task_threads(); #ifdef __WIN32__ void win32_require(LPCTSTR fn, BOOL ok); @@ -125,14 +133,4 @@ public: rust_task *create_task(rust_task *spawner, const char *name); }; -class rust_task_thread : public rust_thread { - int id; - rust_kernel *owner; - -public: - rust_task_thread(int id, rust_kernel *owner); - - virtual void run(); -}; - #endif /* RUST_KERNEL_H */ diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 09a78cebddb..437be04e272 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -4,21 +4,23 @@ #include "globals.h" rust_scheduler::rust_scheduler(rust_kernel *kernel, - rust_message_queue *message_queue, rust_srv *srv, - const char *name) : + rust_message_queue *message_queue, + rust_srv *srv, + int id) : interrupt_flag(0), _log(srv, this), log_lvl(log_note), srv(srv), - name(name), + // TODO: calculate a per scheduler name. + name("main"), newborn_tasks(this, "newborn"), running_tasks(this, "running"), blocked_tasks(this, "blocked"), dead_tasks(this, "dead"), cache(this), - rval(0), kernel(kernel), - message_queue(message_queue) + message_queue(message_queue), + id(id) { LOGPTR(this, "new dom", (uintptr_t)this); isaac_init(this, &rctx); @@ -47,9 +49,9 @@ rust_scheduler::activate(rust_task *task) { task->ctx.next = &ctx; DLOG(this, task, "descheduling..."); - kernel->scheduler_lock.unlock(); + lock.unlock(); task->ctx.swap(ctx); - kernel->scheduler_lock.lock(); + lock.lock(); DLOG(this, task, "task has returned"); } @@ -67,8 +69,8 @@ void rust_scheduler::fail() { log(NULL, log_err, "domain %s @0x%" PRIxPTR " root task failed", name, this); - I(this, rval == 0); - rval = 1; + I(this, kernel->rval == 0); + kernel->rval = 1; exit(1); } @@ -82,7 +84,7 @@ rust_scheduler::number_of_live_tasks() { */ void rust_scheduler::reap_dead_tasks(int id) { - I(this, kernel->scheduler_lock.lock_held_by_current_thread()); + I(this, lock.lock_held_by_current_thread()); for (size_t i = 0; i < dead_tasks.length(); ) { rust_task *task = dead_tasks[i]; // Make sure this task isn't still running somewhere else... @@ -93,6 +95,7 @@ rust_scheduler::reap_dead_tasks(int id) { "deleting unreferenced dead task %s @0x%" PRIxPTR, task->name, task); delete task; + sync::decrement(kernel->live_tasks); continue; } ++i; @@ -180,9 +183,9 @@ rust_scheduler::log_state() { * Returns once no more tasks can be scheduled and all task ref_counts * drop to zero. */ -int -rust_scheduler::start_main_loop(int id) { - kernel->scheduler_lock.lock(); +void +rust_scheduler::start_main_loop() { + lock.lock(); // Make sure someone is watching, to pull us out of infinite loops. // @@ -193,11 +196,11 @@ rust_scheduler::start_main_loop(int id) { DLOG(this, dom, "started domain loop %d", id); - while (number_of_live_tasks() > 0) { + while (kernel->live_tasks > 0) { A(this, kernel->is_deadlocked() == false, "deadlock"); - DLOG(this, dom, "worker %d, number_of_live_tasks = %d", - id, number_of_live_tasks()); + DLOG(this, dom, "worker %d, number_of_live_tasks = %d, total = %d", + id, number_of_live_tasks(), kernel->live_tasks); drain_incoming_message_queue(true); @@ -212,11 +215,12 @@ rust_scheduler::start_main_loop(int id) { DLOG(this, task, "all tasks are blocked, scheduler id %d yielding ...", id); - kernel->scheduler_lock.unlock(); + lock.unlock(); sync::sleep(100); - kernel->scheduler_lock.lock(); + lock.lock(); DLOG(this, task, "scheduler resuming ..."); + reap_dead_tasks(id); continue; } @@ -264,19 +268,18 @@ rust_scheduler::start_main_loop(int id) { "scheduler yielding ...", dead_tasks.length()); log_state(); - kernel->scheduler_lock.unlock(); + lock.unlock(); sync::yield(); - kernel->scheduler_lock.lock(); + lock.lock(); } else { drain_incoming_message_queue(true); } reap_dead_tasks(id); } - DLOG(this, dom, "finished main-loop %d (dom.rval = %d)", id, rval); + DLOG(this, dom, "finished main-loop %d", id); - kernel->scheduler_lock.unlock(); - return rval; + lock.unlock(); } rust_crate_cache * @@ -296,9 +299,16 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) { task->on_wakeup(spawner->_on_wakeup); } newborn_tasks.append(task); + + sync::increment(kernel->live_tasks); + return task; } +void rust_scheduler::run() { + this->start_main_loop(); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h index cabcdf210a8..c53e6157f06 100644 --- a/src/rt/rust_scheduler.h +++ b/src/rt/rust_scheduler.h @@ -27,7 +27,8 @@ public: }; struct rust_scheduler : public kernel_owned, - rc_base + rc_base, + rust_thread { // Fields known to the compiler: uintptr_t interrupt_flag; @@ -46,7 +47,6 @@ struct rust_scheduler : public kernel_owned, rust_crate_cache cache; randctx rctx; - int rval; rust_kernel *kernel; int32_t list_index; @@ -57,6 +57,10 @@ struct rust_scheduler : public kernel_owned, // Incoming messages from other domains. rust_message_queue *message_queue; + const int id; + + lock_and_signal lock; + #ifndef __WIN32__ pthread_attr_t attr; #endif @@ -64,8 +68,8 @@ struct rust_scheduler : public kernel_owned, // Only a pointer to 'name' is kept, so it must live as long as this // domain. rust_scheduler(rust_kernel *kernel, - rust_message_queue *message_queue, rust_srv *srv, - const char *name); + rust_message_queue *message_queue, rust_srv *srv, + int id); ~rust_scheduler(); void activate(rust_task *task); void log(rust_task *task, uint32_t level, char const *fmt, ...); @@ -80,11 +84,13 @@ struct rust_scheduler : public kernel_owned, void reap_dead_tasks(int id); rust_task *schedule_task(int id); - int start_main_loop(int id); + void start_main_loop(); void log_state(); rust_task *create_task(rust_task *spawner, const char *name); + + virtual void run(); }; inline rust_log & diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index de6b00acb3f..10ea48f57c2 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -83,7 +83,8 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state, pinned_on(-1), local_region(&sched->srv->local_region), _on_wakeup(NULL), - failed(false) + failed(false), + propagate_failure(true) { LOGPTR(sched, "new task", (uintptr_t)this); DLOG(sched, task, "sizeof(task) = %d (0x%x)", sizeof *this, sizeof *this); @@ -207,8 +208,8 @@ rust_task::kill() { // Unblock the task so it can unwind. unblock(); - // if (this == sched->root_task) - // sched->fail(); + if (NULL == supervisor && propagate_failure) + sched->fail(); LOG(this, task, "preparing to unwind task: 0x%" PRIxPTR, this); // run_on_resume(rust_unwind_glue); @@ -229,6 +230,8 @@ rust_task::fail() { supervisor->kill(); } // FIXME: implement unwinding again. + if (NULL == supervisor && propagate_failure) + sched->fail(); failed = true; } @@ -248,6 +251,7 @@ rust_task::unsupervise() " disconnecting from supervisor %s @0x%" PRIxPTR, name, this, supervisor->name, supervisor); supervisor = NULL; + propagate_failure = false; } void @@ -397,8 +401,8 @@ rust_task::free(void *p, bool is_gc) void rust_task::transition(rust_task_list *src, rust_task_list *dst) { - I(sched, !kernel->scheduler_lock.lock_held_by_current_thread()); - scoped_lock with(kernel->scheduler_lock); + I(sched, !sched->lock.lock_held_by_current_thread()); + scoped_lock with(sched->lock); DLOG(sched, task, "task %s " PTR " state change '%s' -> '%s' while in '%s'", name, (uintptr_t)this, src->name, dst->name, state->name); diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index b1984b9d40b..9b1a3a39582 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -91,6 +91,7 @@ rust_task : public maybe_proxy, // Indicates that the task ended in failure bool failed; + bool propagate_failure; lock_and_signal lock; diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 8313399130c..3415b6b62ae 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -541,9 +541,9 @@ extern "C" CDECL rust_task * upcall_new_task(rust_task *spawner, rust_vec *name) { // name is a rust string structure. LOG_UPCALL_ENTRY(spawner); - scoped_lock with(spawner->kernel->scheduler_lock); - rust_scheduler *sched = spawner->sched; - rust_task *task = sched->create_task(spawner, (const char *)name->data); + scoped_lock with(spawner->sched->lock); + rust_task *task = + spawner->kernel->create_task(spawner, (const char *)name->data); return task; } @@ -584,7 +584,7 @@ upcall_ivec_resize_shared(rust_task *task, rust_ivec *v, size_t newsz) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->kernel->scheduler_lock); + scoped_lock with(task->sched->lock); I(task->sched, !v->fill); size_t new_alloc = next_power_of_two(newsz); @@ -604,7 +604,7 @@ upcall_ivec_spill_shared(rust_task *task, rust_ivec *v, size_t newsz) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->kernel->scheduler_lock); + scoped_lock with(task->sched->lock); size_t new_alloc = next_power_of_two(newsz); rust_ivec_heap *heap_part = (rust_ivec_heap *) diff --git a/src/rt/rust_util.h b/src/rt/rust_util.h index e1644e9f3cf..89e7f2e7bed 100644 --- a/src/rt/rust_util.h +++ b/src/rt/rust_util.h @@ -126,8 +126,9 @@ next_power_of_two(size_t s) // Initialization helper for ISAAC RNG +template static inline void -isaac_init(rust_scheduler *sched, randctx *rctx) +isaac_init(sched_or_kernel *sched, randctx *rctx) { memset(rctx, 0, sizeof(randctx)); diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h index a932ef1c2ca..8298f402881 100644 --- a/src/rt/sync/sync.h +++ b/src/rt/sync/sync.h @@ -1,4 +1,4 @@ -// -*- c++-mode -*- +// -*- c++ -*- #ifndef SYNC_H #define SYNC_H diff --git a/src/rt/test/rust_test_runtime.cpp b/src/rt/test/rust_test_runtime.cpp index f9a99d9acb1..1e7c10944a7 100644 --- a/src/rt/test/rust_test_runtime.cpp +++ b/src/rt/test/rust_test_runtime.cpp @@ -11,17 +11,16 @@ rust_test_runtime::~rust_test_runtime() { void rust_domain_test::worker::run() { - rust_scheduler *handle = kernel->get_scheduler(); for (int i = 0; i < TASKS; i++) { - handle->create_task(NULL, "child"); + kernel->create_task(NULL, "child"); } - sync::sleep(rand(&handle->rctx) % 1000); + //sync::sleep(rand(&handle->rctx) % 1000); } bool rust_domain_test::run() { rust_srv srv; - rust_kernel kernel(&srv); + rust_kernel kernel(&srv, 1); array_list workers; for (int i = 0; i < DOMAINS; i++) { @@ -47,13 +46,13 @@ void rust_task_test::worker::run() { rust_task *root_task = kernel->create_task(NULL, "main"); root_task->start((uintptr_t)&task_entry, (uintptr_t)NULL); - root_task->sched->start_main_loop(0); + root_task->sched->start_main_loop(); } bool rust_task_test::run() { rust_srv srv; - rust_kernel kernel(&srv); + rust_kernel kernel(&srv, 1); array_list workers; for (int i = 0; i < DOMAINS; i++) { @@ -62,6 +61,6 @@ rust_task_test::run() { worker->start(); } - sync::sleep(rand(&kernel.sched->rctx) % 1000); + //sync::sleep(rand(&kernel.sched->rctx) % 1000); return true; } diff --git a/src/test/run-pass/lib-task.rs b/src/test/run-pass/lib-task.rs new file mode 100644 index 00000000000..313ec8afcf1 --- /dev/null +++ b/src/test/run-pass/lib-task.rs @@ -0,0 +1,41 @@ + + +// xfail-stage0 + +use std; +import std::task; + +fn test_sleep() { task::sleep(1000000u); } + +fn test_unsupervise() { + fn f() { + task::unsupervise(); + fail; + } + spawn f(); +} + +fn test_join() { + fn winner() { + } + + auto wintask = spawn winner(); + + assert task::join(wintask) == task::tr_success; + + fn failer() { + task::unsupervise(); + fail; + } + + auto failtask = spawn failer(); + + assert task::join(failtask) == task::tr_failure; +} + +fn main() { + // FIXME: Why aren't we running this? + //test_sleep(); + test_unsupervise(); + test_join(); +}