using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace ZeroLevel.Services.Async
{
///
/// A reader/writer lock that is compatible with async. Note that this lock is not recursive!
///
[DebuggerDisplay("Id = {Id}, State = {GetStateForDebugger}, ReaderCount = {GetReaderCountForDebugger}, UpgradeInProgress = {GetUpgradeInProgressForDebugger}")]
[DebuggerTypeProxy(typeof(DebugView))]
public sealed class AsyncReaderWriterLock
{
///
/// The queue of TCSs that other tasks are awaiting to acquire the lock as writers.
///
private readonly IAsyncWaitQueue _writerQueue;
///
/// The queue of TCSs that other tasks are awaiting to acquire the lock as readers.
///
private readonly IAsyncWaitQueue _readerQueue;
///
/// The queue of TCSs that other tasks are awaiting to acquire the lock as upgradeable readers.
///
private readonly IAsyncWaitQueue _upgradeableReaderQueue;
///
/// The queue of TCSs that other tasks are awaiting to upgrade a reader lock to a writer lock.
///
private readonly IAsyncWaitQueue _upgradeReaderQueue;
///
/// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
///
private int _id;
///
/// The current upgradeable reader lock key, if any. If this is not null, then there is an upgradeable reader lock held.
///
private UpgradeableReaderKey _upgradeableReaderKey;
///
/// Number of reader locks held (including an upgradeable reader lock, if applicable); -1 if a writer lock is held; 0 if no locks are held.
///
private int _locksHeld;
///
/// The object used for mutual exclusion.
///
private readonly object _mutex;
[DebuggerNonUserCode]
internal State GetStateForDebugger
{
get
{
if (_locksHeld == 0)
return State.Unlocked;
if (_locksHeld == -1)
if (_upgradeableReaderKey != null)
return State.WriteLockedWithUpgradeableReader;
else
return State.WriteLocked;
if (_upgradeableReaderKey != null)
return State.ReadLockedWithUpgradeableReader;
return State.ReadLocked;
}
}
internal enum State
{
Unlocked,
ReadLocked,
ReadLockedWithUpgradeableReader,
WriteLocked,
WriteLockedWithUpgradeableReader,
}
[DebuggerNonUserCode]
internal int GetReaderCountForDebugger { get { return (_locksHeld > 0 ? _locksHeld : 0); } }
[DebuggerNonUserCode]
internal bool GetUpgradeInProgressForDebugger { get { return !_upgradeReaderQueue.IsEmpty; } }
///
/// Creates a new async-compatible reader/writer lock.
///
public AsyncReaderWriterLock(IAsyncWaitQueue writerQueue, IAsyncWaitQueue readerQueue,
IAsyncWaitQueue upgradeableReaderQueue, IAsyncWaitQueue upgradeReaderQueue)
{
_writerQueue = writerQueue;
_readerQueue = readerQueue;
_upgradeableReaderQueue = upgradeableReaderQueue;
_upgradeReaderQueue = upgradeReaderQueue;
_mutex = new object();
}
///
/// Creates a new async-compatible reader/writer lock.
///
public AsyncReaderWriterLock()
: this(new DefaultAsyncWaitQueue(), new DefaultAsyncWaitQueue(),
new DefaultAsyncWaitQueue(), new DefaultAsyncWaitQueue())
{
}
///
/// Gets a semi-unique identifier for this asynchronous lock.
///
public int Id
{
get { return IdManager.GetId(ref _id); }
}
internal object SyncObject
{
get { return _mutex; }
}
///
/// Applies a continuation to the task that will call if the task is canceled. This method may not be called while holding the sync lock.
///
/// The task to observe for cancellation.
private void ReleaseWaitersWhenCanceled(Task task)
{
task.ContinueWith(t =>
{
List finishes;
lock (SyncObject) { finishes = ReleaseWaiters(); }
foreach (var finish in finishes)
finish.Dispose();
}, CancellationToken.None, TaskContinuationOptions.OnlyOnCanceled | TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
///
/// Asynchronously acquires the lock as a reader. Returns a disposable that releases the lock when disposed.
///
/// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
/// A disposable that releases the lock when disposed.
public Task ReaderLockAsync(CancellationToken cancellationToken)
{
Task ret;
lock (SyncObject)
{
// If the lock is available or in read mode and there are no waiting writers, upgradeable readers, or upgrading readers, take it immediately.
if (_locksHeld >= 0 && _writerQueue.IsEmpty && _upgradeableReaderQueue.IsEmpty && _upgradeReaderQueue.IsEmpty)
{
++_locksHeld;
ret = TaskShim.FromResult(new ReaderKey(this));
}
else
{
// Wait for the lock to become available or cancellation.
ret = _readerQueue.Enqueue(cancellationToken);
}
}
return ret;
}
///
/// Synchronously acquires the lock as a reader. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
///
/// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
/// A disposable that releases the lock when disposed.
public IDisposable ReaderLock(CancellationToken cancellationToken)
{
Task ret;
lock (SyncObject)
{
// If the lock is available or in read mode and there are no waiting writers, upgradeable readers, or upgrading readers, take it immediately.
if (_locksHeld >= 0 && _writerQueue.IsEmpty && _upgradeableReaderQueue.IsEmpty && _upgradeReaderQueue.IsEmpty)
{
++_locksHeld;
return new ReaderKey(this);
}
// Wait for the lock to become available or cancellation.
ret = _readerQueue.Enqueue(cancellationToken);
}
return ret.WaitAndUnwrapException();
}
///
/// Asynchronously acquires the lock as a reader. Returns a disposable that releases the lock when disposed.
///
/// A disposable that releases the lock when disposed.
public Task ReaderLockAsync()
{
return ReaderLockAsync(CancellationToken.None);
}
///
/// Synchronously acquires the lock as a reader. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
///
/// A disposable that releases the lock when disposed.
public IDisposable ReaderLock()
{
return ReaderLock(CancellationToken.None);
}
///
/// Asynchronously acquires the lock as a writer. Returns a disposable that releases the lock when disposed.
///
/// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
/// A disposable that releases the lock when disposed.
public Task WriterLockAsync(CancellationToken cancellationToken)
{
Task ret;
lock (SyncObject)
{
// If the lock is available, take it immediately.
if (_locksHeld == 0)
{
_locksHeld = -1;
ret = TaskShim.FromResult(new WriterKey(this));
}
else
{
// Wait for the lock to become available or cancellation.
ret = _writerQueue.Enqueue(cancellationToken);
}
}
ReleaseWaitersWhenCanceled(ret);
return ret;
}
///
/// Synchronously acquires the lock as a writer. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
///
/// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
/// A disposable that releases the lock when disposed.
public IDisposable WriterLock(CancellationToken cancellationToken)
{
Task ret;
lock (SyncObject)
{
// If the lock is available, take it immediately.
if (_locksHeld == 0)
{
_locksHeld = -1;
return new WriterKey(this);
}
// Wait for the lock to become available or cancellation.
ret = _writerQueue.Enqueue(cancellationToken);
}
ReleaseWaitersWhenCanceled(ret);
return ret.WaitAndUnwrapException();
}
///
/// Asynchronously acquires the lock as a writer. Returns a disposable that releases the lock when disposed.
///
/// A disposable that releases the lock when disposed.
public Task WriterLockAsync()
{
return WriterLockAsync(CancellationToken.None);
}
///
/// Asynchronously acquires the lock as a writer. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
///
/// A disposable that releases the lock when disposed.
public IDisposable WriterLock()
{
return WriterLock(CancellationToken.None);
}
///
/// Asynchronously acquires the lock as a reader with the option to upgrade. Returns a key that can be used to upgrade and downgrade the lock, and releases the lock when disposed.
///
/// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
/// A key that can be used to upgrade and downgrade this lock, and releases the lock when disposed.
public Task UpgradeableReaderLockAsync(CancellationToken cancellationToken)
{
Task ret;
lock (SyncObject)
{
// If the lock is available, take it immediately.
if (_locksHeld == 0 || (_locksHeld > 0 && _upgradeableReaderKey == null))
{
++_locksHeld;
_upgradeableReaderKey = new UpgradeableReaderKey(this);
ret = TaskShim.FromResult(_upgradeableReaderKey);
}
else
{
// Wait for the lock to become available or cancellation.
ret = _upgradeableReaderQueue.Enqueue(cancellationToken);
}
}
ReleaseWaitersWhenCanceled(ret);
return ret;
}
///
/// Synchronously acquires the lock as a reader with the option to upgrade. Returns a key that can be used to upgrade and downgrade the lock, and releases the lock when disposed. This method may block the calling thread.
///
/// The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).
/// A key that can be used to upgrade and downgrade this lock, and releases the lock when disposed.
public UpgradeableReaderKey UpgradeableReaderLock(CancellationToken cancellationToken)
{
Task ret;
lock (SyncObject)
{
// If the lock is available, take it immediately.
if (_locksHeld == 0 || (_locksHeld > 0 && _upgradeableReaderKey == null))
{
++_locksHeld;
_upgradeableReaderKey = new UpgradeableReaderKey(this);
return _upgradeableReaderKey;
}
// Wait for the lock to become available or cancellation.
ret = _upgradeableReaderQueue.Enqueue(cancellationToken);
}
ReleaseWaitersWhenCanceled(ret);
return ret.WaitAndUnwrapException();
}
///
/// Asynchronously acquires the lock as a reader with the option to upgrade. Returns a key that can be used to upgrade and downgrade the lock, and releases the lock when disposed.
///
/// A key that can be used to upgrade and downgrade this lock, and releases the lock when disposed.
public Task UpgradeableReaderLockAsync()
{
return UpgradeableReaderLockAsync(CancellationToken.None);
}
///
/// Synchronously acquires the lock as a reader with the option to upgrade. Returns a key that can be used to upgrade and downgrade the lock, and releases the lock when disposed. This method may block the calling thread.
///
/// A key that can be used to upgrade and downgrade this lock, and releases the lock when disposed.
public UpgradeableReaderKey UpgradeableReaderLock()
{
return UpgradeableReaderLock(CancellationToken.None);
}
///
/// Asynchronously upgrades a reader lock to a writer lock. This method assumes the sync lock is already held.
///
internal Task UpgradeAsync(CancellationToken cancellationToken)
{
Task ret;
// If the lock is available, take it immediately.
if (_locksHeld == 1)
{
_locksHeld = -1;
ret = TaskShim.FromResult(new UpgradeableReaderKey.UpgradeKey(_upgradeableReaderKey));
}
else
{
// Wait for the lock to become available or cancellation.
ret = _upgradeReaderQueue.Enqueue(cancellationToken);
}
return ret;
}
///
/// Downgrades a writer lock to a reader lock. This method assumes the sync lock is already held.
///
internal List Downgrade()
{
_locksHeld = 1;
return ReleaseWaiters();
}
///
/// Grants lock(s) to waiting tasks. This method assumes the sync lock is already held.
///
private List ReleaseWaiters()
{
var ret = new List();
if (_locksHeld == 0)
{
// Give priority to writers.
if (!_writerQueue.IsEmpty)
{
ret.Add(_writerQueue.Dequeue(new WriterKey(this)));
_locksHeld = -1;
return ret;
}
// Then to upgradeable readers.
if (!_upgradeableReaderQueue.IsEmpty)
{
_upgradeableReaderKey = new UpgradeableReaderKey(this);
ret.Add(_upgradeableReaderQueue.Dequeue(_upgradeableReaderKey));
++_locksHeld;
}
// Finally to readers.
while (!_readerQueue.IsEmpty)
{
ret.Add(_readerQueue.Dequeue(new ReaderKey(this)));
++_locksHeld;
}
return ret;
}
// Give priority to upgrading readers.
if (_locksHeld == 1)
{
if (!_upgradeReaderQueue.IsEmpty)
{
ret.Add(_upgradeReaderQueue.Dequeue(new UpgradeableReaderKey.UpgradeKey(_upgradeableReaderKey)));
_locksHeld = -1;
}
}
if (_locksHeld > 0)
{
// If there are current reader locks and waiting writers, then do nothing.
if (!_writerQueue.IsEmpty || !_upgradeableReaderQueue.IsEmpty || !_upgradeReaderQueue.IsEmpty)
return ret;
// If there are current reader locks but no upgradeable reader lock, try to release an upgradeable reader.
if (_upgradeableReaderKey == null && !_upgradeableReaderQueue.IsEmpty)
{
_upgradeableReaderKey = new UpgradeableReaderKey(this);
ret.Add(_upgradeableReaderQueue.Dequeue(_upgradeableReaderKey));
}
}
return ret;
}
///
/// Releases the lock as a reader.
///
internal void ReleaseReaderLock()
{
List finishes;
lock (SyncObject)
{
--_locksHeld;
finishes = ReleaseWaiters();
}
foreach (var finish in finishes)
finish.Dispose();
}
///
/// Releases the lock as a writer.
///
internal void ReleaseWriterLock()
{
List finishes;
lock (SyncObject)
{
_locksHeld = 0;
finishes = ReleaseWaiters();
}
foreach (var finish in finishes)
finish.Dispose();
}
///
/// Releases the lock as an upgradeable reader.
///
internal void ReleaseUpgradeableReaderLock(Task upgrade)
{
IDisposable cancelFinish = null;
List finishes;
lock (SyncObject)
{
if (upgrade != null)
cancelFinish = _upgradeReaderQueue.TryCancel(upgrade);
_upgradeableReaderKey = null;
--_locksHeld;
finishes = ReleaseWaiters();
}
if (cancelFinish != null)
cancelFinish.Dispose();
foreach (var finish in finishes)
finish.Dispose();
}
///
/// The disposable which releases the reader lock.
///
private sealed class ReaderKey : IDisposable
{
///
/// The lock to release.
///
private AsyncReaderWriterLock _asyncReaderWriterLock;
///
/// Creates the key for a lock.
///
/// The lock to release. May not be null.
public ReaderKey(AsyncReaderWriterLock asyncReaderWriterLock)
{
_asyncReaderWriterLock = asyncReaderWriterLock;
}
///
/// Release the lock.
///
public void Dispose()
{
if (_asyncReaderWriterLock == null)
return;
_asyncReaderWriterLock.ReleaseReaderLock();
_asyncReaderWriterLock = null;
}
}
///
/// The disposable which releases the writer lock.
///
private sealed class WriterKey : IDisposable
{
///
/// The lock to release.
///
private AsyncReaderWriterLock _asyncReaderWriterLock;
///
/// Creates the key for a lock.
///
/// The lock to release. May not be null.
public WriterKey(AsyncReaderWriterLock asyncReaderWriterLock)
{
_asyncReaderWriterLock = asyncReaderWriterLock;
}
///
/// Release the lock.
///
public void Dispose()
{
if (_asyncReaderWriterLock == null)
return;
_asyncReaderWriterLock.ReleaseWriterLock();
_asyncReaderWriterLock = null;
}
}
///
/// The disposable which manages the upgradeable reader lock.
///
[DebuggerDisplay("State = {GetStateForDebugger}, ReaderWriterLockId = {_asyncReaderWriterLock.Id}")]
public sealed class UpgradeableReaderKey : IDisposable
{
///
/// The lock to release.
///
private readonly AsyncReaderWriterLock _asyncReaderWriterLock;
///
/// The task doing the upgrade.
///
private Task _upgrade;
///
/// Whether or not this instance has been disposed.
///
private bool _disposed;
[DebuggerNonUserCode]
internal State GetStateForDebugger
{
get
{
if (_upgrade == null)
return State.Reader;
if (_upgrade.Status == TaskStatus.RanToCompletion)
return State.Writer;
return State.UpgradingToWriter;
}
}
internal enum State
{
Reader,
UpgradingToWriter,
Writer,
}
///
/// Creates the key for a lock.
///
/// The lock to release. May not be null.
internal UpgradeableReaderKey(AsyncReaderWriterLock asyncReaderWriterLock)
{
_asyncReaderWriterLock = asyncReaderWriterLock;
}
///
/// Gets a value indicating whether this lock has been upgraded to a write lock.
///
public bool Upgraded
{
get
{
Task task;
lock (_asyncReaderWriterLock.SyncObject) { task = _upgrade; }
return (task != null && task.Status == TaskStatus.RanToCompletion);
}
}
///
/// Upgrades the reader lock to a writer lock. Returns a disposable that downgrades the writer lock to a reader lock when disposed.
///
/// The cancellation token used to cancel the upgrade. If this is already set, then this method will attempt to upgrade immediately (succeeding if the lock is currently available).
public Task UpgradeAsync(CancellationToken cancellationToken)
{
lock (_asyncReaderWriterLock.SyncObject)
{
if (_upgrade != null)
throw new InvalidOperationException("Cannot upgrade.");
_upgrade = _asyncReaderWriterLock.UpgradeAsync(cancellationToken);
}
_asyncReaderWriterLock.ReleaseWaitersWhenCanceled(_upgrade);
var ret = new TaskCompletionSource();
_upgrade.ContinueWith(t =>
{
if (t.IsCanceled)
lock (_asyncReaderWriterLock.SyncObject) { _upgrade = null; }
ret.TryCompleteFromCompletedTask(t);
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
return ret.Task;
}
///
/// Synchronously upgrades the reader lock to a writer lock. Returns a disposable that downgrades the writer lock to a reader lock when disposed. This method may block the calling thread.
///
/// The cancellation token used to cancel the upgrade. If this is already set, then this method will attempt to upgrade immediately (succeeding if the lock is currently available).
public IDisposable Upgrade(CancellationToken cancellationToken)
{
lock (_asyncReaderWriterLock.SyncObject)
{
if (_upgrade != null)
throw new InvalidOperationException("Cannot upgrade.");
_upgrade = _asyncReaderWriterLock.UpgradeAsync(cancellationToken);
}
_asyncReaderWriterLock.ReleaseWaitersWhenCanceled(_upgrade);
var ret = new TaskCompletionSource();
_upgrade.ContinueWith(t =>
{
if (t.IsCanceled)
lock (_asyncReaderWriterLock.SyncObject) { _upgrade = null; }
ret.TryCompleteFromCompletedTask(t);
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
return ret.Task.WaitAndUnwrapException();
}
///
/// Upgrades the reader lock to a writer lock. Returns a disposable that downgrades the writer lock to a reader lock when disposed.
///
public Task UpgradeAsync()
{
return UpgradeAsync(CancellationToken.None);
}
///
/// Synchronously upgrades the reader lock to a writer lock. Returns a disposable that downgrades the writer lock to a reader lock when disposed. This method may block the calling thread.
///
public IDisposable Upgrade()
{
return Upgrade(CancellationToken.None);
}
///
/// Downgrades the writer lock to a reader lock.
///
private void Downgrade()
{
List finishes;
lock (_asyncReaderWriterLock.SyncObject)
{
finishes = _asyncReaderWriterLock.Downgrade();
_upgrade = null;
}
foreach (var finish in finishes)
finish.Dispose();
}
///
/// Release the lock.
///
public void Dispose()
{
if (_disposed)
return;
_asyncReaderWriterLock.ReleaseUpgradeableReaderLock(_upgrade);
_disposed = true;
}
///
/// The disposable which downgrades an upgradeable reader key.
///
internal sealed class UpgradeKey : IDisposable
{
///
/// The upgradeable reader key to downgrade.
///
private UpgradeableReaderKey _key;
///
/// Creates the upgrade key for an upgradeable reader key.
///
/// The upgradeable reader key to downgrade. May not be null.
public UpgradeKey(UpgradeableReaderKey key)
{
_key = key;
}
///
/// Downgrade the upgradeable reader key.
///
public void Dispose()
{
if (_key == null)
return;
_key.Downgrade();
_key = null;
}
}
}
// ReSharper disable UnusedMember.Local
[DebuggerNonUserCode]
private sealed class DebugView
{
private readonly AsyncReaderWriterLock _rwl;
public DebugView(AsyncReaderWriterLock rwl)
{
_rwl = rwl;
}
public int Id { get { return _rwl.Id; } }
public State State { get { return _rwl.GetStateForDebugger; } }
public int ReaderCount { get { return _rwl.GetReaderCountForDebugger; } }
public bool UpgradeInProgress { get { return _rwl.GetUpgradeInProgressForDebugger; } }
public IAsyncWaitQueue ReaderWaitQueue { get { return _rwl._readerQueue; } }
public IAsyncWaitQueue WriterWaitQueue { get { return _rwl._writerQueue; } }
public IAsyncWaitQueue UpgradeableReaderWaitQueue { get { return _rwl._upgradeableReaderQueue; } }
public IAsyncWaitQueue UpgradeReaderWaitQueue { get { return _rwl._upgradeReaderQueue; } }
}
// ReSharper restore UnusedMember.Local
}
}