using System.Collections.Concurrent; using System.Diagnostics; using System.Text; using ZeroLevel; using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.PartitionStorage; using ZeroLevel.Services.Serialization; namespace PartitionFileStorageTest { internal class Program { // const int PAIRS_COUNT = 200_000_000; const int PAIRS_COUNT = 2000_000; private class Metadata { public DateTime Date { get; set; } } private static ulong Generate(Random r) { var num = new StringBuilder(); num.Append("79"); num.Append(r.Next(99).ToString("D2")); num.Append(r.Next(9999).ToString("D7")); return ulong.Parse(num.ToString()); } private static void FastTest(StoreOptions options) { var r = new Random(Environment.TickCount); var store = new Store(options); var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); Console.WriteLine("Small test start"); var c1 = (ulong)(86438 * 128); var c2 = (ulong)(83438 * 128); var c3 = (ulong)(831238 * 128); storePart.Store(c1, Generate(r)); storePart.Store(c1, Generate(r)); storePart.Store(c1, Generate(r)); storePart.Store(c2, Generate(r)); storePart.Store(c2, Generate(r)); storePart.Store(c2, Generate(r)); storePart.Store(c2, Generate(r)); storePart.Store(c2, Generate(r)); storePart.Store(c3, Generate(r)); storePart.Store(c3, Generate(r)); storePart.Store(c3, Generate(r)); storePart.CompleteAdding(); storePart.Compress(); var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); Console.WriteLine("Data:"); foreach (var e in readPart.Iterate()) { Console.WriteLine($"{e.Key}: {e.Value.Length}"); } readPart.RemoveKey(c1); Console.WriteLine("Data after remove:"); foreach (var e in readPart.Iterate()) { Console.WriteLine($"{e.Key}: {e.Value.Length}"); } } private static void FullStoreTest(StoreOptions options, List<(ulong, ulong)> pairs) { var meta = new Metadata { Date = new DateTime(2022, 11, 08) }; var r = new Random(Environment.TickCount); var store = new Store(options); var storePart = store.CreateBuilder(meta); var sw = new Stopwatch(); sw.Start(); var testKeys1 = new List(); var testKeys2 = new List(); var testData = new Dictionary>(); var insertCount = (int)(0.7 * pairs.Count); for (int i = 0; i < insertCount; i++) { var key = pairs[i].Item1; var val = pairs[i].Item2; if (testData.ContainsKey(key) == false) testData[key] = new HashSet(); testData[key].Add(val); storePart.Store(key, val); if (key % 717 == 0) { testKeys1.Add(key); } if (key % 117 == 0) { testKeys2.Add(key); } } sw.Stop(); Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms. Records writed: {storePart.TotalRecords}"); sw.Restart(); storePart.CompleteAdding(); storePart.Compress(); sw.Stop(); Log.Info($"Compress: {sw.ElapsedMilliseconds}ms"); sw.Restart(); storePart.RebuildIndex(); sw.Stop(); Log.Info($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); Log.Info("Start merge test"); sw.Restart(); var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data)); for (int i = insertCount; i < pairs.Count; i++) { var key = pairs[i].Item1; var val = pairs[i].Item2; if (testData.ContainsKey(key) == false) testData[key] = new HashSet(); testData[key].Add(val); merger.Store(key, val); } Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. New records merged: {merger.TotalRecords}"); merger.Compress(); // auto reindex sw.Stop(); Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); Log.Info("Test #1 reading"); var readPart = store.CreateAccessor(meta); ulong totalData = 0; ulong totalKeys = 0; foreach (var key in testKeys1) { var result = readPart.Find(key); totalData += (ulong)(result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Test #1 remove by keys"); for (int i = 0; i < testKeys1.Count; i++) { readPart.RemoveKey(testKeys1[i], false); } sw.Restart(); readPart.RebuildIndex(); sw.Stop(); Log.Info($"Rebuild indexes after remove: {sw.ElapsedMilliseconds}ms"); Log.Info("Test #1 reading after remove"); foreach (var key in testKeys1) { var result = readPart.Find(key); totalData += (ulong)(result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Test #2 reading"); foreach (var key in testKeys2) { var result = readPart.Find(key); totalData += (ulong)(result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Test #2 remove keys batch"); readPart.RemoveKeys(testKeys2); Log.Info("Test #2 reading after remove"); foreach (var key in testKeys2) { var result = readPart.Find(key); totalData += (ulong)(result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Iterator test"); foreach (var e in readPart.Iterate()) { totalData += (ulong)(e.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Test stored data"); foreach (var test in testData) { if (test.Value.Count > 0 && testKeys1.Contains(test.Key) == false && testKeys2.Contains(test.Key) == false) { var result = Compressor.DecodeBytesContent(readPart.Find(test.Key).Value).ToHashSet(); if (test.Value.Count != result.Count) { Log.Info($"Key '{test.Key}' not found!"); continue; } foreach (var t in test.Value) { if (result.Contains(t) == false) { Log.Info($"Value '{t}' from test data missed in base"); } } } } Log.Info("Completed"); } private static void FullStoreMultithreadTest(StoreOptions options, List<(ulong, ulong)> pairs) { var meta = new Metadata { Date = new DateTime(2022, 11, 08) }; var r = new Random(Environment.TickCount); var store = new Store(options); var storePart = store.CreateBuilder(meta); var sw = new Stopwatch(); sw.Start(); var insertCount = (int)(0.7 * pairs.Count); var testKeys1 = new ConcurrentBag(); var testKeys2 = new ConcurrentBag(); var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 24 }; Parallel.ForEach(pairs.Take(insertCount).ToArray(), parallelOptions, pair => { var key = pair.Item1; var val = pair.Item2; storePart.Store(key, val); if (key % 717 == 0) { testKeys1.Add(key); } if (key % 117 == 0) { testKeys2.Add(key); } }); if (storePart.TotalRecords != insertCount) { Log.Error($"Count of stored record no equals expected. Recorded: {storePart.TotalRecords}. Expected: {insertCount}"); } sw.Stop(); Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms"); sw.Restart(); storePart.CompleteAdding(); storePart.Compress(); sw.Stop(); Log.Info($"Compress: {sw.ElapsedMilliseconds}ms"); sw.Restart(); storePart.RebuildIndex(); sw.Stop(); Log.Info($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); Log.Info("Start merge test"); sw.Restart(); var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data)); Parallel.ForEach(pairs.Skip(insertCount).ToArray(), parallelOptions, pair => { var key = pair.Item1; var val = pair.Item2; merger.Store(key, val); }); if (merger.TotalRecords != (pairs.Count - insertCount)) { Log.Error($"Count of stored record no equals expected. Recorded: {merger.TotalRecords}. Expected: {(pairs.Count - insertCount)}"); } Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {pairs.Count}"); merger.Compress(); // auto reindex sw.Stop(); Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); Log.Info("Test #1 reading"); var readPart = store.CreateAccessor(meta); ulong totalData = 0; ulong totalKeys = 0; foreach (var key in testKeys1) { var result = readPart.Find(key); totalData += (ulong)(result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Test #1 remove by keys"); foreach (var key in testKeys1) { readPart.RemoveKey(key, false); } sw.Restart(); readPart.RebuildIndex(); sw.Stop(); Log.Info($"Rebuild indexes after remove: {sw.ElapsedMilliseconds}ms"); Log.Info("Test #1 reading after remove"); foreach (var key in testKeys1) { var result = readPart.Find(key); totalData += (ulong)(result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Test #2 reading"); foreach (var key in testKeys2) { var result = readPart.Find(key); totalData += (ulong)(result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Test #2 remove keys batch"); readPart.RemoveKeys(testKeys2); Log.Info("Test #2 reading after remove"); foreach (var key in testKeys2) { var result = readPart.Find(key); totalData += (ulong)(result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Iterator test"); foreach (var e in readPart.Iterate()) { totalData += (ulong)(e.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); Log.Info("Completed"); } private static void FaultIndexTest(string filePath) { var serializer = new StoreStandartSerializer(); // 1 build index var index = new Dictionary(); using (var reader = new MemoryStreamReader(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024))) { var counter = 1; while (reader.EOS == false) { counter--; var pos = reader.Position; var k = serializer.KeyDeserializer.Invoke(reader); serializer.ValueDeserializer.Invoke(reader); if (counter == 0) { index[k] = pos; counter = 16; } } } // 2 Test index using (var reader = new MemoryStreamReader(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024))) { foreach (var pair in index) { reader.Stream.Seek(pair.Value, SeekOrigin.Begin); var k = serializer.KeyDeserializer.Invoke(reader); if (k != pair.Key) { Log.Warning("Broken index"); } var v = serializer.ValueDeserializer.Invoke(reader); } } } static void Main(string[] args) { /*FaultIndexTest(@"H:\temp\85"); return;*/ var root = @"H:\temp"; var options = new StoreOptions { Index = new IndexOptions { Enabled = true, StepType = IndexStepType.Step, StepValue = 16, EnableIndexInMemoryCachee= true }, RootFolder = root, FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), MergeFunction = list => { ulong s = 0; return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); }, Partitions = new List> { new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) }, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, ThreadSafeWriting = false }; var optionsMultiThread = new StoreOptions { Index = new IndexOptions { Enabled = true, StepType = IndexStepType.Step, StepValue = 16, EnableIndexInMemoryCachee = true }, RootFolder = root, FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), MergeFunction = list => { ulong s = 0; return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); }, Partitions = new List> { new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) }, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, ThreadSafeWriting = true }; Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.FullDebug); Log.Info("Start"); var pairs = new List<(ulong, ulong)>(PAIRS_COUNT); var r = new Random(Environment.TickCount); for (int i = 0; i < PAIRS_COUNT; i++) { pairs.Add((Generate(r), Generate(r))); } // FastTest(options); FSUtils.CleanAndTestFolder(root); FullStoreMultithreadTest(optionsMultiThread, pairs); FullStoreTest(options, pairs); /* FSUtils.CleanAndTestFolder(root); */ Console.ReadKey(); } } }