using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ZeroLevel.Services.Async
{
///
/// A collection of cancelable instances. Implementations must be threadsafe and must work correctly if the caller is holding a lock.
///
/// The type of the results. If this isn't needed, use .
public interface IAsyncWaitQueue
{
///
/// Gets whether the queue is empty.
///
bool IsEmpty { get; }
///
/// Creates a new entry and queues it to this wait queue. The returned task must support both synchronous and asynchronous waits.
///
/// The queued task.
Task Enqueue();
///
/// Removes a single entry in the wait queue. Returns a disposable that completes that entry.
///
/// The result used to complete the wait queue entry. If this isn't needed, use default(T).
IDisposable Dequeue(T result = default(T));
///
/// Removes all entries in the wait queue. Returns a disposable that completes all entries.
///
/// The result used to complete the wait queue entries. If this isn't needed, use default(T).
IDisposable DequeueAll(T result = default(T));
///
/// Attempts to remove an entry from the wait queue. Returns a disposable that cancels the entry.
///
/// The task to cancel.
/// A value indicating whether the entry was found and canceled.
IDisposable TryCancel(Task task);
///
/// Removes all entries from the wait queue. Returns a disposable that cancels all entries.
///
IDisposable CancelAll();
}
///
/// Provides extension methods for wait queues.
///
public static class AsyncWaitQueueExtensions
{
///
/// Creates a new entry and queues it to this wait queue. If the cancellation token is already canceled, this method immediately returns a canceled task without modifying the wait queue.
///
/// The wait queue.
/// The token used to cancel the wait.
/// The queued task.
public static Task Enqueue(this IAsyncWaitQueue @this, CancellationToken token)
{
if (token.IsCancellationRequested)
return TaskConstants.Canceled;
var ret = @this.Enqueue();
if (token.CanBeCanceled)
{
var registration = token.Register(() => @this.TryCancel(ret).Dispose(), useSynchronizationContext: false);
ret.ContinueWith(_ => registration.Dispose(), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
return ret;
}
}
///
/// The default wait queue implementation, which uses a double-ended queue.
///
/// The type of the results. If this isn't needed, use .
[DebuggerDisplay("Count = {Count}")]
[DebuggerTypeProxy(typeof(DefaultAsyncWaitQueue<>.DebugView))]
public sealed class DefaultAsyncWaitQueue : IAsyncWaitQueue
{
private readonly Deque> _queue = new Deque>();
private int Count
{
get { lock (_queue) { return _queue.Count; } }
}
bool IAsyncWaitQueue.IsEmpty
{
get { return Count == 0; }
}
Task IAsyncWaitQueue.Enqueue()
{
var tcs = new TaskCompletionSource();
lock (_queue)
_queue.AddToBack(tcs);
return tcs.Task;
}
IDisposable IAsyncWaitQueue.Dequeue(T result)
{
TaskCompletionSource tcs;
lock (_queue)
tcs = _queue.RemoveFromFront();
return new CompleteDisposable(result, tcs);
}
IDisposable IAsyncWaitQueue.DequeueAll(T result)
{
TaskCompletionSource[] taskCompletionSources;
lock (_queue)
{
taskCompletionSources = _queue.ToArray();
_queue.Clear();
}
return new CompleteDisposable(result, taskCompletionSources);
}
IDisposable IAsyncWaitQueue.TryCancel(Task task)
{
TaskCompletionSource tcs = null;
lock (_queue)
{
for (int i = 0; i != _queue.Count; ++i)
{
if (_queue[i].Task == task)
{
tcs = _queue[i];
_queue.RemoveAt(i);
break;
}
}
}
if (tcs == null)
return new CancelDisposable();
return new CancelDisposable(tcs);
}
IDisposable IAsyncWaitQueue.CancelAll()
{
TaskCompletionSource[] taskCompletionSources;
lock (_queue)
{
taskCompletionSources = _queue.ToArray();
_queue.Clear();
}
return new CancelDisposable(taskCompletionSources);
}
private sealed class CancelDisposable : IDisposable
{
private readonly TaskCompletionSource[] _taskCompletionSources;
public CancelDisposable(params TaskCompletionSource[] taskCompletionSources)
{
_taskCompletionSources = taskCompletionSources;
}
public void Dispose()
{
foreach (var cts in _taskCompletionSources)
cts.TrySetCanceled();
}
}
private sealed class CompleteDisposable : IDisposable
{
private readonly TaskCompletionSource[] _taskCompletionSources;
private readonly T _result;
public CompleteDisposable(T result, params TaskCompletionSource[] taskCompletionSources)
{
_result = result;
_taskCompletionSources = taskCompletionSources;
}
public void Dispose()
{
foreach (var cts in _taskCompletionSources)
cts.TrySetResult(_result);
}
}
[DebuggerNonUserCode]
internal sealed class DebugView
{
private readonly DefaultAsyncWaitQueue _queue;
public DebugView(DefaultAsyncWaitQueue queue)
{
_queue = queue;
}
[DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
public Task[] Tasks
{
get { return _queue._queue.Select(x => x.Task).ToArray(); }
}
}
}
}