Atomic reference counting for tasks.

This commit is contained in:
Eric Holk 2011-07-27 14:51:25 -07:00
parent a5fe66e706
commit 279844ce9f
8 changed files with 43 additions and 77 deletions

View file

@ -4,7 +4,7 @@
// NB: please do not commit code with this uncommented. It's // NB: please do not commit code with this uncommented. It's
// hugely expensive and should only be used as a last resort. // hugely expensive and should only be used as a last resort.
// //
// #define TRACK_ALLOCATIONS #define TRACK_ALLOCATIONS
#define MAGIC 0xbadc0ffe #define MAGIC 0xbadc0ffe

View file

@ -60,7 +60,7 @@ void rust_chan::disassociate() {
// "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, // "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
// this, port->referent()); // this, port->referent());
--this->ref_count; --this->ref_count;
--this->task->ref_count; task->deref();
this->task = NULL; this->task = NULL;
port->referent()->chans.swap_delete(this); port->referent()->chans.swap_delete(this);
} }
@ -109,22 +109,10 @@ void rust_chan::send(void *sptr) {
return; return;
} }
rust_chan *rust_chan::clone(maybe_proxy<rust_task> *target) { rust_chan *rust_chan::clone(rust_task *target) {
size_t unit_sz = buffer.unit_sz; size_t unit_sz = buffer.unit_sz;
maybe_proxy<rust_port> *port = this->port; maybe_proxy<rust_port> *port = this->port;
rust_task *target_task = NULL; return new (target->kernel, "cloned chan")
if (target->is_proxy() == false) {
port = this->port;
target_task = target->referent();
} else {
rust_handle<rust_port> *handle =
task->sched->kernel->get_port_handle(port->as_referent());
maybe_proxy<rust_port> *proxy = new rust_proxy<rust_port> (handle);
DLOG(task->sched, mem, "new proxy: " PTR, proxy);
port = proxy;
target_task = target->as_proxy()->handle()->referent();
}
return new (target_task->kernel, "cloned chan")
rust_chan(kernel, port, unit_sz); rust_chan(kernel, port, unit_sz);
} }

View file

@ -22,7 +22,7 @@ public:
void send(void *sptr); void send(void *sptr);
rust_chan *clone(maybe_proxy<rust_task> *target); rust_chan *clone(rust_task *target);
// Called whenever the channel's ref count drops to zero. // Called whenever the channel's ref count drops to zero.
void destroy(); void destroy();

View file

@ -61,8 +61,8 @@ void notify_message::process() {
break; break;
case JOIN: { case JOIN: {
if (task->dead() == false) { if (task->dead() == false) {
rust_proxy<rust_task> *proxy = new rust_proxy<rust_task>(_source); // FIXME: this should be dead code.
task->tasks_waiting_to_join.append(proxy); assert(false);
} else { } else {
send(WAKEUP, "wakeup", _target, _source); send(WAKEUP, "wakeup", _target, _source);
} }

View file

@ -89,14 +89,14 @@ rust_scheduler::reap_dead_tasks(int id) {
rust_task *task = dead_tasks[i]; rust_task *task = dead_tasks[i];
task->lock.lock(); task->lock.lock();
// Make sure this task isn't still running somewhere else... // Make sure this task isn't still running somewhere else...
if (task->ref_count == 0 && task->can_schedule(id)) { if (task->can_schedule(id)) {
I(this, task->tasks_waiting_to_join.is_empty()); I(this, task->tasks_waiting_to_join.is_empty());
dead_tasks.remove(task); dead_tasks.remove(task);
DLOG(this, task, DLOG(this, task,
"deleting unreferenced dead task %s @0x%" PRIxPTR, "deleting unreferenced dead task %s @0x%" PRIxPTR,
task->name, task); task->name, task);
task->lock.unlock(); task->lock.unlock();
delete task; task->deref();
sync::decrement(kernel->live_tasks); sync::decrement(kernel->live_tasks);
kernel->wakeup_schedulers(); kernel->wakeup_schedulers();
continue; continue;
@ -174,9 +174,8 @@ rust_scheduler::log_state() {
if (!dead_tasks.is_empty()) { if (!dead_tasks.is_empty()) {
log(NULL, log_note, "dead tasks:"); log(NULL, log_note, "dead tasks:");
for (size_t i = 0; i < dead_tasks.length(); i++) { for (size_t i = 0; i < dead_tasks.length(); i++) {
log(NULL, log_note, "\t task: %s 0x%" PRIxPTR ", ref_count: %d", log(NULL, log_note, "\t task: %s 0x%" PRIxPTR,
dead_tasks[i]->name, dead_tasks[i], dead_tasks[i]->name, dead_tasks[i]);
dead_tasks[i]->ref_count);
} }
} }
} }
@ -225,15 +224,13 @@ rust_scheduler::start_main_loop() {
I(this, scheduled_task->running()); I(this, scheduled_task->running());
DLOG(this, task, DLOG(this, task,
"activating task %s 0x%" PRIxPTR "activating task %s 0x%" PRIxPTR
", sp=0x%" PRIxPTR ", sp=0x%" PRIxPTR
", ref_count=%d" ", state: %s",
", state: %s", scheduled_task->name,
scheduled_task->name, (uintptr_t)scheduled_task,
(uintptr_t)scheduled_task, scheduled_task->rust_sp,
scheduled_task->rust_sp, scheduled_task->state->name);
scheduled_task->ref_count,
scheduled_task->state->name);
interrupt_flag = 0; interrupt_flag = 0;

View file

@ -64,7 +64,7 @@ size_t const callee_save_fp = 0;
rust_task::rust_task(rust_scheduler *sched, rust_task_list *state, rust_task::rust_task(rust_scheduler *sched, rust_task_list *state,
rust_task *spawner, const char *name) : rust_task *spawner, const char *name) :
maybe_proxy<rust_task>(this), ref_count(1),
stk(NULL), stk(NULL),
runtime_sp(0), runtime_sp(0),
rust_sp(0), rust_sp(0),
@ -92,10 +92,6 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state,
stk = new_stk(this, 0); stk = new_stk(this, 0);
rust_sp = stk->limit; rust_sp = stk->limit;
if (spawner == NULL) {
ref_count = 0;
}
} }
rust_task::~rust_task() rust_task::~rust_task()
@ -131,10 +127,6 @@ void task_start_wrapper(spawn_args *a)
LOG(task, task, "task exited with value %d", rval); LOG(task, task, "task exited with value %d", rval);
LOG(task, task, "task ref_count: %d", task->ref_count);
A(task->sched, task->ref_count >= 0,
"Task ref_count should not be negative on exit!");
task->die(); task->die();
task->lock.lock(); task->lock.lock();
task->notify_tasks_waiting_to_join(); task->notify_tasks_waiting_to_join();
@ -263,17 +255,10 @@ rust_task::notify_tasks_waiting_to_join() {
while (tasks_waiting_to_join.is_empty() == false) { while (tasks_waiting_to_join.is_empty() == false) {
LOG(this, task, "notify_tasks_waiting_to_join: %d", LOG(this, task, "notify_tasks_waiting_to_join: %d",
tasks_waiting_to_join.size()); tasks_waiting_to_join.size());
maybe_proxy<rust_task> *waiting_task = 0; rust_task *waiting_task = 0;
tasks_waiting_to_join.pop(&waiting_task); tasks_waiting_to_join.pop(&waiting_task);
if (waiting_task->is_proxy()) { if (waiting_task->blocked() == true) {
notify_message::send(notify_message::WAKEUP, "wakeup", waiting_task->wakeup(this);
get_handle(), waiting_task->as_proxy()->handle());
delete waiting_task;
} else {
rust_task *task = waiting_task->referent();
if (task->blocked() == true) {
task->wakeup(this);
}
} }
} }
} }

View file

@ -34,10 +34,19 @@ struct gc_alloc {
} }
}; };
struct struct
rust_task : public maybe_proxy<rust_task>, rust_task : public kernel_owned<rust_task>, rust_cond
public kernel_owned<rust_task>
{ {
// This block could be pulled out into something like a
// RUST_ATOMIC_REFCOUNTED macro.
private:
intptr_t ref_count;
public:
void ref() { sync::increment(ref_count); }
void deref() { if(0 == sync::decrement(ref_count)) { delete this; } }
// Fields known to the compiler. // Fields known to the compiler.
stk_seg *stk; stk_seg *stk;
uintptr_t runtime_sp; // Runtime sp while task running. uintptr_t runtime_sp; // Runtime sp while task running.
@ -69,7 +78,7 @@ rust_task : public maybe_proxy<rust_task>,
uintptr_t* rendezvous_ptr; uintptr_t* rendezvous_ptr;
// List of tasks waiting for this task to finish. // List of tasks waiting for this task to finish.
array_list<maybe_proxy<rust_task> *> tasks_waiting_to_join; array_list<rust_task *> tasks_waiting_to_join;
rust_handle<rust_task> *handle; rust_handle<rust_task> *handle;

View file

@ -6,12 +6,10 @@
#define LOG_UPCALL_ENTRY(task) \ #define LOG_UPCALL_ENTRY(task) \
LOG(task, upcall, \ LOG(task, upcall, \
"> UPCALL %s - task: %s 0x%" PRIxPTR \ "> UPCALL %s - task: %s 0x%" PRIxPTR \
" retpc: x%" PRIxPTR \ " retpc: x%" PRIxPTR, \
" ref_count: %d", \
__FUNCTION__, \ __FUNCTION__, \
(task)->name, (task), \ (task)->name, (task), \
__builtin_return_address(0), \ __builtin_return_address(0));
(task->ref_count));
#else #else
#define LOG_UPCALL_ENTRY(task) \ #define LOG_UPCALL_ENTRY(task) \
LOG(task, upcall, "> UPCALL task: %s @x%" PRIxPTR, \ LOG(task, upcall, "> UPCALL task: %s @x%" PRIxPTR, \
@ -114,8 +112,8 @@ upcall_del_port(rust_task *task, rust_port *port) {
I(task->sched, !port->ref_count); I(task->sched, !port->ref_count);
delete port; delete port;
// FIXME: We shouldn't ever directly manipulate the ref count. // FIXME: this should happen in the port.
--task->ref_count; task->deref();
} }
/** /**
@ -162,7 +160,7 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) {
* has its own copy of the channel. * has its own copy of the channel.
*/ */
extern "C" CDECL rust_chan * extern "C" CDECL rust_chan *
upcall_clone_chan(rust_task *task, maybe_proxy<rust_task> *target, upcall_clone_chan(rust_task *task, rust_task *target,
rust_chan *chan) { rust_chan *chan) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
return chan->clone(target); return chan->clone(target);
@ -247,18 +245,10 @@ upcall_fail(rust_task *task,
* Called whenever a task's ref count drops to zero. * Called whenever a task's ref count drops to zero.
*/ */
extern "C" CDECL void extern "C" CDECL void
upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) { upcall_kill(rust_task *task, rust_task *target) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
if (target->is_proxy()) { target->kill();
notify_message::
send(notify_message::KILL, "kill", task->get_handle(),
target->as_proxy()->handle());
// The proxy ref_count dropped to zero, delete it here.
delete target->as_proxy();
} else {
target->referent()->kill();
}
} }
/** /**
@ -267,9 +257,6 @@ upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) {
extern "C" CDECL void extern "C" CDECL void
upcall_exit(rust_task *task) { upcall_exit(rust_task *task) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
LOG(task, task, "task ref_count: %d", task->ref_count);
A(task->sched, task->ref_count >= 0,
"Task ref_count should not be negative on exit!");
task->die(); task->die();
task->notify_tasks_waiting_to_join(); task->notify_tasks_waiting_to_join();
task->yield(1); task->yield(1);
@ -544,6 +531,7 @@ upcall_new_task(rust_task *spawner, rust_vec *name) {
scoped_lock with(spawner->sched->lock); scoped_lock with(spawner->sched->lock);
rust_task *task = rust_task *task =
spawner->kernel->create_task(spawner, (const char *)name->data); spawner->kernel->create_task(spawner, (const char *)name->data);
task->ref();
return task; return task;
} }
@ -559,8 +547,7 @@ extern "C" CDECL void
upcall_drop_task(rust_task *task, rust_task *target) { upcall_drop_task(rust_task *task, rust_task *target) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
if(target) { if(target) {
//target->deref(); target->deref();
--target->ref_count;
} }
} }