diff --git a/ZeroLevel/Services/Async/AnyResult.cs b/ZeroLevel/Services/Async/AnyResult.cs
new file mode 100644
index 0000000..9dd7240
--- /dev/null
+++ b/ZeroLevel/Services/Async/AnyResult.cs
@@ -0,0 +1,46 @@
+using System;
+using System.Collections.Generic;
+
+namespace ZeroLevel.Services.Async
+{
+ ///
+ /// Represents an item retrieved from one of the asynchronous collections.
+ ///
+ public struct AnyResult
+ : IEquatable>
+ {
+ public AnyResult(T value, int collectionIndex)
+ {
+ Value = value;
+ CollectionIndex = collectionIndex;
+ }
+
+ ///
+ /// Gets the item retrieved from a collection.
+ ///
+ public T Value { get; }
+
+ ///
+ /// Gets the index of the collection the item was retrieved from.
+ ///
+ public int CollectionIndex { get; }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ const int prime = -1521134295;
+ int hash = 12345701;
+ hash = hash * prime + EqualityComparer.Default.GetHashCode(Value);
+ hash = hash * prime + EqualityComparer.Default.GetHashCode(CollectionIndex);
+ return hash;
+ }
+ }
+
+ public bool Equals(AnyResult other) => EqualityComparer.Default.Equals(Value, other.Value) && EqualityComparer.Default.Equals(CollectionIndex, other.CollectionIndex);
+ public override bool Equals(object obj) => obj is AnyResult && Equals((AnyResult)obj);
+
+ public static bool operator ==(AnyResult x, AnyResult y) => x.Equals(y);
+ public static bool operator !=(AnyResult x, AnyResult y) => !x.Equals(y);
+ }
+}
diff --git a/ZeroLevel/Services/Async/AsyncBatchQueue.cs b/ZeroLevel/Services/Async/AsyncBatchQueue.cs
new file mode 100644
index 0000000..335989f
--- /dev/null
+++ b/ZeroLevel/Services/Async/AsyncBatchQueue.cs
@@ -0,0 +1,184 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ZeroLevel.Services.Async
+{
+ ///
+ /// Represents a thread-safe collection that groups the items added to it into batches and allows consuming them asynchronously.
+ ///
+ /// The type of the items contained in the collection.
+ public class AsyncBatchQueue : IAsyncBatchCollection
+ {
+ private volatile Batch _currentBatch;
+ private readonly AsyncQueue> _batchQueue = new AsyncQueue>();
+
+ ///
+ /// Initializes a new instance of that produces batches of a specified size.
+ ///
+ /// Amount of the items contained in an output batch.
+ public AsyncBatchQueue(int batchSize)
+ {
+ if (batchSize <= 0)
+ throw new ArgumentOutOfRangeException("batchSize", batchSize, "Batch size must be a positive integer.");
+
+ BatchSize = batchSize;
+ _currentBatch = new Batch(this);
+ }
+
+ ///
+ /// Gets amount of items contained in an output batch.
+ ///
+ public int BatchSize { get; }
+
+ ///
+ /// Gets the number of flushed batches currently available for consuming.
+ ///
+ public int Count => _batchQueue.Count;
+
+ ///
+ /// Adds an item to the collection. Flushes the new batch to be available for consuming if amount of the pending items has reached .
+ ///
+ ///
+ public void Add(T item)
+ {
+ SpinWait spin = new SpinWait();
+
+ while (!_currentBatch.TryAdd(item))
+ spin.SpinOnce();
+ }
+
+ ///
+ /// Removes and returns a batch from the collection in an asynchronous manner.
+ ///
+ public ValueTask> TakeAsync() => TakeAsync(CancellationToken.None);
+
+ ///
+ /// Removes and returns a batch from the collection in an asynchronous manner.
+ ///
+ public ValueTask> TakeAsync(CancellationToken cancellationToken) => _batchQueue.TakeAsync(cancellationToken);
+
+ ///
+ /// Forces a new batch to be created and made available for consuming even if amount of the pending items has not reached yet.
+ /// Does nothing if there are no pending items to flush.
+ ///
+ public void Flush()
+ {
+ SpinWait spin = new SpinWait();
+ while (!_currentBatch.TryFlush())
+ spin.SpinOnce();
+ }
+
+ public IEnumerator> GetEnumerator() => _batchQueue.GetEnumerator();
+ System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
+
+ private class Batch : IReadOnlyList
+ {
+ private readonly AsyncBatchQueue _queue;
+ private readonly T[] _items;
+ private readonly bool[] _finalizationFlags;
+ private int _lastReservationIndex = -1;
+ private int _count = -1;
+
+ public Batch(AsyncBatchQueue queue)
+ {
+ _queue = queue;
+ _items = new T[_queue.BatchSize];
+ _finalizationFlags = new bool[_queue.BatchSize];
+ }
+
+ public bool TryAdd(T item)
+ {
+ int index = Interlocked.Increment(ref _lastReservationIndex);
+
+ // The following is true if someone has beaten us to the last slot and we have to wait until the next batch comes along.
+ if (index >= _queue.BatchSize)
+ return false;
+
+ // The following is true if we've taken the last slot, which means we're obligated to flush the current batch and create a new one.
+ if (index == _queue.BatchSize - 1)
+ FlushInternal(_queue.BatchSize);
+
+ // The full fence prevents setting finalization flag before the actual item value is written.
+ _items[index] = item;
+ Interlocked.MemoryBarrier();
+ _finalizationFlags[index] = true;
+
+ return true;
+ }
+
+ public bool TryFlush()
+ {
+ int expectedPreviousReservation = Volatile.Read(ref _lastReservationIndex);
+
+ // We don't flush if the batch doesn't have any items or if another thread is about to flush it.
+ // However, we report success to avoid unnecessary spinning.
+ if (expectedPreviousReservation < 0 || expectedPreviousReservation >= _queue.BatchSize - 1)
+ return true;
+
+ int previousReservation = Interlocked.CompareExchange(ref _lastReservationIndex, _queue.BatchSize, expectedPreviousReservation);
+
+ // Flush reservation has succeeded.
+ if (expectedPreviousReservation == previousReservation)
+ {
+ FlushInternal(previousReservation + 1);
+ return true;
+ }
+
+ // The following is true if someone has completed the batch by the time we tried to flush it.
+ // Therefore the batch will be flushed anyway even if we don't do anything.
+ // The opposite means someone has slipped in an update and we have to spin.
+ return previousReservation >= _queue.BatchSize;
+ }
+
+ private void FlushInternal(int count)
+ {
+ _count = count;
+ _queue._currentBatch = new Batch(_queue);
+
+ // The full fence ensures that the current batch will never be added to the queue before _count is set.
+ Interlocked.MemoryBarrier();
+
+ _queue._batchQueue.Add(this);
+ }
+
+ private T GetItemWithoutValidation(int index)
+ {
+ SpinWait spin = new SpinWait();
+ while (!_finalizationFlags[index])
+ {
+ spin.SpinOnce();
+
+ // The full fence prevents caching any part of _finalizationFlags[ index ] expression.
+ Interlocked.MemoryBarrier();
+ }
+
+ // The full fence prevents reading item value before finalization flag is set.
+ Interlocked.MemoryBarrier();
+ return _items[index];
+ }
+
+ public T this[int index]
+ {
+ get
+ {
+ if (index >= Count)
+ throw new IndexOutOfRangeException();
+
+ return GetItemWithoutValidation(index);
+ }
+ }
+
+ public int Count => _count;
+
+ public IEnumerator GetEnumerator()
+ {
+ for (int i = 0; i < Count; i++)
+ yield return GetItemWithoutValidation(i);
+ }
+
+ System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
+ }
+ }
+}
diff --git a/ZeroLevel/Services/Async/AsyncCollection.cs b/ZeroLevel/Services/Async/AsyncCollection.cs
new file mode 100644
index 0000000..fb15957
--- /dev/null
+++ b/ZeroLevel/Services/Async/AsyncCollection.cs
@@ -0,0 +1,190 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using ZeroLevel.Services.Async.Internal;
+
+namespace ZeroLevel.Services.Async
+{
+ ///
+ /// Represents a thread-safe collection that allows asynchronous consuming.
+ ///
+ /// The type of the items contained in the collection.
+ public class AsyncCollection : IAsyncCollection
+ {
+ private readonly IProducerConsumerCollection _itemQueue;
+ private readonly ConcurrentQueue> _awaiterQueue = new ConcurrentQueue>();
+
+ // _queueBalance < 0 means there are free awaiters and not enough items.
+ // _queueBalance > 0 means the opposite is true.
+ private long _queueBalance = 0;
+
+ ///
+ /// Initializes a new instance of with a specified as an underlying item storage.
+ ///
+ /// The collection to use as an underlying item storage. MUST NOT be accessed elsewhere.
+ public AsyncCollection(IProducerConsumerCollection itemQueue)
+ {
+ _itemQueue = itemQueue;
+ _queueBalance = _itemQueue.Count;
+ }
+
+ public int Count => _itemQueue.Count;
+
+ ///
+ /// Gets an amount of pending item requests.
+ ///
+ public int AwaiterCount => _awaiterQueue.Count;
+
+ ///
+ /// Adds an item to the collection.
+ ///
+ /// The item to add to the collection.
+ public void Add(T item)
+ {
+ while (!TryAdd(item)) ;
+ }
+
+ ///
+ /// Tries to add an item to the collection.
+ /// May fail if an awaiter that's supposed to receive the item is cancelled. If this is the case, the TryAdd() method must be called again.
+ ///
+ /// The item to add to the collection.
+ /// True if the item was added to the collection; false if the awaiter was cancelled and the operation must be retried.
+ private bool TryAdd(T item)
+ {
+ long balanceAfterCurrentItem = Interlocked.Increment(ref _queueBalance);
+ SpinWait spin = new SpinWait();
+
+ if (balanceAfterCurrentItem > 0)
+ {
+ // Items are dominating, so we can safely add a new item to the queue.
+ while (!_itemQueue.TryAdd(item))
+ spin.SpinOnce();
+
+ return true;
+ }
+ else
+ {
+ // There's at least one awaiter available or being added as we're speaking, so we're giving the item to it.
+
+ IAwaiter awaiter;
+
+ while (!_awaiterQueue.TryDequeue(out awaiter))
+ spin.SpinOnce();
+
+ // Returns false if the cancellation occurred earlier.
+ return awaiter.TrySetResult(item);
+ }
+ }
+
+ ///
+ /// Removes and returns an item from the collection in an asynchronous manner.
+ ///
+ public ValueTask TakeAsync(CancellationToken cancellationToken)
+ => cancellationToken.IsCancellationRequested
+ ? CanceledValueTask.Value
+ : TakeAsync(new CompletionSourceAwaiterFactory(cancellationToken));
+
+ private ValueTask TakeAsync(TAwaiterFactory awaiterFactory) where TAwaiterFactory : IAwaiterFactory
+ {
+ long balanceAfterCurrentAwaiter = Interlocked.Decrement(ref _queueBalance);
+
+ if (balanceAfterCurrentAwaiter < 0)
+ {
+ // Awaiters are dominating, so we can safely add a new awaiter to the queue.
+ IAwaiter awaiter = awaiterFactory.CreateAwaiter();
+ _awaiterQueue.Enqueue(awaiter);
+ return awaiter.Task;
+ }
+ else
+ {
+ // There's at least one item available or being added, so we're returning it directly.
+
+ T item;
+ SpinWait spin = new SpinWait();
+
+ while (!_itemQueue.TryTake(out item))
+ spin.SpinOnce();
+
+ return new ValueTask(item);
+ }
+ }
+
+ public override string ToString() => $"Count = {Count}, Awaiters = {AwaiterCount}";
+
+ public IEnumerator GetEnumerator() => _itemQueue.GetEnumerator();
+ System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => _itemQueue.GetEnumerator();
+
+ #region Static
+
+ internal const int TakeFromAnyMaxCollections = BitArray32.BitCapacity;
+
+ ///
+ /// Removes and returns an item from one of the specified collections in an asynchronous manner.
+ ///
+ public static ValueTask> TakeFromAnyAsync(AsyncCollection[] collections) => TakeFromAnyAsync(collections, CancellationToken.None);
+
+ ///
+ /// Removes and returns an item from one of the specified collections in an asynchronous manner.
+ ///
+ public static ValueTask> TakeFromAnyAsync(AsyncCollection[] collections, CancellationToken cancellationToken)
+ {
+ if (collections == null)
+ throw new ArgumentNullException("collections");
+
+ if (collections.Length <= 0 || collections.Length > TakeFromAnyMaxCollections)
+ throw new ArgumentException(String.Format("The collection array can't contain less than 1 or more than {0} collections.", TakeFromAnyMaxCollections), "collections");
+
+ if (cancellationToken.IsCancellationRequested)
+ return CanceledValueTask>.Value;
+
+ ExclusiveCompletionSourceGroup exclusiveSources = new ExclusiveCompletionSourceGroup();
+
+ // Fast route: we attempt to take from the top-priority queues that have any items.
+ // If the fast route succeeds, we avoid allocating and queueing a bunch of awaiters.
+ for (int i = 0; i < collections.Length; i++)
+ {
+ if (collections[i].Count > 0)
+ {
+ AnyResult? result = TryTakeFast(exclusiveSources, collections[i], i);
+ if (result.HasValue)
+ return new ValueTask>(result.Value);
+ }
+ }
+
+ // No luck during the fast route; just queue the rest of awaiters.
+ for (int i = 0; i < collections.Length; i++)
+ {
+ AnyResult? result = TryTakeFast(exclusiveSources, collections[i], i);
+ if (result.HasValue)
+ return new ValueTask>(result.Value);
+ }
+
+ // None of the collections had any items. The order doesn't matter anymore, it's time to start the competition.
+ exclusiveSources.UnlockCompetition(cancellationToken);
+ return new ValueTask>(exclusiveSources.Task);
+ }
+
+ private static AnyResult? TryTakeFast(ExclusiveCompletionSourceGroup exclusiveSources, AsyncCollection collection, int index)
+ {
+ // This can happen if the awaiter has already been created during the fast route.
+ if (exclusiveSources.IsAwaiterCreated(index))
+ return null;
+
+ ValueTask collectionTask = collection.TakeAsync(exclusiveSources.CreateAwaiterFactory(index));
+
+ // One of the collections already had an item and returned it directly
+ if (collectionTask != null && collectionTask.IsCompleted)
+ {
+ exclusiveSources.MarkAsResolved();
+ return new AnyResult(collectionTask.Result, index);
+ }
+ else
+ return null;
+ }
+
+ #endregion
+ }
+}
diff --git a/ZeroLevel/Services/Async/AsyncConditionVariable.cs b/ZeroLevel/Services/Async/AsyncConditionVariable.cs
deleted file mode 100644
index c227fff..0000000
--- a/ZeroLevel/Services/Async/AsyncConditionVariable.cs
+++ /dev/null
@@ -1,185 +0,0 @@
-using System;
-using System.Diagnostics;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace ZeroLevel.Services.Async
-{
- ///
- /// An async-compatible condition variable. This type uses Mesa-style semantics (the notifying tasks do not yield).
- ///
- [DebuggerDisplay("Id = {Id}, AsyncLockId = {_asyncLock.Id}")]
- [DebuggerTypeProxy(typeof(DebugView))]
- public sealed class AsyncConditionVariable
- {
- ///
- /// The lock associated with this condition variable.
- ///
- private readonly AsyncLock _asyncLock;
-
- ///
- /// The queue of waiting tasks.
- ///
- private readonly IAsyncWaitQueue _queue;
-
- ///
- /// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
- ///
- private int _id;
-
- ///
- /// The object used for mutual exclusion.
- ///
- private readonly object _mutex;
-
- ///
- /// Creates an async-compatible condition variable associated with an async-compatible lock.
- ///
- /// The lock associated with this condition variable.
- /// The wait queue used to manage waiters.
- public AsyncConditionVariable(AsyncLock asyncLock, IAsyncWaitQueue queue)
- {
- _asyncLock = asyncLock;
- _queue = queue;
- _mutex = new object();
- }
-
- ///
- /// Creates an async-compatible condition variable associated with an async-compatible lock.
- ///
- /// The lock associated with this condition variable.
- public AsyncConditionVariable(AsyncLock asyncLock)
- : this(asyncLock, new DefaultAsyncWaitQueue())
- {
- }
-
- ///
- /// Gets a semi-unique identifier for this asynchronous condition variable.
- ///
- public int Id
- {
- get { return IdManager.GetId(ref _id); }
- }
-
- ///
- /// Sends a signal to a single task waiting on this condition variable. The associated lock MUST be held when calling this method, and it will still be held when this method returns.
- ///
- public void Notify()
- {
- IDisposable finish = null;
- lock (_mutex)
- {
- if (!_queue.IsEmpty)
- finish = _queue.Dequeue();
- }
- if (finish != null)
- finish.Dispose();
- }
-
- ///
- /// Sends a signal to all tasks waiting on this condition variable. The associated lock MUST be held when calling this method, and it will still be held when this method returns.
- ///
- public void NotifyAll()
- {
- IDisposable finish;
- lock (_mutex)
- {
- finish = _queue.DequeueAll();
- }
- finish.Dispose();
- }
-
- ///
- /// Asynchronously waits for a signal on this condition variable. The associated lock MUST be held when calling this method, and it will still be held when this method returns, even if the method is cancelled.
- ///
- /// The cancellation signal used to cancel this wait.
- public Task WaitAsync(CancellationToken cancellationToken)
- {
- lock (_mutex)
- {
- // Begin waiting for either a signal or cancellation.
- var task = _queue.Enqueue(cancellationToken);
-
- // Attach to the signal or cancellation.
- var retTcs = new TaskCompletionSource();
- task.ContinueWith(async t =>
- {
- // Re-take the lock.
- await _asyncLock.LockAsync().ConfigureAwait(false);
-
- // Propagate the cancellation exception if necessary.
- retTcs.TryCompleteFromCompletedTask(t);
- }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
-
- var ret = retTcs.Task;
-
- // Release the lock while we are waiting.
- _asyncLock.ReleaseLock();
-
- return ret;
- }
- }
-
- ///
- /// Synchronously waits for a signal on this condition variable. This method may block the calling thread. The associated lock MUST be held when calling this method, and it will still be held when this method returns, even if the method is cancelled.
- ///
- /// The cancellation signal used to cancel this wait.
- public void Wait(CancellationToken cancellationToken)
- {
- Task enqueuedTask;
- lock (_mutex)
- {
- // Begin waiting for either a signal or cancellation.
- enqueuedTask = _queue.Enqueue(cancellationToken);
- }
-
- // Release the lock while we are waiting.
- _asyncLock.ReleaseLock();
-
- // Wait for the signal or cancellation.
- enqueuedTask.WaitWithoutException();
-
- // Re-take the lock.
- _asyncLock.Lock();
-
- // Propagate the cancellation exception if necessary.
- enqueuedTask.WaitAndUnwrapException();
- }
-
- ///
- /// Asynchronously waits for a signal on this condition variable. The associated lock MUST be held when calling this method, and it will still be held when this method returns.
- ///
- public Task WaitAsync()
- {
- return WaitAsync(CancellationToken.None);
- }
-
- ///
- /// Synchronously waits for a signal on this condition variable. This method may block the calling thread. The associated lock MUST be held when calling this method, and it will still be held when this method returns.
- ///
- public void Wait()
- {
- Wait(CancellationToken.None);
- }
-
- // ReSharper disable UnusedMember.Local
- [DebuggerNonUserCode]
- private sealed class DebugView
- {
- private readonly AsyncConditionVariable _cv;
-
- public DebugView(AsyncConditionVariable cv)
- {
- _cv = cv;
- }
-
- public int Id { get { return _cv.Id; } }
-
- public AsyncLock AsyncLock { get { return _cv._asyncLock; } }
-
- public IAsyncWaitQueue WaitQueue { get { return _cv._queue; } }
- }
-
- // ReSharper restore UnusedMember.Local
- }
-}
\ No newline at end of file
diff --git a/ZeroLevel/Services/Async/AsyncHelper.cs b/ZeroLevel/Services/Async/AsyncHelper.cs
deleted file mode 100644
index a983fdb..0000000
--- a/ZeroLevel/Services/Async/AsyncHelper.cs
+++ /dev/null
@@ -1,38 +0,0 @@
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace ZeroLevel.Services.Async
-{
- public static class AsyncHelper
- {
- private static readonly TaskFactory _taskFactory = new
- TaskFactory(CancellationToken.None,
- TaskCreationOptions.None,
- TaskContinuationOptions.None,
- TaskScheduler.Default);
-
- public static TResult RunSync(Func> func)
- => _taskFactory
- .StartNew(func)
- .Unwrap()
- .GetAwaiter()
- .GetResult();
-
- public static void RunSync(Func func)
- => _taskFactory
- .StartNew(func)
- .Unwrap()
- .GetAwaiter()
- .GetResult();
-
- public async static Task WithTimeout(this Task task, int duration)
- {
- var retTask = await Task.WhenAny(task, Task.Delay(duration))
- .ConfigureAwait(false);
-
- if (retTask is Task) return task.Result;
- return default(T);
- }
- }
-}
\ No newline at end of file
diff --git a/ZeroLevel/Services/Async/AsyncLock.cs b/ZeroLevel/Services/Async/AsyncLock.cs
index a4fa06f..28f6e16 100644
--- a/ZeroLevel/Services/Async/AsyncLock.cs
+++ b/ZeroLevel/Services/Async/AsyncLock.cs
@@ -5,188 +5,125 @@ using System.Threading.Tasks;
namespace ZeroLevel.Services.Async
{
- ///
- /// A mutual exclusion lock that is compatible with async. Note that this lock is not recursive!
- ///
- [DebuggerDisplay("Id = {Id}, Taken = {_taken}")]
- [DebuggerTypeProxy(typeof(DebugView))]
- public sealed class AsyncLock
+ public class AsyncLock
{
- ///
- /// Whether the lock is taken by a task.
- ///
- private bool _taken;
-
- ///
- /// The queue of TCSs that other tasks are awaiting to acquire the lock.
- ///
- private readonly IAsyncWaitQueue _queue;
-
- ///
- /// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
- ///
- private int _id;
-
- ///
- /// The object used for mutual exclusion.
- ///
- private readonly object _mutex;
-
- ///
- /// Creates a new async-compatible mutual exclusion lock.
- ///
- public AsyncLock()
- : this(new DefaultAsyncWaitQueue())
- {
- }
-
- ///
- /// Creates a new async-compatible mutual exclusion lock using the specified wait queue.
- ///
- /// The wait queue used to manage waiters.
- public AsyncLock(IAsyncWaitQueue queue)
+ private object _reentrancy = new object();
+ private int _reentrances = 0;
+ //We are using this SemaphoreSlim like a posix condition variable
+ //we only want to wake waiters, one or more of whom will try to obtain a different lock to do their thing
+ //so long as we can guarantee no wakes are missed, the number of awakees is not important
+ //ideally, this would be "friend" for access only from InnerLock, but whatever.
+ internal SemaphoreSlim _retry = new SemaphoreSlim(0, 1);
+ //We do not have System.Threading.Thread.* on .NET Standard without additional dependencies
+ //Work around is easy: create a new ThreadLocal with a random value and this is our thread id :)
+ private static readonly long UnlockedThreadId = 0; //"owning" thread id when unlocked
+ internal long _owningId = UnlockedThreadId;
+ private static int _globalThreadCounter;
+ private static readonly ThreadLocal _threadId = new ThreadLocal(() => Interlocked.Increment(ref _globalThreadCounter));
+ //We generate a unique id from the thread ID combined with the task ID, if any
+ public static long ThreadId => (long)(((ulong)_threadId.Value) << 32) | ((uint)(Task.CurrentId ?? 0));
+
+ struct InnerLock : IDisposable
{
- _queue = queue;
- _mutex = new object();
- }
+ private readonly AsyncLock _parent;
+#if DEBUG
+ private bool _disposed;
+#endif
- ///
- /// Gets a semi-unique identifier for this asynchronous lock.
- ///
- public int Id
- {
- get { return IdManager.GetId(ref _id); }
- }
+ internal InnerLock(AsyncLock parent)
+ {
+ _parent = parent;
+#if DEBUG
+ _disposed = false;
+#endif
+ }
- ///
- /// Asynchronously acquires the lock. Returns a disposable that releases the lock when disposed.
- ///
- /// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
- /// A disposable that releases the lock when disposed.
- public Task LockAsync(CancellationToken cancellationToken)
- {
- Task ret;
- lock (_mutex)
+ internal async Task ObtainLockAsync()
{
- if (!_taken)
- {
- // If the lock is available, take it immediately.
- _taken = true;
- ret = TaskShim.FromResult(new Key(this));
- }
- else
+ while (!TryEnter())
{
- // Wait for the lock to become available or cancellation.
- ret = _queue.Enqueue(cancellationToken);
+ //we need to wait for someone to leave the lock before trying again
+ await _parent._retry.WaitAsync();
}
}
- return ret;
- }
- ///
- /// Synchronously acquires the lock. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
- ///
- /// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
- public IDisposable Lock(CancellationToken cancellationToken)
- {
- Task enqueuedTask;
- lock (_mutex)
+ internal async Task ObtainLockAsync(CancellationToken ct)
{
- if (!_taken)
+ while (!TryEnter())
{
- _taken = true;
- return new Key(this);
+ //we need to wait for someone to leave the lock before trying again
+ await _parent._retry.WaitAsync(ct);
}
-
- enqueuedTask = _queue.Enqueue(cancellationToken);
}
- return enqueuedTask.WaitAndUnwrapException();
- }
-
- ///
- /// Asynchronously acquires the lock. Returns a disposable that releases the lock when disposed.
- ///
- /// A disposable that releases the lock when disposed.
- public Task LockAsync()
- {
- return LockAsync(CancellationToken.None);
- }
-
- ///
- /// Synchronously acquires the lock. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
- ///
- public IDisposable Lock()
- {
- return Lock(CancellationToken.None);
- }
-
- ///
- /// Releases the lock.
- ///
- internal void ReleaseLock()
- {
- IDisposable finish = null;
- lock (_mutex)
+ internal void ObtainLock()
{
- if (_queue.IsEmpty)
- _taken = false;
- else
- finish = _queue.Dequeue(new Key(this));
+ while (!TryEnter())
+ {
+ //we need to wait for someone to leave the lock before trying again
+ _parent._retry.Wait();
+ }
}
- if (finish != null)
- finish.Dispose();
- }
-
- ///
- /// The disposable which releases the lock.
- ///
- private sealed class Key : IDisposable
- {
- ///
- /// The lock to release.
- ///
- private AsyncLock _asyncLock;
- ///
- /// Creates the key for a lock.
- ///
- /// The lock to release. May not be null .
- public Key(AsyncLock asyncLock)
+ private bool TryEnter()
{
- _asyncLock = asyncLock;
+ lock (_parent._reentrancy)
+ {
+ Debug.Assert((_parent._owningId == UnlockedThreadId) == (_parent._reentrances == 0));
+ if (_parent._owningId != UnlockedThreadId && _parent._owningId != AsyncLock.ThreadId)
+ {
+ //another thread currently owns the lock
+ return false;
+ }
+ //we can go in
+ Interlocked.Increment(ref _parent._reentrances);
+ _parent._owningId = AsyncLock.ThreadId;
+ return true;
+ }
}
- ///
- /// Release the lock.
- ///
public void Dispose()
{
- if (_asyncLock == null)
- return;
- _asyncLock.ReleaseLock();
- _asyncLock = null;
+#if DEBUG
+ Debug.Assert(!_disposed);
+ _disposed = true;
+#endif
+ lock (_parent._reentrancy)
+ {
+ Interlocked.Decrement(ref _parent._reentrances);
+ if (_parent._reentrances == 0)
+ {
+ //the owning thread is always the same so long as we are in a nested stack call
+ //we reset the owning id to null only when the lock is fully unlocked
+ _parent._owningId = UnlockedThreadId;
+ if (_parent._retry.CurrentCount == 0)
+ {
+ _parent._retry.Release();
+ }
+ }
+ }
}
}
- // ReSharper disable UnusedMember.Local
- [DebuggerNonUserCode]
- private sealed class DebugView
+ public IDisposable Lock()
{
- private readonly AsyncLock _mutex;
-
- public DebugView(AsyncLock mutex)
- {
- _mutex = mutex;
- }
-
- public int Id { get { return _mutex.Id; } }
-
- public bool Taken { get { return _mutex._taken; } }
+ var @lock = new InnerLock(this);
+ @lock.ObtainLock();
+ return @lock;
+ }
- public IAsyncWaitQueue WaitQueue { get { return _mutex._queue; } }
+ public async Task LockAsync()
+ {
+ var @lock = new InnerLock(this);
+ await @lock.ObtainLockAsync();
+ return @lock;
}
- // ReSharper restore UnusedMember.Local
+ public async Task LockAsync(CancellationToken ct)
+ {
+ var @lock = new InnerLock(this);
+ await @lock.ObtainLockAsync(ct);
+ return @lock;
+ }
}
-}
\ No newline at end of file
+}
diff --git a/ZeroLevel/Services/Async/AsyncManualResetEvent.cs b/ZeroLevel/Services/Async/AsyncManualResetEvent.cs
deleted file mode 100644
index 13aa39e..0000000
--- a/ZeroLevel/Services/Async/AsyncManualResetEvent.cs
+++ /dev/null
@@ -1,160 +0,0 @@
-using System.Diagnostics;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace ZeroLevel.Services.Async
-{
- ///
- /// An async-compatible manual-reset event.
- ///
- [DebuggerDisplay("Id = {Id}, IsSet = {GetStateForDebugger}")]
- [DebuggerTypeProxy(typeof(DebugView))]
- public sealed class AsyncManualResetEvent
- {
- ///
- /// The object used for synchronization.
- ///
- private readonly object _mutex;
-
- ///
- /// The current state of the event.
- ///
- private TaskCompletionSource _tcs;
-
- ///
- /// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
- ///
- private int _id;
-
- [DebuggerNonUserCode]
- private bool GetStateForDebugger
- {
- get
- {
- return _tcs.Task.IsCompleted;
- }
- }
-
- ///
- /// Creates an async-compatible manual-reset event.
- ///
- /// Whether the manual-reset event is initially set or unset.
- public AsyncManualResetEvent(bool set)
- {
- _mutex = new object();
- _tcs = TaskCompletionSourceExtensions.CreateAsyncTaskSource();
- if (set)
- _tcs.TrySetResult(null);
- }
-
- ///
- /// Creates an async-compatible manual-reset event that is initially unset.
- ///
- public AsyncManualResetEvent()
- : this(false)
- {
- }
-
- ///
- /// Gets a semi-unique identifier for this asynchronous manual-reset event.
- ///
- public int Id
- {
- get { return IdManager.GetId(ref _id); }
- }
-
- ///
- /// Whether this event is currently set. This member is seldom used; code using this member has a high possibility of race conditions.
- ///
- public bool IsSet
- {
- get { lock (_mutex) return _tcs.Task.IsCompleted; }
- }
-
- ///
- /// Asynchronously waits for this event to be set.
- ///
- public Task WaitAsync()
- {
- lock (_mutex)
- {
- return _tcs.Task;
- }
- }
-
- ///
- /// Asynchronously waits for this event to be set or for the wait to be canceled.
- ///
- /// The cancellation token used to cancel the wait. If this token is already canceled, this method will first check whether the event is set.
- public Task WaitAsync(CancellationToken cancellationToken)
- {
- var waitTask = WaitAsync();
- if (waitTask.IsCompleted)
- return waitTask;
- return waitTask.WaitAsync(cancellationToken);
- }
-
- ///
- /// Synchronously waits for this event to be set. This method may block the calling thread.
- ///
- public void Wait()
- {
- WaitAsync().WaitAndUnwrapException();
- }
-
- ///
- /// Synchronously waits for this event to be set. This method may block the calling thread.
- ///
- /// The cancellation token used to cancel the wait. If this token is already canceled, this method will first check whether the event is set.
- public void Wait(CancellationToken cancellationToken)
- {
- var ret = WaitAsync();
- if (ret.IsCompleted)
- return;
- ret.WaitAndUnwrapException(cancellationToken);
- }
-
- ///
- /// Sets the event, atomically completing every task returned by . If the event is already set, this method does nothing.
- ///
- public void Set()
- {
- lock (_mutex)
- {
- _tcs.TrySetResult(null);
- }
- }
-
- ///
- /// Resets the event. If the event is already reset, this method does nothing.
- ///
- public void Reset()
- {
- lock (_mutex)
- {
- if (_tcs.Task.IsCompleted)
- _tcs = TaskCompletionSourceExtensions.CreateAsyncTaskSource();
- }
- }
-
- // ReSharper disable UnusedMember.Local
- [DebuggerNonUserCode]
- private sealed class DebugView
- {
- private readonly AsyncManualResetEvent _mre;
-
- public DebugView(AsyncManualResetEvent mre)
- {
- _mre = mre;
- }
-
- public int Id { get { return _mre.Id; } }
-
- public bool IsSet { get { return _mre.GetStateForDebugger; } }
-
- public Task CurrentTask { get { return _mre._tcs.Task; } }
- }
-
- // ReSharper restore UnusedMember.Local
- }
-}
\ No newline at end of file
diff --git a/ZeroLevel/Services/Async/AsyncMonitor.cs b/ZeroLevel/Services/Async/AsyncMonitor.cs
deleted file mode 100644
index 5beb86f..0000000
--- a/ZeroLevel/Services/Async/AsyncMonitor.cs
+++ /dev/null
@@ -1,135 +0,0 @@
-using System;
-using System.Diagnostics;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace ZeroLevel.Services.Async
-{
- ///
- /// An async-compatible monitor.
- ///
- [DebuggerDisplay("Id = {Id}, ConditionVariableId = {_conditionVariable.Id}")]
- public sealed class AsyncMonitor
- {
- ///
- /// The lock.
- ///
- private readonly AsyncLock _asyncLock;
-
- ///
- /// The condition variable.
- ///
- private readonly AsyncConditionVariable _conditionVariable;
-
- ///
- /// Constructs a new monitor.
- ///
- public AsyncMonitor(IAsyncWaitQueue lockQueue, IAsyncWaitQueue conditionVariableQueue)
- {
- _asyncLock = new AsyncLock(lockQueue);
- _conditionVariable = new AsyncConditionVariable(_asyncLock, conditionVariableQueue);
- }
-
- ///
- /// Constructs a new monitor.
- ///
- public AsyncMonitor()
- : this(new DefaultAsyncWaitQueue(), new DefaultAsyncWaitQueue())
- {
- }
-
- ///
- /// Gets a semi-unique identifier for this monitor.
- ///
- public int Id
- {
- get { return _asyncLock.Id; }
- }
-
- ///
- /// Asynchronously enters the monitor. Returns a disposable that leaves the monitor when disposed.
- ///
- /// The cancellation token used to cancel the enter. If this is already set, then this method will attempt to enter the monitor immediately (succeeding if the monitor is currently available).
- /// A disposable that leaves the monitor when disposed.
- public Task EnterAsync(CancellationToken cancellationToken)
- {
- return _asyncLock.LockAsync(cancellationToken);
- }
-
- ///
- /// Synchronously enters the monitor. Returns a disposable that leaves the monitor when disposed. This method may block the calling thread.
- ///
- /// The cancellation token used to cancel the enter. If this is already set, then this method will attempt to enter the monitor immediately (succeeding if the monitor is currently available).
- public IDisposable Enter(CancellationToken cancellationToken)
- {
- return _asyncLock.Lock(cancellationToken);
- }
-
- ///
- /// Asynchronously enters the monitor. Returns a disposable that leaves the monitor when disposed.
- ///
- /// A disposable that leaves the monitor when disposed.
- public Task EnterAsync()
- {
- return EnterAsync(CancellationToken.None);
- }
-
- ///
- /// Asynchronously enters the monitor. Returns a disposable that leaves the monitor when disposed. This method may block the calling thread.
- ///
- public IDisposable Enter()
- {
- return Enter(CancellationToken.None);
- }
-
- ///
- /// Asynchronously waits for a pulse signal on this monitor. The monitor MUST already be entered when calling this method, and it will still be entered when this method returns, even if the method is cancelled. This method internally will leave the monitor while waiting for a notification.
- ///
- /// The cancellation signal used to cancel this wait.
- public Task WaitAsync(CancellationToken cancellationToken)
- {
- return _conditionVariable.WaitAsync(cancellationToken);
- }
-
- ///
- /// Asynchronously waits for a pulse signal on this monitor. This method may block the calling thread. The monitor MUST already be entered when calling this method, and it will still be entered when this method returns, even if the method is cancelled. This method internally will leave the monitor while waiting for a notification.
- ///
- /// The cancellation signal used to cancel this wait.
- public void Wait(CancellationToken cancellationToken)
- {
- _conditionVariable.Wait(cancellationToken);
- }
-
- ///
- /// Asynchronously waits for a pulse signal on this monitor. The monitor MUST already be entered when calling this method, and it will still be entered when this method returns. This method internally will leave the monitor while waiting for a notification.
- ///
- public Task WaitAsync()
- {
- return WaitAsync(CancellationToken.None);
- }
-
- ///
- /// Asynchronously waits for a pulse signal on this monitor. This method may block the calling thread. The monitor MUST already be entered when calling this method, and it will still be entered when this method returns. This method internally will leave the monitor while waiting for a notification.
- ///
- public void Wait()
- {
- Wait(CancellationToken.None);
- }
-
- ///
- /// Sends a signal to a single task waiting on this monitor. The monitor MUST already be entered when calling this method, and it will still be entered when this method returns.
- ///
- public void Pulse()
- {
- _conditionVariable.Notify();
- }
-
- ///
- /// Sends a signal to all tasks waiting on this monitor. The monitor MUST already be entered when calling this method, and it will still be entered when this method returns.
- ///
- public void PulseAll()
- {
- _conditionVariable.NotifyAll();
- }
- }
-}
\ No newline at end of file
diff --git a/ZeroLevel/Services/Async/AsyncProducerConsumerQueue.cs b/ZeroLevel/Services/Async/AsyncProducerConsumerQueue.cs
deleted file mode 100644
index 30e6382..0000000
--- a/ZeroLevel/Services/Async/AsyncProducerConsumerQueue.cs
+++ /dev/null
@@ -1,737 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace ZeroLevel.Services.Async
-{
- ///
- /// An async-compatible producer/consumer queue.
- ///
- /// The type of elements contained in the queue.
- [DebuggerDisplay("Count = {_queue.Count}, MaxCount = {_maxCount}")]
- [DebuggerTypeProxy(typeof(AsyncProducerConsumerQueue<>.DebugView))]
- public sealed class AsyncProducerConsumerQueue : IDisposable
- {
- ///
- /// The underlying queue.
- ///
- private readonly Queue _queue;
-
- ///
- /// The maximum number of elements allowed in the queue.
- ///
- private readonly int _maxCount;
-
- ///
- /// The mutual-exclusion lock protecting the queue.
- ///
- private readonly AsyncLock _mutex;
-
- ///
- /// A condition variable that is signalled when the queue is not full.
- ///
- private readonly AsyncConditionVariable _notFull;
-
- ///
- /// A condition variable that is signalled when the queue is completed or not empty.
- ///
- private readonly AsyncConditionVariable _completedOrNotEmpty;
-
- ///
- /// A cancellation token source that is canceled when the queue is marked completed for adding.
- ///
- private readonly CancellationTokenSource _completed;
-
- ///
- /// A cached result that is common when calling .
- ///
- internal static readonly DequeueResult FalseResult = new DequeueResult(null, default(T));
-
- ///
- /// Creates a new async-compatible producer/consumer queue with the specified initial elements and a maximum element count.
- ///
- /// The initial elements to place in the queue.
- /// The maximum element count. This must be greater than zero.
- public AsyncProducerConsumerQueue(IEnumerable collection, int maxCount)
- {
- if (maxCount <= 0)
- throw new ArgumentOutOfRangeException("maxCount", "The maximum count must be greater than zero.");
- _queue = collection == null ? new Queue() : new Queue(collection);
- if (maxCount < _queue.Count)
- throw new ArgumentException("The maximum count cannot be less than the number of elements in the collection.", "maxCount");
- _maxCount = maxCount;
-
- _mutex = new AsyncLock();
- _notFull = new AsyncConditionVariable(_mutex);
- _completedOrNotEmpty = new AsyncConditionVariable(_mutex);
- _completed = new CancellationTokenSource();
- }
-
- ///
- /// Creates a new async-compatible producer/consumer queue with the specified initial elements.
- ///
- /// The initial elements to place in the queue.
- public AsyncProducerConsumerQueue(IEnumerable collection)
- : this(collection, int.MaxValue)
- {
- }
-
- ///
- /// Creates a new async-compatible producer/consumer queue with a maximum element count.
- ///
- /// The maximum element count. This must be greater than zero.
- public AsyncProducerConsumerQueue(int maxCount)
- : this(null, maxCount)
- {
- }
-
- ///
- /// Creates a new async-compatible producer/consumer queue.
- ///
- public AsyncProducerConsumerQueue()
- : this(null, int.MaxValue)
- {
- }
-
- ///
- /// Whether the queue is empty.
- ///
- private bool Empty { get { return _queue.Count == 0; } }
-
- ///
- /// Whether the queue is full.
- ///
- private bool Full { get { return _queue.Count == _maxCount; } }
-
- ///
- /// Releases resources held by this instance. After disposal, any use of this instance is undefined.
- ///
- public void Dispose()
- {
- _completed.Dispose();
- }
-
- ///
- /// Asynchronously marks the producer/consumer queue as complete for adding.
- ///
- [Obsolete("Use CompleteAdding() instead.")]
- public async Task CompleteAddingAsync()
- {
- using (await _mutex.LockAsync().ConfigureAwait(false))
- {
- if (_completed.IsCancellationRequested)
- return;
- _completed.Cancel();
- _completedOrNotEmpty.NotifyAll();
- }
- }
-
- ///
- /// Synchronously marks the producer/consumer queue as complete for adding.
- ///
- public void CompleteAdding()
- {
- using (_mutex.Lock())
- {
- if (_completed.IsCancellationRequested)
- return;
- _completed.Cancel();
- _completedOrNotEmpty.NotifyAll();
- }
- }
-
- ///
- /// Attempts to enqueue an item.
- ///
- /// The item to enqueue.
- /// A cancellation token that can be used to abort the enqueue operation. If is not null , then this token must include signals from the object.
- /// A synchronization object used to cancel related enqueue operations. May be null if this is the only enqueue operation.
- internal async Task> TryEnqueueAsync(T item, CancellationToken cancellationToken, TaskCompletionSource abort)
- {
- try
- {
- using (var combinedToken = CancellationTokenHelpers.Normalize(_completed.Token, cancellationToken))
- using (await _mutex.LockAsync().ConfigureAwait(false))
- {
- // Wait for the queue to be not full.
- while (Full)
- await _notFull.WaitAsync(combinedToken.Token).ConfigureAwait(false);
-
- // Explicitly check whether the queue has been marked complete to prevent a race condition where notFull is signalled at the same time the queue is marked complete.
- if (_completed.IsCancellationRequested)
- return null;
-
- // Set the abort signal. If another queue has already set the abort signal, then abort.
- if (abort != null && !abort.TrySetCanceled())
- return null;
-
- _queue.Enqueue(item);
- _completedOrNotEmpty.Notify();
- return this;
- }
- }
- catch (OperationCanceledException)
- {
- return null;
- }
- }
-
- ///
- /// Attempts to enqueue an item. This method may block the calling thread.
- ///
- /// The item to enqueue.
- /// A cancellation token that can be used to abort the enqueue operation.
- internal AsyncProducerConsumerQueue DoTryEnqueue(T item, CancellationToken cancellationToken)
- {
- try
- {
- using (var combinedToken = CancellationTokenHelpers.Normalize(_completed.Token, cancellationToken))
- using (_mutex.Lock())
- {
- // Wait for the queue to be not full.
- while (Full)
- _notFull.Wait(combinedToken.Token);
-
- // Explicitly check whether the queue has been marked complete to prevent a race condition where notFull is signalled at the same time the queue is marked complete.
- if (_completed.IsCancellationRequested)
- return null;
-
- _queue.Enqueue(item);
- _completedOrNotEmpty.Notify();
- return this;
- }
- }
- catch (OperationCanceledException)
- {
- return null;
- }
- }
-
- ///
- /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding.
- ///
- /// The item to enqueue.
- /// A cancellation token that can be used to abort the enqueue operation.
- public async Task TryEnqueueAsync(T item, CancellationToken cancellationToken)
- {
- var ret = await TryEnqueueAsync(item, cancellationToken, null).ConfigureAwait(false);
- if (ret != null)
- return true;
- cancellationToken.ThrowIfCancellationRequested();
- return false;
- }
-
- ///
- /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding. This method may block the calling thread.
- ///
- /// The item to enqueue.
- /// A cancellation token that can be used to abort the enqueue operation.
- public bool TryEnqueue(T item, CancellationToken cancellationToken)
- {
- var ret = DoTryEnqueue(item, cancellationToken);
- if (ret != null)
- return true;
- cancellationToken.ThrowIfCancellationRequested();
- return false;
- }
-
- ///
- /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding.
- ///
- /// The item to enqueue.
- public Task TryEnqueueAsync(T item)
- {
- return TryEnqueueAsync(item, CancellationToken.None);
- }
-
- ///
- /// Attempts to enqueue an item to the producer/consumer queue. Returns false if the producer/consumer queue has completed adding. This method may block the calling thread.
- ///
- /// The item to enqueue.
- public bool TryEnqueue(T item)
- {
- return TryEnqueue(item, CancellationToken.None);
- }
-
- ///
- /// Enqueues an item to the producer/consumer queue. Throws if the producer/consumer queue has completed adding.
- ///
- /// The item to enqueue.
- /// A cancellation token that can be used to abort the enqueue operation.
- public async Task EnqueueAsync(T item, CancellationToken cancellationToken)
- {
- var result = await TryEnqueueAsync(item, cancellationToken).ConfigureAwait(false);
- if (!result)
- throw new InvalidOperationException("Enqueue failed; the producer/consumer queue has completed adding.");
- }
-
- ///
- /// Enqueues an item to the producer/consumer queue. Throws if the producer/consumer queue has completed adding. This method may block the calling thread.
- ///
- /// The item to enqueue.
- /// A cancellation token that can be used to abort the enqueue operation.
- public void Enqueue(T item, CancellationToken cancellationToken)
- {
- var result = TryEnqueue(item, cancellationToken);
- if (!result)
- throw new InvalidOperationException("Enqueue failed; the producer/consumer queue has completed adding.");
- }
-
- ///
- /// Enqueues an item to the producer/consumer queue. Throws if the producer/consumer queue has completed adding.
- ///
- /// The item to enqueue.
- public Task EnqueueAsync(T item)
- {
- return EnqueueAsync(item, CancellationToken.None);
- }
-
- ///
- /// Enqueues an item to the producer/consumer queue. This method may block the calling thread. Throws if the producer/consumer queue has completed adding.
- ///
- /// The item to enqueue.
- public void Enqueue(T item)
- {
- Enqueue(item, CancellationToken.None);
- }
-
- ///
- /// Asynchronously waits until an item is available to dequeue. Returns false if the producer/consumer queue has completed adding and there are no more items.
- ///
- /// A cancellation token that can be used to abort the asynchronous wait.
- public async Task OutputAvailableAsync(CancellationToken cancellationToken)
- {
- using (await _mutex.LockAsync().ConfigureAwait(false))
- {
- while (!_completed.IsCancellationRequested && Empty)
- await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(false);
- return !Empty;
- }
- }
-
- ///
- /// Asynchronously waits until an item is available to dequeue. Returns false if the producer/consumer queue has completed adding and there are no more items.
- ///
- public Task OutputAvailableAsync()
- {
- return OutputAvailableAsync(CancellationToken.None);
- }
-
- ///
- /// Provides a (synchronous) consuming enumerable for items in the producer/consumer queue.
- ///
- /// A cancellation token that can be used to abort the synchronous enumeration.
- public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken)
- {
- while (true)
- {
- var result = DoTryDequeue(cancellationToken);
- if (!result.Success)
- yield break;
- yield return result.Item;
- }
- }
-
- ///
- /// Provides a (synchronous) consuming enumerable for items in the producer/consumer queue.
- ///
- public IEnumerable GetConsumingEnumerable()
- {
- return GetConsumingEnumerable(CancellationToken.None);
- }
-
- ///
- /// Attempts to dequeue an item.
- ///
- /// A cancellation token that can be used to abort the dequeue operation. If is not null , then this token must include signals from the object.
- /// A synchronization object used to cancel related dequeue operations. May be null if this is the only dequeue operation.
- internal async Task TryDequeueAsync(CancellationToken cancellationToken, TaskCompletionSource abort)
- {
- try
- {
- using (await _mutex.LockAsync().ConfigureAwait(false))
- {
- while (!_completed.IsCancellationRequested && Empty)
- await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(false);
- if (_completed.IsCancellationRequested && Empty)
- return FalseResult;
- if (abort != null && !abort.TrySetCanceled())
- return FalseResult;
- var item = _queue.Dequeue();
- _notFull.Notify();
- return new DequeueResult(this, item);
- }
- }
- catch (OperationCanceledException)
- {
- return FalseResult;
- }
- }
-
- ///
- /// Attempts to dequeue an item. This method may block the calling thread.
- ///
- /// A cancellation token that can be used to abort the dequeue operation.
- internal DequeueResult DoTryDequeue(CancellationToken cancellationToken)
- {
- try
- {
- using (_mutex.Lock())
- {
- while (!_completed.IsCancellationRequested && Empty)
- _completedOrNotEmpty.Wait(cancellationToken);
- if (_completed.IsCancellationRequested && Empty)
- return FalseResult;
- var item = _queue.Dequeue();
- _notFull.Notify();
- return new DequeueResult(this, item);
- }
- }
- catch (OperationCanceledException)
- {
- return FalseResult;
- }
- }
-
- ///
- /// Attempts to dequeue an item from the producer/consumer queue.
- ///
- /// A cancellation token that can be used to abort the dequeue operation.
- public async Task TryDequeueAsync(CancellationToken cancellationToken)
- {
- var ret = await TryDequeueAsync(cancellationToken, null).ConfigureAwait(false);
- if (ret.Success)
- return ret;
- cancellationToken.ThrowIfCancellationRequested();
- return ret;
- }
-
- ///
- /// Attempts to dequeue an item from the producer/consumer queue. This method may block the calling thread.
- ///
- /// A cancellation token that can be used to abort the dequeue operation.
- public DequeueResult TryDequeue(CancellationToken cancellationToken)
- {
- var ret = DoTryDequeue(cancellationToken);
- if (ret.Success)
- return ret;
- cancellationToken.ThrowIfCancellationRequested();
- return ret;
- }
-
- ///
- /// Attempts to dequeue an item from the producer/consumer queue.
- ///
- public Task TryDequeueAsync()
- {
- return TryDequeueAsync(CancellationToken.None);
- }
-
- ///
- /// Attempts to dequeue an item from the producer/consumer queue. This method may block the calling thread.
- ///
- public DequeueResult TryDequeue()
- {
- return TryDequeue(CancellationToken.None);
- }
-
- ///
- /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. Throws if the producer/consumer queue has completed adding and is empty.
- ///
- /// A cancellation token that can be used to abort the dequeue operation.
- /// The dequeued item.
- public async Task DequeueAsync(CancellationToken cancellationToken)
- {
- var ret = await TryDequeueAsync(cancellationToken).ConfigureAwait(false);
- if (!ret.Success)
- throw new InvalidOperationException("Dequeue failed; the producer/consumer queue has completed adding and is empty.");
- return ret.Item;
- }
-
- ///
- /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. This method may block the calling thread. Throws if the producer/consumer queue has completed adding and is empty.
- ///
- /// A cancellation token that can be used to abort the dequeue operation.
- public T Dequeue(CancellationToken cancellationToken)
- {
- var ret = TryDequeue(cancellationToken);
- if (!ret.Success)
- throw new InvalidOperationException("Dequeue failed; the producer/consumer queue has completed adding and is empty.");
- return ret.Item;
- }
-
- ///
- /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. Throws if the producer/consumer queue has completed adding and is empty.
- ///
- /// The dequeued item.
- public Task DequeueAsync()
- {
- return DequeueAsync(CancellationToken.None);
- }
-
- ///
- /// Dequeues an item from the producer/consumer queue. Returns the dequeued item. This method may block the calling thread. Throws if the producer/consumer queue has completed adding and is empty.
- ///
- /// The dequeued item.
- public T Dequeue()
- {
- return Dequeue(CancellationToken.None);
- }
-
- ///
- /// The result of a TryDequeue , DequeueFromAny , or TryDequeueFromAny operation.
- ///
- public sealed class DequeueResult
- {
- internal DequeueResult(AsyncProducerConsumerQueue queue, T item)
- {
- Queue = queue;
- Item = item;
- }
-
- ///
- /// The queue from which the item was dequeued, or null if the operation failed.
- ///
- public AsyncProducerConsumerQueue Queue { get; private set; }
-
- ///
- /// Whether the operation was successful. This is true if and only if is not null .
- ///
- public bool Success { get { return Queue != null; } }
-
- ///
- /// The dequeued item. This is only valid if is not null .
- ///
- public T Item { get; private set; }
- }
-
- [DebuggerNonUserCode]
- internal sealed class DebugView
- {
- private readonly AsyncProducerConsumerQueue _queue;
-
- public DebugView(AsyncProducerConsumerQueue queue)
- {
- _queue = queue;
- }
-
- [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
- public T[] Items
- {
- get { return _queue._queue.ToArray(); }
- }
- }
- }
-
- ///
- /// Provides methods for working on multiple instances.
- ///
- public static class AsyncProducerConsumerQueueExtensions
- {
- ///
- /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding.
- ///
- /// The producer/consumer queues.
- /// The item to enqueue.
- /// A cancellation token that can be used to abort the enqueue operation.
- /// The producer/consumer queue that received the item.
- public static async Task> TryEnqueueToAnyAsync(this IEnumerable> queues, T item, CancellationToken cancellationToken)
- {
- var abort = new TaskCompletionSource();
- using (var abortCancellationToken = CancellationTokenHelpers.FromTask(abort.Task))
- using (var combinedToken = CancellationTokenHelpers.Normalize(abortCancellationToken.Token, cancellationToken))
- {
- var token = combinedToken.Token;
- var tasks = queues.Select(q => q.TryEnqueueAsync(item, token, abort));
- var results = await TaskShim.WhenAll(tasks).ConfigureAwait(false);
- var ret = results.FirstOrDefault(x => x != null);
- if (ret == null)
- cancellationToken.ThrowIfCancellationRequested();
- return ret;
- }
- }
-
- ///
- /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding. This method may block the calling thread.
- ///
- /// The producer/consumer queues.
- /// The item to enqueue.
- /// A cancellation token that can be used to abort the enqueue operation.
- /// The producer/consumer queue that received the item.
- public static AsyncProducerConsumerQueue TryEnqueueToAny(this IEnumerable> queues, T item, CancellationToken cancellationToken)
- {
- return TryEnqueueToAnyAsync(queues, item, cancellationToken).WaitAndUnwrapException();
- }
-
- ///
- /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding.
- ///
- /// The producer/consumer queues.
- /// The item to enqueue.
- /// The producer/consumer queue that received the item.
- public static Task> TryEnqueueToAnyAsync(this IEnumerable> queues, T item)
- {
- return TryEnqueueToAnyAsync(queues, item, CancellationToken.None);
- }
-
- ///
- /// Attempts to enqueue an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Returns null if all producer/consumer queues have completed adding. This method may block the calling thread.
- ///
- /// The producer/consumer queues.
- /// The item to enqueue.
- /// The producer/consumer queue that received the item.
- public static AsyncProducerConsumerQueue TryEnqueueToAny(this IEnumerable> queues, T item)
- {
- return TryEnqueueToAny(queues, item, CancellationToken.None);
- }
-
- ///
- /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding.
- ///
- /// The producer/consumer queues.
- /// The item to enqueue.
- /// A cancellation token that can be used to abort the enqueue operation.
- /// The producer/consumer queue that received the item.
- public static async Task> EnqueueToAnyAsync(this IEnumerable> queues, T item, CancellationToken cancellationToken)
- {
- var ret = await TryEnqueueToAnyAsync(queues, item, cancellationToken).ConfigureAwait(false);
- if (ret == null)
- throw new InvalidOperationException("Enqueue failed; all producer/consumer queues have completed adding.");
- return ret;
- }
-
- ///
- /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding. This method may block the calling thread.
- ///
- /// The producer/consumer queues.
- /// The item to enqueue.
- /// A cancellation token that can be used to abort the enqueue operation.
- /// The producer/consumer queue that received the item.
- public static AsyncProducerConsumerQueue EnqueueToAny(this IEnumerable> queues, T item, CancellationToken cancellationToken)
- {
- var ret = TryEnqueueToAny(queues, item, cancellationToken);
- if (ret == null)
- throw new InvalidOperationException("Enqueue failed; all producer/consumer queues have completed adding.");
- return ret;
- }
-
- ///
- /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding.
- ///
- /// The producer/consumer queues.
- /// The item to enqueue.
- /// The producer/consumer queue that received the item.
- public static Task> EnqueueToAnyAsync(this IEnumerable> queues, T item)
- {
- return EnqueueToAnyAsync(queues, item, CancellationToken.None);
- }
-
- ///
- /// Enqueues an item to any of a number of producer/consumer queues. Returns the producer/consumer queue that received the item. Throws if all producer/consumer queues have completed adding. This method may block the calling thread.
- ///
- /// The producer/consumer queues.
- /// The item to enqueue.
- /// The producer/consumer queue that received the item.
- public static AsyncProducerConsumerQueue EnqueueToAny(this IEnumerable> queues, T item)
- {
- return EnqueueToAny(queues, item, CancellationToken.None);
- }
-
- ///
- /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty.
- ///
- /// The producer/consumer queues.
- /// A cancellation token that can be used to abort the dequeue operation.
- public static async Task.DequeueResult> TryDequeueFromAnyAsync(this IEnumerable> queues, CancellationToken cancellationToken)
- {
- var abort = new TaskCompletionSource();
- using (var abortCancellationToken = CancellationTokenHelpers.FromTask(abort.Task))
- using (var combinedToken = CancellationTokenHelpers.Normalize(abortCancellationToken.Token, cancellationToken))
- {
- var token = combinedToken.Token;
- var tasks = queues.Select(q => q.TryDequeueAsync(token, abort));
- var results = await TaskShim.WhenAll(tasks).ConfigureAwait(false);
- var result = results.FirstOrDefault(x => x.Success);
- if (result != null)
- return result;
- cancellationToken.ThrowIfCancellationRequested();
- return AsyncProducerConsumerQueue.FalseResult;
- }
- }
-
- ///
- /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
- ///
- /// The producer/consumer queues.
- /// A cancellation token that can be used to abort the dequeue operation.
- public static AsyncProducerConsumerQueue.DequeueResult TryDequeueFromAny(this IEnumerable> queues, CancellationToken cancellationToken)
- {
- return TryDequeueFromAnyAsync(queues, cancellationToken).WaitAndUnwrapException();
- }
-
- ///
- /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty.
- ///
- /// The producer/consumer queues.
- public static Task.DequeueResult> TryDequeueFromAnyAsync(this IEnumerable> queues)
- {
- return TryDequeueFromAnyAsync(queues, CancellationToken.None);
- }
-
- ///
- /// Attempts to dequeue an item from any of a number of producer/consumer queues. The operation "fails" if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
- ///
- /// The producer/consumer queues.
- public static AsyncProducerConsumerQueue.DequeueResult TryDequeueFromAny(this IEnumerable> queues)
- {
- return TryDequeueFromAny(queues, CancellationToken.None);
- }
-
- ///
- /// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty.
- ///
- /// The producer/consumer queues.
- /// A cancellation token that can be used to abort the dequeue operation.
- public static async Task.DequeueResult> DequeueFromAnyAsync(this IEnumerable> queues, CancellationToken cancellationToken)
- {
- var ret = await TryDequeueFromAnyAsync(queues, cancellationToken).ConfigureAwait(false);
- if (!ret.Success)
- throw new InvalidOperationException("Dequeue failed; all producer/consumer queues have completed adding and are empty.");
- return ret;
- }
-
- ///
- /// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
- ///
- /// The producer/consumer queues.
- /// A cancellation token that can be used to abort the dequeue operation.
- public static AsyncProducerConsumerQueue.DequeueResult DequeueFromAny(this IEnumerable> queues, CancellationToken cancellationToken)
- {
- var ret = TryDequeueFromAny(queues, cancellationToken);
- if (!ret.Success)
- throw new InvalidOperationException("Dequeue failed; all producer/consumer queues have completed adding and are empty.");
- return ret;
- }
-
- ///
- /// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty.
- ///
- /// The producer/consumer queues.
- public static Task.DequeueResult> DequeueFromAnyAsync(this IEnumerable> queues)
- {
- return DequeueFromAnyAsync(queues, CancellationToken.None);
- }
-
- ///
- /// Dequeues an item from any of a number of producer/consumer queues. Throws if all the producer/consumer queues have completed adding and are empty. This method may block the calling thread.
- ///
- /// The producer/consumer queues.
- public static AsyncProducerConsumerQueue.DequeueResult DequeueFromAny(this IEnumerable> queues)
- {
- return DequeueFromAny(queues, CancellationToken.None);
- }
- }
-}
\ No newline at end of file
diff --git a/ZeroLevel/Services/Async/AsyncQueue.cs b/ZeroLevel/Services/Async/AsyncQueue.cs
new file mode 100644
index 0000000..b860cda
--- /dev/null
+++ b/ZeroLevel/Services/Async/AsyncQueue.cs
@@ -0,0 +1,520 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using ZeroLevel.Services.Async.Internal;
+
+namespace ZeroLevel.Services.Async
+{
+ ///
+ /// Represents a thread-safe queue that allows asynchronous consuming.
+ ///
+ /// The type of the items contained in the queue.
+ public class AsyncQueue
+ : IAsyncCollection
+ {
+ internal const int SegmentSize = 32;
+
+ private Segment _itemTail;
+ private Segment _awaiterTail;
+
+ ///
+ /// This actually points to either or , depending on which one lags behind.
+ /// The only reason this field exists is to simplify enumerating segments for things like , or .
+ ///
+ private Segment _head;
+
+ ///
+ /// The value is positive if there are any active enumerators and negative if any segments are being transferred to the pool.
+ ///
+ private int _enumerationPoolingBalance = 0;
+
+ private Segment _segmentPoolHead = null;
+
+ ///
+ /// Initializes a new empty instance of .
+ ///
+ public AsyncQueue()
+ {
+ Segment firstSegment = new Segment(this) { SegmentID = 0 };
+ _itemTail = firstSegment;
+ _awaiterTail = firstSegment;
+ _head = firstSegment;
+ }
+
+ ///
+ /// Initializes a new instance of that contains elements copied from a specified collection.
+ ///
+ /// The collection whose elements are copied to the new queue.
+ public AsyncQueue(IEnumerable collection)
+ : this()
+ {
+ foreach (T item in collection)
+ Add(item);
+ }
+
+ public int AwaiterCount => ComputeCount(Volatile.Read(ref _awaiterTail), Volatile.Read(ref _itemTail), s => s.AwaiterCount);
+ public int Count => ComputeCount(Volatile.Read(ref _itemTail), Volatile.Read(ref _awaiterTail), s => s.ItemCount);
+
+ private int ComputeCount(Segment myTail, Segment otherTail, Func countExtractor)
+ {
+ if (myTail.SegmentID < otherTail.SegmentID)
+ return 0;
+
+ if (myTail.SegmentID == otherTail.SegmentID)
+ return countExtractor(myTail);
+
+ int count = countExtractor(myTail) + countExtractor(otherTail);
+ long fullMiddleSegmentCount = myTail.SegmentID - otherTail.SegmentID - 1;
+ if (fullMiddleSegmentCount > 0)
+ count += SegmentSize * (int)fullMiddleSegmentCount;
+
+ return count;
+ }
+
+ public void Add(T item)
+ {
+ SpinWait spin = new SpinWait();
+ while (!Volatile.Read(ref _itemTail).TryAdd(item))
+ spin.SpinOnce();
+ }
+
+ public ValueTask TakeAsync(CancellationToken cancellationToken)
+ => cancellationToken.IsCancellationRequested
+ ? CanceledValueTask.Value
+ : TakeWithoutValidationAsync(cancellationToken);
+
+ private ValueTask TakeWithoutValidationAsync(CancellationToken cancellationToken)
+ {
+ SpinWait spin = new SpinWait();
+
+ while (true)
+ {
+ ValueTask? result = Volatile.Read(ref _awaiterTail).TryTakeAsync(cancellationToken);
+ if (result != null)
+ return result.Value;
+
+ spin.SpinOnce();
+ }
+ }
+
+ public Enumerator GetEnumerator()
+ {
+ SpinWait spin = new SpinWait();
+
+ while (true)
+ {
+ int oldBalance = Volatile.Read(ref _enumerationPoolingBalance);
+ if (oldBalance >= 0 && Interlocked.CompareExchange(ref _enumerationPoolingBalance, oldBalance + 1, oldBalance) == oldBalance)
+ break;
+
+ spin.SpinOnce();
+ }
+
+ return new Enumerator(this);
+ }
+
+ IEnumerator IEnumerable.GetEnumerator() => new BoxedEnumerator(GetEnumerator());
+ IEnumerator IEnumerable.GetEnumerator() => (this as IEnumerable).GetEnumerator();
+
+ public struct Enumerator : IEnumerator
+ {
+ private SelectManyStructEnumererator _innerEnumerator;
+ private readonly AsyncQueue _queue;
+
+ public Enumerator(AsyncQueue queue)
+ {
+ _queue = queue;
+ _innerEnumerator = new SelectManyStructEnumererator(new SegmentEnumerator(queue), segment => segment.GetEnumerator());
+ }
+
+ public T Current => _innerEnumerator.Current;
+ object IEnumerator.Current => Current;
+
+ public bool MoveNext() => _innerEnumerator.MoveNext();
+ public void Reset() => _innerEnumerator.Reset();
+
+ public void Dispose()
+ {
+ _innerEnumerator.Dispose();
+ Interlocked.Decrement(ref _queue._enumerationPoolingBalance);
+ }
+ }
+
+ private struct SegmentEnumerator : IEnumerator
+ {
+ private readonly AsyncQueue _queue;
+ private bool _readFirstSegment;
+
+ public SegmentEnumerator(AsyncQueue queue)
+ {
+ _queue = queue;
+ Current = default(Segment);
+ _readFirstSegment = false;
+ }
+
+ public Segment Current { get; private set; }
+ object IEnumerator.Current => Current;
+
+ public bool MoveNext()
+ {
+ if (!_readFirstSegment)
+ {
+ Current = Volatile.Read(ref _queue._head);
+ _readFirstSegment = true;
+ return true;
+ }
+
+ Current = Current.VolatileNext;
+ return Current != null;
+ }
+
+ public void Dispose()
+ {
+ }
+
+ public void Reset()
+ {
+ }
+ }
+
+ private class Segment : IEnumerable
+ {
+ private readonly T[] _items = new T[SegmentSize];
+ private readonly IAwaiter[] _awaiters = new IAwaiter[SegmentSize];
+ private readonly int[] _slotStates = new int[SegmentSize];
+
+ private readonly AsyncQueue _queue;
+ private long _segmentID;
+
+ private int _awaiterIndex = -1;
+ private int _itemIndex = -1;
+ private Segment _next = null;
+
+ private Segment _nextPooledSegment = null;
+
+ public Segment(AsyncQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public Segment VolatileNext => Volatile.Read(ref _next);
+
+ public long SegmentID
+ {
+ get { return Volatile.Read(ref _segmentID); }
+ set { Volatile.Write(ref _segmentID, value); }
+ }
+
+ public int ItemCount => Math.Max(0, ItemAwaiterBalance);
+ public int AwaiterCount => Math.Max(0, -ItemAwaiterBalance);
+
+ private int ItemAwaiterBalance => SlotReferenceToCount(ref _itemIndex) - SlotReferenceToCount(ref _awaiterIndex);
+ private int SlotReferenceToCount(ref int slotReference) => Math.Min(SegmentSize, Volatile.Read(ref slotReference) + 1);
+
+ public bool TryAdd(T item)
+ => TryAdd(item, Interlocked.Increment(ref _itemIndex));
+
+ private bool TryAdd(T item, int slot)
+ => slot < SegmentSize
+ && TryAddWithoutValidation(item, slot);
+
+ private bool TryAddWithoutValidation(T item, int slot)
+ {
+ _items[slot] = item;
+ bool wonSlot = Interlocked.CompareExchange(ref _slotStates[slot], SlotState.HasItem, SlotState.None) == SlotState.None;
+
+ /// 1. If we have won the slot, the item is considered successfully added.
+ /// 2. Otherwise, it's up to the result of .
+ /// Awaiter could have been canceled by now, and if it has, we should return false to insert item again into another slot.
+ /// We also can't blindly read awaiter from the slot, because captures slot *before* filling in the awaiter.
+ /// So we have to spin until it is available.
+ /// And regardless of the awaiter state, we mark the slot as finished because both item and awaiter have visited it.
+ bool success = wonSlot || TrySetAwaiterResultAndClearSlot(item, slot);
+
+ HandleLastSlotCapture(slot, wonSlot, ref _queue._itemTail);
+ return success;
+ }
+
+ private bool TrySetAwaiterResultAndClearSlot(T item, int slot)
+ {
+ bool success = SpinUntilAwaiterIsReady(slot).TrySetResult(item);
+ ClearSlot(slot);
+ return success;
+ }
+
+ private IAwaiter SpinUntilAwaiterIsReady(int slot)
+ {
+ SpinWait spin = new SpinWait();
+ while (true)
+ {
+ IAwaiter awaiter = Volatile.Read(ref _awaiters[slot]);
+ if (awaiter != null)
+ return awaiter;
+
+ spin.SpinOnce();
+ }
+ }
+
+ public ValueTask? TryTakeAsync(CancellationToken cancellationToken)
+ => TryTakeAsync(cancellationToken, Interlocked.Increment(ref _awaiterIndex));
+
+ private ValueTask? TryTakeAsync(CancellationToken cancellationToken, int slot)
+ => slot < SegmentSize
+ ? TryTakeWithoutValidationAsync(cancellationToken, slot)
+ : (ValueTask?)null;
+
+ private ValueTask TryTakeWithoutValidationAsync(CancellationToken cancellationToken, int slot)
+ {
+ ValueTask result;
+
+ /// The order here differs from what does: we capture the slot *before* inserting an awaiter.
+ /// We do it to avoid allocating an awaiter / registering the cancellation that we're not gonna need in case we lose.
+ /// This means can see the default awaiter value, but it is easily solved by spinning until the awaiter is assigned.
+ bool wonSlot = Interlocked.CompareExchange(ref _slotStates[slot], SlotState.HasAwaiter, SlotState.None) == SlotState.None;
+ if (wonSlot)
+ {
+ IAwaiter awaiter = new CompletionSourceAwaiterFactory(cancellationToken).CreateAwaiter();
+ Volatile.Write(ref _awaiters[slot], awaiter);
+ result = awaiter.Task;
+ }
+ else
+ {
+ result = new ValueTask(_items[slot]);
+ ClearSlot(slot);
+ }
+
+ HandleLastSlotCapture(slot, wonSlot, ref _queue._awaiterTail);
+ return result;
+ }
+
+ private void ClearSlot(int slot)
+ {
+ Volatile.Write(ref _slotStates[slot], SlotState.Cleared);
+ Volatile.Write(ref _awaiters[slot], null);
+ _items[slot] = default(T);
+ }
+
+ ///
+ /// Here comes the tricky part.
+ /// 0. We only care if we've captured exactly the last slot, so only one thread performs the segment maintenance.
+ /// 1. Whether the slot is lost or won, the next time the same kind of item is inserted, there's no point in looking for a slot at the current segment.
+ /// So we have to advance or , depending on the kind of item we're working on right now.
+ /// 2. The last slot is captured twice: by an item and by an awaiter. We obviously should to grow a segment only once, so only the winner does it.
+ /// 3. If we've lost the last slot, it's still possible the next segment is not grown yet, so we have to spin.
+ /// 4. If we've lost the last slot, it means we're done with the current segment: all items and all awaiters have annihilated each other.
+ /// and are 0 now, so the segment can't contribute to or .
+ /// So we lose the reference to it by advancing .
+ /// 5. If we've lost the last slot, we pool it to be reused later.
+ ///
+ /// Either or , whichever we're working on right now.
+ private void HandleLastSlotCapture(int slot, bool wonSlot, ref Segment tailReference)
+ {
+ if (!IsLastSlot(slot))
+ return;
+
+ Segment nextSegment = wonSlot ? GrowSegment() : SpinUntilNextSegmentIsReady();
+ Volatile.Write(ref tailReference, nextSegment);
+
+ if (wonSlot)
+ return;
+
+ Volatile.Write(ref _queue._head, nextSegment);
+ TryPoolSegment();
+ }
+
+ private void TryPoolSegment()
+ {
+ if (!TryDecreaseBalance())
+ return;
+
+ /// We reset so it could be GC-ed if it doesn't make it to the pool.
+ /// It's safe to do so because:
+ /// 1. guarantees that the whole queue is not being enumerated right now.
+ /// 2. By this time is already rewritten so future enumerators can't possibly reference the current segment.
+ /// The rest of the clean-up is *NOT* safe to do here, see for details.
+ Volatile.Write(ref _next, null);
+ PushToPool();
+ Interlocked.Increment(ref _queue._enumerationPoolingBalance);
+ }
+
+ private bool TryDecreaseBalance()
+ {
+ SpinWait spin = new SpinWait();
+ while (true)
+ {
+ int enumeratorPoolBalance = Volatile.Read(ref _queue._enumerationPoolingBalance);
+
+ // If the balance is positive, we have some active enumerators and it's dangerous to pool the segment right now.
+ // We can't spin until the balance is restored either, because we have no guarantee that enumerators will be disposed soon (or will be disposed at all).
+ // So we have no choice but to give up on pooling the segment.
+ if (enumeratorPoolBalance > 0)
+ return false;
+
+ if (Interlocked.CompareExchange(ref _queue._enumerationPoolingBalance, enumeratorPoolBalance - 1, enumeratorPoolBalance) == enumeratorPoolBalance)
+ return true;
+
+ spin.SpinOnce();
+ }
+ }
+
+ private void PushToPool()
+ {
+ SpinWait spin = new SpinWait();
+ while (true)
+ {
+ Segment oldHead = Volatile.Read(ref _queue._segmentPoolHead);
+ Volatile.Write(ref _nextPooledSegment, oldHead);
+
+ if (Interlocked.CompareExchange(ref _queue._segmentPoolHead, this, oldHead) == oldHead)
+ break;
+
+ spin.SpinOnce();
+ }
+ }
+
+ private Segment TryPopSegmentFromPool()
+ {
+ SpinWait spin = new SpinWait();
+ while (true)
+ {
+ Segment oldHead = Volatile.Read(ref _queue._segmentPoolHead);
+ if (oldHead == null)
+ return null;
+
+ if (Interlocked.CompareExchange(ref _queue._segmentPoolHead, oldHead._nextPooledSegment, oldHead) == oldHead)
+ {
+ Volatile.Write(ref oldHead._nextPooledSegment, null);
+ return oldHead;
+ }
+
+ spin.SpinOnce();
+ }
+ }
+
+ ///
+ /// It's possible for the appenders to read the tail reference before it's updated and to try appending to the segment while it's being pooled.
+ /// There's no cheap way to prevent it, so we have to be prepared that an append can succeed as fast as we reset or .
+ /// This means this method must *NOT* be called on putting the segment into the pool, because it could lead to items/awaiters being stored somewhere in the pool.
+ /// Since there's no guarantee that the segment will ever be reused, this effectively means losing data (or, in the best-case scenario, completely screwing up the item order).
+ /// We prevent this disaster by calling this method on taking segment from the pool, instead of putting it there.
+ /// This way even if such append happens, we're about the reattach the segment to the queue and the data won't be lost.
+ ///
+ private Segment ResetAfterTakingFromPool()
+ {
+ /// We must reset before and .
+ /// Otherwise appenders could successfully increment a pointer and mess with the slots before they are ready to be messed with.
+ for (int i = 0; i < SegmentSize; i++)
+ {
+ /// We can't simply overwrite the state, because it's possible that the slot loser has not finished yet.
+ SpinWait spin = new SpinWait();
+ while (Interlocked.CompareExchange(ref _slotStates[i], SlotState.None, SlotState.Cleared) != SlotState.Cleared)
+ spin.SpinOnce();
+ }
+
+ Volatile.Write(ref _awaiterIndex, -1);
+ Volatile.Write(ref _itemIndex, -1);
+
+ return this;
+ }
+
+ private static bool IsLastSlot(int slot) => slot == SegmentSize - 1;
+
+ private Segment SpinUntilNextSegmentIsReady()
+ {
+ SpinWait spin = new SpinWait();
+ while (VolatileNext == null)
+ spin.SpinOnce();
+
+ return VolatileNext;
+ }
+
+ private Segment GrowSegment()
+ {
+ Segment newTail = TryPopSegmentFromPool()?.ResetAfterTakingFromPool() ?? new Segment(_queue);
+ newTail.SegmentID = _segmentID + 1;
+ Volatile.Write(ref _next, newTail);
+ return newTail;
+ }
+
+ private int SpinUntilStateIsResolvedAndReturnState(int slot)
+ {
+ SpinWait spin = new SpinWait();
+ int slotState;
+ while (true)
+ {
+ slotState = Volatile.Read(ref _slotStates[slot]);
+ if (slotState != SlotState.None)
+ break;
+
+ spin.SpinOnce();
+ }
+
+ return slotState;
+ }
+
+ public Enumerator GetEnumerator() => new Enumerator(this);
+ IEnumerator IEnumerable.GetEnumerator() => new BoxedEnumerator(GetEnumerator());
+ IEnumerator IEnumerable.GetEnumerator() => (this as IEnumerable).GetEnumerator();
+
+ private static class SlotState
+ {
+ public const int None = 0;
+ public const int HasItem = 1;
+ public const int HasAwaiter = 2;
+ public const int Cleared = 3;
+ }
+
+ public struct Enumerator : IEnumerator
+ {
+ private readonly Segment _segment;
+ private int _currentSlot;
+ private int _effectiveLength;
+
+ public Enumerator(Segment segment)
+ {
+ _segment = segment;
+ _currentSlot = Int32.MinValue;
+ _effectiveLength = Int32.MinValue;
+ Current = default(T);
+ }
+
+ public T Current { get; private set; }
+ object IEnumerator.Current => Current;
+
+ public bool MoveNext()
+ {
+ if (_currentSlot == Int32.MinValue)
+ {
+ /// Items in slots 0 .. are taken by awaiters, so they are no longer considered stored in the queue.
+ _currentSlot = _segment.SlotReferenceToCount(ref _segment._awaiterIndex);
+
+ /// is the last slot an item actually exists at at the moment, so we shouldn't enumerate through the default values that are stored further.
+ _effectiveLength = _segment.SlotReferenceToCount(ref _segment._itemIndex);
+ }
+
+ while (_currentSlot < _effectiveLength)
+ {
+ int slotState = _segment.SpinUntilStateIsResolvedAndReturnState(_currentSlot);
+ Current = _segment._items[_currentSlot];
+ _currentSlot++;
+
+ if (slotState == SlotState.HasItem)
+ return true;
+ }
+
+ return false;
+ }
+
+ public void Dispose()
+ {
+ }
+
+ public void Reset()
+ {
+ }
+ }
+ }
+ }
+}
diff --git a/ZeroLevel/Services/Async/AsyncReaderWriterLock.cs b/ZeroLevel/Services/Async/AsyncReaderWriterLock.cs
deleted file mode 100644
index 3a5ce3d..0000000
--- a/ZeroLevel/Services/Async/AsyncReaderWriterLock.cs
+++ /dev/null
@@ -1,771 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace ZeroLevel.Services.Async
-{
- ///
- /// A reader/writer lock that is compatible with async. Note that this lock is not recursive!
- ///
- [DebuggerDisplay("Id = {Id}, State = {GetStateForDebugger}, ReaderCount = {GetReaderCountForDebugger}, UpgradeInProgress = {GetUpgradeInProgressForDebugger}")]
- [DebuggerTypeProxy(typeof(DebugView))]
- public sealed class AsyncReaderWriterLock
- {
- ///
- /// The queue of TCSs that other tasks are awaiting to acquire the lock as writers.
- ///
- private readonly IAsyncWaitQueue _writerQueue;
-
- ///
- /// The queue of TCSs that other tasks are awaiting to acquire the lock as readers.
- ///
- private readonly IAsyncWaitQueue _readerQueue;
-
- ///
- /// The queue of TCSs that other tasks are awaiting to acquire the lock as upgradeable readers.
- ///
- private readonly IAsyncWaitQueue _upgradeableReaderQueue;
-
- ///
- /// The queue of TCSs that other tasks are awaiting to upgrade a reader lock to a writer lock.
- ///
- private readonly IAsyncWaitQueue _upgradeReaderQueue;
-
- ///
- /// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
- ///
- private int _id;
-
- ///
- /// The current upgradeable reader lock key, if any. If this is not null , then there is an upgradeable reader lock held.
- ///
- private UpgradeableReaderKey _upgradeableReaderKey;
-
- ///
- /// Number of reader locks held (including an upgradeable reader lock, if applicable); -1 if a writer lock is held; 0 if no locks are held.
- ///
- private int _locksHeld;
-
- ///
- /// The object used for mutual exclusion.
- ///
- private readonly object _mutex;
-
- [DebuggerNonUserCode]
- internal State GetStateForDebugger
- {
- get
- {
- if (_locksHeld == 0)
- return State.Unlocked;
- if (_locksHeld == -1)
- if (_upgradeableReaderKey != null)
- return State.WriteLockedWithUpgradeableReader;
- else
- return State.WriteLocked;
- if (_upgradeableReaderKey != null)
- return State.ReadLockedWithUpgradeableReader;
- return State.ReadLocked;
- }
- }
-
- internal enum State
- {
- Unlocked,
- ReadLocked,
- ReadLockedWithUpgradeableReader,
- WriteLocked,
- WriteLockedWithUpgradeableReader,
- }
-
- [DebuggerNonUserCode]
- internal int GetReaderCountForDebugger { get { return (_locksHeld > 0 ? _locksHeld : 0); } }
-
- [DebuggerNonUserCode]
- internal bool GetUpgradeInProgressForDebugger { get { return !_upgradeReaderQueue.IsEmpty; } }
-
- ///
- /// Creates a new async-compatible reader/writer lock.
- ///
- public AsyncReaderWriterLock(IAsyncWaitQueue writerQueue, IAsyncWaitQueue readerQueue,
- IAsyncWaitQueue upgradeableReaderQueue, IAsyncWaitQueue upgradeReaderQueue)
- {
- _writerQueue = writerQueue;
- _readerQueue = readerQueue;
- _upgradeableReaderQueue = upgradeableReaderQueue;
- _upgradeReaderQueue = upgradeReaderQueue;
- _mutex = new object();
- }
-
- ///
- /// Creates a new async-compatible reader/writer lock.
- ///
- public AsyncReaderWriterLock()
- : this(new DefaultAsyncWaitQueue(), new DefaultAsyncWaitQueue(),
- new DefaultAsyncWaitQueue(), new DefaultAsyncWaitQueue())
- {
- }
-
- ///
- /// Gets a semi-unique identifier for this asynchronous lock.
- ///
- public int Id
- {
- get { return IdManager.GetId(ref _id); }
- }
-
- internal object SyncObject
- {
- get { return _mutex; }
- }
-
- ///
- /// Applies a continuation to the task that will call if the task is canceled. This method may not be called while holding the sync lock.
- ///
- /// The task to observe for cancellation.
- private void ReleaseWaitersWhenCanceled(Task task)
- {
- task.ContinueWith(t =>
- {
- List finishes;
- lock (SyncObject) { finishes = ReleaseWaiters(); }
- foreach (var finish in finishes)
- finish.Dispose();
- }, CancellationToken.None, TaskContinuationOptions.OnlyOnCanceled | TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
- }
-
- ///
- /// Asynchronously acquires the lock as a reader. Returns a disposable that releases the lock when disposed.
- ///
- /// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
- /// A disposable that releases the lock when disposed.
- public Task ReaderLockAsync(CancellationToken cancellationToken)
- {
- Task ret;
- lock (SyncObject)
- {
- // If the lock is available or in read mode and there are no waiting writers, upgradeable readers, or upgrading readers, take it immediately.
- if (_locksHeld >= 0 && _writerQueue.IsEmpty && _upgradeableReaderQueue.IsEmpty && _upgradeReaderQueue.IsEmpty)
- {
- ++_locksHeld;
- ret = TaskShim.FromResult(new ReaderKey(this));
- }
- else
- {
- // Wait for the lock to become available or cancellation.
- ret = _readerQueue.Enqueue(cancellationToken);
- }
- }
-
- return ret;
- }
-
- ///
- /// Synchronously acquires the lock as a reader. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
- ///
- /// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
- /// A disposable that releases the lock when disposed.
- public IDisposable ReaderLock(CancellationToken cancellationToken)
- {
- Task ret;
- lock (SyncObject)
- {
- // If the lock is available or in read mode and there are no waiting writers, upgradeable readers, or upgrading readers, take it immediately.
- if (_locksHeld >= 0 && _writerQueue.IsEmpty && _upgradeableReaderQueue.IsEmpty && _upgradeReaderQueue.IsEmpty)
- {
- ++_locksHeld;
- return new ReaderKey(this);
- }
-
- // Wait for the lock to become available or cancellation.
- ret = _readerQueue.Enqueue(cancellationToken);
- }
-
- return ret.WaitAndUnwrapException();
- }
-
- ///
- /// Asynchronously acquires the lock as a reader. Returns a disposable that releases the lock when disposed.
- ///
- /// A disposable that releases the lock when disposed.
- public Task ReaderLockAsync()
- {
- return ReaderLockAsync(CancellationToken.None);
- }
-
- ///
- /// Synchronously acquires the lock as a reader. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
- ///
- /// A disposable that releases the lock when disposed.
- public IDisposable ReaderLock()
- {
- return ReaderLock(CancellationToken.None);
- }
-
- ///
- /// Asynchronously acquires the lock as a writer. Returns a disposable that releases the lock when disposed.
- ///
- /// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
- /// A disposable that releases the lock when disposed.
- public Task WriterLockAsync(CancellationToken cancellationToken)
- {
- Task ret;
- lock (SyncObject)
- {
- // If the lock is available, take it immediately.
- if (_locksHeld == 0)
- {
- _locksHeld = -1;
- ret = TaskShim.FromResult(new WriterKey(this));
- }
- else
- {
- // Wait for the lock to become available or cancellation.
- ret = _writerQueue.Enqueue(cancellationToken);
- }
- }
-
- ReleaseWaitersWhenCanceled(ret);
- return ret;
- }
-
- ///
- /// Synchronously acquires the lock as a writer. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
- ///
- /// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
- /// A disposable that releases the lock when disposed.
- public IDisposable WriterLock(CancellationToken cancellationToken)
- {
- Task ret;
- lock (SyncObject)
- {
- // If the lock is available, take it immediately.
- if (_locksHeld == 0)
- {
- _locksHeld = -1;
- return new WriterKey(this);
- }
-
- // Wait for the lock to become available or cancellation.
- ret = _writerQueue.Enqueue(cancellationToken);
- }
-
- ReleaseWaitersWhenCanceled(ret);
- return ret.WaitAndUnwrapException();
- }
-
- ///
- /// Asynchronously acquires the lock as a writer. Returns a disposable that releases the lock when disposed.
- ///
- /// A disposable that releases the lock when disposed.
- public Task WriterLockAsync()
- {
- return WriterLockAsync(CancellationToken.None);
- }
-
- ///
- /// Asynchronously acquires the lock as a writer. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
- ///
- /// A disposable that releases the lock when disposed.
- public IDisposable WriterLock()
- {
- return WriterLock(CancellationToken.None);
- }
-
- ///
- /// Asynchronously acquires the lock as a reader with the option to upgrade. Returns a key that can be used to upgrade and downgrade the lock, and releases the lock when disposed.
- ///
- /// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
- /// A key that can be used to upgrade and downgrade this lock, and releases the lock when disposed.
- public Task UpgradeableReaderLockAsync(CancellationToken cancellationToken)
- {
- Task ret;
- lock (SyncObject)
- {
- // If the lock is available, take it immediately.
- if (_locksHeld == 0 || (_locksHeld > 0 && _upgradeableReaderKey == null))
- {
- ++_locksHeld;
- _upgradeableReaderKey = new UpgradeableReaderKey(this);
- ret = TaskShim.FromResult(_upgradeableReaderKey);
- }
- else
- {
- // Wait for the lock to become available or cancellation.
- ret = _upgradeableReaderQueue.Enqueue(cancellationToken);
- }
- }
-
- ReleaseWaitersWhenCanceled(ret);
- return ret;
- }
-
- ///
- /// Synchronously acquires the lock as a reader with the option to upgrade. Returns a key that can be used to upgrade and downgrade the lock, and releases the lock when disposed. This method may block the calling thread.
- ///
- /// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
- /// A key that can be used to upgrade and downgrade this lock, and releases the lock when disposed.
- public UpgradeableReaderKey UpgradeableReaderLock(CancellationToken cancellationToken)
- {
- Task ret;
- lock (SyncObject)
- {
- // If the lock is available, take it immediately.
- if (_locksHeld == 0 || (_locksHeld > 0 && _upgradeableReaderKey == null))
- {
- ++_locksHeld;
- _upgradeableReaderKey = new UpgradeableReaderKey(this);
- return _upgradeableReaderKey;
- }
-
- // Wait for the lock to become available or cancellation.
- ret = _upgradeableReaderQueue.Enqueue(cancellationToken);
- }
-
- ReleaseWaitersWhenCanceled(ret);
- return ret.WaitAndUnwrapException();
- }
-
- ///
- /// Asynchronously acquires the lock as a reader with the option to upgrade. Returns a key that can be used to upgrade and downgrade the lock, and releases the lock when disposed.
- ///
- /// A key that can be used to upgrade and downgrade this lock, and releases the lock when disposed.
- public Task UpgradeableReaderLockAsync()
- {
- return UpgradeableReaderLockAsync(CancellationToken.None);
- }
-
- ///
- /// Synchronously acquires the lock as a reader with the option to upgrade. Returns a key that can be used to upgrade and downgrade the lock, and releases the lock when disposed. This method may block the calling thread.
- ///
- /// A key that can be used to upgrade and downgrade this lock, and releases the lock when disposed.
- public UpgradeableReaderKey UpgradeableReaderLock()
- {
- return UpgradeableReaderLock(CancellationToken.None);
- }
-
- ///
- /// Asynchronously upgrades a reader lock to a writer lock. This method assumes the sync lock is already held.
- ///
- internal Task UpgradeAsync(CancellationToken cancellationToken)
- {
- Task ret;
-
- // If the lock is available, take it immediately.
- if (_locksHeld == 1)
- {
- _locksHeld = -1;
- ret = TaskShim.FromResult(new UpgradeableReaderKey.UpgradeKey(_upgradeableReaderKey));
- }
- else
- {
- // Wait for the lock to become available or cancellation.
- ret = _upgradeReaderQueue.Enqueue(cancellationToken);
- }
- return ret;
- }
-
- ///
- /// Downgrades a writer lock to a reader lock. This method assumes the sync lock is already held.
- ///
- internal List Downgrade()
- {
- _locksHeld = 1;
- return ReleaseWaiters();
- }
-
- ///
- /// Grants lock(s) to waiting tasks. This method assumes the sync lock is already held.
- ///
- private List ReleaseWaiters()
- {
- var ret = new List();
-
- if (_locksHeld == 0)
- {
- // Give priority to writers.
- if (!_writerQueue.IsEmpty)
- {
- ret.Add(_writerQueue.Dequeue(new WriterKey(this)));
- _locksHeld = -1;
- return ret;
- }
-
- // Then to upgradeable readers.
- if (!_upgradeableReaderQueue.IsEmpty)
- {
- _upgradeableReaderKey = new UpgradeableReaderKey(this);
- ret.Add(_upgradeableReaderQueue.Dequeue(_upgradeableReaderKey));
- ++_locksHeld;
- }
-
- // Finally to readers.
- while (!_readerQueue.IsEmpty)
- {
- ret.Add(_readerQueue.Dequeue(new ReaderKey(this)));
- ++_locksHeld;
- }
-
- return ret;
- }
-
- // Give priority to upgrading readers.
- if (_locksHeld == 1)
- {
- if (!_upgradeReaderQueue.IsEmpty)
- {
- ret.Add(_upgradeReaderQueue.Dequeue(new UpgradeableReaderKey.UpgradeKey(_upgradeableReaderKey)));
- _locksHeld = -1;
- }
- }
-
- if (_locksHeld > 0)
- {
- // If there are current reader locks and waiting writers, then do nothing.
- if (!_writerQueue.IsEmpty || !_upgradeableReaderQueue.IsEmpty || !_upgradeReaderQueue.IsEmpty)
- return ret;
-
- // If there are current reader locks but no upgradeable reader lock, try to release an upgradeable reader.
- if (_upgradeableReaderKey == null && !_upgradeableReaderQueue.IsEmpty)
- {
- _upgradeableReaderKey = new UpgradeableReaderKey(this);
- ret.Add(_upgradeableReaderQueue.Dequeue(_upgradeableReaderKey));
- }
- }
-
- return ret;
- }
-
- ///
- /// Releases the lock as a reader.
- ///
- internal void ReleaseReaderLock()
- {
- List finishes;
- lock (SyncObject)
- {
- --_locksHeld;
- finishes = ReleaseWaiters();
- }
- foreach (var finish in finishes)
- finish.Dispose();
- }
-
- ///
- /// Releases the lock as a writer.
- ///
- internal void ReleaseWriterLock()
- {
- List finishes;
- lock (SyncObject)
- {
- _locksHeld = 0;
- finishes = ReleaseWaiters();
- }
- foreach (var finish in finishes)
- finish.Dispose();
- }
-
- ///
- /// Releases the lock as an upgradeable reader.
- ///
- internal void ReleaseUpgradeableReaderLock(Task upgrade)
- {
- IDisposable cancelFinish = null;
- List finishes;
- lock (SyncObject)
- {
- if (upgrade != null)
- cancelFinish = _upgradeReaderQueue.TryCancel(upgrade);
- _upgradeableReaderKey = null;
- --_locksHeld;
- finishes = ReleaseWaiters();
- }
- if (cancelFinish != null)
- cancelFinish.Dispose();
- foreach (var finish in finishes)
- finish.Dispose();
- }
-
- ///
- /// The disposable which releases the reader lock.
- ///
- private sealed class ReaderKey : IDisposable
- {
- ///
- /// The lock to release.
- ///
- private AsyncReaderWriterLock _asyncReaderWriterLock;
-
- ///
- /// Creates the key for a lock.
- ///
- /// The lock to release. May not be null .
- public ReaderKey(AsyncReaderWriterLock asyncReaderWriterLock)
- {
- _asyncReaderWriterLock = asyncReaderWriterLock;
- }
-
- ///
- /// Release the lock.
- ///
- public void Dispose()
- {
- if (_asyncReaderWriterLock == null)
- return;
- _asyncReaderWriterLock.ReleaseReaderLock();
- _asyncReaderWriterLock = null;
- }
- }
-
- ///
- /// The disposable which releases the writer lock.
- ///
- private sealed class WriterKey : IDisposable
- {
- ///
- /// The lock to release.
- ///
- private AsyncReaderWriterLock _asyncReaderWriterLock;
-
- ///
- /// Creates the key for a lock.
- ///
- /// The lock to release. May not be null .
- public WriterKey(AsyncReaderWriterLock asyncReaderWriterLock)
- {
- _asyncReaderWriterLock = asyncReaderWriterLock;
- }
-
- ///
- /// Release the lock.
- ///
- public void Dispose()
- {
- if (_asyncReaderWriterLock == null)
- return;
- _asyncReaderWriterLock.ReleaseWriterLock();
- _asyncReaderWriterLock = null;
- }
- }
-
- ///
- /// The disposable which manages the upgradeable reader lock.
- ///
- [DebuggerDisplay("State = {GetStateForDebugger}, ReaderWriterLockId = {_asyncReaderWriterLock.Id}")]
- public sealed class UpgradeableReaderKey : IDisposable
- {
- ///
- /// The lock to release.
- ///
- private readonly AsyncReaderWriterLock _asyncReaderWriterLock;
-
- ///
- /// The task doing the upgrade.
- ///
- private Task _upgrade;
-
- ///
- /// Whether or not this instance has been disposed.
- ///
- private bool _disposed;
-
- [DebuggerNonUserCode]
- internal State GetStateForDebugger
- {
- get
- {
- if (_upgrade == null)
- return State.Reader;
- if (_upgrade.Status == TaskStatus.RanToCompletion)
- return State.Writer;
- return State.UpgradingToWriter;
- }
- }
-
- internal enum State
- {
- Reader,
- UpgradingToWriter,
- Writer,
- }
-
- ///
- /// Creates the key for a lock.
- ///
- /// The lock to release. May not be null .
- internal UpgradeableReaderKey(AsyncReaderWriterLock asyncReaderWriterLock)
- {
- _asyncReaderWriterLock = asyncReaderWriterLock;
- }
-
- ///
- /// Gets a value indicating whether this lock has been upgraded to a write lock.
- ///
- public bool Upgraded
- {
- get
- {
- Task task;
- lock (_asyncReaderWriterLock.SyncObject) { task = _upgrade; }
- return (task != null && task.Status == TaskStatus.RanToCompletion);
- }
- }
-
- ///
- /// Upgrades the reader lock to a writer lock. Returns a disposable that downgrades the writer lock to a reader lock when disposed.
- ///
- /// The cancellation token used to cancel the upgrade. If this is already set, then this method will attempt to upgrade immediately (succeeding if the lock is currently available).
- public Task UpgradeAsync(CancellationToken cancellationToken)
- {
- lock (_asyncReaderWriterLock.SyncObject)
- {
- if (_upgrade != null)
- throw new InvalidOperationException("Cannot upgrade.");
-
- _upgrade = _asyncReaderWriterLock.UpgradeAsync(cancellationToken);
- }
-
- _asyncReaderWriterLock.ReleaseWaitersWhenCanceled(_upgrade);
- var ret = new TaskCompletionSource();
- _upgrade.ContinueWith(t =>
- {
- if (t.IsCanceled)
- lock (_asyncReaderWriterLock.SyncObject) { _upgrade = null; }
- ret.TryCompleteFromCompletedTask(t);
- }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
- return ret.Task;
- }
-
- ///
- /// Synchronously upgrades the reader lock to a writer lock. Returns a disposable that downgrades the writer lock to a reader lock when disposed. This method may block the calling thread.
- ///
- /// The cancellation token used to cancel the upgrade. If this is already set, then this method will attempt to upgrade immediately (succeeding if the lock is currently available).
- public IDisposable Upgrade(CancellationToken cancellationToken)
- {
- lock (_asyncReaderWriterLock.SyncObject)
- {
- if (_upgrade != null)
- throw new InvalidOperationException("Cannot upgrade.");
-
- _upgrade = _asyncReaderWriterLock.UpgradeAsync(cancellationToken);
- }
-
- _asyncReaderWriterLock.ReleaseWaitersWhenCanceled(_upgrade);
- var ret = new TaskCompletionSource();
- _upgrade.ContinueWith(t =>
- {
- if (t.IsCanceled)
- lock (_asyncReaderWriterLock.SyncObject) { _upgrade = null; }
- ret.TryCompleteFromCompletedTask(t);
- }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
- return ret.Task.WaitAndUnwrapException();
- }
-
- ///
- /// Upgrades the reader lock to a writer lock. Returns a disposable that downgrades the writer lock to a reader lock when disposed.
- ///
- public Task UpgradeAsync()
- {
- return UpgradeAsync(CancellationToken.None);
- }
-
- ///
- /// Synchronously upgrades the reader lock to a writer lock. Returns a disposable that downgrades the writer lock to a reader lock when disposed. This method may block the calling thread.
- ///
- public IDisposable Upgrade()
- {
- return Upgrade(CancellationToken.None);
- }
-
- ///
- /// Downgrades the writer lock to a reader lock.
- ///
- private void Downgrade()
- {
- List finishes;
- lock (_asyncReaderWriterLock.SyncObject)
- {
- finishes = _asyncReaderWriterLock.Downgrade();
- _upgrade = null;
- }
- foreach (var finish in finishes)
- finish.Dispose();
- }
-
- ///
- /// Release the lock.
- ///
- public void Dispose()
- {
- if (_disposed)
- return;
- _asyncReaderWriterLock.ReleaseUpgradeableReaderLock(_upgrade);
- _disposed = true;
- }
-
- ///
- /// The disposable which downgrades an upgradeable reader key.
- ///
- internal sealed class UpgradeKey : IDisposable
- {
- ///
- /// The upgradeable reader key to downgrade.
- ///
- private UpgradeableReaderKey _key;
-
- ///
- /// Creates the upgrade key for an upgradeable reader key.
- ///
- /// The upgradeable reader key to downgrade. May not be null .
- public UpgradeKey(UpgradeableReaderKey key)
- {
- _key = key;
- }
-
- ///
- /// Downgrade the upgradeable reader key.
- ///
- public void Dispose()
- {
- if (_key == null)
- return;
- _key.Downgrade();
- _key = null;
- }
- }
- }
-
- // ReSharper disable UnusedMember.Local
- [DebuggerNonUserCode]
- private sealed class DebugView
- {
- private readonly AsyncReaderWriterLock _rwl;
-
- public DebugView(AsyncReaderWriterLock rwl)
- {
- _rwl = rwl;
- }
-
- public int Id { get { return _rwl.Id; } }
-
- public State State { get { return _rwl.GetStateForDebugger; } }
-
- public int ReaderCount { get { return _rwl.GetReaderCountForDebugger; } }
-
- public bool UpgradeInProgress { get { return _rwl.GetUpgradeInProgressForDebugger; } }
-
- public IAsyncWaitQueue ReaderWaitQueue { get { return _rwl._readerQueue; } }
-
- public IAsyncWaitQueue WriterWaitQueue { get { return _rwl._writerQueue; } }
-
- public IAsyncWaitQueue UpgradeableReaderWaitQueue { get { return _rwl._upgradeableReaderQueue; } }
-
- public IAsyncWaitQueue UpgradeReaderWaitQueue { get { return _rwl._upgradeReaderQueue; } }
- }
-
- // ReSharper restore UnusedMember.Local
- }
-}
\ No newline at end of file
diff --git a/ZeroLevel/Services/Async/AsyncSemaphore.cs b/ZeroLevel/Services/Async/AsyncSemaphore.cs
deleted file mode 100644
index 478641c..0000000
--- a/ZeroLevel/Services/Async/AsyncSemaphore.cs
+++ /dev/null
@@ -1,178 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace ZeroLevel.Services.Async
-{
- ///
- /// An async-compatible semaphore. Alternatively, you could use