using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ZeroLevel.Services.Async
{
///
/// An async-compatible producer/consumer queue.
///
/// The type of elements contained in the queue.
[DebuggerDisplay("Count = {_queue.Count}, MaxCount = {_maxCount}")]
[DebuggerTypeProxy(typeof(AsyncProducerConsumerQueue<>.DebugView))]
public sealed class AsyncProducerConsumerQueue : IDisposable
{
///
/// The underlying queue.
///
private readonly Queue _queue;
///
/// The maximum number of elements allowed in the queue.
///
private readonly int _maxCount;
///
/// The mutual-exclusion lock protecting the queue.
///
private readonly AsyncLock _mutex;
///
/// A condition variable that is signalled when the queue is not full.
///
private readonly AsyncConditionVariable _notFull;
///
/// A condition variable that is signalled when the queue is completed or not empty.
///
private readonly AsyncConditionVariable _completedOrNotEmpty;
///
/// A cancellation token source that is canceled when the queue is marked completed for adding.
///
private readonly CancellationTokenSource _completed;
///
/// A cached result that is common when calling .
///
internal static readonly DequeueResult FalseResult = new DequeueResult(null, default(T));
///
/// Creates a new async-compatible producer/consumer queue with the specified initial elements and a maximum element count.
///
/// The initial elements to place in the queue.
/// The maximum element count. This must be greater than zero.
public AsyncProducerConsumerQueue(IEnumerable collection, int maxCount)
{
if (maxCount <= 0)
throw new ArgumentOutOfRangeException("maxCount", "The maximum count must be greater than zero.");
_queue = collection == null ? new Queue() : new Queue(collection);
if (maxCount < _queue.Count)
throw new ArgumentException("The maximum count cannot be less than the number of elements in the collection.", "maxCount");
_maxCount = maxCount;
_mutex = new AsyncLock();
_notFull = new AsyncConditionVariable(_mutex);
_completedOrNotEmpty = new AsyncConditionVariable(_mutex);
_completed = new CancellationTokenSource();
}
///
/// Creates a new async-compatible producer/consumer queue with the specified initial elements.
///
/// The initial elements to place in the queue.
public AsyncProducerConsumerQueue(IEnumerable collection)
: this(collection, int.MaxValue)
{
}
///
/// Creates a new async-compatible producer/consumer queue with a maximum element count.
///
/// The maximum element count. This must be greater than zero.
public AsyncProducerConsumerQueue(int maxCount)
: this(null, maxCount)
{
}
///
/// Creates a new async-compatible producer/consumer queue.
///
public AsyncProducerConsumerQueue()
: this(null, int.MaxValue)
{
}
///
/// Whether the queue is empty.
///
private bool Empty { get { return _queue.Count == 0; } }
///
/// Whether the queue is full.
///
private bool Full { get { return _queue.Count == _maxCount; } }
///
/// Releases resources held by this instance. After disposal, any use of this instance is undefined.
///
public void Dispose()
{
_completed.Dispose();
}
///
/// Asynchronously marks the producer/consumer queue as complete for adding.
///
[Obsolete("Use CompleteAdding() instead.")]
public async Task CompleteAddingAsync()
{
using (await _mutex.LockAsync().ConfigureAwait(false))
{
if (_completed.IsCancellationRequested)
return;
_completed.Cancel();
_completedOrNotEmpty.NotifyAll();
}
}
///
/// Synchronously marks the producer/consumer queue as complete for adding.
///
public void CompleteAdding()
{
using (_mutex.Lock())
{
if (_completed.IsCancellationRequested)
return;
_completed.Cancel();
_completedOrNotEmpty.NotifyAll();
}
}
///
/// Attempts to enqueue an item.
///
/// The item to enqueue.
/// A cancellation token that can be used to abort the enqueue operation. If is not null, then this token must include signals from the object.
/// A synchronization object used to cancel related enqueue operations. May be null if this is the only enqueue operation.
internal async Task> TryEnqueueAsync(T item, CancellationToken cancellationToken, TaskCompletionSource abort)
{
try
{
using (var combinedToken = CancellationTokenHelpers.Normalize(_completed.Token, cancellationToken))
using (await _mutex.LockAsync().ConfigureAwait(false))
{
// Wait for the queue to be not full.
while (Full)
await _notFull.WaitAsync(combinedToken.Token).ConfigureAwait(false);
// Explicitly check whether the queue has been marked complete to prevent a race condition where notFull is signalled at the same time the queue is marked complete.
if (_completed.IsCancellationRequested)
return null;
// Set the abort signal. If another queue has already set the abort signal, then abort.
if (abort != null && !abort.TrySetCanceled())
return null;
_queue.Enqueue(item);
_completedOrNotEmpty.Notify();
return this;
}
}
catch (OperationCanceledException)
{
return null;
}
}
///
/// Attempts to enqueue an item. This method may block the calling thread.
///
/// The item to enqueue.
/// A cancellation token that can be used to abort the enqueue operation.
internal AsyncProducerConsumerQueue DoTryEnqueue(T item, CancellationToken cancellationToken)
{
try
{
using (var combinedToken = CancellationTokenHelpers.Normalize(_completed.Token, cancellationToken))
using (_mutex.Lock())
{
// Wait for the queue to be not full.
while (Full)
_notFull.Wait(combinedToken.Token);
// Explicitly check whether the queue has been marked complete to prevent a race condition where notFull is signalled at the same time the queue is marked complete.
if (_completed.IsCancellationRequested)
return null;
_queue.Enqueue(item);
_completedOrNotEmpty.Notify();
return this;
}
}
catch (OperationCanceledException)
{
return null;
}
}
///
/// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding.
///
/// The item to enqueue.
/// A cancellation token that can be used to abort the enqueue operation.
public async Task TryEnqueueAsync(T item, CancellationToken cancellationToken)
{
var ret = await TryEnqueueAsync(item, cancellationToken, null).ConfigureAwait(false);
if (ret != null)
return true;
cancellationToken.ThrowIfCancellationRequested();
return false;
}
///
/// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding. This method may block the calling thread.
///
/// The item to enqueue.
/// A cancellation token that can be used to abort the enqueue operation.
public bool TryEnqueue(T item, CancellationToken cancellationToken)
{
var ret = DoTryEnqueue(item, cancellationToken);
if (ret != null)
return true;
cancellationToken.ThrowIfCancellationRequested();
return false;
}
///
/// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding.
///
/// The item to enqueue.
public Task TryEnqueueAsync(T item)
{
return TryEnqueueAsync(item, CancellationToken.None);
}
///
/// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding. This method may block the calling thread.
///
/// The item to enqueue.
public bool TryEnqueue(T item)
{
return TryEnqueue(item, CancellationToken.None);
}
///
/// Enqueues an item to the producer/consumer queue. Throws if the producer/consumer queue has completed adding.
///
/// The item to enqueue.
/// A cancellation token that can be used to abort the enqueue operation.
public async Task EnqueueAsync(T item, CancellationToken cancellationToken)
{
var result = await TryEnqueueAsync(item, cancellationToken).ConfigureAwait(false);
if (!result)
throw new InvalidOperationException("Enqueue failed; the producer/consumer queue has completed adding.");
}
///
/// Enqueues an item to the producer/consumer queue. Throws if the producer/consumer queue has completed adding. This method may block the calling thread.
///
/// The item to enqueue.
/// A cancellation token that can be used to abort the enqueue operation.
public void Enqueue(T item, CancellationToken cancellationToken)
{
var result = TryEnqueue(item, cancellationToken);
if (!result)
throw new InvalidOperationException("Enqueue failed; the producer/consumer queue has completed adding.");
}
///
/// Enqueues an item to the producer/consumer queue. Throws if the producer/consumer queue has completed adding.
///
/// The item to enqueue.
public Task EnqueueAsync(T item)
{
return EnqueueAsync(item, CancellationToken.None);
}
///
/// Enqueues an item to the producer/consumer queue. This method may block the calling thread. Throws if the producer/consumer queue has completed adding.
///
/// The item to enqueue.
public void Enqueue(T item)
{
Enqueue(item, CancellationToken.None);
}
///
/// Asynchronously waits until an item is available to dequeue. Returns false if the producer/consumer queue has completed adding and there are no more items.
///
/// A cancellation token that can be used to abort the asynchronous wait.
public async Task OutputAvailableAsync(CancellationToken cancellationToken)
{
using (await _mutex.LockAsync().ConfigureAwait(false))
{
while (!_completed.IsCancellationRequested && Empty)
await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(false);
return !Empty;
}
}
///
/// Asynchronously waits until an item is available to dequeue. Returns false if the producer/consumer queue has completed adding and there are no more items.
///
public Task OutputAvailableAsync()
{
return OutputAvailableAsync(CancellationToken.None);
}
///
/// Provides a (synchronous) consuming enumerable for items in the producer/consumer queue.
///
/// A cancellation token that can be used to abort the synchronous enumeration.
public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken)
{
while (true)
{
var result = DoTryDequeue(cancellationToken);
if (!result.Success)
yield break;
yield return result.Item;
}
}
///
/// Provides a (synchronous) consuming enumerable for items in the producer/consumer queue.
///
public IEnumerable GetConsumingEnumerable()
{
return GetConsumingEnumerable(CancellationToken.None);
}
///
/// Attempts to dequeue an item.
///
/// A cancellation token that can be used to abort the dequeue operation. If is not null, then this token must include signals from the object.
/// A synchronization object used to cancel related dequeue operations. May be null if this is the only dequeue operation.
internal async Task TryDequeueAsync(CancellationToken cancellationToken, TaskCompletionSource abort)
{
try
{
using (await _mutex.LockAsync().ConfigureAwait(false))
{
while (!_completed.IsCancellationRequested && Empty)
await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(false);
if (_completed.IsCancellationRequested && Empty)
return FalseResult;
if (abort != null && !abort.TrySetCanceled())
return FalseResult;
var item = _queue.Dequeue();
_notFull.Notify();
return new DequeueResult(this, item);
}
}
catch (OperationCanceledException)
{
return FalseResult;
}
}
///
/// Attempts to dequeue an item. This method may block the calling thread.
///
/// A cancellation token that can be used to abort the dequeue operation.
internal DequeueResult DoTryDequeue(CancellationToken cancellationToken)
{
try
{
using (_mutex.Lock())
{
while (!_completed.IsCancellationRequested && Empty)
_completedOrNotEmpty.Wait(cancellationToken);
if (_completed.IsCancellationRequested && Empty)
return FalseResult;
var item = _queue.Dequeue();
_notFull.Notify();
return new DequeueResult(this, item);
}
}
catch (OperationCanceledException)
{
return FalseResult;
}
}
///
/// Attempts to dequeue an item from the producer/consumer queue.
///
/// A cancellation token that can be used to abort the dequeue operation.
public async Task TryDequeueAsync(CancellationToken cancellationToken)
{
var ret = await TryDequeueAsync(cancellationToken, null).ConfigureAwait(false);
if (ret.Success)
return ret;
cancellationToken.ThrowIfCancellationRequested();
return ret;
}
///
/// Attempts to dequeue an item from the producer/consumer queue. This method may block the calling thread.
///
/// A cancellation token that can be used to abort the dequeue operation.
public DequeueResult TryDequeue(CancellationToken cancellationToken)
{
var ret = DoTryDequeue(cancellationToken);
if (ret.Success)
return ret;
cancellationToken.ThrowIfCancellationRequested();
return ret;
}
///
/// Attempts to dequeue an item from the producer/consumer queue.
///
public Task TryDequeueAsync()
{
return TryDequeueAsync(CancellationToken.None);
}
///
/// Attempts to dequeue an item from the producer/consumer queue. This method may block the calling thread.
///
public DequeueResult TryDequeue()
{
return TryDequeue(CancellationToken.None);
}
///
/// Dequeues an item from the producer/consumer queue. Returns the dequeued item. Throws if the producer/consumer queue has completed adding and is empty.
///
/// A cancellation token that can be used to abort the dequeue operation.
/// The dequeued item.
public async Task DequeueAsync(CancellationToken cancellationToken)
{
var ret = await TryDequeueAsync(cancellationToken).ConfigureAwait(false);
if (!ret.Success)
throw new InvalidOperationException("Dequeue failed; the producer/consumer queue has completed adding and is empty.");
return ret.Item;
}
///
/// Dequeues an item from the producer/consumer queue. Returns the dequeued item. This method may block the calling thread. Throws if the producer/consumer queue has completed adding and is empty.
///
/// A cancellation token that can be used to abort the dequeue operation.
public T Dequeue(CancellationToken cancellationToken)
{
var ret = TryDequeue(cancellationToken);
if (!ret.Success)
throw new InvalidOperationException("Dequeue failed; the producer/consumer queue has completed adding and is empty.");
return ret.Item;
}
///
/// Dequeues an item from the producer/consumer queue. Returns the dequeued item. Throws if the producer/consumer queue has completed adding and is empty.
///
/// The dequeued item.
public Task DequeueAsync()
{
return DequeueAsync(CancellationToken.None);
}
///
/// Dequeues an item from the producer/consumer queue. Returns the dequeued item. This method may block the calling thread. Throws if the producer/consumer queue has completed adding and is empty.
///
/// The dequeued item.
public T Dequeue()
{
return Dequeue(CancellationToken.None);
}
///
/// The result of a TryDequeue, DequeueFromAny, or TryDequeueFromAny operation.
///
public sealed class DequeueResult
{
internal DequeueResult(AsyncProducerConsumerQueue queue, T item)
{
Queue = queue;
Item = item;
}
///
/// The queue from which the item was dequeued, or null if the operation failed.
///
public AsyncProducerConsumerQueue Queue { get; private set; }
///
/// Whether the operation was successful. This is true if and only if is not null.
///
public bool Success { get { return Queue != null; } }
///
/// The dequeued item. This is only valid if is not null.
///
public T Item { get; private set; }
}
[DebuggerNonUserCode]
internal sealed class DebugView
{
private readonly AsyncProducerConsumerQueue _queue;
public DebugView(AsyncProducerConsumerQueue queue)
{
_queue = queue;
}
[DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
public T[] Items
{
get { return _queue._queue.ToArray(); }
}
}
}
///
/// Provides methods for working on multiple instances.
///
public static class AsyncProducerConsumerQueueExtensions
{
///
/// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding.
///
/// The producer/consumer queues.
/// The item to enqueue.
/// A cancellation token that can be used to abort the enqueue operation.
/// The producer/consumer queue that received the item.
public static async Task> TryEnqueueToAnyAsync(this IEnumerable> queues, T item, CancellationToken cancellationToken)
{
var abort = new TaskCompletionSource();
using (var abortCancellationToken = CancellationTokenHelpers.FromTask(abort.Task))
using (var combinedToken = CancellationTokenHelpers.Normalize(abortCancellationToken.Token, cancellationToken))
{
var token = combinedToken.Token;
var tasks = queues.Select(q => q.TryEnqueueAsync(item, token, abort));
var results = await TaskShim.WhenAll(tasks).ConfigureAwait(false);
var ret = results.FirstOrDefault(x => x != null);
if (ret == null)
cancellationToken.ThrowIfCancellationRequested();
return ret;
}
}
///
/// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding. This method may block the calling thread.
///
/// The producer/consumer queues.
/// The item to enqueue.
/// A cancellation token that can be used to abort the enqueue operation.
/// The producer/consumer queue that received the item.
public static AsyncProducerConsumerQueue TryEnqueueToAny(this IEnumerable> queues, T item, CancellationToken cancellationToken)
{
return TryEnqueueToAnyAsync(queues, item, cancellationToken).WaitAndUnwrapException();
}
///
/// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding.
///
/// The producer/consumer queues.
/// The item to enqueue.
/// The producer/consumer queue that received the item.
public static Task> TryEnqueueToAnyAsync(this IEnumerable> queues, T item)
{
return TryEnqueueToAnyAsync(queues, item, CancellationToken.None);
}
///
/// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding. This method may block the calling thread.
///
/// The producer/consumer queues.
/// The item to enqueue.
/// The producer/consumer queue that received the item.
public static AsyncProducerConsumerQueue TryEnqueueToAny(this IEnumerable> queues, T item)
{
return TryEnqueueToAny(queues, item, CancellationToken.None);
}
///
/// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding.
///
/// The producer/consumer queues.
/// The item to enqueue.
/// A cancellation token that can be used to abort the enqueue operation.
/// The producer/consumer queue that received the item.
public static async Task> EnqueueToAnyAsync(this IEnumerable> queues, T item, CancellationToken cancellationToken)
{
var ret = await TryEnqueueToAnyAsync(queues, item, cancellationToken).ConfigureAwait(false);
if (ret == null)
throw new InvalidOperationException("Enqueue failed; all producer/consumer queues have completed adding.");
return ret;
}
///
/// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding. This method may block the calling thread.
///
/// The producer/consumer queues.
/// The item to enqueue.
/// A cancellation token that can be used to abort the enqueue operation.
/// The producer/consumer queue that received the item.
public static AsyncProducerConsumerQueue EnqueueToAny(this IEnumerable> queues, T item, CancellationToken cancellationToken)
{
var ret = TryEnqueueToAny(queues, item, cancellationToken);
if (ret == null)
throw new InvalidOperationException("Enqueue failed; all producer/consumer queues have completed adding.");
return ret;
}
///
/// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding.
///
/// The producer/consumer queues.
/// The item to enqueue.
/// The producer/consumer queue that received the item.
public static Task> EnqueueToAnyAsync(this IEnumerable> queues, T item)
{
return EnqueueToAnyAsync(queues, item, CancellationToken.None);
}
///
/// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding. This method may block the calling thread.
///
/// The producer/consumer queues.
/// The item to enqueue.
/// The producer/consumer queue that received the item.
public static AsyncProducerConsumerQueue EnqueueToAny(this IEnumerable> queues, T item)
{
return EnqueueToAny(queues, item, CancellationToken.None);
}
///
/// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty.
///
/// The producer/consumer queues.
/// A cancellation token that can be used to abort the dequeue operation.
public static async Task.DequeueResult> TryDequeueFromAnyAsync(this IEnumerable> queues, CancellationToken cancellationToken)
{
var abort = new TaskCompletionSource();
using (var abortCancellationToken = CancellationTokenHelpers.FromTask(abort.Task))
using (var combinedToken = CancellationTokenHelpers.Normalize(abortCancellationToken.Token, cancellationToken))
{
var token = combinedToken.Token;
var tasks = queues.Select(q => q.TryDequeueAsync(token, abort));
var results = await TaskShim.WhenAll(tasks).ConfigureAwait(false);
var result = results.FirstOrDefault(x => x.Success);
if (result != null)
return result;
cancellationToken.ThrowIfCancellationRequested();
return AsyncProducerConsumerQueue.FalseResult;
}
}
///
/// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
///
/// The producer/consumer queues.
/// A cancellation token that can be used to abort the dequeue operation.
public static AsyncProducerConsumerQueue.DequeueResult TryDequeueFromAny(this IEnumerable> queues, CancellationToken cancellationToken)
{
return TryDequeueFromAnyAsync(queues, cancellationToken).WaitAndUnwrapException();
}
///
/// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty.
///
/// The producer/consumer queues.
public static Task.DequeueResult> TryDequeueFromAnyAsync(this IEnumerable> queues)
{
return TryDequeueFromAnyAsync(queues, CancellationToken.None);
}
///
/// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
///
/// The producer/consumer queues.
public static AsyncProducerConsumerQueue.DequeueResult TryDequeueFromAny(this IEnumerable> queues)
{
return TryDequeueFromAny(queues, CancellationToken.None);
}
///
/// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty.
///
/// The producer/consumer queues.
/// A cancellation token that can be used to abort the dequeue operation.
public static async Task.DequeueResult> DequeueFromAnyAsync(this IEnumerable> queues, CancellationToken cancellationToken)
{
var ret = await TryDequeueFromAnyAsync(queues, cancellationToken).ConfigureAwait(false);
if (!ret.Success)
throw new InvalidOperationException("Dequeue failed; all producer/consumer queues have completed adding and are empty.");
return ret;
}
///
/// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
///
/// The producer/consumer queues.
/// A cancellation token that can be used to abort the dequeue operation.
public static AsyncProducerConsumerQueue.DequeueResult DequeueFromAny(this IEnumerable> queues, CancellationToken cancellationToken)
{
var ret = TryDequeueFromAny(queues, cancellationToken);
if (!ret.Success)
throw new InvalidOperationException("Dequeue failed; all producer/consumer queues have completed adding and are empty.");
return ret;
}
///
/// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty.
///
/// The producer/consumer queues.
public static Task.DequeueResult> DequeueFromAnyAsync(this IEnumerable> queues)
{
return DequeueFromAnyAsync(queues, CancellationToken.None);
}
///
/// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
///
/// The producer/consumer queues.
public static AsyncProducerConsumerQueue.DequeueResult DequeueFromAny(this IEnumerable> queues)
{
return DequeueFromAny(queues, CancellationToken.None);
}
}
}