// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. #pragma warning disable 0162 using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace FASTER.core { /// /// FASTER log /// public class FasterLog : IDisposable { private readonly BlittableAllocator allocator; private readonly LightEpoch epoch; private readonly ILogCommitManager logCommitManager; private readonly GetMemory getMemory; private readonly int headerSize; private readonly LogChecksumType logChecksum; private readonly Dictionary RecoveredIterators; private TaskCompletionSource commitTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); /// /// Beginning address of log /// public long BeginAddress => allocator.BeginAddress; /// /// Tail address of log /// public long TailAddress => allocator.GetTailAddress(); /// /// Log flushed until address /// public long FlushedUntilAddress => allocator.FlushedUntilAddress; /// /// Log committed until address /// public long CommittedUntilAddress; /// /// Log committed begin address /// public long CommittedBeginAddress; /// /// Task notifying commit completions /// internal Task CommitTask => commitTcs.Task; /// /// Create new log instance /// /// public FasterLog(FasterLogSettings logSettings) { logCommitManager = logSettings.LogCommitManager ?? new LocalLogCommitManager(logSettings.LogCommitFile ?? logSettings.LogDevice.FileName + ".commit"); // Reserve 8 byte checksum in header if requested logChecksum = logSettings.LogChecksum; headerSize = logChecksum == LogChecksumType.PerEntry ? 12 : 4; getMemory = logSettings.GetMemory; epoch = new LightEpoch(); CommittedUntilAddress = Constants.kFirstValidAddress; CommittedBeginAddress = Constants.kFirstValidAddress; allocator = new BlittableAllocator( logSettings.GetLogSettings(), null, null, epoch, CommitCallback); allocator.Initialize(); Restore(out RecoveredIterators); } /// /// Dispose /// public void Dispose() { allocator.Dispose(); epoch.Dispose(); commitTcs.TrySetException(new ObjectDisposedException("Log has been disposed")); } #region Enqueue /// /// Enqueue entry to log (in memory) - no guarantee of flush/commit /// /// Entry to be enqueued to log /// Logical address of added entry public long Enqueue(byte[] entry) { long logicalAddress; while (!TryEnqueue(entry, out logicalAddress)) ; return logicalAddress; } /// /// Enqueue entry to log (in memory) - no guarantee of flush/commit /// /// Entry to be enqueued to log /// Logical address of added entry public long Enqueue(ReadOnlySpan entry) { long logicalAddress; while (!TryEnqueue(entry, out logicalAddress)) ; return logicalAddress; } /// /// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit /// /// Batch of entries to be enqueued to log /// Logical address of added entry public long Enqueue(IReadOnlySpanBatch readOnlySpanBatch) { long logicalAddress; while (!TryEnqueue(readOnlySpanBatch, out logicalAddress)) ; return logicalAddress; } #endregion #region TryEnqueue /// /// Try to enqueue entry to log (in memory). If it returns true, we are /// done. If it returns false, we need to retry. /// /// Entry to be enqueued to log /// Logical address of added entry /// Whether the append succeeded public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress) { logicalAddress = 0; epoch.Resume(); var length = entry.Length; logicalAddress = allocator.TryAllocate(headerSize + Align(length)); if (logicalAddress == 0) { epoch.Suspend(); return false; } var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); fixed (byte* bp = entry) Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), length, length); SetHeader(length, (byte*)physicalAddress); epoch.Suspend(); return true; } /// /// Try to append entry to log. If it returns true, we are /// done. If it returns false, we need to retry. /// /// Entry to be appended to log /// Logical address of added entry /// Whether the append succeeded public unsafe bool TryEnqueue(ReadOnlySpan entry, out long logicalAddress) { logicalAddress = 0; epoch.Resume(); var length = entry.Length; logicalAddress = allocator.TryAllocate(headerSize + Align(length)); if (logicalAddress == 0) { epoch.Suspend(); return false; } var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); fixed (byte* bp = &entry.GetPinnableReference()) Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), length, length); SetHeader(length, (byte*)physicalAddress); epoch.Suspend(); return true; } /// /// Try to enqueue batch of entries as a single atomic unit (to memory). Entire /// batch needs to fit on one log page. /// /// Batch to be appended to log /// Logical address of first added entry /// Whether the append succeeded public bool TryEnqueue(IReadOnlySpanBatch readOnlySpanBatch, out long logicalAddress) { return TryAppend(readOnlySpanBatch, out logicalAddress, out _); } #endregion #region EnqueueAsync /// /// Enqueue entry to log in memory (async) - completes after entry is /// appended to memory, NOT committed to storage. /// /// /// public async ValueTask EnqueueAsync(byte[] entry) { long logicalAddress; while (true) { var task = CommitTask; if (TryEnqueue(entry, out logicalAddress)) break; if (NeedToWait(CommittedUntilAddress, TailAddress)) { // Wait for *some* commit - failure can be ignored try { await task; } catch { } } } return logicalAddress; } /// /// Enqueue entry to log in memory (async) - completes after entry is /// appended to memory, NOT committed to storage. /// /// /// public async ValueTask EnqueueAsync(ReadOnlyMemory entry) { long logicalAddress; while (true) { var task = CommitTask; if (TryEnqueue(entry.Span, out logicalAddress)) break; if (NeedToWait(CommittedUntilAddress, TailAddress)) { // Wait for *some* commit - failure can be ignored try { await task; } catch { } } } return logicalAddress; } /// /// Enqueue batch of entries to log in memory (async) - completes after entry is /// appended to memory, NOT committed to storage. /// /// /// public async ValueTask EnqueueAsync(IReadOnlySpanBatch readOnlySpanBatch) { long logicalAddress; while (true) { var task = CommitTask; if (TryEnqueue(readOnlySpanBatch, out logicalAddress)) break; if (NeedToWait(CommittedUntilAddress, TailAddress)) { // Wait for *some* commit - failure can be ignored try { await task; } catch { } } } return logicalAddress; } #endregion #region WaitForCommit and WaitForCommitAsync /// /// Spin-wait for enqueues, until tail or specified address, to commit to /// storage. Does NOT itself issue a commit, just waits for commit. So you should /// ensure that someone else causes the commit to happen. /// /// Address until which we should wait for commit, default 0 for tail of log /// public void WaitForCommit(long untilAddress = 0) { var tailAddress = untilAddress; if (tailAddress == 0) tailAddress = allocator.GetTailAddress(); while (CommittedUntilAddress < tailAddress) ; } /// /// Wait for appends (in memory), until tail or specified address, to commit to /// storage. Does NOT itself issue a commit, just waits for commit. So you should /// ensure that someone else causes the commit to happen. /// /// Address until which we should wait for commit, default 0 for tail of log /// public async ValueTask WaitForCommitAsync(long untilAddress = 0) { var task = CommitTask; var tailAddress = untilAddress; if (tailAddress == 0) tailAddress = allocator.GetTailAddress(); while (true) { var linkedCommitInfo = await task; if (linkedCommitInfo.CommitInfo.UntilAddress < tailAddress) task = linkedCommitInfo.NextTask; else break; } } #endregion #region Commit /// /// Issue commit request for log (until tail) /// /// If true, spin-wait until commit completes. Otherwise, issue commit and return immediately. /// public void Commit(bool spinWait = false) { CommitInternal(spinWait); } /// /// Async commit log (until tail), completes only when we /// complete the commit. Throws exception if this or any /// ongoing commit fails. /// /// public async ValueTask CommitAsync() { var task = CommitTask; var tailAddress = CommitInternal(); while (true) { var linkedCommitInfo = await task; if (linkedCommitInfo.CommitInfo.UntilAddress < tailAddress) task = linkedCommitInfo.NextTask; else break; } } /// /// Async commit log (until tail), completes only when we /// complete the commit. Throws exception if any commit /// from prevCommitTask to current fails. /// /// public async ValueTask> CommitAsync(Task prevCommitTask) { if (prevCommitTask == null) prevCommitTask = commitTcs.Task; var tailAddress = CommitInternal(); while (true) { var linkedCommitInfo = await prevCommitTask; if (linkedCommitInfo.CommitInfo.UntilAddress < tailAddress) prevCommitTask = linkedCommitInfo.NextTask; else return linkedCommitInfo.NextTask; } } #endregion #region EnqueueAndWaitForCommit /// /// Append entry to log - spin-waits until entry is committed to storage. /// Does NOT itself issue flush! /// /// /// public long EnqueueAndWaitForCommit(byte[] entry) { long logicalAddress; while (!TryEnqueue(entry, out logicalAddress)) ; while (CommittedUntilAddress < logicalAddress + 1) ; return logicalAddress; } /// /// Append entry to log - spin-waits until entry is committed to storage. /// Does NOT itself issue flush! /// /// /// public long EnqueueAndWaitForCommit(ReadOnlySpan entry) { long logicalAddress; while (!TryEnqueue(entry, out logicalAddress)) ; while (CommittedUntilAddress < logicalAddress + 1) ; return logicalAddress; } /// /// Append batch of entries to log - spin-waits until entry is committed to storage. /// Does NOT itself issue flush! /// /// /// public long EnqueueAndWaitForCommit(IReadOnlySpanBatch readOnlySpanBatch) { long logicalAddress; while (!TryEnqueue(readOnlySpanBatch, out logicalAddress)) ; while (CommittedUntilAddress < logicalAddress + 1) ; return logicalAddress; } #endregion #region EnqueueAndWaitForCommitAsync /// /// Append entry to log (async) - completes after entry is committed to storage. /// Does NOT itself issue flush! /// /// /// public async ValueTask EnqueueAndWaitForCommitAsync(byte[] entry) { long logicalAddress; Task task; // Phase 1: wait for commit to memory while (true) { task = CommitTask; if (TryEnqueue(entry, out logicalAddress)) break; if (NeedToWait(CommittedUntilAddress, TailAddress)) { // Wait for *some* commit - failure can be ignored try { await task; } catch { } } } // Phase 2: wait for commit/flush to storage while (true) { LinkedCommitInfo linkedCommitInfo; try { linkedCommitInfo = await task; } catch (CommitFailureException e) { linkedCommitInfo = e.LinkedCommitInfo; if (logicalAddress >= linkedCommitInfo.CommitInfo.FromAddress && logicalAddress < linkedCommitInfo.CommitInfo.UntilAddress) throw e; } if (linkedCommitInfo.CommitInfo.UntilAddress < logicalAddress + 1) task = linkedCommitInfo.NextTask; else break; } return logicalAddress; } /// /// Append entry to log (async) - completes after entry is committed to storage. /// Does NOT itself issue flush! /// /// /// public async ValueTask EnqueueAndWaitForCommitAsync(ReadOnlyMemory entry) { long logicalAddress; Task task; // Phase 1: wait for commit to memory while (true) { task = CommitTask; if (TryEnqueue(entry.Span, out logicalAddress)) break; if (NeedToWait(CommittedUntilAddress, TailAddress)) { // Wait for *some* commit - failure can be ignored try { await task; } catch { } } } // Phase 2: wait for commit/flush to storage while (true) { LinkedCommitInfo linkedCommitInfo; try { linkedCommitInfo = await task; } catch (CommitFailureException e) { linkedCommitInfo = e.LinkedCommitInfo; if (logicalAddress >= linkedCommitInfo.CommitInfo.FromAddress && logicalAddress < linkedCommitInfo.CommitInfo.UntilAddress) throw e; } if (linkedCommitInfo.CommitInfo.UntilAddress < logicalAddress + 1) task = linkedCommitInfo.NextTask; else break; } return logicalAddress; } /// /// Append batch of entries to log (async) - completes after batch is committed to storage. /// Does NOT itself issue flush! /// /// /// public async ValueTask EnqueueAndWaitForCommitAsync(IReadOnlySpanBatch readOnlySpanBatch) { long logicalAddress; Task task; // Phase 1: wait for commit to memory while (true) { task = CommitTask; if (TryEnqueue(readOnlySpanBatch, out logicalAddress)) break; if (NeedToWait(CommittedUntilAddress, TailAddress)) { // Wait for *some* commit - failure can be ignored try { await task; } catch { } } } // Phase 2: wait for commit/flush to storage while (true) { LinkedCommitInfo linkedCommitInfo; try { linkedCommitInfo = await task; } catch (CommitFailureException e) { linkedCommitInfo = e.LinkedCommitInfo; if (logicalAddress >= linkedCommitInfo.CommitInfo.FromAddress && logicalAddress < linkedCommitInfo.CommitInfo.UntilAddress) throw e; } if (linkedCommitInfo.CommitInfo.UntilAddress < logicalAddress + 1) task = linkedCommitInfo.NextTask; else break; } return logicalAddress; } #endregion /// /// Truncate the log until, but not including, untilAddress /// /// public void TruncateUntil(long untilAddress) { allocator.ShiftBeginAddress(untilAddress); } /// /// Pull-based iterator interface for scanning FASTER log /// /// Begin address for scan. /// End address for scan (or long.MaxValue for tailing). /// Name of iterator, if we need to persist/recover it (default null - do not persist). /// Whether to recover named iterator from latest commit (if exists). If false, iterator starts from beginAddress. /// Use single or double buffering /// public FasterLogScanIterator Scan(long beginAddress, long endAddress, string name = null, bool recover = true, ScanBufferingMode scanBufferingMode = ScanBufferingMode.DoublePageBuffering) { FasterLogScanIterator iter; if (recover && name != null && RecoveredIterators != null && RecoveredIterators.ContainsKey(name)) iter = new FasterLogScanIterator(this, allocator, RecoveredIterators[name], endAddress, getMemory, scanBufferingMode, epoch, headerSize, name); else iter = new FasterLogScanIterator(this, allocator, beginAddress, endAddress, getMemory, scanBufferingMode, epoch, headerSize, name); if (name != null) { if (name.Length > 20) throw new Exception("Max length of iterator name is 20 characters"); if (FasterLogScanIterator.PersistedIterators.ContainsKey(name)) Debug.WriteLine("Iterator name exists, overwriting"); FasterLogScanIterator.PersistedIterators[name] = iter; } return iter; } /// /// Random read record from log, at given address /// /// Logical address to read from /// Estimated length of entry, if known /// public async ValueTask<(byte[], int)> ReadAsync(long address, int estimatedLength = 0) { epoch.Resume(); if (address >= CommittedUntilAddress || address < BeginAddress) { epoch.Suspend(); return default; } var ctx = new SimpleReadContext { logicalAddress = address, completedRead = new SemaphoreSlim(0) }; unsafe { allocator.AsyncReadRecordToMemory(address, headerSize + estimatedLength, AsyncGetFromDiskCallback, ref ctx); } epoch.Suspend(); await ctx.completedRead.WaitAsync(); return GetRecordAndFree(ctx.record); } [MethodImpl(MethodImplOptions.AggressiveInlining)] private int Align(int length) { return (length + 3) & ~3; } /// /// Commit log /// private void CommitCallback(CommitInfo commitInfo) { TaskCompletionSource _commitTcs = default; // We can only allow serial monotonic synchronous commit lock (this) { if (CommittedBeginAddress > commitInfo.BeginAddress) commitInfo.BeginAddress = CommittedBeginAddress; if (CommittedUntilAddress > commitInfo.FromAddress) commitInfo.FromAddress = CommittedUntilAddress; if (CommittedUntilAddress > commitInfo.UntilAddress) commitInfo.UntilAddress = CommittedUntilAddress; FasterLogRecoveryInfo info = new FasterLogRecoveryInfo { BeginAddress = commitInfo.BeginAddress, FlushedUntilAddress = commitInfo.UntilAddress }; info.PopulateIterators(); logCommitManager.Commit(info.BeginAddress, info.FlushedUntilAddress, info.ToByteArray()); CommittedBeginAddress = info.BeginAddress; CommittedUntilAddress = info.FlushedUntilAddress; _commitTcs = commitTcs; // If task is not faulted, create new task // If task is faulted due to commit exception, create new task if (commitTcs.Task.Status != TaskStatus.Faulted || commitTcs.Task.Exception.InnerException as CommitFailureException != null) { commitTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } } var lci = new LinkedCommitInfo { CommitInfo = commitInfo, NextTask = commitTcs.Task }; if (commitInfo.ErrorCode == 0) _commitTcs?.TrySetResult(lci); else _commitTcs.TrySetException(new CommitFailureException(lci, $"Commit of address range [{commitInfo.FromAddress}-{commitInfo.UntilAddress}] failed with error code {commitInfo.ErrorCode}")); } /// /// Restore log /// private void Restore(out Dictionary recoveredIterators) { recoveredIterators = null; FasterLogRecoveryInfo info = new FasterLogRecoveryInfo(); var commitInfo = logCommitManager.GetCommitMetadata(); if (commitInfo == null) return; using (var r = new BinaryReader(new MemoryStream(commitInfo))) { info.Initialize(r); } var headAddress = info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress); if (headAddress == 0) headAddress = Constants.kFirstValidAddress; recoveredIterators = info.Iterators; allocator.RestoreHybridLog(info.FlushedUntilAddress, headAddress, info.BeginAddress); CommittedUntilAddress = info.FlushedUntilAddress; CommittedBeginAddress = info.BeginAddress; } /// /// Try to append batch of entries as a single atomic unit. Entire batch /// needs to fit on one page. /// /// Batch to be appended to log /// Logical address of first added entry /// Actual allocated length /// Whether the append succeeded private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long logicalAddress, out int allocatedLength) { logicalAddress = 0; int totalEntries = readOnlySpanBatch.TotalEntries(); allocatedLength = 0; for (int i = 0; i < totalEntries; i++) { allocatedLength += Align(readOnlySpanBatch.Get(i).Length) + headerSize; } epoch.Resume(); logicalAddress = allocator.TryAllocate(allocatedLength); if (logicalAddress == 0) { epoch.Suspend(); return false; } var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); for (int i = 0; i < totalEntries; i++) { var span = readOnlySpanBatch.Get(i); var entryLength = span.Length; fixed (byte* bp = &span.GetPinnableReference()) Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), entryLength, entryLength); SetHeader(entryLength, (byte*)physicalAddress); physicalAddress += Align(entryLength) + headerSize; } epoch.Suspend(); return true; } private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, NativeOverlapped* overlap) { var ctx = (SimpleReadContext)Overlapped.Unpack(overlap).AsyncResult; if (errorCode != 0) { Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); ctx.record.Return(); ctx.record = null; ctx.completedRead.Release(); } else { var record = ctx.record.GetValidPointer(); var length = GetLength(record); if (length < 0 || length > allocator.PageSize) { Debug.WriteLine("Invalid record length found: " + length); ctx.record.Return(); ctx.record = null; ctx.completedRead.Release(); } else { int requiredBytes = headerSize + length; if (ctx.record.available_bytes >= requiredBytes) { ctx.completedRead.Release(); } else { ctx.record.Return(); allocator.AsyncReadRecordToMemory(ctx.logicalAddress, requiredBytes, AsyncGetFromDiskCallback, ref ctx); } } } Overlapped.Free(overlap); } private (byte[], int) GetRecordAndFree(SectorAlignedMemory record) { if (record == null) return (null, 0); byte[] result; int length; unsafe { var ptr = record.GetValidPointer(); length = GetLength(ptr); if (!VerifyChecksum(ptr, length)) { throw new Exception("Checksum failed for read"); } result = getMemory != null ? getMemory(length) : new byte[length]; fixed (byte* bp = result) { Buffer.MemoryCopy(ptr + headerSize, bp, length, length); } } record.Return(); return (result, length); } private long CommitInternal(bool spinWait = false) { epoch.Resume(); if (allocator.ShiftReadOnlyToTail(out long tailAddress)) { if (spinWait) { while (CommittedUntilAddress < tailAddress) { epoch.ProtectAndDrain(); Thread.Yield(); } } epoch.Suspend(); } else { // May need to commit begin address and/or iterators epoch.Suspend(); var beginAddress = allocator.BeginAddress; if (beginAddress > CommittedBeginAddress || FasterLogScanIterator.PersistedIterators.Count > 0) CommitCallback(new CommitInfo { BeginAddress = beginAddress, FromAddress = CommittedUntilAddress, UntilAddress = CommittedUntilAddress, ErrorCode = 0 }); } return tailAddress; } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal unsafe int GetLength(byte* ptr) { if (logChecksum == LogChecksumType.None) return *(int*)ptr; else if (logChecksum == LogChecksumType.PerEntry) return *(int*)(ptr + 8); return 0; } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal unsafe bool VerifyChecksum(byte* ptr, int length) { if (logChecksum == LogChecksumType.PerEntry) { var cs = Utility.XorBytes(ptr + 8, length + 4); if (cs != *(ulong*)ptr) { return false; } } return true; } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal unsafe ulong GetChecksum(byte* ptr) { if (logChecksum == LogChecksumType.PerEntry) { return *(ulong*)ptr; } return 0; } [MethodImpl(MethodImplOptions.AggressiveInlining)] private unsafe void SetHeader(int length, byte* dest) { if (logChecksum == LogChecksumType.None) { *(int*)dest = length; return; } else if (logChecksum == LogChecksumType.PerEntry) { *(int*)(dest + 8) = length; *(ulong*)dest = Utility.XorBytes(dest + 8, length + 4); } } /// /// Do we need to await a commit to make forward progress? /// /// /// /// private bool NeedToWait(long committedUntilAddress, long tailAddress) { Thread.Yield(); return allocator.GetPage(committedUntilAddress) <= (allocator.GetPage(tailAddress) - allocator.BufferSize); } } }