using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ZeroLevel.Services.Async
{
    /// 
    /// An async-compatible producer/consumer queue.
    /// 
    /// The type of elements contained in the queue.
    [DebuggerDisplay("Count = {_queue.Count}, MaxCount = {_maxCount}")]
    [DebuggerTypeProxy(typeof(AsyncProducerConsumerQueue<>.DebugView))]
    public sealed class AsyncProducerConsumerQueue : IDisposable
    {
        /// 
        /// The underlying queue.
        /// 
        private readonly Queue _queue;
        /// 
        /// The maximum number of elements allowed in the queue.
        /// 
        private readonly int _maxCount;
        /// 
        /// The mutual-exclusion lock protecting the queue.
        /// 
        private readonly AsyncLock _mutex;
        /// 
        /// A condition variable that is signalled when the queue is not full.
        /// 
        private readonly AsyncConditionVariable _notFull;
        /// 
        /// A condition variable that is signalled when the queue is completed or not empty.
        /// 
        private readonly AsyncConditionVariable _completedOrNotEmpty;
        /// 
        /// A cancellation token source that is canceled when the queue is marked completed for adding.
        /// 
        private readonly CancellationTokenSource _completed;
        /// 
        /// A cached result that is common when calling .
        /// 
        internal static readonly DequeueResult FalseResult = new DequeueResult(null, default(T));
        /// 
        /// Creates a new async-compatible producer/consumer queue with the specified initial elements and a maximum element count.
        /// 
        /// The initial elements to place in the queue.
        /// The maximum element count. This must be greater than zero.
        public AsyncProducerConsumerQueue(IEnumerable collection, int maxCount)
        {
            if (maxCount <= 0)
                throw new ArgumentOutOfRangeException("maxCount", "The maximum count must be greater than zero.");
            _queue = collection == null ? new Queue() : new Queue(collection);
            if (maxCount < _queue.Count)
                throw new ArgumentException("The maximum count cannot be less than the number of elements in the collection.", "maxCount");
            _maxCount = maxCount;
            _mutex = new AsyncLock();
            _notFull = new AsyncConditionVariable(_mutex);
            _completedOrNotEmpty = new AsyncConditionVariable(_mutex);
            _completed = new CancellationTokenSource();
        }
        /// 
        /// Creates a new async-compatible producer/consumer queue with the specified initial elements.
        /// 
        /// The initial elements to place in the queue.
        public AsyncProducerConsumerQueue(IEnumerable collection)
            : this(collection, int.MaxValue)
        {
        }
        /// 
        /// Creates a new async-compatible producer/consumer queue with a maximum element count.
        /// 
        /// The maximum element count. This must be greater than zero.
        public AsyncProducerConsumerQueue(int maxCount)
            : this(null, maxCount)
        {
        }
        /// 
        /// Creates a new async-compatible producer/consumer queue.
        /// 
        public AsyncProducerConsumerQueue()
            : this(null, int.MaxValue)
        {
        }
        /// 
        /// Whether the queue is empty.
        /// 
        private bool Empty { get { return _queue.Count == 0; } }
        /// 
        /// Whether the queue is full.
        /// 
        private bool Full { get { return _queue.Count == _maxCount; } }
        /// 
        /// Releases resources held by this instance. After disposal, any use of this instance is undefined.
        /// 
        public void Dispose()
        {
            _completed.Dispose();
        }
        /// 
        /// Asynchronously marks the producer/consumer queue as complete for adding.
        /// 
        [Obsolete("Use CompleteAdding() instead.")]
        public async Task CompleteAddingAsync()
        {
            using (await _mutex.LockAsync().ConfigureAwait(false))
            {
                if (_completed.IsCancellationRequested)
                    return;
                _completed.Cancel();
                _completedOrNotEmpty.NotifyAll();
            }
        }
        /// 
        /// Synchronously marks the producer/consumer queue as complete for adding.
        /// 
        public void CompleteAdding()
        {
            using (_mutex.Lock())
            {
                if (_completed.IsCancellationRequested)
                    return;
                _completed.Cancel();
                _completedOrNotEmpty.NotifyAll();
            }
        }
        /// 
        /// Attempts to enqueue an item.
        /// 
        /// The item to enqueue.
        /// A cancellation token that can be used to abort the enqueue operation. If  is not null, then this token must include signals from the  object.
        /// A synchronization object used to cancel related enqueue operations. May be null if this is the only enqueue operation.
        internal async Task> TryEnqueueAsync(T item, CancellationToken cancellationToken, TaskCompletionSource abort)
        {
            try
            {
                using (var combinedToken = CancellationTokenHelpers.Normalize(_completed.Token, cancellationToken))
                using (await _mutex.LockAsync().ConfigureAwait(false))
                {
                    // Wait for the queue to be not full.
                    while (Full)
                        await _notFull.WaitAsync(combinedToken.Token).ConfigureAwait(false);
                    // Explicitly check whether the queue has been marked complete to prevent a race condition where notFull is signalled at the same time the queue is marked complete.
                    if (_completed.IsCancellationRequested)
                        return null;
                    // Set the abort signal. If another queue has already set the abort signal, then abort.
                    if (abort != null && !abort.TrySetCanceled())
                        return null;
                    _queue.Enqueue(item);
                    _completedOrNotEmpty.Notify();
                    return this;
                }
            }
            catch (OperationCanceledException)
            {
                return null;
            }
        }
        /// 
        /// Attempts to enqueue an item. This method may block the calling thread.
        /// 
        /// The item to enqueue.
        /// A cancellation token that can be used to abort the enqueue operation.
        internal AsyncProducerConsumerQueue DoTryEnqueue(T item, CancellationToken cancellationToken)
        {
            try
            {
                using (var combinedToken = CancellationTokenHelpers.Normalize(_completed.Token, cancellationToken))
                using (_mutex.Lock())
                {
                    // Wait for the queue to be not full.
                    while (Full)
                        _notFull.Wait(combinedToken.Token);
                    // Explicitly check whether the queue has been marked complete to prevent a race condition where notFull is signalled at the same time the queue is marked complete.
                    if (_completed.IsCancellationRequested)
                        return null;
                    _queue.Enqueue(item);
                    _completedOrNotEmpty.Notify();
                    return this;
                }
            }
            catch (OperationCanceledException)
            {
                return null;
            }
        }
        /// 
        /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding.
        /// 
        /// The item to enqueue.
        /// A cancellation token that can be used to abort the enqueue operation.
        public async Task TryEnqueueAsync(T item, CancellationToken cancellationToken)
        {
            var ret = await TryEnqueueAsync(item, cancellationToken, null).ConfigureAwait(false);
            if (ret != null)
                return true;
            cancellationToken.ThrowIfCancellationRequested();
            return false;
        }
        /// 
        /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding. This method may block the calling thread.
        /// 
        /// The item to enqueue.
        /// A cancellation token that can be used to abort the enqueue operation.
        public bool TryEnqueue(T item, CancellationToken cancellationToken)
        {
            var ret = DoTryEnqueue(item, cancellationToken);
            if (ret != null)
                return true;
            cancellationToken.ThrowIfCancellationRequested();
            return false;
        }
        /// 
        /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding.
        /// 
        /// The item to enqueue.
        public Task TryEnqueueAsync(T item)
        {
            return TryEnqueueAsync(item, CancellationToken.None);
        }
        /// 
        /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding. This method may block the calling thread.
        /// 
        /// The item to enqueue.
        public bool TryEnqueue(T item)
        {
            return TryEnqueue(item, CancellationToken.None);
        }
        /// 
        /// Enqueues an item to the producer/consumer queue. Throws  if the producer/consumer queue has completed adding.
        /// 
        /// The item to enqueue.
        /// A cancellation token that can be used to abort the enqueue operation.
        public async Task EnqueueAsync(T item, CancellationToken cancellationToken)
        {
            var result = await TryEnqueueAsync(item, cancellationToken).ConfigureAwait(false);
            if (!result)
                throw new InvalidOperationException("Enqueue failed; the producer/consumer queue has completed adding.");
        }
        /// 
        /// Enqueues an item to the producer/consumer queue. Throws  if the producer/consumer queue has completed adding. This method may block the calling thread.
        /// 
        /// The item to enqueue.
        /// A cancellation token that can be used to abort the enqueue operation.
        public void Enqueue(T item, CancellationToken cancellationToken)
        {
            var result = TryEnqueue(item, cancellationToken);
            if (!result)
                throw new InvalidOperationException("Enqueue failed; the producer/consumer queue has completed adding.");
        }
        /// 
        /// Enqueues an item to the producer/consumer queue. Throws  if the producer/consumer queue has completed adding.
        /// 
        /// The item to enqueue.
        public Task EnqueueAsync(T item)
        {
            return EnqueueAsync(item, CancellationToken.None);
        }
        /// 
        /// Enqueues an item to the producer/consumer queue. This method may block the calling thread. Throws  if the producer/consumer queue has completed adding.
        /// 
        /// The item to enqueue.
        public void Enqueue(T item)
        {
            Enqueue(item, CancellationToken.None);
        }
        /// 
        /// Asynchronously waits until an item is available to dequeue. Returns false if the producer/consumer queue has completed adding and there are no more items.
        /// 
        /// A cancellation token that can be used to abort the asynchronous wait.
        public async Task OutputAvailableAsync(CancellationToken cancellationToken)
        {
            using (await _mutex.LockAsync().ConfigureAwait(false))
            {
                while (!_completed.IsCancellationRequested && Empty)
                    await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(false);
                return !Empty;
            }
        }
        /// 
        /// Asynchronously waits until an item is available to dequeue. Returns false if the producer/consumer queue has completed adding and there are no more items.
        /// 
        public Task OutputAvailableAsync()
        {
            return OutputAvailableAsync(CancellationToken.None);
        }
        /// 
        /// Provides a (synchronous) consuming enumerable for items in the producer/consumer queue.
        /// 
        /// A cancellation token that can be used to abort the synchronous enumeration.
        public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken)
        {
            while (true)
            {
                var result = DoTryDequeue(cancellationToken);
                if (!result.Success)
                    yield break;
                yield return result.Item;
            }
        }
        /// 
        /// Provides a (synchronous) consuming enumerable for items in the producer/consumer queue.
        /// 
        public IEnumerable GetConsumingEnumerable()
        {
            return GetConsumingEnumerable(CancellationToken.None);
        }
        /// 
        /// Attempts to dequeue an item.
        /// 
        /// A cancellation token that can be used to abort the dequeue operation. If  is not null, then this token must include signals from the  object.
        /// A synchronization object used to cancel related dequeue operations. May be null if this is the only dequeue operation.
        internal async Task TryDequeueAsync(CancellationToken cancellationToken, TaskCompletionSource abort)
        {
            try
            {
                using (await _mutex.LockAsync().ConfigureAwait(false))
                {
                    while (!_completed.IsCancellationRequested && Empty)
                        await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(false);
                    if (_completed.IsCancellationRequested && Empty)
                        return FalseResult;
                    if (abort != null && !abort.TrySetCanceled())
                        return FalseResult;
                    var item = _queue.Dequeue();
                    _notFull.Notify();
                    return new DequeueResult(this, item);
                }
            }
            catch (OperationCanceledException)
            {
                return FalseResult;
            }
        }
        /// 
        /// Attempts to dequeue an item. This method may block the calling thread.
        /// 
        /// A cancellation token that can be used to abort the dequeue operation.
        internal DequeueResult DoTryDequeue(CancellationToken cancellationToken)
        {
            try
            {
                using (_mutex.Lock())
                {
                    while (!_completed.IsCancellationRequested && Empty)
                        _completedOrNotEmpty.Wait(cancellationToken);
                    if (_completed.IsCancellationRequested && Empty)
                        return FalseResult;
                    var item = _queue.Dequeue();
                    _notFull.Notify();
                    return new DequeueResult(this, item);
                }
            }
            catch (OperationCanceledException)
            {
                return FalseResult;
            }
        }
        /// 
        /// Attempts to dequeue an item from the producer/consumer queue.
        /// 
        /// A cancellation token that can be used to abort the dequeue operation.
        public async Task TryDequeueAsync(CancellationToken cancellationToken)
        {
            var ret = await TryDequeueAsync(cancellationToken, null).ConfigureAwait(false);
            if (ret.Success)
                return ret;
            cancellationToken.ThrowIfCancellationRequested();
            return ret;
        }
        /// 
        /// Attempts to dequeue an item from the producer/consumer queue. This method may block the calling thread.
        /// 
        /// A cancellation token that can be used to abort the dequeue operation.
        public DequeueResult TryDequeue(CancellationToken cancellationToken)
        {
            var ret = DoTryDequeue(cancellationToken);
            if (ret.Success)
                return ret;
            cancellationToken.ThrowIfCancellationRequested();
            return ret;
        }
        /// 
        /// Attempts to dequeue an item from the producer/consumer queue.
        /// 
        public Task TryDequeueAsync()
        {
            return TryDequeueAsync(CancellationToken.None);
        }
        /// 
        /// Attempts to dequeue an item from the producer/consumer queue. This method may block the calling thread.
        /// 
        public DequeueResult TryDequeue()
        {
            return TryDequeue(CancellationToken.None);
        }
        /// 
        /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. Throws  if the producer/consumer queue has completed adding and is empty.
        /// 
        /// A cancellation token that can be used to abort the dequeue operation.
        /// The dequeued item.
        public async Task DequeueAsync(CancellationToken cancellationToken)
        {
            var ret = await TryDequeueAsync(cancellationToken).ConfigureAwait(false);
            if (!ret.Success)
                throw new InvalidOperationException("Dequeue failed; the producer/consumer queue has completed adding and is empty.");
            return ret.Item;
        }
        /// 
        /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. This method may block the calling thread. Throws  if the producer/consumer queue has completed adding and is empty.
        /// 
        /// A cancellation token that can be used to abort the dequeue operation.
        public T Dequeue(CancellationToken cancellationToken)
        {
            var ret = TryDequeue(cancellationToken);
            if (!ret.Success)
                throw new InvalidOperationException("Dequeue failed; the producer/consumer queue has completed adding and is empty.");
            return ret.Item;
        }
        /// 
        /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. Throws  if the producer/consumer queue has completed adding and is empty.
        /// 
        /// The dequeued item.
        public Task DequeueAsync()
        {
            return DequeueAsync(CancellationToken.None);
        }
        /// 
        /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. This method may block the calling thread. Throws  if the producer/consumer queue has completed adding and is empty.
        /// 
        /// The dequeued item.
        public T Dequeue()
        {
            return Dequeue(CancellationToken.None);
        }
        /// 
        /// The result of a TryDequeue, DequeueFromAny, or TryDequeueFromAny operation.
        /// 
        public sealed class DequeueResult
        {
            internal DequeueResult(AsyncProducerConsumerQueue queue, T item)
            {
                Queue = queue;
                Item = item;
            }
            /// 
            /// The queue from which the item was dequeued, or null if the operation failed.
            /// 
            public AsyncProducerConsumerQueue Queue { get; private set; }
            /// 
            /// Whether the operation was successful. This is true if and only if  is not null.
            /// 
            public bool Success { get { return Queue != null; } }
            /// 
            /// The dequeued item. This is only valid if  is not null.
            /// 
            public T Item { get; private set; }
        }
        [DebuggerNonUserCode]
        internal sealed class DebugView
        {
            private readonly AsyncProducerConsumerQueue _queue;
            public DebugView(AsyncProducerConsumerQueue queue)
            {
                _queue = queue;
            }
            [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
            public T[] Items
            {
                get { return _queue._queue.ToArray(); }
            }
        }
    }
    /// 
    /// Provides methods for working on multiple  instances.
    /// 
    public static class AsyncProducerConsumerQueueExtensions
    {
        /// 
        /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding.
        /// 
        /// The producer/consumer queues.
        /// The item to enqueue.
        /// A cancellation token that can be used to abort the enqueue operation.
        /// The producer/consumer queue that received the item.
        public static async Task> TryEnqueueToAnyAsync(this IEnumerable> queues, T item, CancellationToken cancellationToken)
        {
            var abort = new TaskCompletionSource();
            using (var abortCancellationToken = CancellationTokenHelpers.FromTask(abort.Task))
            using (var combinedToken = CancellationTokenHelpers.Normalize(abortCancellationToken.Token, cancellationToken))
            {
                var token = combinedToken.Token;
                var tasks = queues.Select(q => q.TryEnqueueAsync(item, token, abort));
                var results = await TaskShim.WhenAll(tasks).ConfigureAwait(false);
                var ret = results.FirstOrDefault(x => x != null);
                if (ret == null)
                    cancellationToken.ThrowIfCancellationRequested();
                return ret;
            }
        }
        /// 
        /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding. This method may block the calling thread.
        /// 
        /// The producer/consumer queues.
        /// The item to enqueue.
        /// A cancellation token that can be used to abort the enqueue operation.
        /// The producer/consumer queue that received the item.
        public static AsyncProducerConsumerQueue TryEnqueueToAny(this IEnumerable> queues, T item, CancellationToken cancellationToken)
        {
            return TryEnqueueToAnyAsync(queues, item, cancellationToken).WaitAndUnwrapException();
        }
        /// 
        /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding.
        /// 
        /// The producer/consumer queues.
        /// The item to enqueue.
        /// The producer/consumer queue that received the item.
        public static Task> TryEnqueueToAnyAsync(this IEnumerable> queues, T item)
        {
            return TryEnqueueToAnyAsync(queues, item, CancellationToken.None);
        }
        /// 
        /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding. This method may block the calling thread.
        /// 
        /// The producer/consumer queues.
        /// The item to enqueue.
        /// The producer/consumer queue that received the item.
        public static AsyncProducerConsumerQueue TryEnqueueToAny(this IEnumerable> queues, T item)
        {
            return TryEnqueueToAny(queues, item, CancellationToken.None);
        }
        /// 
        /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws  if all producer/consumer queues have completed adding.
        /// 
        /// The producer/consumer queues.
        /// The item to enqueue.
        /// A cancellation token that can be used to abort the enqueue operation.
        /// The producer/consumer queue that received the item.
        public static async Task> EnqueueToAnyAsync(this IEnumerable> queues, T item, CancellationToken cancellationToken)
        {
            var ret = await TryEnqueueToAnyAsync(queues, item, cancellationToken).ConfigureAwait(false);
            if (ret == null)
                throw new InvalidOperationException("Enqueue failed; all producer/consumer queues have completed adding.");
            return ret;
        }
        /// 
        /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws  if all producer/consumer queues have completed adding. This method may block the calling thread.
        /// 
        /// The producer/consumer queues.
        /// The item to enqueue.
        /// A cancellation token that can be used to abort the enqueue operation.
        /// The producer/consumer queue that received the item.
        public static AsyncProducerConsumerQueue EnqueueToAny(this IEnumerable> queues, T item, CancellationToken cancellationToken)
        {
            var ret = TryEnqueueToAny(queues, item, cancellationToken);
            if (ret == null)
                throw new InvalidOperationException("Enqueue failed; all producer/consumer queues have completed adding.");
            return ret;
        }
        /// 
        /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws  if all producer/consumer queues have completed adding.
        /// 
        /// The producer/consumer queues.
        /// The item to enqueue.
        /// The producer/consumer queue that received the item.
        public static Task> EnqueueToAnyAsync(this IEnumerable> queues, T item)
        {
            return EnqueueToAnyAsync(queues, item, CancellationToken.None);
        }
        /// 
        /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws  if all producer/consumer queues have completed adding. This method may block the calling thread.
        /// 
        /// The producer/consumer queues.
        /// The item to enqueue.
        /// The producer/consumer queue that received the item.
        public static AsyncProducerConsumerQueue EnqueueToAny(this IEnumerable> queues, T item)
        {
            return EnqueueToAny(queues, item, CancellationToken.None);
        }
        /// 
        /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty.
        /// 
        /// The producer/consumer queues.
        /// A cancellation token that can be used to abort the dequeue operation.
        public static async Task.DequeueResult> TryDequeueFromAnyAsync(this IEnumerable> queues, CancellationToken cancellationToken)
        {
            var abort = new TaskCompletionSource();
            using (var abortCancellationToken = CancellationTokenHelpers.FromTask(abort.Task))
            using (var combinedToken = CancellationTokenHelpers.Normalize(abortCancellationToken.Token, cancellationToken))
            {
                var token = combinedToken.Token;
                var tasks = queues.Select(q => q.TryDequeueAsync(token, abort));
                var results = await TaskShim.WhenAll(tasks).ConfigureAwait(false);
                var result = results.FirstOrDefault(x => x.Success);
                if (result != null)
                    return result;
                cancellationToken.ThrowIfCancellationRequested();
                return AsyncProducerConsumerQueue.FalseResult;
            }
        }
        /// 
        /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
        /// 
        /// The producer/consumer queues.
        /// A cancellation token that can be used to abort the dequeue operation.
        public static AsyncProducerConsumerQueue.DequeueResult TryDequeueFromAny(this IEnumerable> queues, CancellationToken cancellationToken)
        {
            return TryDequeueFromAnyAsync(queues, cancellationToken).WaitAndUnwrapException();
        }
        /// 
        /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty.
        /// 
        /// The producer/consumer queues.
        public static Task.DequeueResult> TryDequeueFromAnyAsync(this IEnumerable> queues)
        {
            return TryDequeueFromAnyAsync(queues, CancellationToken.None);
        }
        /// 
        /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
        /// 
        /// The producer/consumer queues.
        public static AsyncProducerConsumerQueue.DequeueResult TryDequeueFromAny(this IEnumerable> queues)
        {
            return TryDequeueFromAny(queues, CancellationToken.None);
        }
        /// 
        /// Dequeues an item from any of a number of producer/consumer queues. Throws  if all the producer/consumer queues have completed adding and are empty.
        /// 
        /// The producer/consumer queues.
        /// A cancellation token that can be used to abort the dequeue operation.
        public static async Task.DequeueResult> DequeueFromAnyAsync(this IEnumerable> queues, CancellationToken cancellationToken)
        {
            var ret = await TryDequeueFromAnyAsync(queues, cancellationToken).ConfigureAwait(false);
            if (!ret.Success)
                throw new InvalidOperationException("Dequeue failed; all producer/consumer queues have completed adding and are empty.");
            return ret;
        }
        /// 
        /// Dequeues an item from any of a number of producer/consumer queues. Throws  if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
        /// 
        /// The producer/consumer queues.
        /// A cancellation token that can be used to abort the dequeue operation.
        public static AsyncProducerConsumerQueue.DequeueResult DequeueFromAny(this IEnumerable> queues, CancellationToken cancellationToken)
        {
            var ret = TryDequeueFromAny(queues, cancellationToken);
            if (!ret.Success)
                throw new InvalidOperationException("Dequeue failed; all producer/consumer queues have completed adding and are empty.");
            return ret;
        }
        /// 
        /// Dequeues an item from any of a number of producer/consumer queues. Throws  if all the producer/consumer queues have completed adding and are empty.
        /// 
        /// The producer/consumer queues.
        public static Task.DequeueResult> DequeueFromAnyAsync(this IEnumerable> queues)
        {
            return DequeueFromAnyAsync(queues, CancellationToken.None);
        }
        /// 
        /// Dequeues an item from any of a number of producer/consumer queues. Throws  if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
        /// 
        /// The producer/consumer queues.
        public static AsyncProducerConsumerQueue.DequeueResult DequeueFromAny(this IEnumerable> queues)
        {
            return DequeueFromAny(queues, CancellationToken.None);
        }
    }
}