// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. using System; using System.Runtime.CompilerServices; using System.Threading; using System.Runtime.InteropServices; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq.Expressions; using System.IO; using System.Diagnostics; #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member namespace FASTER.core { public unsafe sealed class VariableLengthBlittableAllocator : AllocatorBase where Key : new() where Value : new() { public const int kRecordAlignment = 8; // RecordInfo has a long field, so it should be aligned to 8-bytes // Circular buffer definition private byte[][] values; private GCHandle[] handles; private long[] pointers; private readonly GCHandle ptrHandle; private readonly long* nativePointers; private readonly bool fixedSizeKey; private readonly bool fixedSizeValue; internal readonly IVariableLengthStruct KeyLength; internal readonly IVariableLengthStruct ValueLength; public VariableLengthBlittableAllocator(LogSettings settings, VariableLengthStructSettings vlSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null, Action flushCallback = null) : base(settings, comparer, evictCallback, epoch, flushCallback) { values = new byte[BufferSize][]; handles = new GCHandle[BufferSize]; pointers = new long[BufferSize]; ptrHandle = GCHandle.Alloc(pointers, GCHandleType.Pinned); nativePointers = (long*)ptrHandle.AddrOfPinnedObject(); KeyLength = vlSettings.keyLength; ValueLength = vlSettings.valueLength; if (KeyLength == null) { fixedSizeKey = true; KeyLength = new FixedLengthStruct(); } if (ValueLength == null) { fixedSizeValue = true; ValueLength = new FixedLengthStruct(); } } public override void Initialize() { Initialize(Constants.kFirstValidAddress); } public override ref RecordInfo GetInfo(long physicalAddress) { return ref Unsafe.AsRef((void*)physicalAddress); } public override ref RecordInfo GetInfoFromBytePointer(byte* ptr) { return ref Unsafe.AsRef(ptr); } public override ref Key GetKey(long physicalAddress) { return ref Unsafe.AsRef((byte*)physicalAddress + RecordInfo.GetLength()); } public override ref Value GetValue(long physicalAddress) { return ref Unsafe.AsRef((byte*)physicalAddress + RecordInfo.GetLength() + KeySize(physicalAddress)); } private int KeySize(long physicalAddress) { return KeyLength.GetLength(ref GetKey(physicalAddress)); } private int ValueSize(long physicalAddress) { return ValueLength.GetLength(ref GetValue(physicalAddress)); } public override int GetRecordSize(long physicalAddress) { ref var recordInfo = ref GetInfo(physicalAddress); if (recordInfo.IsNull()) return RecordInfo.GetLength(); var size = RecordInfo.GetLength() + KeySize(physicalAddress) + ValueSize(physicalAddress); size = (size + kRecordAlignment - 1) & (~(kRecordAlignment - 1)); return size; } public override int GetRequiredRecordSize(long physicalAddress, int availableBytes) { // We need at least [record size] + [average key size] + [average value size] var reqBytes = GetAverageRecordSize(); if (availableBytes < reqBytes) { return reqBytes; } // We need at least [record size] + [actual key size] + [average value size] reqBytes = RecordInfo.GetLength() + KeySize(physicalAddress) + ValueLength.GetAverageLength(); if (availableBytes < reqBytes) { return reqBytes; } // We need at least [record size] + [actual key size] + [actual value size] reqBytes = RecordInfo.GetLength() + KeySize(physicalAddress) + ValueSize(physicalAddress); reqBytes = (reqBytes + kRecordAlignment - 1) & (~(kRecordAlignment - 1)); return reqBytes; } public override int GetAverageRecordSize() { return RecordInfo.GetLength() + kRecordAlignment + KeyLength.GetAverageLength() + ValueLength.GetAverageLength(); } public override int GetInitialRecordSize(ref Key key, ref TInput input) { var actualSize = RecordInfo.GetLength() + KeyLength.GetLength(ref key) + ValueLength.GetInitialLength(ref input); return (actualSize + kRecordAlignment - 1) & (~(kRecordAlignment - 1)); } public override int GetRecordSize(ref Key key, ref Value value) { var actualSize = RecordInfo.GetLength() + KeyLength.GetLength(ref key) + ValueLength.GetLength(ref value); return (actualSize + kRecordAlignment - 1) & (~(kRecordAlignment - 1)); } public override void ShallowCopy(ref Key src, ref Key dst) { Buffer.MemoryCopy( Unsafe.AsPointer(ref src), Unsafe.AsPointer(ref dst), KeyLength.GetLength(ref src), KeyLength.GetLength(ref src)); } public override void ShallowCopy(ref Value src, ref Value dst) { Buffer.MemoryCopy( Unsafe.AsPointer(ref src), Unsafe.AsPointer(ref dst), ValueLength.GetLength(ref src), ValueLength.GetLength(ref src)); } /// /// Dispose memory allocator /// public override void Dispose() { if (values != null) { for (int i = 0; i < values.Length; i++) { if (handles[i].IsAllocated) handles[i].Free(); values[i] = null; } } handles = null; pointers = null; values = null; base.Dispose(); } public override AddressInfo* GetKeyAddressInfo(long physicalAddress) { throw new NotSupportedException(); } public override AddressInfo* GetValueAddressInfo(long physicalAddress) { throw new NotSupportedException(); } /// /// Allocate memory page, pinned in memory, and in sector aligned form, if possible /// /// internal override void AllocatePage(int index) { var adjustedSize = PageSize + 2 * sectorSize; byte[] tmp = new byte[adjustedSize]; Array.Clear(tmp, 0, adjustedSize); handles[index] = GCHandle.Alloc(tmp, GCHandleType.Pinned); long p = (long)handles[index].AddrOfPinnedObject(); pointers[index] = (p + (sectorSize - 1)) & ~(sectorSize - 1); values[index] = tmp; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public override long GetPhysicalAddress(long logicalAddress) { // Offset within page int offset = (int)(logicalAddress & ((1L << LogPageSizeBits) - 1)); // Index of page within the circular buffer int pageIndex = (int)((logicalAddress >> LogPageSizeBits) & (BufferSize - 1)); return *(nativePointers + pageIndex) + offset; } protected override bool IsAllocated(int pageIndex) { return values[pageIndex] != null; } protected override void WriteAsync(long flushPage, IOCompletionCallback callback, PageAsyncFlushResult asyncResult) { WriteAsync((IntPtr)pointers[flushPage % BufferSize], (ulong)(AlignedPageSizeBytes * flushPage), (uint)AlignedPageSizeBytes, callback, asyncResult, device); } protected override void WriteAsyncToDevice (long startPage, long flushPage, int pageSize, IOCompletionCallback callback, PageAsyncFlushResult asyncResult, IDevice device, IDevice objectLogDevice) { var alignedPageSize = (pageSize + (sectorSize - 1)) & ~(sectorSize - 1); WriteAsync((IntPtr)pointers[flushPage % BufferSize], (ulong)(AlignedPageSizeBytes * (flushPage - startPage)), (uint)alignedPageSize, callback, asyncResult, device); } /// /// Get start logical address /// /// /// public override long GetStartLogicalAddress(long page) { return page << LogPageSizeBits; } /// /// Get first valid logical address /// /// /// public override long GetFirstValidLogicalAddress(long page) { if (page == 0) return (page << LogPageSizeBits) + Constants.kFirstValidAddress; return page << LogPageSizeBits; } protected override void ClearPage(long page, int offset) { if (offset == 0) Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); else { // Adjust array offset for cache alignment offset += (int)(pointers[page % BufferSize] - (long)handles[page % BufferSize].AddrOfPinnedObject()); Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); } } /// /// Delete in-memory portion of the log /// internal override void DeleteFromMemory() { for (int i = 0; i < values.Length; i++) { if (handles[i].IsAllocated) handles[i].Free(); values[i] = null; } handles = null; pointers = null; values = null; } private void WriteAsync(IntPtr alignedSourceAddress, ulong alignedDestinationAddress, uint numBytesToWrite, IOCompletionCallback callback, PageAsyncFlushResult asyncResult, IDevice device) { if (asyncResult.partial) { // Write only required bytes within the page int aligned_start = (int)((asyncResult.fromAddress - (asyncResult.page << LogPageSizeBits))); aligned_start = (aligned_start / sectorSize) * sectorSize; int aligned_end = (int)((asyncResult.untilAddress - (asyncResult.page << LogPageSizeBits))); aligned_end = ((aligned_end + (sectorSize - 1)) & ~(sectorSize - 1)); numBytesToWrite = (uint)(aligned_end - aligned_start); device.WriteAsync(alignedSourceAddress + aligned_start, alignedDestinationAddress + (ulong)aligned_start, numBytesToWrite, callback, asyncResult); } else { device.WriteAsync(alignedSourceAddress, alignedDestinationAddress, numBytesToWrite, callback, asyncResult); } } protected override void ReadAsync( ulong alignedSourceAddress, int destinationPageIndex, uint aligned_read_length, IOCompletionCallback callback, PageAsyncReadResult asyncResult, IDevice device, IDevice objlogDevice) { device.ReadAsync(alignedSourceAddress, (IntPtr)pointers[destinationPageIndex], aligned_read_length, callback, asyncResult); } /// /// Invoked by users to obtain a record from disk. It uses sector aligned memory to read /// the record efficiently into memory. /// /// /// /// /// /// protected override void AsyncReadRecordObjectsToMemory(long fromLogical, int numBytes, IOCompletionCallback callback, AsyncIOContext context, SectorAlignedMemory result = default(SectorAlignedMemory)) { throw new InvalidOperationException("AsyncReadRecordObjectsToMemory invalid for BlittableAllocator"); } /// /// Retrieve objects from object log /// /// /// /// protected override bool RetrievedFullRecord(byte* record, ref AsyncIOContext ctx) { return true; } public override ref Key GetContextRecordKey(ref AsyncIOContext ctx) { return ref GetKey((long)ctx.record.GetValidPointer()); } public override ref Value GetContextRecordValue(ref AsyncIOContext ctx) { return ref GetValue((long)ctx.record.GetValidPointer()); } public override IHeapContainer GetKeyContainer(ref Key key) { if (fixedSizeKey) return new StandardHeapContainer(ref key); else return new VarLenHeapContainer(ref key, KeyLength, bufferPool); } public override IHeapContainer GetValueContainer(ref Value value) { if (fixedSizeValue) return new StandardHeapContainer(ref value); else return new VarLenHeapContainer(ref value, ValueLength, bufferPool); } /// /// Whether KVS has keys to serialize/deserialize /// /// public override bool KeyHasObjects() { return false; } /// /// Whether KVS has values to serialize/deserialize /// /// public override bool ValueHasObjects() { return false; } public override long[] GetSegmentOffsets() { return null; } internal override void PopulatePage(byte* src, int required_bytes, long destinationPage) { throw new Exception("BlittableAllocator memory pages are sector aligned - use direct copy"); // Buffer.MemoryCopy(src, (void*)pointers[destinationPage % BufferSize], required_bytes, required_bytes); } /// /// Iterator interface for scanning FASTER log /// /// /// /// /// public override IFasterScanIterator Scan(long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode) { return new VariableLengthBlittableScanIterator(this, beginAddress, endAddress, scanBufferingMode); } /// /// Read pages from specified device /// /// /// /// /// /// /// /// /// /// /// /// internal void AsyncReadPagesFromDeviceToFrame( long readPageStart, int numPages, long untilAddress, IOCompletionCallback callback, TContext context, BlittableFrame frame, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null) { var usedDevice = device; IDevice usedObjlogDevice = objectLogDevice; if (device == null) { usedDevice = this.device; } completed = new CountdownEvent(numPages); for (long readPage = readPageStart; readPage < (readPageStart + numPages); readPage++) { int pageIndex = (int)(readPage % frame.frameSize); if (frame.frame[pageIndex] == null) { frame.Allocate(pageIndex); } else { frame.Clear(pageIndex); } var asyncResult = new PageAsyncReadResult() { page = readPage, context = context, handle = completed, frame = frame }; ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage); uint readLength = (uint)AlignedPageSizeBytes; long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask)); if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize)) { readLength = (uint)(adjustedUntilAddress - (long)offsetInFile); readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1)); } if (device != null) offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset)); usedDevice.ReadAsync(offsetInFile, (IntPtr)frame.pointers[pageIndex], readLength, callback, asyncResult); } } } }