using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace ZeroLevel.Services.Async
{
///
/// An async-compatible semaphore. Alternatively, you could use SemaphoreSlim on .NET 4.5 / Windows Store.
///
[DebuggerDisplay("Id = {Id}, CurrentCount = {_count}")]
[DebuggerTypeProxy(typeof(DebugView))]
public sealed class AsyncSemaphore
{
///
/// The queue of TCSs that other tasks are awaiting to acquire the semaphore.
///
private readonly IAsyncWaitQueue _queue;
///
/// The number of waits that will be immediately granted.
///
private int _count;
///
/// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
///
private int _id;
///
/// The object used for mutual exclusion.
///
private readonly object _mutex;
///
/// Creates a new async-compatible semaphore with the specified initial count.
///
/// The initial count for this semaphore. This must be greater than or equal to zero.
/// The wait queue used to manage waiters.
public AsyncSemaphore(int initialCount, IAsyncWaitQueue queue)
{
_queue = queue;
_count = initialCount;
_mutex = new object();
}
///
/// Creates a new async-compatible semaphore with the specified initial count.
///
/// The initial count for this semaphore. This must be greater than or equal to zero.
public AsyncSemaphore(int initialCount)
: this(initialCount, new DefaultAsyncWaitQueue())
{
}
///
/// Gets a semi-unique identifier for this asynchronous semaphore.
///
public int Id
{
get { return IdManager.GetId(ref _id); }
}
///
/// Gets the number of slots currently available on this semaphore.
///
public int CurrentCount
{
get { lock (_mutex) { return _count; } }
}
///
/// Asynchronously waits for a slot in the semaphore to be available.
///
/// The cancellation token used to cancel the wait. If this is already set, then this method will attempt to take the slot immediately (succeeding if a slot is currently available).
public Task WaitAsync(CancellationToken cancellationToken)
{
Task ret;
lock (_mutex)
{
// If the semaphore is available, take it immediately and return.
if (_count != 0)
{
--_count;
ret = TaskConstants.Completed;
}
else
{
// Wait for the semaphore to become available or cancellation.
ret = _queue.Enqueue(cancellationToken);
}
}
return ret;
}
///
/// Synchronously waits for a slot in the semaphore to be available. This method may block the calling thread.
///
/// The cancellation token used to cancel the wait. If this is already set, then this method will attempt to take the slot immediately (succeeding if a slot is currently available).
public void Wait(CancellationToken cancellationToken)
{
WaitAsync(cancellationToken).WaitAndUnwrapException();
}
///
/// Asynchronously waits for a slot in the semaphore to be available.
///
public Task WaitAsync()
{
return WaitAsync(CancellationToken.None);
}
///
/// Synchronously waits for a slot in the semaphore to be available. This method may block the calling thread.
///
public void Wait()
{
Wait(CancellationToken.None);
}
///
/// Releases the semaphore.
///
public void Release(int releaseCount)
{
if (releaseCount == 0)
return;
var finishes = new List();
lock (_mutex)
{
if (_count > int.MaxValue - releaseCount)
throw new InvalidOperationException("Could not release semaphore.");
var oldCount = _count;
while (releaseCount != 0)
{
if (_queue.IsEmpty)
++_count;
else
finishes.Add(_queue.Dequeue());
--releaseCount;
}
}
foreach (var finish in finishes)
finish.Dispose();
}
///
/// Releases the semaphore.
///
public void Release()
{
Release(1);
}
// ReSharper disable UnusedMember.Local
[DebuggerNonUserCode]
private sealed class DebugView
{
private readonly AsyncSemaphore _semaphore;
public DebugView(AsyncSemaphore semaphore)
{
_semaphore = semaphore;
}
public int Id { get { return _semaphore.Id; } }
public int CurrentCount { get { return _semaphore._count; } }
public IAsyncWaitQueue WaitQueue { get { return _semaphore._queue; } }
}
// ReSharper restore UnusedMember.Local
}
}