using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace ZeroLevel.Services.Async { /// /// An async-compatible producer/consumer queue. /// /// The type of elements contained in the queue. [DebuggerDisplay("Count = {_queue.Count}, MaxCount = {_maxCount}")] [DebuggerTypeProxy(typeof(AsyncProducerConsumerQueue<>.DebugView))] public sealed class AsyncProducerConsumerQueue : IDisposable { /// /// The underlying queue. /// private readonly Queue _queue; /// /// The maximum number of elements allowed in the queue. /// private readonly int _maxCount; /// /// The mutual-exclusion lock protecting the queue. /// private readonly AsyncLock _mutex; /// /// A condition variable that is signalled when the queue is not full. /// private readonly AsyncConditionVariable _notFull; /// /// A condition variable that is signalled when the queue is completed or not empty. /// private readonly AsyncConditionVariable _completedOrNotEmpty; /// /// A cancellation token source that is canceled when the queue is marked completed for adding. /// private readonly CancellationTokenSource _completed; /// /// A cached result that is common when calling . /// internal static readonly DequeueResult FalseResult = new DequeueResult(null, default(T)); /// /// Creates a new async-compatible producer/consumer queue with the specified initial elements and a maximum element count. /// /// The initial elements to place in the queue. /// The maximum element count. This must be greater than zero. public AsyncProducerConsumerQueue(IEnumerable collection, int maxCount) { if (maxCount <= 0) throw new ArgumentOutOfRangeException("maxCount", "The maximum count must be greater than zero."); _queue = collection == null ? new Queue() : new Queue(collection); if (maxCount < _queue.Count) throw new ArgumentException("The maximum count cannot be less than the number of elements in the collection.", "maxCount"); _maxCount = maxCount; _mutex = new AsyncLock(); _notFull = new AsyncConditionVariable(_mutex); _completedOrNotEmpty = new AsyncConditionVariable(_mutex); _completed = new CancellationTokenSource(); } /// /// Creates a new async-compatible producer/consumer queue with the specified initial elements. /// /// The initial elements to place in the queue. public AsyncProducerConsumerQueue(IEnumerable collection) : this(collection, int.MaxValue) { } /// /// Creates a new async-compatible producer/consumer queue with a maximum element count. /// /// The maximum element count. This must be greater than zero. public AsyncProducerConsumerQueue(int maxCount) : this(null, maxCount) { } /// /// Creates a new async-compatible producer/consumer queue. /// public AsyncProducerConsumerQueue() : this(null, int.MaxValue) { } /// /// Whether the queue is empty. /// private bool Empty { get { return _queue.Count == 0; } } /// /// Whether the queue is full. /// private bool Full { get { return _queue.Count == _maxCount; } } /// /// Releases resources held by this instance. After disposal, any use of this instance is undefined. /// public void Dispose() { _completed.Dispose(); } /// /// Asynchronously marks the producer/consumer queue as complete for adding. /// [Obsolete("Use CompleteAdding() instead.")] public async Task CompleteAddingAsync() { using (await _mutex.LockAsync().ConfigureAwait(false)) { if (_completed.IsCancellationRequested) return; _completed.Cancel(); _completedOrNotEmpty.NotifyAll(); } } /// /// Synchronously marks the producer/consumer queue as complete for adding. /// public void CompleteAdding() { using (_mutex.Lock()) { if (_completed.IsCancellationRequested) return; _completed.Cancel(); _completedOrNotEmpty.NotifyAll(); } } /// /// Attempts to enqueue an item. /// /// The item to enqueue. /// A cancellation token that can be used to abort the enqueue operation. If is not null, then this token must include signals from the object. /// A synchronization object used to cancel related enqueue operations. May be null if this is the only enqueue operation. internal async Task> TryEnqueueAsync(T item, CancellationToken cancellationToken, TaskCompletionSource abort) { try { using (var combinedToken = CancellationTokenHelpers.Normalize(_completed.Token, cancellationToken)) using (await _mutex.LockAsync().ConfigureAwait(false)) { // Wait for the queue to be not full. while (Full) await _notFull.WaitAsync(combinedToken.Token).ConfigureAwait(false); // Explicitly check whether the queue has been marked complete to prevent a race condition where notFull is signalled at the same time the queue is marked complete. if (_completed.IsCancellationRequested) return null; // Set the abort signal. If another queue has already set the abort signal, then abort. if (abort != null && !abort.TrySetCanceled()) return null; _queue.Enqueue(item); _completedOrNotEmpty.Notify(); return this; } } catch (OperationCanceledException) { return null; } } /// /// Attempts to enqueue an item. This method may block the calling thread. /// /// The item to enqueue. /// A cancellation token that can be used to abort the enqueue operation. internal AsyncProducerConsumerQueue DoTryEnqueue(T item, CancellationToken cancellationToken) { try { using (var combinedToken = CancellationTokenHelpers.Normalize(_completed.Token, cancellationToken)) using (_mutex.Lock()) { // Wait for the queue to be not full. while (Full) _notFull.Wait(combinedToken.Token); // Explicitly check whether the queue has been marked complete to prevent a race condition where notFull is signalled at the same time the queue is marked complete. if (_completed.IsCancellationRequested) return null; _queue.Enqueue(item); _completedOrNotEmpty.Notify(); return this; } } catch (OperationCanceledException) { return null; } } /// /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding. /// /// The item to enqueue. /// A cancellation token that can be used to abort the enqueue operation. public async Task TryEnqueueAsync(T item, CancellationToken cancellationToken) { var ret = await TryEnqueueAsync(item, cancellationToken, null).ConfigureAwait(false); if (ret != null) return true; cancellationToken.ThrowIfCancellationRequested(); return false; } /// /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding. This method may block the calling thread. /// /// The item to enqueue. /// A cancellation token that can be used to abort the enqueue operation. public bool TryEnqueue(T item, CancellationToken cancellationToken) { var ret = DoTryEnqueue(item, cancellationToken); if (ret != null) return true; cancellationToken.ThrowIfCancellationRequested(); return false; } /// /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding. /// /// The item to enqueue. public Task TryEnqueueAsync(T item) { return TryEnqueueAsync(item, CancellationToken.None); } /// /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding. This method may block the calling thread. /// /// The item to enqueue. public bool TryEnqueue(T item) { return TryEnqueue(item, CancellationToken.None); } /// /// Enqueues an item to the producer/consumer queue. Throws if the producer/consumer queue has completed adding. /// /// The item to enqueue. /// A cancellation token that can be used to abort the enqueue operation. public async Task EnqueueAsync(T item, CancellationToken cancellationToken) { var result = await TryEnqueueAsync(item, cancellationToken).ConfigureAwait(false); if (!result) throw new InvalidOperationException("Enqueue failed; the producer/consumer queue has completed adding."); } /// /// Enqueues an item to the producer/consumer queue. Throws if the producer/consumer queue has completed adding. This method may block the calling thread. /// /// The item to enqueue. /// A cancellation token that can be used to abort the enqueue operation. public void Enqueue(T item, CancellationToken cancellationToken) { var result = TryEnqueue(item, cancellationToken); if (!result) throw new InvalidOperationException("Enqueue failed; the producer/consumer queue has completed adding."); } /// /// Enqueues an item to the producer/consumer queue. Throws if the producer/consumer queue has completed adding. /// /// The item to enqueue. public Task EnqueueAsync(T item) { return EnqueueAsync(item, CancellationToken.None); } /// /// Enqueues an item to the producer/consumer queue. This method may block the calling thread. Throws if the producer/consumer queue has completed adding. /// /// The item to enqueue. public void Enqueue(T item) { Enqueue(item, CancellationToken.None); } /// /// Asynchronously waits until an item is available to dequeue. Returns false if the producer/consumer queue has completed adding and there are no more items. /// /// A cancellation token that can be used to abort the asynchronous wait. public async Task OutputAvailableAsync(CancellationToken cancellationToken) { using (await _mutex.LockAsync().ConfigureAwait(false)) { while (!_completed.IsCancellationRequested && Empty) await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(false); return !Empty; } } /// /// Asynchronously waits until an item is available to dequeue. Returns false if the producer/consumer queue has completed adding and there are no more items. /// public Task OutputAvailableAsync() { return OutputAvailableAsync(CancellationToken.None); } /// /// Provides a (synchronous) consuming enumerable for items in the producer/consumer queue. /// /// A cancellation token that can be used to abort the synchronous enumeration. public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken) { while (true) { var result = DoTryDequeue(cancellationToken); if (!result.Success) yield break; yield return result.Item; } } /// /// Provides a (synchronous) consuming enumerable for items in the producer/consumer queue. /// public IEnumerable GetConsumingEnumerable() { return GetConsumingEnumerable(CancellationToken.None); } /// /// Attempts to dequeue an item. /// /// A cancellation token that can be used to abort the dequeue operation. If is not null, then this token must include signals from the object. /// A synchronization object used to cancel related dequeue operations. May be null if this is the only dequeue operation. internal async Task TryDequeueAsync(CancellationToken cancellationToken, TaskCompletionSource abort) { try { using (await _mutex.LockAsync().ConfigureAwait(false)) { while (!_completed.IsCancellationRequested && Empty) await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(false); if (_completed.IsCancellationRequested && Empty) return FalseResult; if (abort != null && !abort.TrySetCanceled()) return FalseResult; var item = _queue.Dequeue(); _notFull.Notify(); return new DequeueResult(this, item); } } catch (OperationCanceledException) { return FalseResult; } } /// /// Attempts to dequeue an item. This method may block the calling thread. /// /// A cancellation token that can be used to abort the dequeue operation. internal DequeueResult DoTryDequeue(CancellationToken cancellationToken) { try { using (_mutex.Lock()) { while (!_completed.IsCancellationRequested && Empty) _completedOrNotEmpty.Wait(cancellationToken); if (_completed.IsCancellationRequested && Empty) return FalseResult; var item = _queue.Dequeue(); _notFull.Notify(); return new DequeueResult(this, item); } } catch (OperationCanceledException) { return FalseResult; } } /// /// Attempts to dequeue an item from the producer/consumer queue. /// /// A cancellation token that can be used to abort the dequeue operation. public async Task TryDequeueAsync(CancellationToken cancellationToken) { var ret = await TryDequeueAsync(cancellationToken, null).ConfigureAwait(false); if (ret.Success) return ret; cancellationToken.ThrowIfCancellationRequested(); return ret; } /// /// Attempts to dequeue an item from the producer/consumer queue. This method may block the calling thread. /// /// A cancellation token that can be used to abort the dequeue operation. public DequeueResult TryDequeue(CancellationToken cancellationToken) { var ret = DoTryDequeue(cancellationToken); if (ret.Success) return ret; cancellationToken.ThrowIfCancellationRequested(); return ret; } /// /// Attempts to dequeue an item from the producer/consumer queue. /// public Task TryDequeueAsync() { return TryDequeueAsync(CancellationToken.None); } /// /// Attempts to dequeue an item from the producer/consumer queue. This method may block the calling thread. /// public DequeueResult TryDequeue() { return TryDequeue(CancellationToken.None); } /// /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. Throws if the producer/consumer queue has completed adding and is empty. /// /// A cancellation token that can be used to abort the dequeue operation. /// The dequeued item. public async Task DequeueAsync(CancellationToken cancellationToken) { var ret = await TryDequeueAsync(cancellationToken).ConfigureAwait(false); if (!ret.Success) throw new InvalidOperationException("Dequeue failed; the producer/consumer queue has completed adding and is empty."); return ret.Item; } /// /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. This method may block the calling thread. Throws if the producer/consumer queue has completed adding and is empty. /// /// A cancellation token that can be used to abort the dequeue operation. public T Dequeue(CancellationToken cancellationToken) { var ret = TryDequeue(cancellationToken); if (!ret.Success) throw new InvalidOperationException("Dequeue failed; the producer/consumer queue has completed adding and is empty."); return ret.Item; } /// /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. Throws if the producer/consumer queue has completed adding and is empty. /// /// The dequeued item. public Task DequeueAsync() { return DequeueAsync(CancellationToken.None); } /// /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. This method may block the calling thread. Throws if the producer/consumer queue has completed adding and is empty. /// /// The dequeued item. public T Dequeue() { return Dequeue(CancellationToken.None); } /// /// The result of a TryDequeue, DequeueFromAny, or TryDequeueFromAny operation. /// public sealed class DequeueResult { internal DequeueResult(AsyncProducerConsumerQueue queue, T item) { Queue = queue; Item = item; } /// /// The queue from which the item was dequeued, or null if the operation failed. /// public AsyncProducerConsumerQueue Queue { get; private set; } /// /// Whether the operation was successful. This is true if and only if is not null. /// public bool Success { get { return Queue != null; } } /// /// The dequeued item. This is only valid if is not null. /// public T Item { get; private set; } } [DebuggerNonUserCode] internal sealed class DebugView { private readonly AsyncProducerConsumerQueue _queue; public DebugView(AsyncProducerConsumerQueue queue) { _queue = queue; } [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] public T[] Items { get { return _queue._queue.ToArray(); } } } } /// /// Provides methods for working on multiple instances. /// public static class AsyncProducerConsumerQueueExtensions { /// /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding. /// /// The producer/consumer queues. /// The item to enqueue. /// A cancellation token that can be used to abort the enqueue operation. /// The producer/consumer queue that received the item. public static async Task> TryEnqueueToAnyAsync(this IEnumerable> queues, T item, CancellationToken cancellationToken) { var abort = new TaskCompletionSource(); using (var abortCancellationToken = CancellationTokenHelpers.FromTask(abort.Task)) using (var combinedToken = CancellationTokenHelpers.Normalize(abortCancellationToken.Token, cancellationToken)) { var token = combinedToken.Token; var tasks = queues.Select(q => q.TryEnqueueAsync(item, token, abort)); var results = await TaskShim.WhenAll(tasks).ConfigureAwait(false); var ret = results.FirstOrDefault(x => x != null); if (ret == null) cancellationToken.ThrowIfCancellationRequested(); return ret; } } /// /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding. This method may block the calling thread. /// /// The producer/consumer queues. /// The item to enqueue. /// A cancellation token that can be used to abort the enqueue operation. /// The producer/consumer queue that received the item. public static AsyncProducerConsumerQueue TryEnqueueToAny(this IEnumerable> queues, T item, CancellationToken cancellationToken) { return TryEnqueueToAnyAsync(queues, item, cancellationToken).WaitAndUnwrapException(); } /// /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding. /// /// The producer/consumer queues. /// The item to enqueue. /// The producer/consumer queue that received the item. public static Task> TryEnqueueToAnyAsync(this IEnumerable> queues, T item) { return TryEnqueueToAnyAsync(queues, item, CancellationToken.None); } /// /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding. This method may block the calling thread. /// /// The producer/consumer queues. /// The item to enqueue. /// The producer/consumer queue that received the item. public static AsyncProducerConsumerQueue TryEnqueueToAny(this IEnumerable> queues, T item) { return TryEnqueueToAny(queues, item, CancellationToken.None); } /// /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding. /// /// The producer/consumer queues. /// The item to enqueue. /// A cancellation token that can be used to abort the enqueue operation. /// The producer/consumer queue that received the item. public static async Task> EnqueueToAnyAsync(this IEnumerable> queues, T item, CancellationToken cancellationToken) { var ret = await TryEnqueueToAnyAsync(queues, item, cancellationToken).ConfigureAwait(false); if (ret == null) throw new InvalidOperationException("Enqueue failed; all producer/consumer queues have completed adding."); return ret; } /// /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding. This method may block the calling thread. /// /// The producer/consumer queues. /// The item to enqueue. /// A cancellation token that can be used to abort the enqueue operation. /// The producer/consumer queue that received the item. public static AsyncProducerConsumerQueue EnqueueToAny(this IEnumerable> queues, T item, CancellationToken cancellationToken) { var ret = TryEnqueueToAny(queues, item, cancellationToken); if (ret == null) throw new InvalidOperationException("Enqueue failed; all producer/consumer queues have completed adding."); return ret; } /// /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding. /// /// The producer/consumer queues. /// The item to enqueue. /// The producer/consumer queue that received the item. public static Task> EnqueueToAnyAsync(this IEnumerable> queues, T item) { return EnqueueToAnyAsync(queues, item, CancellationToken.None); } /// /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding. This method may block the calling thread. /// /// The producer/consumer queues. /// The item to enqueue. /// The producer/consumer queue that received the item. public static AsyncProducerConsumerQueue EnqueueToAny(this IEnumerable> queues, T item) { return EnqueueToAny(queues, item, CancellationToken.None); } /// /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty. /// /// The producer/consumer queues. /// A cancellation token that can be used to abort the dequeue operation. public static async Task.DequeueResult> TryDequeueFromAnyAsync(this IEnumerable> queues, CancellationToken cancellationToken) { var abort = new TaskCompletionSource(); using (var abortCancellationToken = CancellationTokenHelpers.FromTask(abort.Task)) using (var combinedToken = CancellationTokenHelpers.Normalize(abortCancellationToken.Token, cancellationToken)) { var token = combinedToken.Token; var tasks = queues.Select(q => q.TryDequeueAsync(token, abort)); var results = await TaskShim.WhenAll(tasks).ConfigureAwait(false); var result = results.FirstOrDefault(x => x.Success); if (result != null) return result; cancellationToken.ThrowIfCancellationRequested(); return AsyncProducerConsumerQueue.FalseResult; } } /// /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread. /// /// The producer/consumer queues. /// A cancellation token that can be used to abort the dequeue operation. public static AsyncProducerConsumerQueue.DequeueResult TryDequeueFromAny(this IEnumerable> queues, CancellationToken cancellationToken) { return TryDequeueFromAnyAsync(queues, cancellationToken).WaitAndUnwrapException(); } /// /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty. /// /// The producer/consumer queues. public static Task.DequeueResult> TryDequeueFromAnyAsync(this IEnumerable> queues) { return TryDequeueFromAnyAsync(queues, CancellationToken.None); } /// /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread. /// /// The producer/consumer queues. public static AsyncProducerConsumerQueue.DequeueResult TryDequeueFromAny(this IEnumerable> queues) { return TryDequeueFromAny(queues, CancellationToken.None); } /// /// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty. /// /// The producer/consumer queues. /// A cancellation token that can be used to abort the dequeue operation. public static async Task.DequeueResult> DequeueFromAnyAsync(this IEnumerable> queues, CancellationToken cancellationToken) { var ret = await TryDequeueFromAnyAsync(queues, cancellationToken).ConfigureAwait(false); if (!ret.Success) throw new InvalidOperationException("Dequeue failed; all producer/consumer queues have completed adding and are empty."); return ret; } /// /// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread. /// /// The producer/consumer queues. /// A cancellation token that can be used to abort the dequeue operation. public static AsyncProducerConsumerQueue.DequeueResult DequeueFromAny(this IEnumerable> queues, CancellationToken cancellationToken) { var ret = TryDequeueFromAny(queues, cancellationToken); if (!ret.Success) throw new InvalidOperationException("Dequeue failed; all producer/consumer queues have completed adding and are empty."); return ret; } /// /// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty. /// /// The producer/consumer queues. public static Task.DequeueResult> DequeueFromAnyAsync(this IEnumerable> queues) { return DequeueFromAnyAsync(queues, CancellationToken.None); } /// /// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread. /// /// The producer/consumer queues. public static AsyncProducerConsumerQueue.DequeueResult DequeueFromAny(this IEnumerable> queues) { return DequeueFromAny(queues, CancellationToken.None); } } }