using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ZeroLevel.Services.Async.Internal
{
///
/// A set of exclusive awaiters that allows only one of the awaiters to be completed.
///
internal class ExclusiveCompletionSourceGroup
{
private int _completedSource = State.Locked;
private readonly TaskCompletionSource> _realCompetionSource = new TaskCompletionSource>();
private BitArray32 _awaitersCreated = BitArray32.Empty;
private CancellationRegistrationHolder _cancellationRegistrationHolder;
public ExclusiveCompletionSourceGroup()
{
Task = _realCompetionSource.Task.WithYield();
}
public Task> Task { get; }
public bool IsAwaiterCreated(int index) => _awaitersCreated.IsBitSet(index);
public Factory CreateAwaiterFactory(int index) => new Factory(this, index);
private IAwaiter CreateAwaiter(int index)
{
_awaitersCreated = _awaitersCreated.WithBitSet(index);
return new ExclusiveCompletionSource(this, index);
}
public void MarkAsResolved() => Interlocked.CompareExchange(ref _completedSource, State.Canceled, State.Unlocked);
public void UnlockCompetition(CancellationToken cancellationToken)
{
CancellationTokenRegistration registration = cancellationToken
.Register
(
state =>
{
ExclusiveCompletionSourceGroup group = state as ExclusiveCompletionSourceGroup;
/// There are 2 cases here.
///
/// #1: The token is canceled before is called, but after the token is validated higher up the stack.
/// Is this is the case, the cancellation callbak will be called synchronously while is still set to .
/// So the competition will never progress to and we have to check for this explicitly.
///
/// #2: We're canceled after the competition has been unlocked.
/// If this is the case, we have a simple race against the awaiters to progress from to .
if (group.TryTransitionToCanceledIfStateIs(State.Locked) || group.TryTransitionToCanceledIfStateIs(State.Unlocked))
group._realCompetionSource.SetCanceled();
},
this,
useSynchronizationContext: false
);
// We can't do volatile reads/writes on a custom value type field, so we have to wrap the registration into a holder instance.
// But there's no point in allocating the wrapper if the token can never be canceled.
if (cancellationToken.CanBeCanceled)
Volatile.Write(ref _cancellationRegistrationHolder, new CancellationRegistrationHolder(registration));
// If the cancellation was processed synchronously, the state will already be set to Canceled and we must *NOT* unlock the competition.
Interlocked.CompareExchange(ref _completedSource, State.Unlocked, State.Locked);
}
private bool TryTransitionToCanceledIfStateIs(int requiredState) => Interlocked.CompareExchange(ref _completedSource, State.Canceled, requiredState) == requiredState;
private static class State
{
public const int Locked = -1;
public const int Unlocked = -2;
public const int Canceled = Int32.MinValue;
}
private class CancellationRegistrationHolder
{
public CancellationRegistrationHolder(CancellationTokenRegistration registration)
{
Registration = registration;
}
public CancellationTokenRegistration Registration { get; }
}
private class ExclusiveCompletionSource : IAwaiter
{
private static readonly ValueTask _neverEndingTask = new ValueTask(new TaskCompletionSource().Task);
private readonly ExclusiveCompletionSourceGroup _group;
private readonly int _id;
public ExclusiveCompletionSource(ExclusiveCompletionSourceGroup group, int id)
{
_group = group;
_id = id;
}
public bool TrySetResult(T result)
{
SpinWait spin = new SpinWait();
while (true)
{
int completedSource = Interlocked.CompareExchange(ref _group._completedSource, _id, State.Unlocked);
if (completedSource == State.Unlocked)
{
// We are the champions!
_group._realCompetionSource.SetResult(new AnyResult(result, _id));
// This also means we're the ones responsible for disposing the cancellation registration.
// It's important to remember the holder can be null if the token is non-cancellable.
Volatile.Read(ref _group._cancellationRegistrationHolder)?.Registration.Dispose();
return true;
}
if (completedSource == State.Locked)
{
// The competition has not started yet.
spin.SpinOnce();
continue;
}
// Everything else means we've lost the competition and another completion source has got the result
return false;
}
}
// The value will never be actually used.
public ValueTask Task => _neverEndingTask;
}
public struct Factory : IAwaiterFactory, IEquatable
{
private readonly ExclusiveCompletionSourceGroup _group;
private readonly int _index;
public Factory(ExclusiveCompletionSourceGroup group, int index)
{
_group = group;
_index = index;
}
public IAwaiter CreateAwaiter() => _group.CreateAwaiter(_index);
#region IEquatable
public override int GetHashCode()
{
unchecked
{
const int prime = -1521134295;
int hash = 12345701;
hash = hash * prime + EqualityComparer>.Default.GetHashCode(_group);
hash = hash * prime + EqualityComparer.Default.GetHashCode(_index);
return hash;
}
}
public bool Equals(Factory other) => EqualityComparer>.Default.Equals(_group, other._group) && EqualityComparer.Default.Equals(_index, other._index);
public override bool Equals(object obj) => obj is Factory && Equals((Factory)obj);
public static bool operator ==(Factory x, Factory y) => x.Equals(y);
public static bool operator !=(Factory x, Factory y) => !x.Equals(y);
#endregion
}
}
}