diff --git a/src/libcore/sync.rs b/src/libcore/sync.rs index 3e1b85441e9..909d4d4dc1b 100644 --- a/src/libcore/sync.rs +++ b/src/libcore/sync.rs @@ -5,11 +5,17 @@ * in std. */ +export condvar; export semaphore, new_semaphore; +export mutex, new_mutex; // FIXME (#3119) This shouldn't be a thing exported from core. import arc::exclusive; +/**************************************************************************** + * Internals + ****************************************************************************/ + // Each waiting task receives on one of these. FIXME #3125 make these oneshot. type wait_end = pipes::port<()>; type signal_end = pipes::chan<()>; @@ -17,42 +23,24 @@ type signal_end = pipes::chan<()>; type waitqueue = { head: pipes::port, tail: pipes::chan }; -fn waitqueue() -> waitqueue { - let (tail, head) = pipes::stream(); - { head: head, tail: tail } -} - -/// A counting, blocking, bounded-waiting semaphore. -enum semaphore = exclusive; -type semaphore_inner = { +// The building-block used to make semaphores, lock-and-signals, and rwlocks. +enum sem = exclusive<{ mut count: int, waiters: waitqueue, - //blocked: waitqueue, -}; + // Can be either unit or another waitqueue. Some sems shouldn't come with + // a condition variable attached, others should. + blocked: Q, +}>; -/// Create a new semaphore with the specified count. -fn new_semaphore(count: int) -> semaphore { - semaphore(exclusive({ mut count: count, - waiters: waitqueue(), /* blocked: waitqueue() */ })) -} - -impl semaphore for &semaphore { - /// Creates a new handle to the semaphore. - fn clone() -> semaphore { - semaphore((**self).clone()) - } - - /** - * Acquires a resource represented by the semaphore. Blocks if necessary - * until resource(s) become available. - */ +impl sem for &sem { fn acquire() { let mut waiter_nobe = none; unsafe { do (**self).with |state| { state.count -= 1; if state.count < 0 { - let (signal_end,wait_end) = pipes::stream(); + let (signal_end, wait_end) = pipes::stream(); + // Tell outer scope we need to block. waiter_nobe = some(wait_end); // Enqueue ourself. state.waiters.tail.send(signal_end); @@ -66,42 +54,192 @@ impl semaphore for &semaphore { let _ = option::unwrap(waiter_nobe).recv(); } } - - /** - * Release a held resource represented by the semaphore. Wakes a blocked - * contending task, if any exist. - */ fn release() { unsafe { do (**self).with |state| { state.count += 1; // The peek is mandatory to make sure recv doesn't block. - if state.count >= 0 && state.waiters.head.peek() { + if state.count <= 0 && state.waiters.head.peek() { // Pop off the waitqueue and send a wakeup signal. If the // waiter was killed, its port will have closed, and send // will fail. Keep trying until we get a live task. state.waiters.head.recv().send(()); - // to-do: use this version when it's ready, kill-friendly. + // FIXME(#3145) use kill-friendly version when ready // while !state.waiters.head.recv().try_send(()) { } } } } } - - /// Runs a function with ownership of one of the semaphore's resources. +} +// FIXME(#3154) move both copies of this into sem, and unify the 2 structs +impl sem_access for &sem<()> { fn access(blk: fn() -> U) -> U { self.acquire(); let _x = sem_release(self); blk() } } +impl sem_access for &sem { + fn access(blk: fn() -> U) -> U { + self.acquire(); + let _x = sem_and_signal_release(self); + blk() + } +} // FIXME(#3136) should go inside of access() struct sem_release { - sem: &semaphore; - new(sem: &semaphore) { self.sem = sem; } + sem: &sem<()>; + new(sem: &sem<()>) { self.sem = sem; } drop { self.sem.release(); } } +struct sem_and_signal_release { + sem: &sem; + new(sem: &sem) { self.sem = sem; } + drop { self.sem.release(); } +} + +/// A mechanism for atomic-unlock-and-deschedule blocking and signalling. +enum condvar = &sem; + +impl condvar for condvar { + /// Atomically drop the associated lock, and block until a signal is sent. + fn wait() { + let (signal_end, wait_end) = pipes::stream(); + let mut signal_end = some(signal_end); + unsafe { + do (***self).with |state| { + // Drop the lock. + // FIXME(#3145) investigate why factoring doesn't compile. + state.count += 1; + if state.count <= 0 && state.waiters.head.peek() { + state.waiters.head.recv().send(()); + // FIXME(#3145) use kill-friendly version when ready + } + // Enqueue ourself to be woken up by a signaller. + state.blocked.tail.send(option::swap_unwrap(&mut signal_end)); + } + } + // Unconditionally "block". (Might not actually block if a signaller + // did send -- I mean 'unconditionally' in contrast with acquire().) + let _ = wait_end.recv(); + // Pick up the lock again. FIXME(#3145): unkillable? destructor? + (*self).acquire(); + } + + /// Wake up a blocked task. Returns false if there was no blocked task. + fn signal() -> bool { + unsafe { + do (***self).with |state| { + if state.blocked.head.peek() { + state.blocked.head.recv().send(()); + // FIXME(#3145) use kill-friendly version when ready + true + } else { + false + } + } + } + } + + /// Wake up all blocked tasks. Returns the number of tasks woken. + fn broadcast() -> uint { + unsafe { + do (***self).with |state| { + let mut count = 0; + while state.blocked.head.peek() { + // This is already kill-friendly. + state.blocked.head.recv().send(()); + count += 1; + } + count + } + } + } +} + +impl sem_and_signal for &sem { + fn access_cond(blk: fn(condvar) -> U) -> U { + do self.access { blk(condvar(self)) } + } +} + +/**************************************************************************** + * Semaphores + ****************************************************************************/ + +/// A counting, blocking, bounded-waiting semaphore. +enum semaphore = sem<()>; + +/// Create a new semaphore with the specified count. +fn new_semaphore(count: int) -> semaphore { + let (wait_tail, wait_head) = pipes::stream(); + semaphore(sem(exclusive({ mut count: count, + waiters: { head: wait_head, tail: wait_tail }, + blocked: () }))) +} + +impl semaphore for &semaphore { + /// Create a new handle to the semaphore. + fn clone() -> semaphore { semaphore(sem((***self).clone())) } + + /** + * Acquire a resource represented by the semaphore. Blocks if necessary + * until resource(s) become available. + */ + fn acquire() { (&**self).acquire() } + + /** + * Release a held resource represented by the semaphore. Wakes a blocked + * contending task, if any exist. + */ + fn release() { (&**self).release() } + + /// Run a function with ownership of one of the semaphore's resources. + fn access(blk: fn() -> U) -> U { (&**self).access(blk) } +} + +/**************************************************************************** + * Mutexes + ****************************************************************************/ + +/** + * A blocking, bounded-waiting, mutual exclusion lock with an associated + * FIFO condition variable. + */ +enum mutex = sem; + +/// Create a new mutex. +fn new_mutex() -> mutex { + let (wait_tail, wait_head) = pipes::stream(); + let (block_tail, block_head) = pipes::stream(); + mutex(sem(exclusive({ mut count: 1, + waiters: { head: wait_head, tail: wait_tail }, + blocked: { head: block_head, tail: block_tail } }))) +} + +impl mutex for &mutex { + /// Create a new handle to the mutex. + fn clone() -> mutex { mutex(sem((***self).clone())) } + + /// Run a function with ownership of the mutex. + fn lock(blk: fn() -> U) -> U { (&**self).access(blk) } + + /// Run a function with ownership of the mutex and a handle to a condvar. + fn lock_cond(blk: fn(condvar) -> U) -> U { + (&**self).access_cond(blk) + } +} + +/**************************************************************************** + * Reader-writer locks + ****************************************************************************/ + +// FIXME(#3145) implement + +/**************************************************************************** + * Tests + ****************************************************************************/ #[cfg(test)] mod tests { @@ -145,35 +283,6 @@ mod tests { c.send(()); } #[test] - fn test_sem_mutual_exclusion() { - // Unsafely achieve shared state, and do the textbook - // "load tmp <- ptr; inc tmp; store ptr <- tmp" dance. - let (c,p) = pipes::stream(); - let s = ~new_semaphore(1); - let s2 = ~s.clone(); - let sharedstate = ~0; - let ptr = ptr::addr_of(*sharedstate); - do task::spawn { - let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) }; - access_shared(sharedstate, s2, 10); - c.send(()); - } - access_shared(sharedstate, s, 10); - let _ = p.recv(); - - assert *sharedstate == 20; - - fn access_shared(sharedstate: &mut int, sem: &semaphore, n: uint) { - for n.times { - do sem.access { - let oldval = *sharedstate; - task::yield(); - *sharedstate = oldval + 1; - } - } - } - } - #[test] fn test_sem_multi_resource() { // Parent and child both get in the critical section at the same // time, and shake hands. @@ -214,4 +323,90 @@ mod tests { let _ = p.recv(); // wait for child to be done } } + #[test] + fn test_mutex() { + // Unsafely achieve shared state, and do the textbook + // "load tmp <- ptr; inc tmp; store ptr <- tmp" dance. + let (c,p) = pipes::stream(); + let m = ~new_mutex(); + let m2 = ~m.clone(); + let sharedstate = ~0; + let ptr = ptr::addr_of(*sharedstate); + do task::spawn { + let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) }; + access_shared(sharedstate, m2, 10); + c.send(()); + } + access_shared(sharedstate, m, 10); + let _ = p.recv(); + + assert *sharedstate == 20; + + fn access_shared(sharedstate: &mut int, sem: &mutex, n: uint) { + for n.times { + do sem.lock { + let oldval = *sharedstate; + task::yield(); + *sharedstate = oldval + 1; + } + } + } + } + #[test] + fn test_mutex_cond_wait() { + let m = ~new_mutex(); + let mut m2 = some(~m.clone()); + + // Child wakes up parent + do m.lock_cond |cond| { + let m2 = option::swap_unwrap(&mut m2); + do task::spawn { + do m2.lock_cond |cond| { cond.signal(); } + } + cond.wait(); + } + // Parent wakes up child + let (chan,port) = pipes::stream(); + let m3 = ~m.clone(); + do task::spawn { + do m3.lock_cond |cond| { + chan.send(()); + cond.wait(); + chan.send(()); + } + } + let _ = port.recv(); // Wait until child gets in the mutex + do m.lock_cond |cond| { + cond.signal(); + } + let _ = port.recv(); // Wait until child wakes up + } + #[test] + fn test_mutex_cond_broadcast() { + let num_waiters: uint = 12; + let m = ~new_mutex(); + let mut ports = ~[]; + + for num_waiters.times { + let mi = ~m.clone(); + let (chan, port) = pipes::stream(); + vec::push(ports, port); + do task::spawn { + do mi.lock_cond |cond| { + chan.send(()); + cond.wait(); + chan.send(()); + } + } + } + + // wait until all children get in the mutex + for ports.each |port| { let _ = port.recv(); } + do m.lock_cond |cond| { + let num_woken = cond.broadcast(); + assert num_woken == num_waiters; + } + // wait until all children wake up + for ports.each |port| { let _ = port.recv(); } + } }