diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index c65fed8..b845d63 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -1,8 +1,9 @@ using System.Collections.Concurrent; using System.Diagnostics; -using System.Text; using ZeroLevel; +using ZeroLevel.Collections; using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.Memory; using ZeroLevel.Services.PartitionStorage; using ZeroLevel.Services.Serialization; @@ -11,20 +12,18 @@ namespace PartitionFileStorageTest internal class Program { // const int PAIRS_COUNT = 200_000_000; - const int PAIRS_COUNT = 2000_000; + const long PAIRS_COUNT = 100_000_000; private class Metadata { public DateTime Date { get; set; } } + const ulong num_base = 79770000000; + private static ulong Generate(Random r) { - var num = new StringBuilder(); - num.Append("79"); - num.Append(r.Next(99).ToString("D2")); - num.Append(r.Next(9999).ToString("D7")); - return ulong.Parse(num.ToString()); + return num_base + (uint)r.Next(999999); } private static void FastTest(StoreOptions options) @@ -210,11 +209,20 @@ namespace PartitionFileStorageTest } } } + store.Dispose(); Log.Info("Completed"); } - private static void FullStoreMultithreadTest(StoreOptions options, - List<(ulong, ulong)> pairs) + private static IEnumerable<(ulong, ulong)> MassGenerator(long count) + { + var r = new Random(Environment.TickCount); + for (long i = 0; i < count; i++) + { + yield return (Generate(r), Generate(r)); + } + } + + private static void FullStoreMultithreadTest(StoreOptions options) { var meta = new Metadata { Date = new DateTime(2022, 11, 08) }; var r = new Random(Environment.TickCount); @@ -222,12 +230,13 @@ namespace PartitionFileStorageTest var storePart = store.CreateBuilder(meta); var sw = new Stopwatch(); sw.Start(); - var insertCount = (int)(0.7 * pairs.Count); + var insertCount = (long)(0.7 * PAIRS_COUNT); var testKeys1 = new ConcurrentBag(); var testKeys2 = new ConcurrentBag(); var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 24 }; + var Keys = new ConcurrentHashSet(); - Parallel.ForEach(pairs.Take(insertCount).ToArray(), parallelOptions, pair => + Parallel.ForEach(MassGenerator((long)(0.7 * PAIRS_COUNT)), parallelOptions, pair => { var key = pair.Item1; var val = pair.Item2; @@ -240,11 +249,12 @@ namespace PartitionFileStorageTest { testKeys2.Add(key); } + Keys.Add(key); }); if (storePart.TotalRecords != insertCount) { - Log.Error($"Count of stored record no equals expected. Recorded: {storePart.TotalRecords}. Expected: {insertCount}"); + Log.Error($"Count of stored record no equals expected. Recorded: {storePart.TotalRecords}. Expected: {insertCount}. Unique keys: {Keys.Count}"); } sw.Stop(); @@ -263,27 +273,29 @@ namespace PartitionFileStorageTest sw.Restart(); var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data)); - Parallel.ForEach(pairs.Skip(insertCount).ToArray(), parallelOptions, pair => + Parallel.ForEach(MassGenerator((long)(0.3 * PAIRS_COUNT)), parallelOptions, pair => { var key = pair.Item1; var val = pair.Item2; merger.Store(key, val); + Keys.Add(key); }); - if (merger.TotalRecords != (pairs.Count - insertCount)) + if (merger.TotalRecords != ((long)(0.3 * PAIRS_COUNT))) { - Log.Error($"Count of stored record no equals expected. Recorded: {merger.TotalRecords}. Expected: {(pairs.Count - insertCount)}"); + Log.Error($"Count of stored record no equals expected. Recorded: {merger.TotalRecords}. Expected: {((long)(0.3 * PAIRS_COUNT))}"); } - Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {pairs.Count}"); + Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {PAIRS_COUNT}. Unique keys: {Keys.Count}"); merger.Compress(); // auto reindex sw.Stop(); Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); - Log.Info("Test #1 reading"); - var readPart = store.CreateAccessor(meta); ulong totalData = 0; ulong totalKeys = 0; + var readPart = store.CreateAccessor(meta); + /* + Log.Info("Test #1 reading"); foreach (var key in testKeys1) { var result = readPart.Find(key); @@ -312,6 +324,7 @@ namespace PartitionFileStorageTest Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; + */ Log.Info("Test #2 reading"); foreach (var key in testKeys2) { @@ -319,11 +332,15 @@ namespace PartitionFileStorageTest totalData += (ulong)(result.Value?.Length ?? 0); totalKeys++; } - Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + Log.Info($"\t\tFound: {totalKeys}/{Keys.Count} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Test #2 remove keys batch"); readPart.RemoveKeys(testKeys2); + foreach (var k in testKeys2) + { + Keys.TryRemove(k); + } Log.Info("Test #2 reading after remove"); foreach (var key in testKeys2) { @@ -341,7 +358,8 @@ namespace PartitionFileStorageTest totalData += (ulong)(e.Value?.Length ?? 0); totalKeys++; } - Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + Log.Info($"\t\tFound: {totalKeys}/{Keys.Count} keys. {totalData} bytes"); + store.Dispose(); Log.Info("Completed"); } @@ -368,6 +386,23 @@ namespace PartitionFileStorageTest } // 2 Test index + var fileReader = new ParallelFileReader(filePath); + foreach (var pair in index) + { + var accessor = fileReader.GetAccessor(pair.Value); + using (var reader = new MemoryStreamReader(accessor)) + { + var k = serializer.KeyDeserializer.Invoke(reader); + if (k != pair.Key) + { + Log.Warning("Broken index"); + } + var v = serializer.ValueDeserializer.Invoke(reader); + } + } + + + /* using (var reader = new MemoryStreamReader(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024))) { foreach (var pair in index) @@ -381,6 +416,7 @@ namespace PartitionFileStorageTest var v = serializer.ValueDeserializer.Invoke(reader); } } + */ } private static void FaultUncompressedReadTest(string filePath) @@ -408,7 +444,7 @@ namespace PartitionFileStorageTest } catch (Exception ex) { - + } } } @@ -416,13 +452,6 @@ namespace PartitionFileStorageTest static void Main(string[] args) { - /*FaultIndexTest(@"H:\temp\85"); - return;*/ - /* - FaultUncompressedReadTest(@"H:\temp\107"); - return; - */ - var root = @"H:\temp"; var options = new StoreOptions { @@ -430,7 +459,7 @@ namespace PartitionFileStorageTest { Enabled = true, StepType = IndexStepType.Step, - StepValue = 16, + StepValue = 32, EnableIndexInMemoryCachee = true }, RootFolder = root, @@ -453,7 +482,7 @@ namespace PartitionFileStorageTest { Enabled = true, StepType = IndexStepType.Step, - StepValue = 16, + StepValue = 32, EnableIndexInMemoryCachee = true }, RootFolder = root, @@ -468,24 +497,30 @@ namespace PartitionFileStorageTest new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) }, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, - ThreadSafeWriting = true + ThreadSafeWriting = true, + MaxDegreeOfParallelism = 16 }; Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.FullDebug); - Log.Info("Start"); - + /* var pairs = new List<(ulong, ulong)>(PAIRS_COUNT); var r = new Random(Environment.TickCount); + Log.Info("Start create dataset"); for (int i = 0; i < PAIRS_COUNT; i++) { pairs.Add((Generate(r), Generate(r))); } - + */ + Log.Info("Start test"); // FastTest(options); - /* FSUtils.CleanAndTestFolder(root); - FullStoreMultithreadTest(optionsMultiThread, pairs);*/ - FSUtils.CleanAndTestFolder(root); - FullStoreTest(options, pairs); + FullStoreMultithreadTest(optionsMultiThread); + + + /* + FSUtils.CleanAndTestFolder(root); + FullStoreTest(options, pairs); + */ + //TestParallelFileReadingMMF(); /* @@ -494,5 +529,83 @@ namespace PartitionFileStorageTest */ Console.ReadKey(); } + + + static void TestParallelFileReading() + { + var path = @"C:\Users\Ogoun\Downloads\Lego_super_hero.iso"; + var threads = new List(); + for (int i = 0; i < 100; i++) + { + var k = i; + var reader = GetReadStream(path); + var t = new Thread(() => PartReader(reader, 1000000 + k * 1000)); + t.IsBackground = true; + threads.Add(t); + } + foreach (var t in threads) + { + t.Start(); + } + } + + static void TestParallelFileReadingMMF() + { + var filereader = new ParallelFileReader(@"C:\Users\Ogoun\Downloads\Lego_super_hero.iso"); + var threads = new List(); + for (int i = 0; i < 100; i++) + { + var k = i; + var t = new Thread(() => PartReaderMMF(filereader.GetAccessor(1000000 + k * 1000))); + t.IsBackground = true; + threads.Add(t); + } + + foreach (var t in threads) + { + t.Start(); + } + + } + + static MemoryStreamReader GetReadStream(string filePath) + { + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + return new MemoryStreamReader(stream); + } + + static void PartReader(MemoryStreamReader reader, long offset) + { + var count = 0; + using (reader) + { + reader.SetPosition(offset); + for (int i = 0; i < 1000000; i++) + { + if (reader.ReadByte() == 127) count++; + } + } + Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId}: {count}"); + } + + static void PartReaderMMF(IViewAccessor accessor) + { + var count = 0; + var lastPosition = accessor.Position; + using (var reader = new MemoryStreamReader(accessor)) + { + for (int i = 0; i < 1000000; i++) + { + if (reader.ReadByte() == 127) count++; + if (lastPosition > reader.Position) + { + // Test for correct absolute position + Console.WriteLine("Fock!"); + } + lastPosition = reader.Position; + } + } + Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId}: {count}"); + } } } \ No newline at end of file diff --git a/ZeroLevel/Services/Cache/TimerCachee.cs b/ZeroLevel/Services/Cache/TimerCachee.cs new file mode 100644 index 0000000..b6f3a5b --- /dev/null +++ b/ZeroLevel/Services/Cache/TimerCachee.cs @@ -0,0 +1,137 @@ +using System; +using System.Collections.Generic; +using ZeroLevel.Services.Shedulling; + +namespace ZeroLevel.Services.Cache +{ + internal sealed class TimerCachee + : IDisposable + { + private sealed class CacheeItem + { + public T Value { get; set; } + public DateTime LastAcessTime { get; set; } + } + + private readonly IDictionary> _cachee; + private readonly object _cacheeLock = new object(); + + private readonly ISheduller _sheduller; + private readonly Func _factory; + private readonly Action _onDisposeAction; + private readonly TimeSpan _expirationPeriod; + public TimerCachee(TimeSpan expirationPeriod, Func factory, Action onDisposeAction, int capacity = 512) + { + _factory = factory; + _onDisposeAction = onDisposeAction; + _sheduller = Sheduller.Create(); + _cachee = new Dictionary>(capacity); + _expirationPeriod = expirationPeriod; + var ts = TimeSpan.FromSeconds(60); + if (ts.TotalSeconds > expirationPeriod.TotalSeconds) + { + ts = expirationPeriod; + } + _sheduller.RemindEvery(ts, _ => CheckAndCleacnCachee()); + } + + public T Get(string key) + { + lock (_cacheeLock) + { + if (_cachee.TryGetValue(key, out var v)) + { + v.LastAcessTime = DateTime.UtcNow; + return v.Value; + } + } + var obj = _factory.Invoke(key); + var item = new CacheeItem { Value = obj, LastAcessTime = DateTime.UtcNow }; + lock (_cacheeLock) + { + _cachee[key] = item; + } + return obj; + } + + public void Drop(string key) + { + lock (_cacheeLock) + { + if (_cachee.TryGetValue(key, out var v)) + { + try + { + if (_onDisposeAction != null) + { + _onDisposeAction.Invoke(v.Value); + } + } + catch (Exception ex) + { + Log.SystemError(ex, $"[TimerCachee.Drop] Key '{key}'. Fault dispose."); + } + _cachee.Remove(key); + } + } + } + + private void CheckAndCleacnCachee() + { + lock (_cacheeLock) + { + var keysToRemove = new List(_cachee.Count); + foreach (var pair in _cachee) + { + if ((DateTime.UtcNow - pair.Value.LastAcessTime) > _expirationPeriod) + { + keysToRemove.Add(pair.Key); + } + } + foreach (var key in keysToRemove) + { + try + { + if (_onDisposeAction != null) + { + _onDisposeAction.Invoke(_cachee[key].Value); + } + } + catch (Exception ex) + { + Log.SystemError(ex, $"[TimerCachee.CheckAndCleacnCachee] Key '{key}'"); + } + _cachee.Remove(key); + } + } + } + + public void DropAll() + { + lock (_cacheeLock) + { + foreach (var pair in _cachee) + { + try + { + if (_onDisposeAction != null) + { + _onDisposeAction.Invoke(pair.Value.Value); + } + } + catch (Exception ex) + { + Log.SystemError(ex, $"[TimerCachee.DropAll] Key '{pair.Key}'"); + } + } + _cachee.Clear(); + } + } + public void Dispose() + { + _sheduller?.Clean(); + _sheduller?.Dispose(); + DropAll(); + } + } +} diff --git a/ZeroLevel/Services/FileSystem/ParallelFileReader.cs b/ZeroLevel/Services/FileSystem/ParallelFileReader.cs new file mode 100644 index 0000000..4fc2c69 --- /dev/null +++ b/ZeroLevel/Services/FileSystem/ParallelFileReader.cs @@ -0,0 +1,41 @@ +using System; +using System.IO; +using System.IO.MemoryMappedFiles; +using ZeroLevel.Services.Memory; + +namespace ZeroLevel.Services.FileSystem +{ + public sealed class ParallelFileReader + : IDisposable + { + private MemoryMappedFile _mmf; + private readonly long _fileLength; + public long FileLength => _fileLength; + + public ParallelFileReader(string filePath) + { + _fileLength = new FileInfo(filePath).Length; + _mmf = MemoryMappedFile.CreateFromFile(filePath, FileMode.Open, Guid.NewGuid().ToString(), 0, MemoryMappedFileAccess.Read); + } + + public IViewAccessor GetAccessor(long offset) + { + var length = _fileLength - offset; + return new MMFViewAccessor(_mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read), offset); + } + + public IViewAccessor GetAccessor(long offset, int length) + { + if ((offset + length) > _fileLength) + { + throw new OutOfMemoryException($"Offset + Length ({offset + length}) more than file length ({_fileLength})"); + } + return new MMFViewAccessor(_mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read), offset); + } + + public void Dispose() + { + _mmf?.Dispose(); + } + } +} diff --git a/ZeroLevel/Services/Memory/IViewAccessor.cs b/ZeroLevel/Services/Memory/IViewAccessor.cs new file mode 100644 index 0000000..9223405 --- /dev/null +++ b/ZeroLevel/Services/Memory/IViewAccessor.cs @@ -0,0 +1,17 @@ +using System; + +namespace ZeroLevel.Services.Memory +{ + public interface IViewAccessor + : IDisposable + { + /// + /// End of view + /// + bool EOV { get; } + long Position { get; } + byte[] ReadBuffer(int count); + bool CheckOutOfRange(int offset); + void Seek(long offset); + } +} diff --git a/ZeroLevel/Services/Memory/MMFViewAccessor.cs b/ZeroLevel/Services/Memory/MMFViewAccessor.cs new file mode 100644 index 0000000..2d86277 --- /dev/null +++ b/ZeroLevel/Services/Memory/MMFViewAccessor.cs @@ -0,0 +1,48 @@ +using System; +using System.IO; +using System.IO.MemoryMappedFiles; + +namespace ZeroLevel.Services.Memory +{ + internal class MMFViewAccessor + : IViewAccessor + { + private readonly MemoryMappedViewStream _accessor; + private readonly long _absoluteOffset; + + public MMFViewAccessor(MemoryMappedViewStream accessor, long offset) + { + _accessor = accessor; + _absoluteOffset = offset; + } + + public bool EOV => _accessor.Position >= _accessor.Length; + + public long Position => _absoluteOffset + _accessor.Position; + + public bool CheckOutOfRange(int offset) + { + return offset < 0 || (_accessor.Position + offset) > _accessor.Length; + } + + public void Dispose() + { + _accessor?.Dispose(); + } + + public byte[] ReadBuffer(int count) + { + if (count == 0) return null; + var buffer = new byte[count]; + var readedCount = _accessor.Read(buffer, 0, count); + if (count != readedCount) + throw new InvalidOperationException($"The stream returned less data ({count} bytes) than expected ({readedCount} bytes)"); + return buffer; + } + + public void Seek(long offset) + { + _accessor.Seek(offset, SeekOrigin.Begin); + } + } +} diff --git a/ZeroLevel/Services/Memory/StreamVewAccessor.cs b/ZeroLevel/Services/Memory/StreamVewAccessor.cs new file mode 100644 index 0000000..3558b67 --- /dev/null +++ b/ZeroLevel/Services/Memory/StreamVewAccessor.cs @@ -0,0 +1,44 @@ +using System; +using System.IO; + +namespace ZeroLevel.Services.Memory +{ + internal sealed class StreamVewAccessor + : IViewAccessor + { + private readonly Stream _stream; + public StreamVewAccessor(Stream stream) + { + _stream = stream; + } + + public bool EOV => _stream.Position >= _stream.Length; + + public long Position => _stream.Position; + + public bool CheckOutOfRange(int offset) + { + return offset < 0 || (_stream.Position + offset) > _stream.Length; + } + + public byte[] ReadBuffer(int count) + { + if (count == 0) return null; + var buffer = new byte[count]; + var readedCount = _stream.Read(buffer, 0, count); + if (count != readedCount) + throw new InvalidOperationException($"The stream returned less data ({count} bytes) than expected ({readedCount} bytes)"); + return buffer; + } + + public void Dispose() + { + _stream.Dispose(); + } + + public void Seek(long offset) + { + _stream.Seek(offset, SeekOrigin.Begin); + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs index 78d5207..3ecb613 100644 --- a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.IO; using System.Linq; -using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage @@ -19,7 +18,8 @@ namespace ZeroLevel.Services.PartitionStorage private readonly int _stepValue; private readonly Func _keyDeserializer; private readonly Func _valueDeserializer; - public IndexBuilder(IndexStepType indexType, int stepValue, string dataCatalog) + private readonly PhisicalFileAccessorCachee _phisicalFileAccessorCachee; + public IndexBuilder(IndexStepType indexType, int stepValue, string dataCatalog, PhisicalFileAccessorCachee phisicalFileAccessorCachee) { _dataCatalog = dataCatalog; _indexCatalog = Path.Combine(dataCatalog, INDEX_SUBFOLDER_NAME); @@ -27,19 +27,19 @@ namespace ZeroLevel.Services.PartitionStorage _stepValue = stepValue; _keyDeserializer = MessageSerializer.GetDeserializer(); _valueDeserializer = MessageSerializer.GetDeserializer(); + _phisicalFileAccessorCachee = phisicalFileAccessorCachee; } /// /// Rebuild indexes for all files /// internal void RebuildIndex() { - FSUtils.CleanAndTestFolder(_indexCatalog); var files = Directory.GetFiles(_dataCatalog); if (files != null && files.Length > 0) { foreach (var file in files) { - RebuildFileIndex(file); + RebuildFileIndex(Path.GetFileName(file)); } } } @@ -63,6 +63,7 @@ namespace ZeroLevel.Services.PartitionStorage internal void DropFileIndex(string file) { var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); + _phisicalFileAccessorCachee.DropIndexReader(index_file); if (File.Exists(index_file)) { File.Delete(index_file); @@ -78,23 +79,21 @@ namespace ZeroLevel.Services.PartitionStorage Directory.CreateDirectory(_indexCatalog); } var dict = new Dictionary(); - if (TryGetReadStream(file, out var reader)) + using (var reader = new MemoryStreamReader(new FileStream(Path.Combine(_dataCatalog, file), FileMode.Open, FileAccess.Read, FileShare.None))) { - using (reader) + while (reader.EOS == false) { - while (reader.EOS == false) - { - var pos = reader.Position; - var k = _keyDeserializer.Invoke(reader); - dict[k] = pos; - _valueDeserializer.Invoke(reader); - } + var pos = reader.Position; + var k = _keyDeserializer.Invoke(reader); + dict[k] = pos; + _valueDeserializer.Invoke(reader); } } if (dict.Count > _stepValue) { var step = (int)Math.Round(dict.Count / (float)_stepValue, MidpointRounding.ToZero); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); + DropFileIndex(index_file); var d_arr = dict.OrderBy(p => p.Key).ToArray(); using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) { @@ -116,49 +115,28 @@ namespace ZeroLevel.Services.PartitionStorage { Directory.CreateDirectory(_indexCatalog); } - if (TryGetReadStream(file, out var reader)) + using (var reader = new MemoryStreamReader(new FileStream(Path.Combine(_dataCatalog, file), FileMode.Open, FileAccess.Read, FileShare.None))) { - using (reader) + var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); + DropFileIndex(index_file); + using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) { - var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); - using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) + var counter = 1; + while (reader.EOS == false) { - var counter = 1; - while (reader.EOS == false) + counter--; + var pos = reader.Position; + var k = _keyDeserializer.Invoke(reader); + _valueDeserializer.Invoke(reader); + if (counter == 0) { - counter--; - var pos = reader.Position; - var k = _keyDeserializer.Invoke(reader); - _valueDeserializer.Invoke(reader); - if (counter == 0) - { - writer.WriteCompatible(k); - writer.WriteLong(pos); - counter = _stepValue; - } + writer.WriteCompatible(k); + writer.WriteLong(pos); + counter = _stepValue; } } } } } - /// - /// Attempting to open a file for reading - /// - private bool TryGetReadStream(string fileName, out MemoryStreamReader reader) - { - try - { - var filePath = Path.Combine(_dataCatalog, fileName); - var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); - reader = new MemoryStreamReader(stream); - return true; - } - catch (Exception ex) - { - Log.SystemError(ex, "[StorePartitionAccessor.TryGetReadStream]"); - } - reader = null; - return false; - } } } diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/KeyIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/KeyIndex.cs index 7afd3e2..bf7ade7 100644 --- a/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/KeyIndex.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/KeyIndex.cs @@ -4,5 +4,6 @@ { public TKey Key { get; set; } public long Offset { get; set; } + public int Length { get; set; } } } diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/StorePartitionSparseIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/StorePartitionSparseIndex.cs index 14abcf1..1b0909e 100644 --- a/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/StorePartitionSparseIndex.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/StorePartitionSparseIndex.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.IO; +using ZeroLevel.Services.Memory; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage @@ -15,13 +16,13 @@ namespace ZeroLevel.Services.PartitionStorage private readonly bool _enableIndexInMemoryCachee; private readonly Func _keyDeserializer; private readonly TMeta _meta; - - private readonly Dictionary[]> _indexCachee = null; + private readonly PhisicalFileAccessorCachee _phisicalFileAccessorCachee; public StorePartitionSparseIndex(string partitionFolder, TMeta meta, StoreFilePartition filePartition, Func keyComparer, - bool enableIndexInMemoryCachee) + bool enableIndexInMemoryCachee, + PhisicalFileAccessorCachee phisicalFileAccessorCachee) { _indexFolder = Path.Combine(partitionFolder, "__indexes__"); _indexExists = Directory.Exists(_indexFolder); @@ -32,7 +33,7 @@ namespace ZeroLevel.Services.PartitionStorage _enableIndexInMemoryCachee = enableIndexInMemoryCachee; if (_enableIndexInMemoryCachee) { - _indexCachee = new Dictionary[]>(1024); + _phisicalFileAccessorCachee = phisicalFileAccessorCachee; } } @@ -110,46 +111,26 @@ namespace ZeroLevel.Services.PartitionStorage { if (_enableIndexInMemoryCachee) { - lock (_casheupdatelock) - { - _indexCachee.Clear(); - } + _phisicalFileAccessorCachee.DropAllIndexReaders(); } } public void RemoveCacheeItem(string name) { + var file = Path.Combine(_indexFolder, name); if (_enableIndexInMemoryCachee) { - lock (_casheupdatelock) - { - _indexCachee.Remove(name); - } + _phisicalFileAccessorCachee.DropIndexReader(file); } } - private readonly object _casheupdatelock = new object(); private KeyIndex[] GetFileIndex(TKey key) { var indexName = _filePartition.FileNameExtractor.Invoke(key, _meta); + var filePath = Path.Combine(_indexFolder, indexName); try { - if (_enableIndexInMemoryCachee) - { - if (_indexCachee.TryGetValue(indexName, out var index)) - { - return index; - } - lock (_casheupdatelock) - { - _indexCachee[indexName] = ReadIndexesFromIndexFile(indexName); - return _indexCachee[indexName]; - } - } - else - { - return ReadIndexesFromIndexFile(indexName); - } + return ReadIndexesFromIndexFile(filePath); } catch (Exception ex) { @@ -158,19 +139,35 @@ namespace ZeroLevel.Services.PartitionStorage return null; } - private KeyIndex[] ReadIndexesFromIndexFile(string indexName) + private KeyIndex[] ReadIndexesFromIndexFile(string filePath) { - var file = Path.Combine(_indexFolder, indexName); - if (File.Exists(file)) + if (File.Exists(filePath)) { var list = new List>(); - using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None))) + var accessor = _enableIndexInMemoryCachee ? _phisicalFileAccessorCachee.GetIndexAccessor(filePath, 0) : new StreamVewAccessor(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.None)); + using (var reader = new MemoryStreamReader(accessor)) { + var index = new KeyIndex(); + if (reader.EOS == false) + { + index.Key = _keyDeserializer.Invoke(reader); + } + if (reader.EOS == false) + { + index.Offset = reader.ReadLong(); + } while (reader.EOS == false) { var k = _keyDeserializer.Invoke(reader); var o = reader.ReadLong(); - list.Add(new KeyIndex { Key = k, Offset = o }); + index.Length = (int)(o - index.Offset); + list.Add(index); + index = new KeyIndex { Key = k, Offset = o }; + } + if (index.Offset > 0) + { + index.Length = 0; + list.Add(index); } } return list.ToArray(); diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/ValuesIndex/ValueIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/ValuesIndex/ValueIndex.cs index 81716b3..595f82a 100644 --- a/ZeroLevel/Services/PartitionStorage/Indexes/ValuesIndex/ValueIndex.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/ValuesIndex/ValueIndex.cs @@ -1,5 +1,6 @@ namespace ZeroLevel.Services.PartitionStorage { + /*TODO IN FUTURE*/ internal struct ValueIndex { public TValue Value { get; set; } diff --git a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs index 7a2e0f6..14b71a4 100644 --- a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs +++ b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs @@ -43,6 +43,10 @@ namespace ZeroLevel.Services.PartitionStorage /// Uses a thread-safe mechanism for writing to files during multi-threaded writes /// public bool ThreadSafeWriting { get; set; } = false; + /// + /// Period before memory mapped file was closed, after last access time + /// + public TimeSpan PhisicalFileAccessorExpirationPeriod { get; set; } = TimeSpan.FromMinutes(30); public IndexOptions Index { get; set; } = new IndexOptions { diff --git a/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs index ee128ed..86fc3c8 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.IO; using System.Threading; using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.Memory; using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.Serialization; @@ -24,9 +25,12 @@ namespace ZeroLevel.Services.PartitionStorage.Partition private readonly IndexBuilder _indexBuilder; private readonly Dictionary _writeStreams = new Dictionary(); + private readonly PhisicalFileAccessorCachee _phisicalFileAccessor; + protected PhisicalFileAccessorCachee PhisicalFileAccessorCachee => _phisicalFileAccessor; + internal BasePartition(StoreOptions options, TMeta info, - IStoreSerializer serializer) + IStoreSerializer serializer, PhisicalFileAccessorCachee fileAccessorCachee) { _options = options; _info = info; @@ -35,7 +39,8 @@ namespace ZeroLevel.Services.PartitionStorage.Partition { Directory.CreateDirectory(_catalog); } - _indexBuilder = _options.Index.Enabled ? new IndexBuilder(_options.Index.StepType, _options.Index.StepValue, _catalog) : null; + _phisicalFileAccessor = fileAccessorCachee; + _indexBuilder = _options.Index.Enabled ? new IndexBuilder(_options.Index.StepType, _options.Index.StepValue, _catalog, fileAccessorCachee) : null; Serializer = serializer; } @@ -155,5 +160,41 @@ namespace ZeroLevel.Services.PartitionStorage.Partition reader = null; return false; } + protected IViewAccessor GetViewAccessor(TKey key, long offset) + { + var fileName = _options.GetFileName(key, _info); + var filePath = Path.Combine(_catalog, fileName); + return GetViewAccessor(filePath, offset); + } + protected IViewAccessor GetViewAccessor(TKey key, long offset, int length) + { + var fileName = _options.GetFileName(key, _info); + var filePath = Path.Combine(_catalog, fileName); + return GetViewAccessor(filePath, offset, length); + } + protected IViewAccessor GetViewAccessor(string filePath, long offset) + { + try + { + return PhisicalFileAccessorCachee.GetDataAccessor(filePath, offset); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[StorePartitionAccessor.GetViewAccessor] '{filePath}'"); + } + return null; + } + protected IViewAccessor GetViewAccessor(string filePath, long offset, int length) + { + try + { + return PhisicalFileAccessorCachee.GetDataAccessor(filePath, offset, length); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[StorePartitionAccessor.GetViewAccessor] '{filePath}'"); + } + return null; + } } } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs index 792f5e2..a4b6732 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.IO; using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; using ZeroLevel.Services.FileSystem; @@ -22,8 +21,9 @@ namespace ZeroLevel.Services.PartitionStorage.Partition public CompactKeyStorePartitionBuilder(StoreOptions options, TMeta info, - IStoreSerializer serializer) - : base(options, info, serializer) + IStoreSerializer serializer, + PhisicalFileAccessorCachee fileAccessorCachee) + : base(options, info, serializer, fileAccessorCachee) { if (options == null) throw new ArgumentNullException(nameof(options)); if (options.ThreadSafeWriting) diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs index e98fe62..7de90c7 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs @@ -19,7 +19,7 @@ namespace ZeroLevel.Services.PartitionStorage /// Exists compressed catalog /// private readonly IStorePartitionAccessor _accessor; - + private readonly PhisicalFileAccessorCachee _phisicalFileAccessor; private readonly string _temporaryFolder; private readonly Func _keyDeserializer; private readonly Func _valueDeserializer; @@ -33,19 +33,22 @@ namespace ZeroLevel.Services.PartitionStorage /// Write catalog /// private readonly IStorePartitionBuilder _temporaryAccessor; - + public StoreMergePartitionAccessor(StoreOptions options, - TMeta info, + TMeta info, Func> decompress, - IStoreSerializer serializer) + IStoreSerializer serializer, + PhisicalFileAccessorCachee cachee) { if (decompress == null) throw new ArgumentNullException(nameof(decompress)); _decompress = decompress; - _accessor = new StorePartitionAccessor(options, info, serializer); + _accessor = new StorePartitionAccessor(options, info, serializer, cachee); _temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); var tempOptions = options.Clone(); tempOptions.RootFolder = _temporaryFolder; - _temporaryAccessor = new StorePartitionBuilder(tempOptions, info, serializer); + _temporaryAccessor = new StorePartitionBuilder(tempOptions, info, serializer, cachee); + + _phisicalFileAccessor = cachee; _keyDeserializer = MessageSerializer.GetDeserializer(); _valueDeserializer = MessageSerializer.GetDeserializer(); @@ -103,13 +106,21 @@ namespace ZeroLevel.Services.PartitionStorage // replace old file by new foreach (var file in newFiles) { + _phisicalFileAccessor.DropDataReader(file); + // 1. Remove index file (_accessor as StorePartitionAccessor) .DropFileIndex(file); // 2. Replace source var name = Path.GetFileName(file); - File.Move(file, Path.Combine(folder, name), true); + var updateFilePath = Path.Combine(folder, name); + if (File.Exists(updateFilePath)) + { + _phisicalFileAccessor.DropDataReader(updateFilePath); + File.Delete(updateFilePath); + } + File.Move(file, updateFilePath, true); // 3. Rebuil index (_accessor as BasePartition).RebuildFileIndex(name); diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs index 0565b19..566445a 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs @@ -3,8 +3,10 @@ using System.Collections.Generic; using System.IO; using System.Linq; using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.Memory; using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Partition; +using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage { @@ -12,66 +14,54 @@ namespace ZeroLevel.Services.PartitionStorage : BasePartition, IStorePartitionAccessor { private readonly StorePartitionSparseIndex Indexes; - public StorePartitionAccessor(StoreOptions options, TMeta info, - IStoreSerializer serializer) - : base(options, info, serializer) + IStoreSerializer serializer, + PhisicalFileAccessorCachee phisicalFileAccessor) + : base(options, info, serializer, phisicalFileAccessor) { if (options == null) throw new ArgumentNullException(nameof(options)); if (options.Index.Enabled) { - Indexes = new StorePartitionSparseIndex(_catalog, _info, options.FilePartition, options.KeyComparer, options.Index.EnableIndexInMemoryCachee); + Indexes = new StorePartitionSparseIndex(_catalog, _info, options.FilePartition, options.KeyComparer, options.Index.EnableIndexInMemoryCachee, phisicalFileAccessor); } } - #region IStorePartitionAccessor + #region IStorePartitionAccessor + public StorePartitionKeyValueSearchResult Find(TKey key) { - var fileName = _options.GetFileName(key, _info); - if (File.Exists(Path.Combine(_catalog, fileName))) + IViewAccessor memoryAccessor; + if (_options.Index.Enabled) { - long startOffset = 0; - if (_options.Index.Enabled) - { - var offset = Indexes.GetOffset(key); - startOffset = offset.Offset; - } - if (TryGetReadStream(fileName, out var reader)) + var offset = Indexes.GetOffset(key); + memoryAccessor = offset.Length > 0 ? GetViewAccessor(key, offset.Offset, offset.Length) : GetViewAccessor(key, offset.Offset); + } + else + { + memoryAccessor = GetViewAccessor(key, 0); + } + if (memoryAccessor != null) + { + using (var reader = new MemoryStreamReader(memoryAccessor)) { - using (reader) + while (reader.EOS == false) { - if (startOffset > 0) + var k = Serializer.KeyDeserializer.Invoke(reader); + var v = Serializer.ValueDeserializer.Invoke(reader); + var c = _options.KeyComparer(key, k); + if (c == 0) return new StorePartitionKeyValueSearchResult { - reader.Seek(startOffset, SeekOrigin.Begin); - } - while (reader.EOS == false) + Key = key, + Value = v, + Status = SearchResult.Success + }; + if (c == -1) { - var k = Serializer.KeyDeserializer.Invoke(reader); - var v = Serializer.ValueDeserializer.Invoke(reader); - var c = _options.KeyComparer(key, k); - if (c == 0) return new StorePartitionKeyValueSearchResult - { - Key = key, - Value = v, - Status = SearchResult.Success - }; - if (c == -1) - { - break; - } + break; } } } - else - { - return new StorePartitionKeyValueSearchResult - { - Key = key, - Status = SearchResult.FileLocked, - Value = default - }; - } } return new StorePartitionKeyValueSearchResult { @@ -101,9 +91,10 @@ namespace ZeroLevel.Services.PartitionStorage { foreach (var file in files) { - if (TryGetReadStream(file, out var reader)) + var accessor = PhisicalFileAccessorCachee.GetDataAccessor(file, 0); + if (accessor != null) { - using (reader) + using (var reader = new MemoryStreamReader(accessor)) { while (reader.EOS == false) { @@ -119,11 +110,13 @@ namespace ZeroLevel.Services.PartitionStorage public IEnumerable> IterateKeyBacket(TKey key) { var fileName = _options.GetFileName(key, _info); - if (File.Exists(Path.Combine(_catalog, fileName))) + var filePath = Path.Combine(_catalog, fileName); + if (File.Exists(filePath)) { - if (TryGetReadStream(fileName, out var reader)) + var accessor = PhisicalFileAccessorCachee.GetDataAccessor(filePath, 0); + if (accessor != null) { - using (reader) + using (var reader = new MemoryStreamReader(accessor)) { while (reader.EOS == false) { @@ -186,198 +179,213 @@ namespace ZeroLevel.Services.PartitionStorage #region Private methods private IEnumerable> Find(string fileName, - TKey[] keys) + TKey[] keys) { - if (File.Exists(Path.Combine(_catalog, fileName))) + var filePath = Path.Combine(_catalog, fileName); + if (_options.Index.Enabled) { - if (_options.Index.Enabled) + var offsets = Indexes.GetOffset(keys, true); + for (int i = 0; i < keys.Length; i++) { - var offsets = Indexes.GetOffset(keys, true); - if (TryGetReadStream(fileName, out var reader)) + var searchKey = keys[i]; + var offset = offsets[i]; + IViewAccessor memoryAccessor; + if (offset.Length > 0) { - using (reader) - { - for (int i = 0; i < keys.Length; i++) - { - var searchKey = keys[i]; - var off = offsets[i]; - reader.Seek(off.Offset, SeekOrigin.Begin); - while (reader.EOS == false) - { - var k = Serializer.KeyDeserializer.Invoke(reader); - var v = Serializer.ValueDeserializer.Invoke(reader); - var c = _options.KeyComparer(searchKey, k); - if (c == 0) - { - yield return new StorePartitionKeyValueSearchResult - { - Key = searchKey, - Value = v, - Status = SearchResult.Success - }; - break; - } - else if (c == -1) - { - break; - } - } - } - } + memoryAccessor = GetViewAccessor(filePath, offset.Offset, offset.Length); } - } - else - { - if (TryGetReadStream(fileName, out var reader)) + else + { + memoryAccessor = GetViewAccessor(filePath, offset.Offset); + } + if (memoryAccessor != null) { - using (reader) + using (var reader = new MemoryStreamReader(memoryAccessor)) { - int index = 0; - var keys_arr = keys.OrderBy(k => k).ToArray(); - while (reader.EOS == false && index < keys_arr.Length) + while (reader.EOS == false) { var k = Serializer.KeyDeserializer.Invoke(reader); var v = Serializer.ValueDeserializer.Invoke(reader); - var c = _options.KeyComparer(keys_arr[index], k); + var c = _options.KeyComparer(searchKey, k); if (c == 0) { yield return new StorePartitionKeyValueSearchResult { - Key = keys_arr[index], + Key = searchKey, Value = v, Status = SearchResult.Success }; - index++; + break; } else if (c == -1) { - do - { - index++; - if (index < keys_arr.Length) - { - c = _options.KeyComparer(keys_arr[index], k); - } - } while (index < keys_arr.Length && c == -1); + break; } } } } } } - } - - private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove, bool autoReindex) - { - var filePath = Path.Combine(_catalog, fileName); - if (File.Exists(filePath)) + else { - // 1. Find ranges - var ranges = new List(); - if (_options.Index.Enabled && autoReindex) + var memoryAccessor = GetViewAccessor(filePath, 0); + if (memoryAccessor != null) { - var offsets = Indexes.GetOffset(keys, true); - if (TryGetReadStream(fileName, out var reader)) + using (var reader = new MemoryStreamReader(memoryAccessor)) { - using (reader) + int index = 0; + var keys_arr = keys.OrderBy(k => k).ToArray(); + while (reader.EOS == false && index < keys_arr.Length) { - for (int i = 0; i < keys.Length; i++) + var k = Serializer.KeyDeserializer.Invoke(reader); + var v = Serializer.ValueDeserializer.Invoke(reader); + var c = _options.KeyComparer(keys_arr[index], k); + if (c == 0) { - var searchKey = keys[i]; - var off = offsets[i]; - reader.Seek(off.Offset, SeekOrigin.Begin); - while (reader.EOS == false) + yield return new StorePartitionKeyValueSearchResult { - var startPosition = reader.Position; - var k = Serializer.KeyDeserializer.Invoke(reader); - Serializer.ValueDeserializer.Invoke(reader); - var endPosition = reader.Position; - var c = _options.KeyComparer(searchKey, k); - if (c == 0) - { - ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); - } - else if (c == -1) + Key = keys_arr[index], + Value = v, + Status = SearchResult.Success + }; + index++; + } + else if (c == -1) + { + do + { + index++; + if (index < keys_arr.Length) { - break; + c = _options.KeyComparer(keys_arr[index], k); } - } + } while (index < keys_arr.Length && c == -1); } } } } - else + } + } + + private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove, bool autoReindex) + { + var filePath = Path.Combine(_catalog, fileName); + // 1. Find ranges + var ranges = new List(); + if (_options.Index.Enabled && autoReindex) + { + var offsets = Indexes.GetOffset(keys, true); + for (int i = 0; i < keys.Length; i++) { - if (TryGetReadStream(fileName, out var reader)) + var searchKey = keys[i]; + var offset = offsets[i]; + IViewAccessor memoryAccessor; + if (offset.Length > 0) + { + memoryAccessor = GetViewAccessor(filePath, offset.Offset, offset.Length); + } + else { - using (reader) + memoryAccessor = GetViewAccessor(filePath, offset.Offset); + } + if (memoryAccessor != null) + { + using (var reader = new MemoryStreamReader(memoryAccessor)) { - int index = 0; - var keys_arr = keys.OrderBy(k => k).ToArray(); - while (reader.EOS == false && index < keys_arr.Length) + while (reader.EOS == false) { var startPosition = reader.Position; var k = Serializer.KeyDeserializer.Invoke(reader); Serializer.ValueDeserializer.Invoke(reader); var endPosition = reader.Position; - var c = _options.KeyComparer(keys_arr[index], k); + var c = _options.KeyComparer(searchKey, k); if (c == 0) { ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); - index++; } else if (c == -1) { - do - { - index++; - if (index < keys_arr.Length) - { - c = _options.KeyComparer(keys_arr[index], k); - } - } while (index < keys_arr.Length && c == -1); + break; } } } } } - - // 2. Temporary file from ranges - var tempFile = FSUtils.GetAppLocalTemporaryFile(); - - using (var readStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024)) + } + else + { + var memoryAccessor = GetViewAccessor(filePath, 0); + if (memoryAccessor != null) { - RangeCompression(ranges); - using (var writeStream = new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)) + using (var reader = new MemoryStreamReader(memoryAccessor)) { - if (inverseRemove) + int index = 0; + var keys_arr = keys.OrderBy(k => k).ToArray(); + while (reader.EOS == false && index < keys_arr.Length) { - var inverted = RangeInversion(ranges, readStream.Length); - foreach (var range in inverted) + var startPosition = reader.Position; + var k = Serializer.KeyDeserializer.Invoke(reader); + Serializer.ValueDeserializer.Invoke(reader); + var endPosition = reader.Position; + var c = _options.KeyComparer(keys_arr[index], k); + if (c == 0) { - CopyRange(range, readStream, writeStream); + ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); + index++; } - } - else - { - foreach (var range in ranges) + else if (c == -1) { - CopyRange(range, readStream, writeStream); + do + { + index++; + if (index < keys_arr.Length) + { + c = _options.KeyComparer(keys_arr[index], k); + } + } while (index < keys_arr.Length && c == -1); } } - writeStream.Flush(); } } + } - // 3. Replace from temporary to original - File.Move(tempFile, filePath, true); + // 2. Temporary file from ranges + var tempFile = FSUtils.GetAppLocalTemporaryFile(); - // Rebuild index if needs - if (_options.Index.Enabled && autoReindex) + using (var readStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024)) + { + RangeCompression(ranges); + using (var writeStream = new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)) { - RebuildFileIndex(filePath); + if (inverseRemove) + { + var inverted = RangeInversion(ranges, readStream.Length); + foreach (var range in inverted) + { + CopyRange(range, readStream, writeStream); + } + } + else + { + foreach (var range in ranges) + { + CopyRange(range, readStream, writeStream); + } + } + writeStream.Flush(); } } + + // 3. Replace from temporary to original + PhisicalFileAccessorCachee.DropDataReader(filePath); + File.Delete(filePath); + File.Move(tempFile, filePath, true); + + // Rebuild index if needs + if (_options.Index.Enabled && autoReindex) + { + RebuildFileIndex(filePath); + } } #endregion diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index ce7db6b..8c03a38 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -22,8 +22,9 @@ namespace ZeroLevel.Services.PartitionStorage public StorePartitionBuilder(StoreOptions options, TMeta info, - IStoreSerializer serializer) - : base(options, info, serializer) + IStoreSerializer serializer, + PhisicalFileAccessorCachee fileAccessorCachee) + : base(options, info, serializer, fileAccessorCachee) { if (options == null) throw new ArgumentNullException(nameof(options)); if (options.ThreadSafeWriting) @@ -67,9 +68,10 @@ namespace ZeroLevel.Services.PartitionStorage { foreach (var file in files) { - if (TryGetReadStream(file, out var reader)) + var accessor = GetViewAccessor(file, 0); + if (accessor != null) { - using (reader) + using (var reader = new MemoryStreamReader(accessor)) { while (reader.EOS == false) { @@ -134,21 +136,25 @@ namespace ZeroLevel.Services.PartitionStorage internal void CompressFile(string file) { var dict = new Dictionary>(); - using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) + var accessor = PhisicalFileAccessorCachee.GetDataAccessor(file, 0); + if (accessor != null) { - while (reader.EOS == false) + using (var reader = new MemoryStreamReader(accessor)) { - var key = Serializer.KeyDeserializer.Invoke(reader); - if (false == dict.ContainsKey(key)) + while (reader.EOS == false) { - dict[key] = new HashSet(); - } - if (reader.EOS) - { - break; + var key = Serializer.KeyDeserializer.Invoke(reader); + if (false == dict.ContainsKey(key)) + { + dict[key] = new HashSet(); + } + if (reader.EOS) + { + break; + } + var input = Serializer.InputDeserializer.Invoke(reader); + dict[key].Add(input); } - var input = Serializer.InputDeserializer.Invoke(reader); - dict[key].Add(input); } } var tempFile = FSUtils.GetAppLocalTemporaryFile(); @@ -163,6 +169,7 @@ namespace ZeroLevel.Services.PartitionStorage writer.SerializeCompatible(v); } } + PhisicalFileAccessorCachee.DropDataReader(file); File.Delete(file); File.Move(tempFile, file, true); } diff --git a/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs b/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs new file mode 100644 index 0000000..493a55f --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs @@ -0,0 +1,121 @@ +using System; +using System.IO; +using ZeroLevel.Services.Cache; +using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.Memory; + +namespace ZeroLevel.Services.PartitionStorage +{ + internal sealed class PhisicalFileAccessorCachee + : IDisposable + { + private readonly TimerCachee _indexReadersCachee; + private readonly TimerCachee _dataReadersCachee; + + public PhisicalFileAccessorCachee(TimeSpan dataExpirationPeriod, TimeSpan indexExpirationPeriod) + { + _dataReadersCachee = new TimerCachee(dataExpirationPeriod, s => new ParallelFileReader(s), i => i.Dispose(), 8192); + _indexReadersCachee = new TimerCachee(indexExpirationPeriod, s => new ParallelFileReader(s), i => i.Dispose(), 8192); + } + + #region DATA + public void DropDataReader(string filePath) + { + _dataReadersCachee.Drop(filePath); + } + + private ParallelFileReader GetDataReader(string filePath) + { + if (File.Exists(filePath) == false) + throw new FileNotFoundException(filePath); + return _dataReadersCachee.Get(filePath); + } + public IViewAccessor GetDataAccessor(string filePath, long offset) + { + var reader = GetDataReader(filePath); + try + { + return reader.GetAccessor(offset); + } + catch (ObjectDisposedException) + { + _dataReadersCachee.Drop(filePath); + reader = _dataReadersCachee.Get(filePath); + } + return reader.GetAccessor(offset); + } + + public IViewAccessor GetDataAccessor(string filePath, long offset, int length) + { + var reader = GetDataReader(filePath); + try + { + return reader.GetAccessor(offset, length); + } + catch (ObjectDisposedException) + { + _dataReadersCachee.Drop(filePath); + reader = _dataReadersCachee.Get(filePath); + } + return reader.GetAccessor(offset, length); + } + public void DropAllDataReaders() + { + _dataReadersCachee.DropAll(); + } + #endregion + + #region Indexes + public void DropIndexReader(string filePath) + { + _indexReadersCachee.Drop(filePath); + } + + private ParallelFileReader GetIndexReader(string filePath) + { + if (File.Exists(filePath) == false) + throw new FileNotFoundException(filePath); + return _indexReadersCachee.Get(filePath); + } + public IViewAccessor GetIndexAccessor(string filePath, long offset) + { + var reader = GetIndexReader(filePath); + try + { + return reader.GetAccessor(offset); + } + catch (ObjectDisposedException) + { + _indexReadersCachee.Drop(filePath); + reader = _indexReadersCachee.Get(filePath); + } + return reader.GetAccessor(offset); + } + + public IViewAccessor GetIndexAccessor(string filePath, long offset, int length) + { + var reader = GetIndexReader(filePath); + try + { + return reader.GetAccessor(offset, length); + } + catch (ObjectDisposedException) + { + _indexReadersCachee.Drop(filePath); + reader = _indexReadersCachee.Get(filePath); + } + return reader.GetAccessor(offset, length); + } + public void DropAllIndexReaders() + { + _indexReadersCachee.DropAll(); + } + #endregion + + public void Dispose() + { + _dataReadersCachee.Dispose(); + _indexReadersCachee.Dispose(); + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs index 2dbc840..1792b40 100644 --- a/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs +++ b/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs @@ -4,6 +4,6 @@ { Success, NotFound, - FileLocked + FileLockedOrUnavaliable } } diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs index ce7c1ba..2b7d2e8 100644 --- a/ZeroLevel/Services/PartitionStorage/Store.cs +++ b/ZeroLevel/Services/PartitionStorage/Store.cs @@ -10,10 +10,12 @@ using ZeroLevel.Services.PartitionStorage.Interfaces; namespace ZeroLevel.Services.PartitionStorage { public class Store : - IStore + IStore, IDisposable { private readonly StoreOptions _options; private readonly IStoreSerializer _serializer; + private readonly PhisicalFileAccessorCachee _fileAccessorCachee; + public Store(StoreOptions options, IStoreSerializer serializer = null) { @@ -31,6 +33,7 @@ namespace ZeroLevel.Services.PartitionStorage { Directory.CreateDirectory(_options.RootFolder); } + _fileAccessorCachee = new PhisicalFileAccessorCachee(options.PhisicalFileAccessorExpirationPeriod, TimeSpan.FromHours(2)); } public void RemovePartition(TMeta info) @@ -42,17 +45,17 @@ namespace ZeroLevel.Services.PartitionStorage public IStorePartitionAccessor CreateAccessor(TMeta info) { - return new StorePartitionAccessor(_options, info, _serializer); + return new StorePartitionAccessor(_options, info, _serializer, _fileAccessorCachee); } public IStorePartitionBuilder CreateBuilder(TMeta info) { - return new StorePartitionBuilder(_options, info, _serializer); + return new StorePartitionBuilder(_options, info, _serializer, _fileAccessorCachee); } public IStorePartitionMergeBuilder CreateMergeAccessor(TMeta info, Func> decompressor) { - return new StoreMergePartitionAccessor(_options, info, decompressor, _serializer); + return new StoreMergePartitionAccessor(_options, info, decompressor, _serializer, _fileAccessorCachee); } public async Task> Search(StoreSearchRequest searchRequest) @@ -81,5 +84,10 @@ namespace ZeroLevel.Services.PartitionStorage result.Results = results; return result; } + + public void Dispose() + { + _fileAccessorCachee.Dispose(); + } } } diff --git a/ZeroLevel/Services/Serialization/IBinaryReader.cs b/ZeroLevel/Services/Serialization/IBinaryReader.cs index 53eddb8..a5bd294 100644 --- a/ZeroLevel/Services/Serialization/IBinaryReader.cs +++ b/ZeroLevel/Services/Serialization/IBinaryReader.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.IO; using System.Net; namespace ZeroLevel.Services.Serialization @@ -130,9 +129,9 @@ namespace ZeroLevel.Services.Serialization Dictionary ReadDictionary(); ConcurrentDictionary ReadDictionaryAsConcurrent(); - + #endregion Extensions - Stream Stream { get; } + void SetPosition(long position); } } \ No newline at end of file diff --git a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs index 6cb2863..8735831 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs @@ -5,6 +5,7 @@ using System.IO; using System.Net; using System.Text; using ZeroLevel.Services.Extensions; +using ZeroLevel.Services.Memory; namespace ZeroLevel.Services.Serialization { @@ -14,7 +15,7 @@ namespace ZeroLevel.Services.Serialization public sealed class MemoryStreamReader : IBinaryReader { - private readonly Stream _stream; + private readonly IViewAccessor _accessor; private bool _reverseByteOrder = false; public void ReverseByteOrder(bool use_reverse_byte_order) @@ -25,43 +26,45 @@ namespace ZeroLevel.Services.Serialization /// /// End of stream /// - public bool EOS => _stream.Position >= _stream.Length; + public bool EOS => _accessor.EOV; public MemoryStreamReader(byte[] data) { if (data == null) throw new ArgumentNullException(nameof(data)); - _stream = new MemoryStream(data); + _accessor = new StreamVewAccessor(new MemoryStream(data)); } public MemoryStreamReader(Stream stream) { if (stream == null) throw new ArgumentNullException(nameof(stream)); - _stream = stream; + _accessor = new StreamVewAccessor(stream); } public MemoryStreamReader(MemoryStreamReader reader) { if (reader == null) throw new ArgumentNullException(nameof(reader)); - _stream = reader._stream; + _accessor = reader._accessor; } - public void Seek(long offset, SeekOrigin origin) + public MemoryStreamReader(IViewAccessor accessor) { - _stream.Seek(offset, origin); + if (accessor == null) + throw new ArgumentNullException(nameof(accessor)); + _accessor = accessor; } - public long Position => _stream.Position; + public long Position => _accessor.Position; + + public void SetPosition(long position) => _accessor.Seek(position); /// /// Flag reading /// public bool ReadBoolean() { - if (CheckOutOfRange(1)) - throw new OutOfMemoryException("Array index out of bounds"); return BitConverter.ToBoolean(new byte[1] { ReadByte() }, 0); } @@ -70,15 +73,12 @@ namespace ZeroLevel.Services.Serialization /// public byte ReadByte() { - if (CheckOutOfRange(1)) - throw new OutOfMemoryException("Array index out of bounds"); - return (byte)_stream.ReadByte(); + var buffer = ReadBuffer(1); + return buffer[0]; } public char ReadChar() { - if (CheckOutOfRange(2)) - throw new OutOfMemoryException("Array index out of bounds"); var buffer = ReadBuffer(2); return BitConverter.ToChar(buffer, 0); } @@ -187,13 +187,9 @@ namespace ZeroLevel.Services.Serialization /// public byte[] ReadBuffer(int count) { - if (count == 0) return null; if (CheckOutOfRange(count)) throw new OutOfMemoryException("Array index out of bounds"); - var buffer = new byte[count]; - var readedCount = _stream.Read(buffer, 0, count); - if (count != readedCount) - throw new InvalidOperationException($"The stream returned less data ({count} bytes) than expected ({readedCount} bytes)"); + var buffer = _accessor.ReadBuffer(count); if (_reverseByteOrder && count > 1) { byte b; @@ -262,7 +258,7 @@ namespace ZeroLevel.Services.Serialization /// public bool CheckOutOfRange(int offset) { - return offset < 0 || (_stream.Position + offset) > _stream.Length; + return _accessor.CheckOutOfRange(offset); } #region Extensions @@ -1137,10 +1133,7 @@ namespace ZeroLevel.Services.Serialization public void Dispose() { - _stream.Dispose(); + _accessor.Dispose(); } - - - public Stream Stream => _stream; } } \ No newline at end of file diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index 4a4d4c5..712cfbe 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -6,16 +6,16 @@ ogoun ogoun - 3.3.8.7 - Fix build configuration from commandf line args. + 3.3.8.8 + MMF for partition storage https://github.com/ogoun/Zero/wiki - Copyright Ogoun 2022 + Copyright Ogoun 2023 https://github.com/ogoun/Zero git - 3.3.8.7 - 3.3.8.7 + 3.3.8.8 + 3.3.8.8 AnyCPU;x64;x86 zero.png full