From fa86b82d5673947cc26f9a368a201342d09a9a30 Mon Sep 17 00:00:00 2001 From: Ogoun Date: Tue, 22 Nov 2022 05:07:48 +0300 Subject: [PATCH] Partition storage refactoring New index type --- .../PartitionFileStorageTest.csproj | 2 +- PartitionFileStorageTest/Program.cs | 76 ++------ .../PartitionStorage/FilePositionRange.cs | 2 +- .../PartitionStorage/Indexes/IndexBuilder.cs | 164 ++++++++++++++++++ .../Indexes/StorePartitionSparseIndex.cs | 4 +- .../Interfaces/IStorePartitionAccessor.cs | 35 +++- .../Interfaces/IStorePartitionBuilder.cs | 6 +- .../Interfaces/IStoreSerializer.cs | 14 ++ .../PartitionStorage/Options/IndexOptions.cs | 10 +- .../PartitionStorage/Options/StoreOptions.cs | 8 +- .../Partition/BasePartition.cs | 139 +++++++++++++++ .../Partition/StoreCatalogPartition.cs | 9 + .../Partition/StoreFilePartition.cs | 13 +- .../Partition/StoreMergePartitionAccessor.cs | 24 +-- .../Partition/StorePartitionAccessor.cs | 155 +++-------------- .../Partition/StorePartitionBuilder.cs | 156 ++--------------- .../PartitionStorage/Search/SearchResult.cs | 9 + .../StorePartitionKeyValueSearchResult.cs | 7 - ZeroLevel/Services/PartitionStorage/Store.cs | 18 +- .../StoreStandartSerializer.cs | 35 ++++ 20 files changed, 516 insertions(+), 370 deletions(-) create mode 100644 ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs create mode 100644 ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs create mode 100644 ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs create mode 100644 ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs create mode 100644 ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs diff --git a/PartitionFileStorageTest/PartitionFileStorageTest.csproj b/PartitionFileStorageTest/PartitionFileStorageTest.csproj index 575110a..f8b057f 100644 --- a/PartitionFileStorageTest/PartitionFileStorageTest.csproj +++ b/PartitionFileStorageTest/PartitionFileStorageTest.csproj @@ -15,4 +15,4 @@ - + \ No newline at end of file diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index 7193a37..3c502e0 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -27,7 +27,11 @@ namespace PartitionFileStorageTest var r = new Random(Environment.TickCount); var options = new StoreOptions { - Index = new IndexOptions { Enabled = true, FileIndexCount = 64 }, + Index = new IndexOptions + { + Enabled = true, + StepType = IndexStepType.AbsoluteCount, + StepValue = 64 }, RootFolder = root, FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), MergeFunction = list => @@ -81,7 +85,12 @@ namespace PartitionFileStorageTest var r = new Random(Environment.TickCount); var options = new StoreOptions { - Index = new IndexOptions { Enabled = true, FileIndexCount = 64 }, + Index = new IndexOptions + { + Enabled = true, + StepType = IndexStepType.Step, + StepValue = 1 + }, RootFolder = root, FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), MergeFunction = list => @@ -99,7 +108,7 @@ namespace PartitionFileStorageTest var store = new Store(options); var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); - + /*Log.Info("Fill start"); for (int i = 0; i < 10000000; i++) { @@ -141,7 +150,7 @@ namespace PartitionFileStorageTest //Console.ReadKey(); FSUtils.CleanAndTestFolder(root);*/ - + var sw = new Stopwatch(); sw.Start(); @@ -179,7 +188,7 @@ namespace PartitionFileStorageTest sw.Stop(); Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms"); sw.Restart(); - storePart.CompleteAdding(); + storePart.CompleteAdding(); storePart.Compress(); sw.Stop(); Log.Info($"Compress: {sw.ElapsedMilliseconds}ms"); @@ -309,62 +318,5 @@ namespace PartitionFileStorageTest //TestRangeCompressionAndInversion(); Console.ReadKey(); } - - private static void TestRangeCompressionAndInversion() - { - var list = new List(); - list.Add(new FilePositionRange { Start = 5, End = 12 }); - list.Add(new FilePositionRange { Start = 16, End = 21 }); - RangeCompression(list); - foreach (var r in list) - { - Console.WriteLine($"{r.Start}: {r.End}"); - } - Console.WriteLine("Invert ranges"); - var inverted = RangeInversion(list, 21); - foreach (var r in inverted) - { - Console.WriteLine($"{r.Start}: {r.End}"); - } - } - - private static void RangeCompression(List ranges) - { - for (var i = 0; i < ranges.Count - 1; i++) - { - var current = ranges[i]; - var next = ranges[i + 1]; - if (current.End == next.Start) - { - current.End = next.End; - ranges.RemoveAt(i + 1); - i--; - } - } - } - - private static List RangeInversion(List ranges, long length) - { - if ((ranges?.Count ?? 0) == 0) return new List { new FilePositionRange { Start = 0, End = length } }; - var inverted = new List(); - var current = new FilePositionRange { Start = 0, End = ranges[0].Start }; - for (var i = 0; i < ranges.Count; i++) - { - current.End = ranges[i].Start; - if (current.Start != current.End) - { - inverted.Add(new FilePositionRange { Start = current.Start, End = current.End }); - } - current.Start = ranges[i].End; - } - if (current.End != length) - { - if (current.Start != length) - { - inverted.Add(new FilePositionRange { Start = current.Start, End = length }); - } - } - return inverted; - } } } \ No newline at end of file diff --git a/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs b/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs index 77e6493..03c3f6e 100644 --- a/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs +++ b/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs @@ -1,6 +1,6 @@ namespace ZeroLevel.Services.PartitionStorage { - public class FilePositionRange + internal sealed class FilePositionRange { public long Start; public long End; diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs new file mode 100644 index 0000000..6930764 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs @@ -0,0 +1,164 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services.PartitionStorage +{ + /// + /// Responsible for building index files + /// + internal sealed class IndexBuilder + { + private const string INDEX_SUBFOLDER_NAME = "__indexes__"; + private readonly IndexStepType _indexType; + private readonly string _indexCatalog; + private readonly string _dataCatalog; + private readonly int _stepValue; + private readonly Func _keyDeserializer; + private readonly Func _valueDeserializer; + public IndexBuilder(IndexStepType indexType, int stepValue, string dataCatalog) + { + _dataCatalog = dataCatalog; + _indexCatalog = Path.Combine(dataCatalog, INDEX_SUBFOLDER_NAME); + _indexType = indexType; + _stepValue = stepValue; + _keyDeserializer = MessageSerializer.GetDeserializer(); + _valueDeserializer = MessageSerializer.GetDeserializer(); + } + /// + /// Rebuild indexes for all files + /// + internal void RebuildIndex() + { + FSUtils.CleanAndTestFolder(_indexCatalog); + var files = Directory.GetFiles(_dataCatalog); + if (files != null && files.Length > 0) + { + foreach (var file in files) + { + RebuildFileIndex(file); + } + } + } + /// + /// Rebuild index for the specified file + /// + internal void RebuildFileIndex(string file) + { + if (_indexType == IndexStepType.AbsoluteCount) + { + RebuildFileIndexWithAbsoluteCountIndexes(file); + } + else + { + RebuildFileIndexWithSteps(file); + } + } + /// + /// Delete the index for the specified file + /// + internal void DropFileIndex(string file) + { + var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); + if (File.Exists(index_file)) + { + File.Delete(index_file); + } + } + /// + /// Rebuild index with specified number of steps for specified file + /// + private void RebuildFileIndexWithAbsoluteCountIndexes(string file) + { + if (false == Directory.Exists(_indexCatalog)) + { + Directory.CreateDirectory(_indexCatalog); + } + var dict = new Dictionary(); + if (TryGetReadStream(file, out var reader)) + { + using (reader) + { + while (reader.EOS == false) + { + var pos = reader.Position; + var k = _keyDeserializer.Invoke(reader); + dict[k] = pos; + _valueDeserializer.Invoke(reader); + } + } + } + if (dict.Count > _stepValue) + { + var step = (int)Math.Round(((float)dict.Count / (float)_stepValue), MidpointRounding.ToZero); + var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); + var d_arr = dict.OrderBy(p => p.Key).ToArray(); + using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) + { + for (int i = 0; i < _stepValue; i++) + { + var pair = d_arr[i * step]; + writer.WriteCompatible(pair.Key); + writer.WriteLong(pair.Value); + } + } + } + } + /// + /// Rebuild index with specified step for keys + /// + private void RebuildFileIndexWithSteps(string file) + { + if (false == Directory.Exists(_indexCatalog)) + { + Directory.CreateDirectory(_indexCatalog); + } + if (TryGetReadStream(file, out var reader)) + { + using (reader) + { + var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); + using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) + { + var counter = _stepValue; + while (reader.EOS == false) + { + counter--; + var pos = reader.Position; + var k = _keyDeserializer.Invoke(reader); + _valueDeserializer.Invoke(reader); + if (counter == 0) + { + writer.WriteCompatible(k); + writer.WriteLong(pos); + counter = _stepValue; + } + } + } + } + } + } + /// + /// Attempting to open a file for reading + /// + private bool TryGetReadStream(string fileName, out MemoryStreamReader reader) + { + try + { + var filePath = Path.Combine(_dataCatalog, fileName); + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + reader = new MemoryStreamReader(stream); + return true; + } + catch (Exception ex) + { + Log.SystemError(ex, "[StorePartitionAccessor.TryGetReadStream]"); + } + reader = null; + return false; + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs index 55ee461..a52091a 100644 --- a/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs @@ -5,7 +5,7 @@ using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage { - internal class StorePartitionSparseIndex + internal sealed class StorePartitionSparseIndex : IStorePartitionIndex { private readonly Dictionary[]> _indexCachee @@ -101,7 +101,7 @@ namespace ZeroLevel.Services.PartitionStorage private KeyIndex[] GetFileIndex(TKey key) { - var indexName = _filePartition.PathExtractor.Invoke(key, _meta); + var indexName = _filePartition.FileNameExtractor.Invoke(key, _meta); if (_indexCachee.TryGetValue(indexName, out var index)) return index; var file = Path.Combine(_indexFolder, indexName); diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs index 3865a90..76cc7d1 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs @@ -12,23 +12,48 @@ namespace ZeroLevel.Services.PartitionStorage : IStorePartitionBase { /// - /// Rebuild indexes + /// Rebuilds indexes for data in a partition /// void RebuildIndex(); /// - /// Find in catalog partition by key + /// Search in a partition for a specified key /// StorePartitionKeyValueSearchResult Find(TKey key); /// - /// Find in catalog partition by keys + /// Search in a partition for a specified keys /// IEnumerable> Find(IEnumerable keys); + /// + /// Iterating over all recorded data + /// IEnumerable> Iterate(); + /// + /// Iterating over all recorded data of the file with the specified key + /// IEnumerable> IterateKeyBacket(TKey key); - + /// + /// Deleting the specified key and associated data + /// + /// Key + /// true - automatically rebuild the index of the file from which data was deleted (default = false) void RemoveKey(TKey key, bool autoReindex = false); + /// + /// Deleting the specified keys and associated data + /// + /// Keys + /// true - automatically rebuild the index of the file from which data was deleted (default = true) void RemoveKeys(IEnumerable keys, bool autoReindex = true); - void RemoveAllExceptKey(TKey key, bool autoReindex = false); + /// + /// Delete all keys with data except the specified key + /// + /// Key + /// true - automatically rebuild the index of the file from which data was deleted (default = true) + void RemoveAllExceptKey(TKey key, bool autoReindex = true); + /// + /// Delete all keys with data other than the specified ones + /// + /// Keys + /// true - automatically rebuild the index of the file from which data was deleted (default = true) void RemoveAllExceptKeys(IEnumerable keys, bool autoReindex = true); } } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs index 57a9a8a..2310a23 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs @@ -9,7 +9,7 @@ namespace ZeroLevel.Services.PartitionStorage /// Type of one input value /// Type of records aggregate public interface IStorePartitionBuilder - : IStorePartitionBase + : IStorePartitionBase { IEnumerable> Iterate(); /// @@ -21,11 +21,11 @@ namespace ZeroLevel.Services.PartitionStorage /// void CompleteAdding(); /// - /// Perform the conversion of the records from (TKey; TInput) to (TKey; TValue). Called after CompleteAdding + /// Performs compression/grouping of recorded data in a partition /// void Compress(); /// - /// Rebuilds index files. Only for compressed data. + /// Rebuilds indexes for data in a partition /// void RebuildIndex(); } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs new file mode 100644 index 0000000..453092d --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs @@ -0,0 +1,14 @@ +using System; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services.PartitionStorage.Interfaces +{ + public interface IStoreSerializer + { + Action KeySerializer { get; } + Action InputSerializer { get; } + Func KeyDeserializer { get; } + Func InputDeserializer { get; } + Func ValueDeserializer { get; } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs index e1830fc..300ba51 100644 --- a/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs +++ b/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs @@ -1,8 +1,16 @@ namespace ZeroLevel.Services.PartitionStorage { + public enum IndexStepType + { + AbsoluteCount, + Step + } + public class IndexOptions { public bool Enabled { get; set; } - public int FileIndexCount { get; set; } = 64; + + public IndexStepType StepType { get; set; } = IndexStepType.AbsoluteCount; + public int StepValue { get; set; } = 64; } } diff --git a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs index 9311230..88afd78 100644 --- a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs +++ b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs @@ -44,12 +44,13 @@ namespace ZeroLevel.Services.PartitionStorage public IndexOptions Index { get; set; } = new IndexOptions { Enabled = false, - FileIndexCount = 64 + StepValue = 64, + StepType = IndexStepType.AbsoluteCount }; internal string GetFileName(TKey key, TMeta info) { - return FilePartition.PathExtractor(key, info); + return FilePartition.FileNameExtractor(key, info); } internal string GetCatalogPath(TMeta info) { @@ -74,7 +75,8 @@ namespace ZeroLevel.Services.PartitionStorage Index = new IndexOptions { Enabled = this.Index.Enabled, - FileIndexCount = this.Index.FileIndexCount + StepValue = 64, + StepType = IndexStepType.AbsoluteCount }, FilePartition = this.FilePartition, KeyComparer = this.KeyComparer, diff --git a/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs new file mode 100644 index 0000000..ae91785 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs @@ -0,0 +1,139 @@ +using System.IO; +using System; +using ZeroLevel.Services.Serialization; +using System.Collections.Concurrent; +using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.PartitionStorage.Interfaces; + +namespace ZeroLevel.Services.PartitionStorage.Partition +{ + /// + /// General operations with a partition + /// + internal abstract class BasePartition + : IStorePartitionBase + { + public string Catalog { get { return _catalog; } } + + protected readonly TMeta _info; + protected readonly string _catalog; + protected IStoreSerializer Serializer { get; } + protected readonly StoreOptions _options; + + private readonly IndexBuilder _indexBuilder; + private readonly ConcurrentDictionary _writeStreams = new ConcurrentDictionary(); + + internal BasePartition(StoreOptions options, + TMeta info, + IStoreSerializer serializer) + { + _options = options; + _info = info; + _catalog = _options.GetCatalogPath(info); + if (Directory.Exists(_catalog) == false) + { + Directory.CreateDirectory(_catalog); + } + _indexBuilder = _options.Index.Enabled ? new IndexBuilder(_options.Index.StepType, _options.Index.StepValue, _catalog) : null; + Serializer = serializer; + } + + #region IStorePartitionBase + public int CountDataFiles() => Directory.Exists(_catalog) ? (Directory.GetFiles(_catalog)?.Length ?? 0) : 0; + public string GetCatalogPath() => _catalog; + public void DropData() => FSUtils.CleanAndTestFolder(_catalog); + public void Dispose() + { + CloseWriteStreams(); + } + #endregion + + /// + /// Rebuild indexes for all files + /// + protected void RebuildIndexes() + { + if (_options.Index.Enabled) + { + _indexBuilder.RebuildIndex(); + } + } + /// + /// Rebuild index for the specified file + /// + internal void RebuildFileIndex(string file) + { + if (_options.Index.Enabled) + { + _indexBuilder.RebuildFileIndex(file); + } + } + /// + /// Delete the index for the specified file + /// + internal void DropFileIndex(string file) + { + if (_options.Index.Enabled) + { + _indexBuilder.DropFileIndex(file); + } + } + /// + /// Close all streams for writing + /// + protected void CloseWriteStreams() + { + foreach (var s in _writeStreams) + { + try + { + s.Value.Stream.Flush(); + s.Value.Dispose(); + } + catch { } + } + _writeStreams.Clear(); + } + /// + /// Attempting to open a file for writing + /// + protected bool TryGetWriteStream(string fileName, out MemoryStreamWriter writer) + { + try + { + writer = _writeStreams.GetOrAdd(fileName, k => + { + var filePath = Path.Combine(_catalog, k); + var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); + return new MemoryStreamWriter(stream); + }); + return true; + } + catch (Exception ex) + { + Log.SystemError(ex, "[StorePartitionBuilder.TryGetWriteStream]"); + } + writer = null; + return false; + } + /// + /// Attempting to open a file for reading + /// + protected bool TryGetReadStream(string fileName, out MemoryStreamReader reader) + { + try + { + var filePath = Path.Combine(_catalog, fileName); + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + reader = new MemoryStreamReader(stream); + return true; + } + catch (Exception ex) + { + Log.SystemError(ex, "[StorePartitionBuilder.TryGetReadStream]"); + } + reader = null; + return false; + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreCatalogPartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreCatalogPartition.cs index 8db142a..e5c99a3 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StoreCatalogPartition.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreCatalogPartition.cs @@ -2,9 +2,18 @@ namespace ZeroLevel.Services.PartitionStorage { + /// + /// Partition, contains the method of forming the path + /// public class StoreCatalogPartition { + /// + /// Name of partition, just for info + /// public string Name { get; } + /// + /// Path generator + /// public Func PathExtractor { get; } public StoreCatalogPartition(string name, Func pathExtractor) diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreFilePartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreFilePartition.cs index 8cd8384..4eda3c4 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StoreFilePartition.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreFilePartition.cs @@ -2,15 +2,24 @@ namespace ZeroLevel.Services.PartitionStorage { + /// + /// File partition, contains the method of forming the path + /// public class StoreFilePartition { + /// + /// Name of partition, just for info + /// public string Name { get; } - public Func PathExtractor { get; } + /// + /// File name generator + /// + public Func FileNameExtractor { get; } public StoreFilePartition(string name, Func pathExtractor) { Name = name; - PathExtractor = pathExtractor; + FileNameExtractor = pathExtractor; } } } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs index ae80c44..5ccdb03 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs @@ -3,17 +3,15 @@ using System.Collections.Generic; using System.IO; using System.Linq; using ZeroLevel.Services.PartitionStorage.Interfaces; +using ZeroLevel.Services.PartitionStorage.Partition; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage { /// - /// For writing new values in exist partition - /// - /// ORDER: Store -> CompleteAddingAndCompress -> RebuildIndex - /// + /// Performs merging of new data with existing data in the partition /// - public class StoreMergePartitionAccessor + internal sealed class StoreMergePartitionAccessor : IStorePartitionMergeBuilder { private readonly Func> _decompress; @@ -30,16 +28,19 @@ namespace ZeroLevel.Services.PartitionStorage /// Write catalog /// private readonly IStorePartitionBuilder _temporaryAccessor; + public StoreMergePartitionAccessor(StoreOptions options, - TMeta info, Func> decompress) + TMeta info, + Func> decompress, + IStoreSerializer serializer) { if (decompress == null) throw new ArgumentNullException(nameof(decompress)); _decompress = decompress; - _accessor = new StorePartitionAccessor(options, info); + _accessor = new StorePartitionAccessor(options, info, serializer); _temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); var tempOptions = options.Clone(); tempOptions.RootFolder = _temporaryFolder; - _temporaryAccessor = new StorePartitionBuilder(tempOptions, info); + _temporaryAccessor = new StorePartitionBuilder(tempOptions, info, serializer); _keyDeserializer = MessageSerializer.GetDeserializer(); _valueDeserializer = MessageSerializer.GetDeserializer(); @@ -54,6 +55,10 @@ namespace ZeroLevel.Services.PartitionStorage public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value); public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(), _temporaryAccessor.CountDataFiles()); + + /// + /// Performs compression/grouping of recorded data in a partition + /// public void Compress() { var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath()); @@ -102,8 +107,7 @@ namespace ZeroLevel.Services.PartitionStorage File.Move(file, Path.Combine(folder, name), true); // 3. Rebuil index - (_accessor as StorePartitionAccessor) - .RebuildFileIndex(file); + (_accessor as BasePartition).RebuildFileIndex(name); } } // remove temporary files diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs index 06b2d27..863cf1a 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs @@ -2,41 +2,23 @@ using System.Collections.Generic; using System.IO; using System.Linq; -using ZeroLevel.Services.FileSystem; -using ZeroLevel.Services.Serialization; +using ZeroLevel.Services.PartitionStorage.Interfaces; +using ZeroLevel.Services.PartitionStorage.Partition; namespace ZeroLevel.Services.PartitionStorage { - public class StorePartitionAccessor - : IStorePartitionAccessor + internal sealed class StorePartitionAccessor + : BasePartition, IStorePartitionAccessor { - private readonly StoreOptions _options; - private readonly string _catalog; - private readonly string _indexCatalog; - private readonly TMeta _info; - - private readonly Func _keyDeserializer; - private readonly Func _valueDeserializer; - - public string Catalog { get { return _catalog; } } - public StorePartitionAccessor(StoreOptions options, TMeta info) + public StorePartitionAccessor(StoreOptions options, + TMeta info, + IStoreSerializer serializer) + : base(options, info, serializer) { if (options == null) throw new ArgumentNullException(nameof(options)); - _info = info; - _options = options; - _catalog = _options.GetCatalogPath(info); - if (_options.Index.Enabled) - { - _indexCatalog = Path.Combine(_catalog, "__indexes__"); - } - _keyDeserializer = MessageSerializer.GetDeserializer(); - _valueDeserializer = MessageSerializer.GetDeserializer(); } - #region API methods - public int CountDataFiles() => Directory.Exists(_catalog) ? (Directory.GetFiles(_catalog)?.Length ?? 0) : 0; - public string GetCatalogPath() => _catalog; - public void DropData() => FSUtils.CleanAndTestFolder(_catalog); + #region IStorePartitionAccessor public StorePartitionKeyValueSearchResult Find(TKey key) { var fileName = _options.GetFileName(key, _info); @@ -59,8 +41,8 @@ namespace ZeroLevel.Services.PartitionStorage } while (reader.EOS == false) { - var k = _keyDeserializer.Invoke(reader); - var v = _valueDeserializer.Invoke(reader); + var k = Serializer.KeyDeserializer.Invoke(reader); + var v = Serializer.ValueDeserializer.Invoke(reader); var c = _options.KeyComparer(key, k); if (c == 0) return new StorePartitionKeyValueSearchResult { @@ -119,8 +101,8 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - var k = _keyDeserializer.Invoke(reader); - var v = _valueDeserializer.Invoke(reader); + var k = Serializer.KeyDeserializer.Invoke(reader); + var v = Serializer.ValueDeserializer.Invoke(reader); yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Status = SearchResult.Success }; } } @@ -139,30 +121,16 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - var k = _keyDeserializer.Invoke(reader); - var v = _valueDeserializer.Invoke(reader); + var k = Serializer.KeyDeserializer.Invoke(reader); + var v = Serializer.ValueDeserializer.Invoke(reader); yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Status = SearchResult.Success }; } } } } } - public void RebuildIndex() - { - if (_options.Index.Enabled) - { - FSUtils.CleanAndTestFolder(_indexCatalog); - var files = Directory.GetFiles(_catalog); - if (files != null && files.Length > 0) - { - foreach (var file in files) - { - RebuildFileIndex(file); - } - } - } - } - public void RemoveAllExceptKey(TKey key, bool autoReindex = false) + public void RebuildIndex() => RebuildIndexes(); + public void RemoveAllExceptKey(TKey key, bool autoReindex = true) { RemoveAllExceptKeys(new[] { key }, autoReindex); } @@ -194,59 +162,6 @@ namespace ZeroLevel.Services.PartitionStorage } #endregion - #region Internal methods - internal void DropFileIndex(string file) - { - if (_options.Index.Enabled) - { - var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); - if (File.Exists(index_file)) - { - File.Delete(index_file); - } - } - } - internal void RebuildFileIndex(string file) - { - if (_options.Index.Enabled) - { - if (false == Directory.Exists(_indexCatalog)) - { - Directory.CreateDirectory(_indexCatalog); - } - var dict = new Dictionary(); - if (TryGetReadStream(file, out var reader)) - { - using (reader) - { - while (reader.EOS == false) - { - var pos = reader.Position; - var k = _keyDeserializer.Invoke(reader); - dict[k] = pos; - _valueDeserializer.Invoke(reader); - } - } - } - if (dict.Count > _options.Index.FileIndexCount * 8) - { - var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero); - var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); - var d_arr = dict.OrderBy(p => p.Key).ToArray(); - using (var writer = new MemoryStreamWriter( - new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) - { - for (int i = 0; i < _options.Index.FileIndexCount; i++) - { - var pair = d_arr[i * step]; - writer.WriteCompatible(pair.Key); - writer.WriteLong(pair.Value); - } - } - } - } - } - #endregion #region Private methods private IEnumerable> Find(string fileName, @@ -269,8 +184,8 @@ namespace ZeroLevel.Services.PartitionStorage reader.Seek(off.Offset, SeekOrigin.Begin); while (reader.EOS == false) { - var k = _keyDeserializer.Invoke(reader); - var v = _valueDeserializer.Invoke(reader); + var k = Serializer.KeyDeserializer.Invoke(reader); + var v = Serializer.ValueDeserializer.Invoke(reader); var c = _options.KeyComparer(searchKey, k); if (c == 0) { @@ -301,8 +216,8 @@ namespace ZeroLevel.Services.PartitionStorage var keys_arr = keys.OrderBy(k => k).ToArray(); while (reader.EOS == false && index < keys_arr.Length) { - var k = _keyDeserializer.Invoke(reader); - var v = _valueDeserializer.Invoke(reader); + var k = Serializer.KeyDeserializer.Invoke(reader); + var v = Serializer.ValueDeserializer.Invoke(reader); var c = _options.KeyComparer(keys_arr[index], k); if (c == 0) { @@ -355,8 +270,8 @@ namespace ZeroLevel.Services.PartitionStorage while (reader.EOS == false) { var startPosition = reader.Position; - var k = _keyDeserializer.Invoke(reader); - _valueDeserializer.Invoke(reader); + var k = Serializer.KeyDeserializer.Invoke(reader); + Serializer.ValueDeserializer.Invoke(reader); var endPosition = reader.Position; var c = _options.KeyComparer(searchKey, k); if (c == 0) @@ -383,8 +298,8 @@ namespace ZeroLevel.Services.PartitionStorage while (reader.EOS == false && index < keys_arr.Length) { var startPosition = reader.Position; - var k = _keyDeserializer.Invoke(reader); - _valueDeserializer.Invoke(reader); + var k = Serializer.KeyDeserializer.Invoke(reader); + Serializer.ValueDeserializer.Invoke(reader); var endPosition = reader.Position; var c = _options.KeyComparer(keys_arr[index], k); if (c == 0) @@ -447,22 +362,6 @@ namespace ZeroLevel.Services.PartitionStorage } } - private bool TryGetReadStream(string fileName, out MemoryStreamReader reader) - { - try - { - var filePath = Path.Combine(_catalog, fileName); - var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); - reader = new MemoryStreamReader(stream); - return true; - } - catch (Exception ex) - { - Log.SystemError(ex, "[StorePartitionAccessor.TryGetReadStream]"); - } - reader = null; - return false; - } #endregion #region Static @@ -514,9 +413,5 @@ namespace ZeroLevel.Services.PartitionStorage target.Write(buffer, 0, buffer.Length); } #endregion - - public void Dispose() - { - } } } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index 16c1876..2b477b8 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -1,77 +1,40 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; -using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.PartitionStorage.Interfaces; +using ZeroLevel.Services.PartitionStorage.Partition; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage { - public class StorePartitionBuilder - : IStorePartitionBuilder + internal sealed class StorePartitionBuilder + : BasePartition, IStorePartitionBuilder { - private readonly ConcurrentDictionary _writeStreams - = new ConcurrentDictionary(); - - private readonly StoreOptions _options; - private readonly string _catalog; - private readonly TMeta _info; - private readonly Action _keySerializer; - private readonly Action _inputSerializer; - - private readonly Func _keyDeserializer; - private readonly Func _inputDeserializer; - private readonly Func _valueDeserializer; - public string Catalog { get { return _catalog; } } - public StorePartitionBuilder(StoreOptions options, TMeta info) + public StorePartitionBuilder(StoreOptions options, + TMeta info, + IStoreSerializer serializer) + : base(options, info, serializer) { if (options == null) throw new ArgumentNullException(nameof(options)); - _info = info; - _options = options; - _catalog = _options.GetCatalogPath(info); - if (Directory.Exists(_catalog) == false) - { - Directory.CreateDirectory(_catalog); - } - - _keySerializer = MessageSerializer.GetSerializer(); - _inputSerializer = MessageSerializer.GetSerializer(); - - _keyDeserializer = MessageSerializer.GetDeserializer(); - _inputDeserializer = MessageSerializer.GetDeserializer(); - _valueDeserializer = MessageSerializer.GetDeserializer(); } - #region API methods - public int CountDataFiles() => Directory.Exists(_catalog) ? (Directory.GetFiles(_catalog)?.Length ?? 0) : 0; - public string GetCatalogPath() => _catalog; - public void DropData() => FSUtils.CleanAndTestFolder(_catalog); + #region IStorePartitionBuilder public void Store(TKey key, TInput value) { var fileName = _options.GetFileName(key, _info); if (TryGetWriteStream(fileName, out var stream)) { - _keySerializer.Invoke(stream, key); + Serializer.KeySerializer.Invoke(stream, key); Thread.MemoryBarrier(); - _inputSerializer.Invoke(stream, value); + Serializer.InputSerializer.Invoke(stream, value); } } public void CompleteAdding() { - // Close all write streams - foreach (var s in _writeStreams) - { - try - { - s.Value.Stream.Flush(); - s.Value.Dispose(); - } - catch { } - } - _writeStreams.Clear(); + CloseWriteStreams(); } public void Compress() { @@ -94,8 +57,8 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - var key = _keyDeserializer.Invoke(reader); - var val = _inputDeserializer.Invoke(reader); + var key = Serializer.KeyDeserializer.Invoke(reader); + var val = Serializer.InputDeserializer.Invoke(reader); yield return new StorePartitionKeyValueSearchResult { Key = key, Value = val, Status = SearchResult.Success }; } } @@ -103,53 +66,7 @@ namespace ZeroLevel.Services.PartitionStorage } } } - public void RebuildIndex() - { - if (_options.Index.Enabled) - { - var indexFolder = Path.Combine(_catalog, "__indexes__"); - FSUtils.CleanAndTestFolder(indexFolder); - var files = Directory.GetFiles(_catalog); - if (files != null && files.Length > 0) - { - var dict = new Dictionary(); - foreach (var file in files) - { - if (TryGetReadStream(file, out var reader)) - { - using (reader) - { - while (reader.EOS == false) - { - var pos = reader.Stream.Position; - var key = _keyDeserializer.Invoke(reader); - dict[key] = pos; - if (reader.EOS) break; - _valueDeserializer.Invoke(reader); - } - } - if (dict.Count > _options.Index.FileIndexCount * 8) - { - var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero); - var index_file = Path.Combine(indexFolder, Path.GetFileName(file)); - var d_arr = dict.OrderBy(p => p.Key).ToArray(); - using (var writer = new MemoryStreamWriter( - new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) - { - for (int i = 0; i < _options.Index.FileIndexCount; i++) - { - var pair = d_arr[i * step]; - writer.WriteCompatible(pair.Key); - writer.WriteLong(pair.Value); - } - } - } - } - dict.Clear(); - } - } - } - } + public void RebuildIndex() => RebuildIndexes(); #endregion #region Private methods @@ -160,7 +77,7 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - var key = _keyDeserializer.Invoke(reader); + var key = Serializer.KeyDeserializer.Invoke(reader); if (false == dict.ContainsKey(key)) { dict[key] = new HashSet(); @@ -169,7 +86,7 @@ namespace ZeroLevel.Services.PartitionStorage { break; } - var input = _inputDeserializer.Invoke(reader); + var input = Serializer.InputDeserializer.Invoke(reader); dict[key].Add(input); } } @@ -188,45 +105,6 @@ namespace ZeroLevel.Services.PartitionStorage File.Delete(file); File.Move(tempFile, file, true); } - private bool TryGetWriteStream(string fileName, out MemoryStreamWriter writer) - { - try - { - writer = _writeStreams.GetOrAdd(fileName, k => - { - var filePath = Path.Combine(_catalog, k); - var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); - return new MemoryStreamWriter(stream); - }); - return true; - } - catch (Exception ex) - { - Log.SystemError(ex, "[StorePartitionBuilder.TryGetWriteStream]"); - } - writer = null; - return false; - } - private bool TryGetReadStream(string fileName, out MemoryStreamReader reader) - { - try - { - var filePath = Path.Combine(_catalog, fileName); - var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); - reader = new MemoryStreamReader(stream); - return true; - } - catch (Exception ex) - { - Log.SystemError(ex, "[StorePartitionBuilder.TryGetReadStream]"); - } - reader = null; - return false; - } #endregion - - public void Dispose() - { - } } } diff --git a/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs new file mode 100644 index 0000000..2dbc840 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs @@ -0,0 +1,9 @@ +namespace ZeroLevel.Services.PartitionStorage +{ + public enum SearchResult + { + Success, + NotFound, + FileLocked + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs index 5af1dc0..61ba3de 100644 --- a/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs +++ b/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs @@ -1,12 +1,5 @@ namespace ZeroLevel.Services.PartitionStorage { - public enum SearchResult - { - Success, - NotFound, - FileLocked - } - public class StorePartitionKeyValueSearchResult { public SearchResult Status { get; set; } diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs index 2246da3..ce7c1ba 100644 --- a/ZeroLevel/Services/PartitionStorage/Store.cs +++ b/ZeroLevel/Services/PartitionStorage/Store.cs @@ -13,10 +13,20 @@ namespace ZeroLevel.Services.PartitionStorage IStore { private readonly StoreOptions _options; - public Store(StoreOptions options) + private readonly IStoreSerializer _serializer; + public Store(StoreOptions options, + IStoreSerializer serializer = null) { if (options == null) throw new ArgumentNullException(nameof(options)); _options = options; + if (serializer == null) + { + _serializer = new StoreStandartSerializer(); + } + else + { + _serializer = serializer; + } if (Directory.Exists(_options.RootFolder) == false) { Directory.CreateDirectory(_options.RootFolder); @@ -32,17 +42,17 @@ namespace ZeroLevel.Services.PartitionStorage public IStorePartitionAccessor CreateAccessor(TMeta info) { - return new StorePartitionAccessor(_options, info); + return new StorePartitionAccessor(_options, info, _serializer); } public IStorePartitionBuilder CreateBuilder(TMeta info) { - return new StorePartitionBuilder(_options, info); + return new StorePartitionBuilder(_options, info, _serializer); } public IStorePartitionMergeBuilder CreateMergeAccessor(TMeta info, Func> decompressor) { - return new StoreMergePartitionAccessor(_options, info, decompressor); + return new StoreMergePartitionAccessor(_options, info, decompressor, _serializer); } public async Task> Search(StoreSearchRequest searchRequest) diff --git a/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs b/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs new file mode 100644 index 0000000..7f72869 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs @@ -0,0 +1,35 @@ +using System; +using ZeroLevel.Services.PartitionStorage.Interfaces; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services.PartitionStorage +{ + internal sealed class StoreStandartSerializer + : IStoreSerializer + { + private readonly Action _keySerializer; + private readonly Action _inputSerializer; + private readonly Func _keyDeserializer; + private readonly Func _inputDeserializer; + private readonly Func _valueDeserializer; + + public StoreStandartSerializer() + { + _keySerializer = MessageSerializer.GetSerializer(); + _inputSerializer = MessageSerializer.GetSerializer(); + _keyDeserializer = MessageSerializer.GetDeserializer(); + _inputDeserializer = MessageSerializer.GetDeserializer(); + _valueDeserializer = MessageSerializer.GetDeserializer(); + } + + public Action KeySerializer => _keySerializer; + + public Action InputSerializer => _inputSerializer; + + public Func KeyDeserializer => _keyDeserializer; + + public Func InputDeserializer => _inputDeserializer; + + public Func ValueDeserializer => _valueDeserializer; + } +}