From a57d3e0c152a293885f65cdf44d4b98a5e2f71cc Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Wed, 15 Aug 2012 14:11:39 -0400 Subject: [PATCH] Fix :broadcast_heavy in condvars. --- src/libstd/sync.rs | 48 ++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/src/libstd/sync.rs b/src/libstd/sync.rs index a1ac6bb8945..bf44e3f25ea 100644 --- a/src/libstd/sync.rs +++ b/src/libstd/sync.rs @@ -28,6 +28,11 @@ type signal_end = pipes::chan_one<()>; struct waitqueue { head: pipes::port; tail: pipes::chan; } +fn new_waitqueue() -> waitqueue { + let (block_tail, block_head) = pipes::stream(); + waitqueue { head: block_head, tail: block_tail } +} + // Signals one live task from the queue. #[doc(hidden)] fn signal_waitqueue(q: &waitqueue) -> bool { @@ -70,18 +75,15 @@ enum sem = Exclusive>; #[doc(hidden)] fn new_sem(count: int, +q: Q) -> sem { - let (wait_tail, wait_head) = pipes::stream(); sem(exclusive(sem_inner { - mut count: count, - waiters: waitqueue { head: wait_head, tail: wait_tail }, - blocked: q })) + mut count: count, waiters: new_waitqueue(), blocked: q })) } #[doc(hidden)] -fn new_sem_and_signal(count: int, num_condvars: uint) -> sem<~[waitqueue]> { - let mut queues = ~[]; +fn new_sem_and_signal(count: int, num_condvars: uint) + -> sem<~[mut waitqueue]> { + let mut queues = ~[mut]; for num_condvars.times { - let (block_tail, block_head) = pipes::stream(); - vec::push(queues, waitqueue { head: block_head, tail: block_tail }); + vec::push(queues, new_waitqueue()); } new_sem(count, queues) } @@ -136,7 +138,7 @@ impl &sem<()> { } } #[doc(hidden)] -impl &sem<~[waitqueue]> { +impl &sem<~[mut waitqueue]> { fn access(blk: fn() -> U) -> U { let mut release = none; unsafe { @@ -158,13 +160,13 @@ struct sem_release { } #[doc(hidden)] struct sem_and_signal_release { - sem: &sem<~[waitqueue]>; - new(sem: &sem<~[waitqueue]>) { self.sem = sem; } + sem: &sem<~[mut waitqueue]>; + new(sem: &sem<~[mut waitqueue]>) { self.sem = sem; } drop { self.sem.release(); } } /// A mechanism for atomic-unlock-and-deschedule blocking and signalling. -struct condvar { priv sem: &sem<~[waitqueue]>; drop { } } +struct condvar { priv sem: &sem<~[mut waitqueue]>; drop { } } impl &condvar { /** @@ -232,8 +234,8 @@ impl &condvar { // mutex during unwinding. As long as the wrapper (mutex, etc) is // bounded in when it gets released, this shouldn't hang forever. struct sem_and_signal_reacquire { - sem: &sem<~[waitqueue]>; - new(sem: &sem<~[waitqueue]>) { self.sem = sem; } + sem: &sem<~[mut waitqueue]>; + new(sem: &sem<~[mut waitqueue]>) { self.sem = sem; } drop unsafe { // Needs to succeed, instead of itself dying. do task::unkillable { @@ -268,19 +270,23 @@ impl &condvar { /// As broadcast, but with a specified condvar_id. See wait_on. fn broadcast_on(condvar_id: uint) -> uint { let mut out_of_bounds = none; - let mut result = 0; + let mut queue = none; unsafe { do (**self.sem).with |state| { if condvar_id < vec::len(state.blocked) { - // FIXME(#3145) fix :broadcast_heavy - result = broadcast_waitqueue(&state.blocked[condvar_id]) + // To avoid :broadcast_heavy, we make a new waitqueue, + // swap it out with the old one, and broadcast on the + // old one outside of the little-lock. + queue = some(util::replace(&mut state.blocked[condvar_id], + new_waitqueue())); } else { out_of_bounds = some(vec::len(state.blocked)); } } } do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") { - result + let queue = option::swap_unwrap(&mut queue); + broadcast_waitqueue(&queue) } } } @@ -303,7 +309,7 @@ fn check_cvar_bounds(out_of_bounds: option, id: uint, act: &str, } #[doc(hidden)] -impl &sem<~[waitqueue]> { +impl &sem<~[mut waitqueue]> { // The only other place that condvars get built is rwlock_write_mode. fn access_cond(blk: fn(c: &condvar) -> U) -> U { do self.access { blk(&condvar { sem: self }) } @@ -354,7 +360,7 @@ impl &semaphore { * A task which fails while holding a mutex will unlock the mutex as it * unwinds. */ -struct mutex { priv sem: sem<~[waitqueue]>; } +struct mutex { priv sem: sem<~[mut waitqueue]>; } /// Create a new mutex, with one associated condvar. fn mutex() -> mutex { mutex_with_condvars(1) } @@ -402,7 +408,7 @@ struct rwlock_inner { */ struct rwlock { /* priv */ order_lock: semaphore; - /* priv */ access_lock: sem<~[waitqueue]>; + /* priv */ access_lock: sem<~[mut waitqueue]>; /* priv */ state: Exclusive; }