using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace ZeroLevel.Services.Async { /// /// Represents a thread-safe collection that groups the items added to it into batches and allows consuming them asynchronously. /// /// The type of the items contained in the collection. public class AsyncBatchQueue : IAsyncBatchCollection { private volatile Batch _currentBatch; private readonly AsyncQueue> _batchQueue = new AsyncQueue>(); /// /// Initializes a new instance of that produces batches of a specified size. /// /// Amount of the items contained in an output batch. public AsyncBatchQueue(int batchSize) { if (batchSize <= 0) throw new ArgumentOutOfRangeException("batchSize", batchSize, "Batch size must be a positive integer."); BatchSize = batchSize; _currentBatch = new Batch(this); } /// /// Gets amount of items contained in an output batch. /// public int BatchSize { get; } /// /// Gets the number of flushed batches currently available for consuming. /// public int Count => _batchQueue.Count; /// /// Adds an item to the collection. Flushes the new batch to be available for consuming if amount of the pending items has reached . /// /// public void Add(T item) { SpinWait spin = new SpinWait(); while (!_currentBatch.TryAdd(item)) spin.SpinOnce(); } /// /// Removes and returns a batch from the collection in an asynchronous manner. /// public ValueTask> TakeAsync() => TakeAsync(CancellationToken.None); /// /// Removes and returns a batch from the collection in an asynchronous manner. /// public ValueTask> TakeAsync(CancellationToken cancellationToken) => _batchQueue.TakeAsync(cancellationToken); /// /// Forces a new batch to be created and made available for consuming even if amount of the pending items has not reached yet. /// Does nothing if there are no pending items to flush. /// public void Flush() { SpinWait spin = new SpinWait(); while (!_currentBatch.TryFlush()) spin.SpinOnce(); } public IEnumerator> GetEnumerator() => _batchQueue.GetEnumerator(); System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); private class Batch : IReadOnlyList { private readonly AsyncBatchQueue _queue; private readonly T[] _items; private readonly bool[] _finalizationFlags; private int _lastReservationIndex = -1; private int _count = -1; public Batch(AsyncBatchQueue queue) { _queue = queue; _items = new T[_queue.BatchSize]; _finalizationFlags = new bool[_queue.BatchSize]; } public bool TryAdd(T item) { int index = Interlocked.Increment(ref _lastReservationIndex); // The following is true if someone has beaten us to the last slot and we have to wait until the next batch comes along. if (index >= _queue.BatchSize) return false; // The following is true if we've taken the last slot, which means we're obligated to flush the current batch and create a new one. if (index == _queue.BatchSize - 1) FlushInternal(_queue.BatchSize); // The full fence prevents setting finalization flag before the actual item value is written. _items[index] = item; Interlocked.MemoryBarrier(); _finalizationFlags[index] = true; return true; } public bool TryFlush() { int expectedPreviousReservation = Volatile.Read(ref _lastReservationIndex); // We don't flush if the batch doesn't have any items or if another thread is about to flush it. // However, we report success to avoid unnecessary spinning. if (expectedPreviousReservation < 0 || expectedPreviousReservation >= _queue.BatchSize - 1) return true; int previousReservation = Interlocked.CompareExchange(ref _lastReservationIndex, _queue.BatchSize, expectedPreviousReservation); // Flush reservation has succeeded. if (expectedPreviousReservation == previousReservation) { FlushInternal(previousReservation + 1); return true; } // The following is true if someone has completed the batch by the time we tried to flush it. // Therefore the batch will be flushed anyway even if we don't do anything. // The opposite means someone has slipped in an update and we have to spin. return previousReservation >= _queue.BatchSize; } private void FlushInternal(int count) { _count = count; _queue._currentBatch = new Batch(_queue); // The full fence ensures that the current batch will never be added to the queue before _count is set. Interlocked.MemoryBarrier(); _queue._batchQueue.Add(this); } private T GetItemWithoutValidation(int index) { SpinWait spin = new SpinWait(); while (!_finalizationFlags[index]) { spin.SpinOnce(); // The full fence prevents caching any part of _finalizationFlags[ index ] expression. Interlocked.MemoryBarrier(); } // The full fence prevents reading item value before finalization flag is set. Interlocked.MemoryBarrier(); return _items[index]; } public T this[int index] { get { if (index >= Count) throw new IndexOutOfRangeException(); return GetItemWithoutValidation(index); } } public int Count => _count; public IEnumerator GetEnumerator() { for (int i = 0; i < Count; i++) yield return GetItemWithoutValidation(i); } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); } } }