using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ZeroLevel.Services.Async.Internal;
namespace ZeroLevel.Services.Async
{
///
/// Represents a thread-safe queue that allows asynchronous consuming.
///
/// The type of the items contained in the queue.
public class AsyncQueue
: IAsyncCollection
{
internal const int SegmentSize = 32;
private Segment _itemTail;
private Segment _awaiterTail;
///
/// This actually points to either or , depending on which one lags behind.
/// The only reason this field exists is to simplify enumerating segments for things like , or .
///
private Segment _head;
///
/// The value is positive if there are any active enumerators and negative if any segments are being transferred to the pool.
///
private int _enumerationPoolingBalance = 0;
private Segment _segmentPoolHead = null;
///
/// Initializes a new empty instance of .
///
public AsyncQueue()
{
Segment firstSegment = new Segment(this) { SegmentID = 0 };
_itemTail = firstSegment;
_awaiterTail = firstSegment;
_head = firstSegment;
}
///
/// Initializes a new instance of that contains elements copied from a specified collection.
///
/// The collection whose elements are copied to the new queue.
public AsyncQueue(IEnumerable collection)
: this()
{
foreach (T item in collection)
Add(item);
}
public int AwaiterCount => ComputeCount(Volatile.Read(ref _awaiterTail), Volatile.Read(ref _itemTail), s => s.AwaiterCount);
public int Count => ComputeCount(Volatile.Read(ref _itemTail), Volatile.Read(ref _awaiterTail), s => s.ItemCount);
private int ComputeCount(Segment myTail, Segment otherTail, Func countExtractor)
{
if (myTail.SegmentID < otherTail.SegmentID)
return 0;
if (myTail.SegmentID == otherTail.SegmentID)
return countExtractor(myTail);
int count = countExtractor(myTail) + countExtractor(otherTail);
long fullMiddleSegmentCount = myTail.SegmentID - otherTail.SegmentID - 1;
if (fullMiddleSegmentCount > 0)
count += SegmentSize * (int)fullMiddleSegmentCount;
return count;
}
public void Add(T item)
{
SpinWait spin = new SpinWait();
while (!Volatile.Read(ref _itemTail).TryAdd(item))
spin.SpinOnce();
}
public ValueTask TakeAsync(CancellationToken cancellationToken)
=> cancellationToken.IsCancellationRequested
? CanceledValueTask.Value
: TakeWithoutValidationAsync(cancellationToken);
private ValueTask TakeWithoutValidationAsync(CancellationToken cancellationToken)
{
SpinWait spin = new SpinWait();
while (true)
{
ValueTask? result = Volatile.Read(ref _awaiterTail).TryTakeAsync(cancellationToken);
if (result != null)
return result.Value;
spin.SpinOnce();
}
}
public Enumerator GetEnumerator()
{
SpinWait spin = new SpinWait();
while (true)
{
int oldBalance = Volatile.Read(ref _enumerationPoolingBalance);
if (oldBalance >= 0 && Interlocked.CompareExchange(ref _enumerationPoolingBalance, oldBalance + 1, oldBalance) == oldBalance)
break;
spin.SpinOnce();
}
return new Enumerator(this);
}
IEnumerator IEnumerable.GetEnumerator() => new BoxedEnumerator(GetEnumerator());
IEnumerator IEnumerable.GetEnumerator() => (this as IEnumerable).GetEnumerator();
public struct Enumerator : IEnumerator
{
private SelectManyStructEnumererator _innerEnumerator;
private readonly AsyncQueue _queue;
public Enumerator(AsyncQueue queue)
{
_queue = queue;
_innerEnumerator = new SelectManyStructEnumererator(new SegmentEnumerator(queue), segment => segment.GetEnumerator());
}
public T Current => _innerEnumerator.Current;
object IEnumerator.Current => Current;
public bool MoveNext() => _innerEnumerator.MoveNext();
public void Reset() => _innerEnumerator.Reset();
public void Dispose()
{
_innerEnumerator.Dispose();
Interlocked.Decrement(ref _queue._enumerationPoolingBalance);
}
}
private struct SegmentEnumerator : IEnumerator
{
private readonly AsyncQueue _queue;
private bool _readFirstSegment;
public SegmentEnumerator(AsyncQueue queue)
{
_queue = queue;
Current = default(Segment);
_readFirstSegment = false;
}
public Segment Current { get; private set; }
object IEnumerator.Current => Current;
public bool MoveNext()
{
if (!_readFirstSegment)
{
Current = Volatile.Read(ref _queue._head);
_readFirstSegment = true;
return true;
}
Current = Current.VolatileNext;
return Current != null;
}
public void Dispose()
{
}
public void Reset()
{
}
}
private class Segment : IEnumerable
{
private readonly T[] _items = new T[SegmentSize];
private readonly IAwaiter[] _awaiters = new IAwaiter[SegmentSize];
private readonly int[] _slotStates = new int[SegmentSize];
private readonly AsyncQueue _queue;
private long _segmentID;
private int _awaiterIndex = -1;
private int _itemIndex = -1;
private Segment _next = null;
private Segment _nextPooledSegment = null;
public Segment(AsyncQueue queue)
{
_queue = queue;
}
public Segment VolatileNext => Volatile.Read(ref _next);
public long SegmentID
{
get { return Volatile.Read(ref _segmentID); }
set { Volatile.Write(ref _segmentID, value); }
}
public int ItemCount => Math.Max(0, ItemAwaiterBalance);
public int AwaiterCount => Math.Max(0, -ItemAwaiterBalance);
private int ItemAwaiterBalance => SlotReferenceToCount(ref _itemIndex) - SlotReferenceToCount(ref _awaiterIndex);
private int SlotReferenceToCount(ref int slotReference) => Math.Min(SegmentSize, Volatile.Read(ref slotReference) + 1);
public bool TryAdd(T item)
=> TryAdd(item, Interlocked.Increment(ref _itemIndex));
private bool TryAdd(T item, int slot)
=> slot < SegmentSize
&& TryAddWithoutValidation(item, slot);
private bool TryAddWithoutValidation(T item, int slot)
{
_items[slot] = item;
bool wonSlot = Interlocked.CompareExchange(ref _slotStates[slot], SlotState.HasItem, SlotState.None) == SlotState.None;
/// 1. If we have won the slot, the item is considered successfully added.
/// 2. Otherwise, it's up to the result of .
/// Awaiter could have been canceled by now, and if it has, we should return false to insert item again into another slot.
/// We also can't blindly read awaiter from the slot, because captures slot *before* filling in the awaiter.
/// So we have to spin until it is available.
/// And regardless of the awaiter state, we mark the slot as finished because both item and awaiter have visited it.
bool success = wonSlot || TrySetAwaiterResultAndClearSlot(item, slot);
HandleLastSlotCapture(slot, wonSlot, ref _queue._itemTail);
return success;
}
private bool TrySetAwaiterResultAndClearSlot(T item, int slot)
{
bool success = SpinUntilAwaiterIsReady(slot).TrySetResult(item);
ClearSlot(slot);
return success;
}
private IAwaiter SpinUntilAwaiterIsReady(int slot)
{
SpinWait spin = new SpinWait();
while (true)
{
IAwaiter awaiter = Volatile.Read(ref _awaiters[slot]);
if (awaiter != null)
return awaiter;
spin.SpinOnce();
}
}
public ValueTask? TryTakeAsync(CancellationToken cancellationToken)
=> TryTakeAsync(cancellationToken, Interlocked.Increment(ref _awaiterIndex));
private ValueTask? TryTakeAsync(CancellationToken cancellationToken, int slot)
=> slot < SegmentSize
? TryTakeWithoutValidationAsync(cancellationToken, slot)
: (ValueTask?)null;
private ValueTask TryTakeWithoutValidationAsync(CancellationToken cancellationToken, int slot)
{
ValueTask result;
/// The order here differs from what does: we capture the slot *before* inserting an awaiter.
/// We do it to avoid allocating an awaiter / registering the cancellation that we're not gonna need in case we lose.
/// This means can see the default awaiter value, but it is easily solved by spinning until the awaiter is assigned.
bool wonSlot = Interlocked.CompareExchange(ref _slotStates[slot], SlotState.HasAwaiter, SlotState.None) == SlotState.None;
if (wonSlot)
{
IAwaiter awaiter = new CompletionSourceAwaiterFactory(cancellationToken).CreateAwaiter();
Volatile.Write(ref _awaiters[slot], awaiter);
result = awaiter.Task;
}
else
{
result = new ValueTask(_items[slot]);
ClearSlot(slot);
}
HandleLastSlotCapture(slot, wonSlot, ref _queue._awaiterTail);
return result;
}
private void ClearSlot(int slot)
{
Volatile.Write(ref _slotStates[slot], SlotState.Cleared);
Volatile.Write(ref _awaiters[slot], null);
_items[slot] = default(T);
}
///
/// Here comes the tricky part.
/// 0. We only care if we've captured exactly the last slot, so only one thread performs the segment maintenance.
/// 1. Whether the slot is lost or won, the next time the same kind of item is inserted, there's no point in looking for a slot at the current segment.
/// So we have to advance or , depending on the kind of item we're working on right now.
/// 2. The last slot is captured twice: by an item and by an awaiter. We obviously should to grow a segment only once, so only the winner does it.
/// 3. If we've lost the last slot, it's still possible the next segment is not grown yet, so we have to spin.
/// 4. If we've lost the last slot, it means we're done with the current segment: all items and all awaiters have annihilated each other.
/// and are 0 now, so the segment can't contribute to or .
/// So we lose the reference to it by advancing .
/// 5. If we've lost the last slot, we pool it to be reused later.
///
/// Either or , whichever we're working on right now.
private void HandleLastSlotCapture(int slot, bool wonSlot, ref Segment tailReference)
{
if (!IsLastSlot(slot))
return;
Segment nextSegment = wonSlot ? GrowSegment() : SpinUntilNextSegmentIsReady();
Volatile.Write(ref tailReference, nextSegment);
if (wonSlot)
return;
Volatile.Write(ref _queue._head, nextSegment);
TryPoolSegment();
}
private void TryPoolSegment()
{
if (!TryDecreaseBalance())
return;
/// We reset so it could be GC-ed if it doesn't make it to the pool.
/// It's safe to do so because:
/// 1. guarantees that the whole queue is not being enumerated right now.
/// 2. By this time is already rewritten so future enumerators can't possibly reference the current segment.
/// The rest of the clean-up is *NOT* safe to do here, see for details.
Volatile.Write(ref _next, null);
PushToPool();
Interlocked.Increment(ref _queue._enumerationPoolingBalance);
}
private bool TryDecreaseBalance()
{
SpinWait spin = new SpinWait();
while (true)
{
int enumeratorPoolBalance = Volatile.Read(ref _queue._enumerationPoolingBalance);
// If the balance is positive, we have some active enumerators and it's dangerous to pool the segment right now.
// We can't spin until the balance is restored either, because we have no guarantee that enumerators will be disposed soon (or will be disposed at all).
// So we have no choice but to give up on pooling the segment.
if (enumeratorPoolBalance > 0)
return false;
if (Interlocked.CompareExchange(ref _queue._enumerationPoolingBalance, enumeratorPoolBalance - 1, enumeratorPoolBalance) == enumeratorPoolBalance)
return true;
spin.SpinOnce();
}
}
private void PushToPool()
{
SpinWait spin = new SpinWait();
while (true)
{
Segment oldHead = Volatile.Read(ref _queue._segmentPoolHead);
Volatile.Write(ref _nextPooledSegment, oldHead);
if (Interlocked.CompareExchange(ref _queue._segmentPoolHead, this, oldHead) == oldHead)
break;
spin.SpinOnce();
}
}
private Segment TryPopSegmentFromPool()
{
SpinWait spin = new SpinWait();
while (true)
{
Segment oldHead = Volatile.Read(ref _queue._segmentPoolHead);
if (oldHead == null)
return null;
if (Interlocked.CompareExchange(ref _queue._segmentPoolHead, oldHead._nextPooledSegment, oldHead) == oldHead)
{
Volatile.Write(ref oldHead._nextPooledSegment, null);
return oldHead;
}
spin.SpinOnce();
}
}
///
/// It's possible for the appenders to read the tail reference before it's updated and to try appending to the segment while it's being pooled.
/// There's no cheap way to prevent it, so we have to be prepared that an append can succeed as fast as we reset or .
/// This means this method must *NOT* be called on putting the segment into the pool, because it could lead to items/awaiters being stored somewhere in the pool.
/// Since there's no guarantee that the segment will ever be reused, this effectively means losing data (or, in the best-case scenario, completely screwing up the item order).
/// We prevent this disaster by calling this method on taking segment from the pool, instead of putting it there.
/// This way even if such append happens, we're about the reattach the segment to the queue and the data won't be lost.
///
private Segment ResetAfterTakingFromPool()
{
/// We must reset before and .
/// Otherwise appenders could successfully increment a pointer and mess with the slots before they are ready to be messed with.
for (int i = 0; i < SegmentSize; i++)
{
/// We can't simply overwrite the state, because it's possible that the slot loser has not finished yet.
SpinWait spin = new SpinWait();
while (Interlocked.CompareExchange(ref _slotStates[i], SlotState.None, SlotState.Cleared) != SlotState.Cleared)
spin.SpinOnce();
}
Volatile.Write(ref _awaiterIndex, -1);
Volatile.Write(ref _itemIndex, -1);
return this;
}
private static bool IsLastSlot(int slot) => slot == SegmentSize - 1;
private Segment SpinUntilNextSegmentIsReady()
{
SpinWait spin = new SpinWait();
while (VolatileNext == null)
spin.SpinOnce();
return VolatileNext;
}
private Segment GrowSegment()
{
Segment newTail = TryPopSegmentFromPool()?.ResetAfterTakingFromPool() ?? new Segment(_queue);
newTail.SegmentID = _segmentID + 1;
Volatile.Write(ref _next, newTail);
return newTail;
}
private int SpinUntilStateIsResolvedAndReturnState(int slot)
{
SpinWait spin = new SpinWait();
int slotState;
while (true)
{
slotState = Volatile.Read(ref _slotStates[slot]);
if (slotState != SlotState.None)
break;
spin.SpinOnce();
}
return slotState;
}
public Enumerator GetEnumerator() => new Enumerator(this);
IEnumerator IEnumerable.GetEnumerator() => new BoxedEnumerator(GetEnumerator());
IEnumerator IEnumerable.GetEnumerator() => (this as IEnumerable).GetEnumerator();
private static class SlotState
{
public const int None = 0;
public const int HasItem = 1;
public const int HasAwaiter = 2;
public const int Cleared = 3;
}
public struct Enumerator : IEnumerator
{
private readonly Segment _segment;
private int _currentSlot;
private int _effectiveLength;
public Enumerator(Segment segment)
{
_segment = segment;
_currentSlot = Int32.MinValue;
_effectiveLength = Int32.MinValue;
Current = default(T);
}
public T Current { get; private set; }
object IEnumerator.Current => Current;
public bool MoveNext()
{
if (_currentSlot == Int32.MinValue)
{
/// Items in slots 0 .. are taken by awaiters, so they are no longer considered stored in the queue.
_currentSlot = _segment.SlotReferenceToCount(ref _segment._awaiterIndex);
/// is the last slot an item actually exists at at the moment, so we shouldn't enumerate through the default values that are stored further.
_effectiveLength = _segment.SlotReferenceToCount(ref _segment._itemIndex);
}
while (_currentSlot < _effectiveLength)
{
int slotState = _segment.SpinUntilStateIsResolvedAndReturnState(_currentSlot);
Current = _segment._items[_currentSlot];
_currentSlot++;
if (slotState == SlotState.HasItem)
return true;
}
return false;
}
public void Dispose()
{
}
public void Reset()
{
}
}
}
}
}