From 8a897cf29c4486c78fb0594b29680ca85c372dd2 Mon Sep 17 00:00:00 2001 From: Ogoun Date: Sat, 19 Nov 2022 05:41:17 +0300 Subject: [PATCH] PartitionStorage refactoring --- PartitionFileStorageTest/Program.cs | 313 ++++++++---------- .../Indexes/StorePartitionSparseIndex.cs | 4 +- .../PartitionStorage/Interfaces/IStore.cs | 30 +- .../Interfaces/IStoreCache.cs | 12 - .../Interfaces/IStorePartitionAccessor.cs | 8 +- .../Interfaces/IStorePartitionBuilder.cs | 21 +- .../Interfaces/IStorePartitionIndex.cs | 6 + .../Interfaces/IStorePartitionMergeBuilder.cs | 21 ++ .../Partition/StoreMergePartitionAccessor.cs | 90 +++-- .../Partition/StorePartitionAccessor.cs | 205 ++++++------ .../Partition/StorePartitionBuilder.cs | 97 ++++-- ZeroLevel/Services/PartitionStorage/Store.cs | 3 +- .../Serialization/MemoryStreamReader.cs | 7 + .../Serialization/MessageSerializer.cs | 25 ++ 14 files changed, 466 insertions(+), 376 deletions(-) delete mode 100644 ZeroLevel/Services/PartitionStorage/Interfaces/IStoreCache.cs create mode 100644 ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionMergeBuilder.cs diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index dcf7337..31b51d8 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -1,5 +1,7 @@ using System.Diagnostics; using System.Text; +using ZeroLevel; +using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.PartitionStorage; namespace PartitionFileStorageTest @@ -20,68 +22,7 @@ namespace PartitionFileStorageTest return ulong.Parse(num.ToString()); } - private static void BuildStore(string root) - { - var options = new StoreOptions - { - Index = new IndexOptions { Enabled = true, FileIndexCount = 64 }, - RootFolder = root, - FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), - MergeFunction = list => - { - ulong s = 0; - return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); - }, - Partitions = new List> - { - new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) - }, - KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, - }; - var store = new Store(options); - var storePart1 = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); - var storePart2 = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 09) }); - - var sw = new Stopwatch(); - sw.Start(); - - var r = new Random(Environment.TickCount); - for (int i = 0; i < 1000000; i++) - { - var s = Generate(r); - var count = r.Next(300); - for (int j = 0; j < count; j++) - { - var t = Generate(r); - storePart1.Store(s, t); - } - } - for (int i = 0; i < 1000000; i++) - { - var s = Generate(r); - var count = r.Next(300); - for (int j = 0; j < count; j++) - { - var t = Generate(r); - storePart2.Store(s, t); - } - } - - sw.Stop(); - Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms"); - sw.Restart(); - storePart1.CompleteAddingAndCompress(); - storePart2.CompleteAddingAndCompress(); - sw.Stop(); - Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms"); - sw.Restart(); - storePart1.RebuildIndex(); - storePart2.RebuildIndex(); - sw.Stop(); - Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); - } - - private static void SmallFullTest(string root) + private static void FastTest(string root) { var r = new Random(Environment.TickCount); var options = new StoreOptions @@ -119,7 +60,8 @@ namespace PartitionFileStorageTest storePart.Store(c3, Generate(r)); storePart.Store(c3, Generate(r)); storePart.Store(c3, Generate(r)); - storePart.CompleteAddingAndCompress(); + storePart.CompleteAdding(); + storePart.Compress(); var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); Console.WriteLine("Data:"); foreach (var e in readPart.Iterate()) @@ -134,7 +76,7 @@ namespace PartitionFileStorageTest } } - private static void TestBuildRemoveStore(string root) + private static void FullStoreTest(string root) { var r = new Random(Environment.TickCount); var options = new StoreOptions @@ -153,8 +95,53 @@ namespace PartitionFileStorageTest }, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, }; + FSUtils.CleanAndTestFolder(root); + var store = new Store(options); var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); + + Log.Info("Fill start"); + for (int i = 0; i < 30000000; i++) + { + var s = Generate(r); + var count = r.Next(200); + for (int j = 0; j < count; j++) + { + var t = Generate(r); + storePart.Store(s, t); + } + } + storePart.CompleteAdding(); + Log.Info("Fill complete"); + + long cnt = 0; + foreach (var p in storePart.Iterate()) + { + if (p.Key % 2 == 0) cnt++; + } + Log.Info(cnt.ToString()); + + Log.Info("Fill test complete"); + + storePart.Compress(); + Log.Info("Compress complete"); + + var reader = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); + cnt = 0; + foreach (var p in reader.Iterate()) + { + if (p.Key % 2 == 0) cnt++; + } + Log.Info(cnt.ToString()); + Log.Info("Compress test complete"); + + storePart.DropData(); + + Log.Info("Complete#1"); + Console.ReadKey(); + + FSUtils.CleanAndTestFolder(root); + var sw = new Stopwatch(); sw.Start(); @@ -162,14 +149,22 @@ namespace PartitionFileStorageTest var testKeys1 = new List(); var testKeys2 = new List(); - for (int i = 0; i < 1000000; i++) + var testData = new Dictionary>(); + + var total = 0L; + + for (int i = 0; i < 2000000; i++) { var s = Generate(r); + if (testData.ContainsKey(s) == false) testData[s] = new HashSet(); var count = r.Next(300); + total++; for (int j = 0; j < count; j++) { + total++; var t = Generate(r); storePart.Store(s, t); + testData[s].Add(t); } if (s % 177 == 0) { @@ -182,184 +177,134 @@ namespace PartitionFileStorageTest } sw.Stop(); - Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms"); + Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms"); sw.Restart(); - storePart.CompleteAddingAndCompress(); + storePart.CompleteAdding(); + storePart.Compress(); sw.Stop(); - Console.WriteLine($"Compress: {sw.ElapsedMilliseconds}ms"); + Log.Info($"Compress: {sw.ElapsedMilliseconds}ms"); sw.Restart(); storePart.RebuildIndex(); sw.Stop(); - Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); + Log.Info($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); - Console.WriteLine("Start merge test"); + Log.Info("Start merge test"); sw.Restart(); var merger = store.CreateMergeAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }, data => Compressor.DecodeBytesContent(data)); - for (int i = 0; i < 1000000; i++) + for (int i = 0; i < 2300000; i++) { + total++; var s = Generate(r); + if (testData.ContainsKey(s) == false) testData[s] = new HashSet(); var count = r.Next(300); for (int j = 0; j < count; j++) { + total++; var t = Generate(r); merger.Store(s, t); + testData[s].Add(t); } } - Console.WriteLine($"Merge journal filled: {sw.ElapsedMilliseconds}ms"); - merger.CompleteAddingAndCompress(); + Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {total}"); + merger.Compress(); // auto reindex sw.Stop(); - Console.WriteLine($"Compress after merge: {sw.ElapsedMilliseconds}ms"); - sw.Restart(); - merger.RebuildIndex(); - sw.Stop(); - Console.WriteLine($"Rebuild indexes after merge: {sw.ElapsedMilliseconds}ms"); + Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); - - Console.WriteLine("Test #1 reading"); + Log.Info("Test #1 reading"); var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); + ulong totalData = 0; + ulong totalKeys = 0; foreach (var key in testKeys1) { - Console.WriteLine($"\tKey: {key}"); var result = readPart.Find(key); - Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes"); + totalData += (ulong)(result.Value?.Length ?? 0); + totalKeys++; } - Console.WriteLine("Press to continue"); - Console.ReadKey(); - Console.WriteLine("Test #1 remove by keys"); + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + totalData = 0; + totalKeys = 0; + Log.Info("Test #1 remove by keys"); for (int i = 0; i < testKeys1.Count; i++) { - readPart.RemoveKey(testKeys1[i]); + readPart.RemoveKey(testKeys1[i], false); } - Console.WriteLine("Test #1 reading after remove"); + sw.Restart(); + readPart.RebuildIndex(); + sw.Stop(); + Log.Info($"Rebuild indexes after remove: {sw.ElapsedMilliseconds}ms"); + Log.Info("Test #1 reading after remove"); foreach (var key in testKeys1) { - Console.WriteLine($"\tKey: {key}"); var result = readPart.Find(key); - Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes"); + totalData += (ulong)(result.Value?.Length ?? 0); + totalKeys++; } - Console.WriteLine("Press to continue"); - Console.ReadKey(); - Console.WriteLine(); - Console.WriteLine("---------------------------------------"); - Console.WriteLine(); - Console.WriteLine("Test #2 reading"); + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + totalData = 0; + totalKeys = 0; + Log.Info("Test #2 reading"); foreach (var key in testKeys2) { - Console.WriteLine($"\tKey: {key}"); var result = readPart.Find(key); - Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes"); + totalData += (ulong)(result.Value?.Length ?? 0); + totalKeys++; } - Console.WriteLine("Press to continue"); - Console.ReadKey(); - Console.WriteLine("Test #2 remove keys batch"); + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + totalData = 0; + totalKeys = 0; + Log.Info("Test #2 remove keys batch"); readPart.RemoveKeys(testKeys2); - Console.WriteLine("Test #2 reading after remove"); + Log.Info("Test #2 reading after remove"); foreach (var key in testKeys2) { - Console.WriteLine($"\tKey: {key}"); var result = readPart.Find(key); - Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes"); + totalData += (ulong)(result.Value?.Length ?? 0); + totalKeys++; } + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + totalData = 0; + totalKeys = 0; - Console.WriteLine("Press to continue for iteration"); - Console.ReadKey(); + Log.Info("Iterator test"); foreach (var e in readPart.Iterate()) { - Console.WriteLine($"{e.Key}: {e.Value.Length}"); + totalData += (ulong)(e.Value?.Length ?? 0); + totalKeys++; } - } + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + totalData = 0; + totalKeys = 0; + Log.Info("Test stored data"); - private static void TestReading(string root) - { - var options = new StoreOptions - { - Index = new IndexOptions { Enabled = true, FileIndexCount = 256 }, - RootFolder = root, - FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 512).ToString()), - MergeFunction = list => - { - ulong s = 0; - return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); - }, - Partitions = new List> - { - new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) - }, - KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, - }; - var store = new Store(options); - var request = new StoreSearchRequest + foreach (var test in testData) { - PartitionSearchRequests = new List> + if (test.Value.Count > 0 && testKeys1.Contains(test.Key) == false && testKeys2.Contains(test.Key) == false) { - new PartitionSearchRequest + var result = Compressor.DecodeBytesContent(readPart.Find(test.Key).Value).ToHashSet(); + if (test.Value.Count != result.Count) { - Info = new Metadata { Date = new DateTime(2022, 11, 08) }, - Keys = new ulong[] { } - }, - new PartitionSearchRequest - { - Info = new Metadata { Date = new DateTime(2022, 11, 09) }, - Keys = new ulong[] { } + Log.Info($"Key '{test.Key}' not found!"); + continue; } - } - }; - var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); - Console.WriteLine($"Incoming data files: {storeIncoming.CountDataFiles()}"); - var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 09) }); - Console.WriteLine($"Outcoming data files: {storeOutcoming.CountDataFiles()}"); - var sw = new Stopwatch(); - sw.Start(); - var result = store.Search(request).Result; - foreach (var r in result.Results) - { - foreach (var mr in r.Value) - { - Console.WriteLine($"\tKey: {mr.Key}. Sucess: {mr.Found}"); - if (mr.Found && mr.Value.Length > 0) + foreach (var t in test.Value) { - var ctns = Compressor.DecodeBytesContent(mr.Value); - Console.WriteLine($"\t\t{string.Join(';', ctns)}"); + if (result.Contains(t) == false) + { + Log.Info($"Value '{t}' from test data missed in base"); + } } } } - sw.Stop(); - Console.WriteLine($"Search time: {sw.ElapsedMilliseconds}ms"); + Log.Info("Completed"); } - private static void TestIterations(string root) - { - var options = new StoreOptions - { - Index = new IndexOptions { Enabled = true, FileIndexCount = 256 }, - RootFolder = root, - FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 512).ToString()), - MergeFunction = list => - { - ulong s = 0; - return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); - }, - Partitions = new List> - { - new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) - }, - KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, - }; - var store = new Store(options); - var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); - foreach (var r in storeIncoming.Iterate()) - { - Console.WriteLine($"{r.Key}: {r.Value.Length}"); - } - } static void Main(string[] args) { + Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.FullDebug); var root = @"H:\temp"; - //SmallFullTest(root); - TestBuildRemoveStore(root); - //BuildStore(root); - //TestReading(root); + //FastTest(root); + FullStoreTest(root); //TestIterations(root); //TestRangeCompressionAndInversion(); Console.ReadKey(); diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs index 9049701..55ee461 100644 --- a/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs @@ -15,6 +15,7 @@ namespace ZeroLevel.Services.PartitionStorage private readonly Func _keyComparer; private readonly string _indexFolder; private readonly bool _indexExists = false; + private readonly Func _keyDeserializer; private readonly TMeta _meta; public StorePartitionSparseIndex(string partitionFolder, TMeta meta, StoreFilePartition filePartition, @@ -25,6 +26,7 @@ namespace ZeroLevel.Services.PartitionStorage _meta = meta; _keyComparer = keyComparer; _filePartition = filePartition; + _keyDeserializer = MessageSerializer.GetDeserializer(); } public KeyIndex GetOffset(TKey key) @@ -110,7 +112,7 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - var k = reader.ReadCompatible(); + var k = _keyDeserializer.Invoke(reader); var o = reader.ReadLong(); list.Add(new KeyIndex { Key = k, Offset = o }); } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs index cdcb6d9..34f9538 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs @@ -1,26 +1,38 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using ZeroLevel.Services.PartitionStorage.Interfaces; namespace ZeroLevel.Services.PartitionStorage { /// /// Partition store interface /// - /// Record key - /// The value that is written in the stream - /// Value after compression of TInput values by duplicate keys (TInput list or similar) - /// Meta information for partition search + /// Key type + /// Value type + /// The type of compressed array of values for the key + /// Metadata for creating or searching for a partition public interface IStore { + /// + /// Returns an object to create a partition + /// IStorePartitionBuilder CreateBuilder(TMeta info); - - IStorePartitionBuilder CreateMergeAccessor(TMeta info, Func> decompressor); - + /// + /// Returns an object to overwrite data in an existing partition + /// + IStorePartitionMergeBuilder CreateMergeAccessor(TMeta info, Func> decompressor); + /// + /// Creates an object to access the data in the partition + /// IStorePartitionAccessor CreateAccessor(TMeta info); - + /// + /// Performs a search for data in the repository + /// Task> Search(StoreSearchRequest searchRequest); - + /// + /// Deleting a partition + /// void RemovePartition(TMeta info); } } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreCache.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreCache.cs deleted file mode 100644 index 4160164..0000000 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreCache.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace ZeroLevel.Services.PartitionStorage.Interfaces -{ - public interface IStoreCache - { - } -} diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs index f32b4ec..3865a90 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs @@ -26,9 +26,9 @@ namespace ZeroLevel.Services.PartitionStorage IEnumerable> Iterate(); IEnumerable> IterateKeyBacket(TKey key); - void RemoveKey(TKey key); - void RemoveKeys(IEnumerable keys); - void RemoveAllExceptKey(TKey key); - void RemoveAllExceptKeys(IEnumerable keys); + void RemoveKey(TKey key, bool autoReindex = false); + void RemoveKeys(IEnumerable keys, bool autoReindex = true); + void RemoveAllExceptKey(TKey key, bool autoReindex = false); + void RemoveAllExceptKeys(IEnumerable keys, bool autoReindex = true); } } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs index 1836227..57a9a8a 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs @@ -2,12 +2,6 @@ namespace ZeroLevel.Services.PartitionStorage { - public class InsertValue - { - public TKey Key; - public TInput Value; - } - /// /// Provides write operations in catalog partition /// @@ -17,15 +11,22 @@ namespace ZeroLevel.Services.PartitionStorage public interface IStorePartitionBuilder : IStorePartitionBase { + IEnumerable> Iterate(); /// - /// Save one record + /// Writing a key-value pair /// void Store(TKey key, TInput value); /// - /// Complete the recording and perform the conversion of the records from - /// (TKey; TInput) to (TKey; TValue) + /// Called after all key-value pairs are written to the partition + /// + void CompleteAdding(); + /// + /// Perform the conversion of the records from (TKey; TInput) to (TKey; TValue). Called after CompleteAdding + /// + void Compress(); + /// + /// Rebuilds index files. Only for compressed data. /// - void CompleteAddingAndCompress(); void RebuildIndex(); } } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionIndex.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionIndex.cs index 8ff9f9c..96d45cc 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionIndex.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionIndex.cs @@ -2,7 +2,13 @@ { internal interface IStorePartitionIndex { + /// + /// Search for the offset of the closest to the specified key. + /// KeyIndex GetOffset(TKey key); + /// + /// Search for offsets of the keys closest to the specified ones. + /// KeyIndex[] GetOffset(TKey[] keys, bool inOneGroup); } } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionMergeBuilder.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionMergeBuilder.cs new file mode 100644 index 0000000..2568a83 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionMergeBuilder.cs @@ -0,0 +1,21 @@ +namespace ZeroLevel.Services.PartitionStorage.Interfaces +{ + /// + /// Provides write operations in catalog partition + /// + /// Key type + /// Type of one input value + /// Type of records aggregate + public interface IStorePartitionMergeBuilder + : IStorePartitionBase + { + /// + /// Writing a key-value pair + /// + void Store(TKey key, TInput value); + /// + /// Perform the conversion of the records from (TKey; TInput) to (TKey; TValue). Called after CompleteAdding + /// + void Compress(); + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs index f262e78..d1a3a2b 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage @@ -13,7 +14,7 @@ namespace ZeroLevel.Services.PartitionStorage /// /// public class StoreMergePartitionAccessor - : IStorePartitionBuilder + : IStorePartitionMergeBuilder { private readonly Func> _decompress; /// @@ -22,6 +23,8 @@ namespace ZeroLevel.Services.PartitionStorage private readonly IStorePartitionAccessor _accessor; private readonly string _temporaryFolder; + private readonly Func _keyDeserializer; + private readonly Func _valueDeserializer; /// /// Write catalog @@ -37,33 +40,21 @@ namespace ZeroLevel.Services.PartitionStorage var tempOptions = options.Clone(); tempOptions.RootFolder = _temporaryFolder; _temporaryAccessor = new StorePartitionBuilder(tempOptions, info); - } - private IEnumerable>> - IterateReadKeyInputs(string filePath) - { - if (File.Exists(filePath)) - { - var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); - using (var reader = new MemoryStreamReader(stream)) - { - while (reader.EOS == false) - { - var k = reader.ReadCompatible(); - var v = reader.ReadCompatible(); - var input = _decompress(v); - yield return - new StorePartitionKeyValueSearchResult> - { - Key = k, - Value = input, - Found = true - }; - } - } - } + _keyDeserializer = MessageSerializer.GetDeserializer(); + _valueDeserializer = MessageSerializer.GetDeserializer(); } - public void CompleteAddingAndCompress() + + #region API methods + /// + /// Deletes only new entries. Existing entries remain unchanged. + /// + public void DropData() => _temporaryAccessor.DropData(); + public string GetCatalogPath() => _accessor.GetCatalogPath(); + public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value); + public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(), + _temporaryAccessor.CountDataFiles()); + public void Compress() { var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath()); @@ -90,7 +81,7 @@ namespace ZeroLevel.Services.PartitionStorage } } - (_temporaryAccessor as StorePartitionBuilder).CloseStreams(); + _temporaryAccessor.CompleteAdding(); // compress new file foreach (var file in newFiles) @@ -102,24 +93,51 @@ namespace ZeroLevel.Services.PartitionStorage // replace old file by new foreach (var file in newFiles) { + // 1. Remove index file + (_accessor as StorePartitionAccessor) + .DropFileIndex(file); + + // 2. Replace source var name = Path.GetFileName(file); File.Move(file, Path.Combine(folder, name), true); + + // 3. Rebuil index + (_accessor as StorePartitionAccessor) + .RebuildFileIndex(file); } } // remove temporary files _temporaryAccessor.DropData(); Directory.Delete(_temporaryFolder, true); } + #endregion - /// - /// Deletes only new entries. Existing entries remain unchanged. - /// - public void DropData() => _temporaryAccessor.DropData(); - public string GetCatalogPath() => _accessor.GetCatalogPath(); - public void RebuildIndex() => _accessor.RebuildIndex(); - public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value); - public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(), - _temporaryAccessor.CountDataFiles()); + #region Private methods + private IEnumerable>> + IterateReadKeyInputs(string filePath) + { + if (File.Exists(filePath)) + { + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + using (var reader = new MemoryStreamReader(stream)) + { + while (reader.EOS == false) + { + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); + var input = _decompress(v); + yield return + new StorePartitionKeyValueSearchResult> + { + Key = k, + Value = input, + Found = true + }; + } + } + } + } + #endregion public void Dispose() { diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs index 6661511..6da70b9 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs @@ -15,6 +15,9 @@ namespace ZeroLevel.Services.PartitionStorage private readonly string _indexCatalog; private readonly TMeta _info; + private readonly Func _keyDeserializer; + private readonly Func _valueDeserializer; + public string Catalog { get { return _catalog; } } public StorePartitionAccessor(StoreOptions options, TMeta info) { @@ -26,12 +29,14 @@ namespace ZeroLevel.Services.PartitionStorage { _indexCatalog = Path.Combine(_catalog, "__indexes__"); } + _keyDeserializer = MessageSerializer.GetDeserializer(); + _valueDeserializer = MessageSerializer.GetDeserializer(); } + #region API methods public int CountDataFiles() => Directory.GetFiles(_catalog)?.Length ?? 0; public string GetCatalogPath() => _catalog; public void DropData() => FSUtils.CleanAndTestFolder(_catalog); - public StorePartitionKeyValueSearchResult Find(TKey key) { var fileName = _options.GetFileName(key, _info); @@ -48,12 +53,12 @@ namespace ZeroLevel.Services.PartitionStorage { if (startOffset > 0) { - reader.Stream.Seek(startOffset, SeekOrigin.Begin); + reader.Seek(startOffset, SeekOrigin.Begin); } while (reader.EOS == false) { - var k = reader.ReadCompatible(); - var v = reader.ReadCompatible(); + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); var c = _options.KeyComparer(key, k); if (c == 0) return new StorePartitionKeyValueSearchResult { @@ -100,8 +105,8 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - var k = reader.ReadCompatible(); - var v = reader.ReadCompatible(); + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Found = true }; } } @@ -117,8 +122,8 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - var k = reader.ReadCompatible(); - var v = reader.ReadCompatible(); + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Found = true }; } } @@ -139,41 +144,88 @@ namespace ZeroLevel.Services.PartitionStorage } } } - - private void RebuildFileIndex(string file) + public void RemoveAllExceptKey(TKey key, bool autoReindex = false) + { + RemoveAllExceptKeys(new[] { key }, autoReindex); + } + public void RemoveAllExceptKeys(IEnumerable keys, bool autoReindex = true) + { + var results = keys.Distinct() + .GroupBy( + k => _options.GetFileName(k, _info), + k => k, (key, g) => new { FileName = key, Keys = g.OrderBy(k => k).ToArray() }); + foreach (var group in results) + { + RemoveKeyGroup(group.FileName, group.Keys, false, autoReindex); + } + } + public void RemoveKey(TKey key, bool autoReindex = false) + { + RemoveKeys(new[] { key }, autoReindex); + } + public void RemoveKeys(IEnumerable keys, bool autoReindex = true) { - if (false == Directory.Exists(_indexCatalog)) + var results = keys.Distinct() + .GroupBy( + k => _options.GetFileName(k, _info), + k => k, (key, g) => new { FileName = key, Keys = g.OrderBy(k => k).ToArray() }); + foreach (var group in results) { - Directory.CreateDirectory(_indexCatalog); + RemoveKeyGroup(group.FileName, group.Keys, true, autoReindex); } - var dict = new Dictionary(); - using (var reader = GetReadStream(Path.GetFileName(file))) + } + #endregion + + #region Internal methods + internal void DropFileIndex(string file) + { + if (_options.Index.Enabled) { - while (reader.EOS == false) + var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); + if (File.Exists(index_file)) { - var pos = reader.Stream.Position; - var k = reader.ReadCompatible(); - dict[k] = pos; - reader.ReadCompatible(); + File.Delete(index_file); } } - if (dict.Count > _options.Index.FileIndexCount * 8) + } + internal void RebuildFileIndex(string file) + { + if (_options.Index.Enabled) { - var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero); - var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); - var d_arr = dict.OrderBy(p => p.Key).ToArray(); - using (var writer = new MemoryStreamWriter( - new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) + if (false == Directory.Exists(_indexCatalog)) + { + Directory.CreateDirectory(_indexCatalog); + } + var dict = new Dictionary(); + using (var reader = GetReadStream(Path.GetFileName(file))) + { + while (reader.EOS == false) + { + var pos = reader.Position; + var k = _keyDeserializer.Invoke(reader); + dict[k] = pos; + _valueDeserializer.Invoke(reader); + } + } + if (dict.Count > _options.Index.FileIndexCount * 8) { - for (int i = 0; i < _options.Index.FileIndexCount; i++) + var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero); + var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); + var d_arr = dict.OrderBy(p => p.Key).ToArray(); + using (var writer = new MemoryStreamWriter( + new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) { - var pair = d_arr[i * step]; - writer.WriteCompatible(pair.Key); - writer.WriteLong(pair.Value); + for (int i = 0; i < _options.Index.FileIndexCount; i++) + { + var pair = d_arr[i * step]; + writer.WriteCompatible(pair.Key); + writer.WriteLong(pair.Value); + } } } } } + #endregion #region Private methods private IEnumerable> Find(string fileName, @@ -191,11 +243,11 @@ namespace ZeroLevel.Services.PartitionStorage { var searchKey = keys[i]; var off = offsets[i]; - reader.Stream.Seek(off.Offset, SeekOrigin.Begin); + reader.Seek(off.Offset, SeekOrigin.Begin); while (reader.EOS == false) { - var k = reader.ReadCompatible(); - var v = reader.ReadCompatible(); + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); var c = _options.KeyComparer(searchKey, k); if (c == 0) { @@ -223,8 +275,8 @@ namespace ZeroLevel.Services.PartitionStorage var keys_arr = keys.OrderBy(k => k).ToArray(); while (reader.EOS == false && index < keys_arr.Length) { - var k = reader.ReadCompatible(); - var v = reader.ReadCompatible(); + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); var c = _options.KeyComparer(keys_arr[index], k); if (c == 0) { @@ -253,59 +305,14 @@ namespace ZeroLevel.Services.PartitionStorage } } - private MemoryStreamReader GetReadStream(string fileName) - { - var filePath = Path.Combine(_catalog, fileName); - var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); - return new MemoryStreamReader(stream); - } - #endregion - public void Dispose() - { - } - - public void RemoveAllExceptKey(TKey key) - { - RemoveAllExceptKeys(new[] { key }); - } - - public void RemoveAllExceptKeys(IEnumerable keys) - { - var results = keys.Distinct() - .GroupBy( - k => _options.GetFileName(k, _info), - k => k, (key, g) => new { FileName = key, Keys = g.OrderBy(k => k).ToArray() }); - foreach (var group in results) - { - RemoveKeyGroup(group.FileName, group.Keys, false); - } - } - - public void RemoveKey(TKey key) - { - RemoveKeys(new[] { key }); - } - - public void RemoveKeys(IEnumerable keys) - { - var results = keys.Distinct() - .GroupBy( - k => _options.GetFileName(k, _info), - k => k, (key, g) => new { FileName = key, Keys = g.OrderBy(k => k).ToArray() }); - foreach (var group in results) - { - RemoveKeyGroup(group.FileName, group.Keys, true); - } - } - - private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove) + private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove, bool autoReindex) { var filePath = Path.Combine(_catalog, fileName); if (File.Exists(filePath)) { // 1. Find ranges var ranges = new List(); - if (_options.Index.Enabled) + if (_options.Index.Enabled && autoReindex) { var index = new StorePartitionSparseIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); var offsets = index.GetOffset(keys, true); @@ -315,14 +322,13 @@ namespace ZeroLevel.Services.PartitionStorage { var searchKey = keys[i]; var off = offsets[i]; - reader.Stream.Seek(off.Offset, SeekOrigin.Begin); + reader.Seek(off.Offset, SeekOrigin.Begin); while (reader.EOS == false) { - var startPosition = reader.Stream.Position; - var k = reader.ReadCompatible(); - var v = reader.ReadCompatible(); - var endPosition = reader.Stream.Position; - + var startPosition = reader.Position; + var k = _keyDeserializer.Invoke(reader); + _valueDeserializer.Invoke(reader); + var endPosition = reader.Position; var c = _options.KeyComparer(searchKey, k); if (c == 0) { @@ -344,11 +350,10 @@ namespace ZeroLevel.Services.PartitionStorage var keys_arr = keys.OrderBy(k => k).ToArray(); while (reader.EOS == false && index < keys_arr.Length) { - var startPosition = reader.Stream.Position; - var k = reader.ReadCompatible(); - var v = reader.ReadCompatible(); - var endPosition = reader.Stream.Position; - + var startPosition = reader.Position; + var k = _keyDeserializer.Invoke(reader); + _valueDeserializer.Invoke(reader); + var endPosition = reader.Position; var c = _options.KeyComparer(keys_arr[index], k); if (c == 0) { @@ -402,13 +407,22 @@ namespace ZeroLevel.Services.PartitionStorage File.Move(tempFile, filePath, true); // Rebuild index if needs - if (_options.Index.Enabled) + if (_options.Index.Enabled && autoReindex) { RebuildFileIndex(filePath); } } } + private MemoryStreamReader GetReadStream(string fileName) + { + var filePath = Path.Combine(_catalog, fileName); + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + return new MemoryStreamReader(stream); + } + #endregion + + #region Static private static void RangeCompression(List ranges) { for (var i = 0; i < ranges.Count - 1; i++) @@ -456,5 +470,10 @@ namespace ZeroLevel.Services.PartitionStorage source.Read(buffer, 0, buffer.Length); target.Write(buffer, 0, buffer.Length); } + #endregion + + public void Dispose() + { + } } } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index eca6049..204ff12 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -3,6 +3,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; +using System.Threading; using System.Threading.Tasks; using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.Serialization; @@ -18,7 +19,12 @@ namespace ZeroLevel.Services.PartitionStorage private readonly StoreOptions _options; private readonly string _catalog; private readonly TMeta _info; + private readonly Action _keySerializer; + private readonly Action _inputSerializer; + private readonly Func _keyDeserializer; + private readonly Func _inputDeserializer; + private readonly Func _valueDeserializer; public string Catalog { get { return _catalog; } } public StorePartitionBuilder(StoreOptions options, TMeta info) { @@ -30,28 +36,73 @@ namespace ZeroLevel.Services.PartitionStorage { Directory.CreateDirectory(_catalog); } + + _keySerializer = MessageSerializer.GetSerializer(); + _inputSerializer = MessageSerializer.GetSerializer(); + + _keyDeserializer = MessageSerializer.GetDeserializer(); + _inputDeserializer = MessageSerializer.GetDeserializer(); + _valueDeserializer = MessageSerializer.GetDeserializer(); } + #region API methods public int CountDataFiles() => Directory.GetFiles(_catalog)?.Length ?? 0; public string GetCatalogPath() => _catalog; public void DropData() => FSUtils.CleanAndTestFolder(_catalog); - public void Store(TKey key, TInput value) { var fileName = _options.GetFileName(key, _info); var stream = GetWriteStream(fileName); - stream.SerializeCompatible(key); - stream.SerializeCompatible(value); + _keySerializer.Invoke(stream, key); + Thread.MemoryBarrier(); + _inputSerializer.Invoke(stream, value); + } + public void CompleteAdding() + { + // Close all write streams + foreach (var s in _writeStreams) + { + try + { + s.Value.Stream.Flush(); + s.Value.Dispose(); + } + catch { } + } + _writeStreams.Clear(); } - public void CompleteAddingAndCompress() + public void Compress() { - CloseStreams(); var files = Directory.GetFiles(_catalog); if (files != null && files.Length > 0) { Parallel.ForEach(files, file => CompressFile(file)); } } + public IEnumerable> Iterate() + { + var files = Directory.GetFiles(_catalog); + if (files != null && files.Length > 0) + { + foreach (var file in files) + { + using (var reader = GetReadStream(Path.GetFileName(file))) + { + while (reader.EOS == false) + { + var key = _keyDeserializer.Invoke(reader); + if (reader.EOS) + { + yield return new StorePartitionKeyValueSearchResult { Key = key, Value = default, Found = true }; + break; + } + var val = _inputDeserializer.Invoke(reader); + yield return new StorePartitionKeyValueSearchResult { Key = key, Value = val, Found = true }; + } + } + } + } + } public void RebuildIndex() { if (_options.Index.Enabled) @@ -70,9 +121,10 @@ namespace ZeroLevel.Services.PartitionStorage while (reader.EOS == false) { var pos = reader.Stream.Position; - var k = reader.ReadCompatible(); - dict[k] = pos; - reader.ReadCompatible(); + var key = _keyDeserializer.Invoke(reader); + dict[key] = pos; + if (reader.EOS) break; + _valueDeserializer.Invoke(reader); } } if (dict.Count > _options.Index.FileIndexCount * 8) @@ -95,21 +147,9 @@ namespace ZeroLevel.Services.PartitionStorage } } } + #endregion #region Private methods - - internal void CloseStreams() - { - // Close all write streams - foreach (var s in _writeStreams) - { - try - { - s.Value.Dispose(); - } - catch { } - } - } internal void CompressFile(string file) { var dict = new Dictionary>(); @@ -117,13 +157,17 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - TKey k = reader.ReadCompatible(); - TInput v = reader.ReadCompatible(); - if (false == dict.ContainsKey(k)) + var key = _keyDeserializer.Invoke(reader); + if (false == dict.ContainsKey(key)) { - dict[k] = new HashSet(); + dict[key] = new HashSet(); } - dict[k].Add(v); + if (reader.EOS) + { + break; + } + var input = _inputDeserializer.Invoke(reader); + dict[key].Add(input); } } var tempPath = Path.GetTempPath(); @@ -157,6 +201,7 @@ namespace ZeroLevel.Services.PartitionStorage return new MemoryStreamReader(stream); } #endregion + public void Dispose() { } diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs index c23da43..2246da3 100644 --- a/ZeroLevel/Services/PartitionStorage/Store.cs +++ b/ZeroLevel/Services/PartitionStorage/Store.cs @@ -5,6 +5,7 @@ using System.IO; using System.Linq; using System.Threading.Tasks; using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.PartitionStorage.Interfaces; namespace ZeroLevel.Services.PartitionStorage { @@ -39,7 +40,7 @@ namespace ZeroLevel.Services.PartitionStorage return new StorePartitionBuilder(_options, info); } - public IStorePartitionBuilder CreateMergeAccessor(TMeta info, Func> decompressor) + public IStorePartitionMergeBuilder CreateMergeAccessor(TMeta info, Func> decompressor) { return new StoreMergePartitionAccessor(_options, info, decompressor); } diff --git a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs index a70e080..2d349fd 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs @@ -48,6 +48,13 @@ namespace ZeroLevel.Services.Serialization _stream = reader._stream; } + public void Seek(long offset, SeekOrigin origin) + { + _stream.Seek(offset, origin); + } + + public long Position => _stream.Position; + /// /// Flag reading /// diff --git a/ZeroLevel/Services/Serialization/MessageSerializer.cs b/ZeroLevel/Services/Serialization/MessageSerializer.cs index 5289183..e6b662d 100644 --- a/ZeroLevel/Services/Serialization/MessageSerializer.cs +++ b/ZeroLevel/Services/Serialization/MessageSerializer.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; +using System.Runtime.Serialization; namespace ZeroLevel.Services.Serialization { @@ -29,6 +30,30 @@ namespace ZeroLevel.Services.Serialization } } + public static Action GetSerializer() + { + var t = typeof(T); + if (t.IsAssignableTo(typeof(IBinarySerializable))) + { + return (w, o) => ((IBinarySerializable)o).Serialize(w); + } + return (w, o) => PrimitiveTypeSerializer.Serialize(w, o); + } + + public static Func GetDeserializer() + { + if (typeof(IBinarySerializable).IsAssignableFrom(typeof(T))) + { + return (r) => + { + var o = (IBinarySerializable)FormatterServices.GetUninitializedObject(typeof(T)); + o.Deserialize(r); + return (T)o; + }; + } + return (r) => PrimitiveTypeSerializer.Deserialize(r); + } + public static byte[] SerializeCompatible(object obj) { if (null == obj)