using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace ZeroLevel.Services.Async { /// /// An async-compatible semaphore. Alternatively, you could use SemaphoreSlim on .NET 4.5 / Windows Store. /// [DebuggerDisplay("Id = {Id}, CurrentCount = {_count}")] [DebuggerTypeProxy(typeof(DebugView))] public sealed class AsyncSemaphore { /// /// The queue of TCSs that other tasks are awaiting to acquire the semaphore. /// private readonly IAsyncWaitQueue _queue; /// /// The number of waits that will be immediately granted. /// private int _count; /// /// The semi-unique identifier for this instance. This is 0 if the id has not yet been created. /// private int _id; /// /// The object used for mutual exclusion. /// private readonly object _mutex; /// /// Creates a new async-compatible semaphore with the specified initial count. /// /// The initial count for this semaphore. This must be greater than or equal to zero. /// The wait queue used to manage waiters. public AsyncSemaphore(int initialCount, IAsyncWaitQueue queue) { _queue = queue; _count = initialCount; _mutex = new object(); } /// /// Creates a new async-compatible semaphore with the specified initial count. /// /// The initial count for this semaphore. This must be greater than or equal to zero. public AsyncSemaphore(int initialCount) : this(initialCount, new DefaultAsyncWaitQueue()) { } /// /// Gets a semi-unique identifier for this asynchronous semaphore. /// public int Id { get { return IdManager.GetId(ref _id); } } /// /// Gets the number of slots currently available on this semaphore. /// public int CurrentCount { get { lock (_mutex) { return _count; } } } /// /// Asynchronously waits for a slot in the semaphore to be available. /// /// The cancellation token used to cancel the wait. If this is already set, then this method will attempt to take the slot immediately (succeeding if a slot is currently available). public Task WaitAsync(CancellationToken cancellationToken) { Task ret; lock (_mutex) { // If the semaphore is available, take it immediately and return. if (_count != 0) { --_count; ret = TaskConstants.Completed; } else { // Wait for the semaphore to become available or cancellation. ret = _queue.Enqueue(cancellationToken); } } return ret; } /// /// Synchronously waits for a slot in the semaphore to be available. This method may block the calling thread. /// /// The cancellation token used to cancel the wait. If this is already set, then this method will attempt to take the slot immediately (succeeding if a slot is currently available). public void Wait(CancellationToken cancellationToken) { WaitAsync(cancellationToken).WaitAndUnwrapException(); } /// /// Asynchronously waits for a slot in the semaphore to be available. /// public Task WaitAsync() { return WaitAsync(CancellationToken.None); } /// /// Synchronously waits for a slot in the semaphore to be available. This method may block the calling thread. /// public void Wait() { Wait(CancellationToken.None); } /// /// Releases the semaphore. /// public void Release(int releaseCount) { if (releaseCount == 0) return; var finishes = new List(); lock (_mutex) { if (_count > int.MaxValue - releaseCount) throw new InvalidOperationException("Could not release semaphore."); var oldCount = _count; while (releaseCount != 0) { if (_queue.IsEmpty) ++_count; else finishes.Add(_queue.Dequeue()); --releaseCount; } } foreach (var finish in finishes) finish.Dispose(); } /// /// Releases the semaphore. /// public void Release() { Release(1); } // ReSharper disable UnusedMember.Local [DebuggerNonUserCode] private sealed class DebugView { private readonly AsyncSemaphore _semaphore; public DebugView(AsyncSemaphore semaphore) { _semaphore = semaphore; } public int Id { get { return _semaphore.Id; } } public int CurrentCount { get { return _semaphore._count; } } public IAsyncWaitQueue WaitQueue { get { return _semaphore._queue; } } } // ReSharper restore UnusedMember.Local } }