From 2bf1605bee0335a1dfcd95a35a54c7dc8263e4b4 Mon Sep 17 00:00:00 2001 From: Ogoun Date: Wed, 16 Nov 2022 05:31:35 +0300 Subject: [PATCH] PartitionStorage insert after compress NOT TESTED!!! --- PartitionFileStorageTest/Program.cs | 4 +- ZeroLevel/Services/Network/BaseSocket.cs | 1 + ZeroLevel/Services/PartitionStorage/IStore.cs | 6 +- .../PartitionStorage/IStoreOptions.cs | 22 +++ .../IStorePartitionAccessor.cs | 24 ++-- ZeroLevel/Services/PartitionStorage/Store.cs | 6 + .../StoreMergePartitionAccessor.cs | 127 ++++++++++++++++++ .../StorePartitionAccessor.cs | 76 +++++------ 8 files changed, 214 insertions(+), 52 deletions(-) create mode 100644 ZeroLevel/Services/PartitionStorage/StoreMergePartitionAccessor.cs diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index 5f6e2a3..83ff367 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -154,8 +154,8 @@ namespace PartitionFileStorageTest sw.Stop(); Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms"); sw.Restart(); - storeIncoming.CompleteStoreAndRebuild(); - storeOutcoming.CompleteStoreAndRebuild(); + storeIncoming.CompleteAddingAndCompress(); + storeOutcoming.CompleteAddingAndCompress(); sw.Stop(); Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms"); sw.Restart(); diff --git a/ZeroLevel/Services/Network/BaseSocket.cs b/ZeroLevel/Services/Network/BaseSocket.cs index f690363..b46d790 100644 --- a/ZeroLevel/Services/Network/BaseSocket.cs +++ b/ZeroLevel/Services/Network/BaseSocket.cs @@ -44,6 +44,7 @@ namespace ZeroLevel.Network /// Maximum size of data packet to transmit (serialized frame size) /// private const int DEFAULT_MAX_FRAME_PAYLOAD_SIZE = 1024 * 1024 * 32; + public readonly static int MAX_FRAME_PAYLOAD_SIZE; /// diff --git a/ZeroLevel/Services/PartitionStorage/IStore.cs b/ZeroLevel/Services/PartitionStorage/IStore.cs index ca95813..100a028 100644 --- a/ZeroLevel/Services/PartitionStorage/IStore.cs +++ b/ZeroLevel/Services/PartitionStorage/IStore.cs @@ -1,4 +1,6 @@ -using System.Threading.Tasks; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; namespace ZeroLevel.Services.PartitionStorage { @@ -13,6 +15,8 @@ namespace ZeroLevel.Services.PartitionStorage { IStorePartitionAccessor CreateAccessor(TMeta info); + IStorePartitionAccessor CreateMergeAccessor(TMeta info, Func> decompressor); + Task> Search(StoreSearchRequest searchRequest); } } diff --git a/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs b/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs index d3ea172..189a636 100644 --- a/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs +++ b/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using ZeroLevel.Services.FileSystem; namespace ZeroLevel.Services.PartitionStorage @@ -67,5 +68,26 @@ namespace ZeroLevel.Services.PartitionStorage } return path; } + + public IStoreOptions Clone() + { + var options = new IStoreOptions + { + Index = new IndexOptions + { + Enabled = this.Index.Enabled, + FileIndexCount = this.Index.FileIndexCount + }, + FilePartition = this.FilePartition, + KeyComparer = this.KeyComparer, + MaxDegreeOfParallelism = this.MaxDegreeOfParallelism, + MergeFunction = this.MergeFunction, + Partitions = this.Partitions + .Select(p => new StoreCatalogPartition(p.Name, p.PathExtractor)) + .ToList(), + RootFolder = this.RootFolder + }; + return options; + } } } diff --git a/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs index 85b2646..7eb04ed 100644 --- a/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs @@ -14,14 +14,15 @@ namespace ZeroLevel.Services.PartitionStorage { string GetCatalogPath(); /// - /// Save one record + /// Has any files /// - void Store(TKey key, TInput value); + int CountDataFiles(); /// - /// Complete the recording and perform the conversion of the records from - /// (TKey; TInput) to (TKey; TValue) + /// Remove all files /// - void CompleteStoreAndRebuild(); + void DropData(); + + #region API !only after data compression! /// /// Rebuild indexes /// @@ -36,13 +37,18 @@ namespace ZeroLevel.Services.PartitionStorage IEnumerable> Find(IEnumerable keys); IEnumerable> Iterate(); IEnumerable> IterateKeyBacket(TKey key); + #endregion + + #region API !only before data compression! /// - /// Has any files + /// Save one record /// - int CountDataFiles(); + void Store(TKey key, TInput value); /// - /// Remove all files + /// Complete the recording and perform the conversion of the records from + /// (TKey; TInput) to (TKey; TValue) /// - void DropData(); + void CompleteAddingAndCompress(); + #endregion } } diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs index 7c50be5..58aadbb 100644 --- a/ZeroLevel/Services/PartitionStorage/Store.cs +++ b/ZeroLevel/Services/PartitionStorage/Store.cs @@ -26,6 +26,12 @@ namespace ZeroLevel.Services.PartitionStorage return new StorePartitionAccessor(_options, info); } + public IStorePartitionAccessor CreateMergeAccessor(TMeta info + , Func> decompressor) + { + return new StoreMergePartitionAccessor(_options, info, decompressor); + } + public async Task> Search(StoreSearchRequest searchRequest) { var result = new StoreSearchResult(); diff --git a/ZeroLevel/Services/PartitionStorage/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/StoreMergePartitionAccessor.cs new file mode 100644 index 0000000..15dbb55 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/StoreMergePartitionAccessor.cs @@ -0,0 +1,127 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services.PartitionStorage +{ + /// + /// For writing new values in exist partition + /// + /// ORDER: Store -> CompleteAddingAndCompress -> RebuildIndex + /// + /// + public class StoreMergePartitionAccessor + : IStorePartitionAccessor + { + private readonly Func> _decompress; + /// + /// Exists compressed catalog + /// + private readonly IStorePartitionAccessor _accessor; + /// + /// Write catalog + /// + private readonly IStorePartitionAccessor _temporaryAccessor; + public StoreMergePartitionAccessor(IStoreOptions options, + TMeta info, Func> decompress) + { + if (decompress == null) throw new ArgumentNullException(nameof(decompress)); + _decompress = decompress; + _accessor = new StorePartitionAccessor(options, info); + var tempCatalog = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); + var tempOptions = options.Clone(); + tempOptions.RootFolder = tempCatalog; + _temporaryAccessor = new StorePartitionAccessor(tempOptions, info); + } + + private IEnumerable>> + IterateReadKeyInputs(string filePath) + { + if (File.Exists(filePath)) + { + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + using (var reader = new MemoryStreamReader(stream)) + { + while (reader.EOS == false) + { + var k = reader.ReadCompatible(); + var v = reader.ReadCompatible(); + var input = _decompress(v); + yield return + new StorePartitionKeyValueSearchResult> + { + Key = k, + Value = input, + Found = true + }; + } + } + } + } + public void CompleteAddingAndCompress() + { + var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath()); + + if (newFiles != null && newFiles.Length > 1) + { + var folder = _accessor.GetCatalogPath(); + var existsFiles = Directory.GetFiles(folder) + ?.ToDictionary(f => Path.GetFileName(f), f => f); + + foreach (var file in newFiles) + { + var name = Path.GetFileName(file); + // if datafile by key exists + if (existsFiles.ContainsKey(name)) + { + // append all records from existing file to new + foreach (var r in IterateReadKeyInputs(existsFiles[name])) + { + foreach (var i in r.Value) + { + _temporaryAccessor.Store(r.Key, i); + } + } + } + // compress new file + (_temporaryAccessor as StorePartitionAccessor) + .CompressFile(file); + + // replace old file by new + File.Move(file, Path.Combine(folder, name), true); + } + } + // remove temporary files + _temporaryAccessor.DropData(); + Directory.Delete(_temporaryAccessor.GetCatalogPath(), true); + } + + public StorePartitionKeyValueSearchResult Find(TKey key) + => _accessor.Find(key); + + public IEnumerable> Find(IEnumerable keys) + => _accessor.Find(keys); + public IEnumerable> Iterate() + => _accessor.Iterate(); + public IEnumerable> IterateKeyBacket(TKey key) + => _accessor.IterateKeyBacket(key); + + /// + /// Deletes only new entries. Existing entries remain unchanged. + /// + public void DropData() => _temporaryAccessor.DropData(); + public string GetCatalogPath() => _accessor.GetCatalogPath(); + public void RebuildIndex() => _accessor.RebuildIndex(); + public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value); + public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(), + _temporaryAccessor.CountDataFiles()); + + public void Dispose() + { + _accessor.Dispose(); + _temporaryAccessor.Dispose(); + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs index 5679dea..aa8db87 100644 --- a/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs @@ -30,11 +30,12 @@ namespace ZeroLevel.Services.PartitionStorage Directory.CreateDirectory(_catalog); } } - #region API - public string GetCatalogPath() - { - return _catalog; - } + + public int CountDataFiles() => Directory.GetFiles(_catalog)?.Length ?? 0; + public string GetCatalogPath() => _catalog; + public void DropData() => FSUtils.CleanAndTestFolder(_catalog); + + #region API !only after data compression! public StorePartitionKeyValueSearchResult Find(TKey key) { var fileName = _options.GetFileName(key, _info); @@ -92,39 +93,6 @@ namespace ZeroLevel.Services.PartitionStorage } } } - public void CompleteStoreAndRebuild() - { - // Close all write streams - foreach (var s in _writeStreams) - { - try - { - s.Value.Dispose(); - } - catch { } - } - var files = Directory.GetFiles(_catalog); - if (files != null && files.Length > 1) - { - Parallel.ForEach(files, file => CompressFile(file)); - } - } - public void Store(TKey key, TInput value) - { - var fileName = _options.GetFileName(key, _info); - var stream = GetWriteStream(fileName); - stream.SerializeCompatible(key); - stream.SerializeCompatible(value); - } - public int CountDataFiles() - { - var files = Directory.GetFiles(_catalog); - return files?.Length ?? 0; - } - public void DropData() - { - FSUtils.CleanAndTestFolder(_catalog); - } public IEnumerable> Iterate() { var files = Directory.GetFiles(_catalog); @@ -205,6 +173,34 @@ namespace ZeroLevel.Services.PartitionStorage } #endregion + + #region API !only before data compression! + public void Store(TKey key, TInput value) + { + var fileName = _options.GetFileName(key, _info); + var stream = GetWriteStream(fileName); + stream.SerializeCompatible(key); + stream.SerializeCompatible(value); + } + public void CompleteAddingAndCompress() + { + // Close all write streams + foreach (var s in _writeStreams) + { + try + { + s.Value.Dispose(); + } + catch { } + } + var files = Directory.GetFiles(_catalog); + if (files != null && files.Length > 1) + { + Parallel.ForEach(files, file => CompressFile(file)); + } + } + #endregion + #region Private methods private IEnumerable> Find(string fileName, TKey[] keys) @@ -282,7 +278,7 @@ namespace ZeroLevel.Services.PartitionStorage } } } - private void CompressFile(string file) + internal void CompressFile(string file) { var dict = new Dictionary>(); using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) @@ -312,7 +308,7 @@ namespace ZeroLevel.Services.PartitionStorage } File.Delete(file); File.Move(tempFile, file, true); - } + } private MemoryStreamWriter GetWriteStream(string fileName) { return _writeStreams.GetOrAdd(fileName, k =>