From d81cf3109ab5ef4d1b98f8a19ec353a3dc08a098 Mon Sep 17 00:00:00 2001 From: Ogoun Date: Sat, 22 Jul 2023 17:06:25 +0300 Subject: [PATCH] Fixes --- PartitionFileStorageTest/Program.cs | 6 + TestApp/PersonDetector.cs | 44 ---- TestApp/Program.cs | 72 +++++-- TestApp/TestApp.csproj | 9 +- ZeroLevel.UnitTests/PartitionStorageTests.cs | 188 ++++++++++++++++++ .../Interfaces/IStoreSerializer.cs | 2 + .../Partition/StorePartitionBuilder.cs | 11 +- .../PartitionStorage/StoreSerializers.cs | 5 + .../Services/Serialization/IBinaryReader.cs | 2 +- .../Serialization/IBinarySerializable.cs | 4 +- .../Serialization/MemoryStreamReader.cs | 2 +- .../Serialization/MemoryStreamWriter.cs | 6 +- .../Serialization/MessageSerializer.cs | 4 +- 13 files changed, 279 insertions(+), 76 deletions(-) delete mode 100644 TestApp/PersonDetector.cs create mode 100644 ZeroLevel.UnitTests/PartitionStorageTests.cs diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index ae51b5e..f62a60a 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -37,6 +37,7 @@ namespace PartitionFileStorageTest var store = new Store(options, new StoreSerializers( async (w, n) => await w.WriteULongAsync(n), async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteBytesAsync(n), async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } })); @@ -89,6 +90,7 @@ namespace PartitionFileStorageTest var store = new Store(options, new StoreSerializers( async (w, n) => await w.WriteULongAsync(n), async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteBytesAsync(n), async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } })); @@ -252,6 +254,7 @@ namespace PartitionFileStorageTest var store = new Store(options, new StoreSerializers( async (w, n) => await w.WriteULongAsync(n), async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteBytesAsync(n), async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } })); @@ -408,6 +411,7 @@ namespace PartitionFileStorageTest var serializer = new StoreSerializers( async (w, n) => await w.WriteULongAsync(n), async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteBytesAsync(n), async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } }); @@ -471,6 +475,7 @@ namespace PartitionFileStorageTest var serializer = new StoreSerializers( async (w, n) => await w.WriteULongAsync(n), async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteBytesAsync(n), async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } }); @@ -530,6 +535,7 @@ namespace PartitionFileStorageTest var store = new Store(options, new StoreSerializers( async (w, n) => await w.WriteStringAsync(n), async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteBytesAsync(n), async (r) => { try { return new DeserializeResult(true, await r.ReadStringAsync()); } catch { return new DeserializeResult(false, string.Empty); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } })); diff --git a/TestApp/PersonDetector.cs b/TestApp/PersonDetector.cs deleted file mode 100644 index a3415c8..0000000 --- a/TestApp/PersonDetector.cs +++ /dev/null @@ -1,44 +0,0 @@ -using SixLabors.ImageSharp; -using SixLabors.ImageSharp.PixelFormats; -using System.Collections.Generic; -using System.Linq; -using ZeroLevel.NN.Architectures.YoloV5; -using ZeroLevel.NN.Models; -using ZeroLevel.NN.Services; - -namespace TestApp -{ - public class PersonDetector - { - private const string MODEL_PATH = @"nnmodels/Yolo5S/yolov5s327e.onnx"; - private readonly Yolov5Detector _detector; - private float _threshold = 0.17f; - public PersonDetector() - { - _detector = new Yolov5Detector(MODEL_PATH, gpu: false); - } - - public IEnumerable Detect(string imagePath) - { - using (Image image = Image.Load(imagePath)) - { - var t_predictions = _detector.PredictMultiply(image, true, _threshold); - t_predictions.Apply(p => - { - p.Cx /= image.Width; - p.Cy /= image.Height; - }); - if (t_predictions != null) - { - t_predictions.RemoveAll(p => (p.W * image.Width) < 10.0f || (p.H * image.Height) < 10.0f); - } - if (t_predictions.Count > 0) - { - NMS.Apply(t_predictions); - return t_predictions; - } - } - return Enumerable.Empty(); - } - } -} diff --git a/TestApp/Program.cs b/TestApp/Program.cs index 4ad4a61..2205568 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -1,38 +1,82 @@ using System; +using System.Collections.Concurrent; using System.Linq; using System.Reflection; +using ZeroLevel.Logging; using ZeroLevel.Services.Invokation; -using ZeroLevel.Services.ObjectMapping; using ZeroLevel.Services.Serialization; namespace TestApp { - internal static class Program + public record LogMessage(LogLevel Level, T Message); + internal interface ILogMessageBuffer + : IDisposable + { + long Count(); + void Push(LogLevel level, T message); + LogMessage Take(); + } + internal sealed class NoLimitedLogMessageBuffer + : ILogMessageBuffer { - private class Wrapper + private readonly BlockingCollection> _messageQueue = + new BlockingCollection>(); + + private bool _isDisposed = false; + + public long Count() { - public string ReadId; - public string WriteId; - public IInvokeWrapper Invoker; + if (_messageQueue.IsCompleted) + return 0; + return _messageQueue.Count; + } - public T Read(IBinaryReader reader) + public void Dispose() + { + if (!_isDisposed) { - return (T)Invoker.Invoke(reader, ReadId); + _isDisposed = true; + _messageQueue.Dispose(); } + } + + public void Push(LogLevel level, T message) + { + if (_isDisposed) return; + _messageQueue.Add(new LogMessage(level, message)); + } + + public LogMessage Take() + { + return _messageQueue.Take(); + } + } + + internal static class Program + { + private class LogQueueWrapper + { + private string TakeMethod; + private string PushMethod; + private object Target; + public IInvokeWrapper Invoker; - public object ReadObject(IBinaryReader reader) + public LogMessage Take() { - return Invoker.Invoke(reader, ReadId); + return (LogMessage)Invoker.Invoke(Target, TakeMethod); } - public void Write(IBinaryWriter writer, T value) + public void Push(LogLevel level, LogMessage value) { - Invoker.Invoke(writer, WriteId, new object[] { value }); + Invoker.Invoke(Target, PushMethod, new object[] { level, value }); } - public void WriteObject(IBinaryWriter writer, object value) + public static LogQueueWrapper Create(object target) { - Invoker.Invoke(writer, WriteId, new object[] { value }); + var wrapper = new LogQueueWrapper { Invoker = InvokeWrapper.Create(), Target = target }; + wrapper.PushMethod = wrapper.Invoker.ConfigureGeneric(typeof(T), "Push").First(); + wrapper.TakeMethod = wrapper.Invoker.ConfigureGeneric(typeof(T), "Take").First(); + return wrapper; } } diff --git a/TestApp/TestApp.csproj b/TestApp/TestApp.csproj index 0d41ae4..e7c793c 100644 --- a/TestApp/TestApp.csproj +++ b/TestApp/TestApp.csproj @@ -8,20 +8,13 @@ - - - - - + Always - - PreserveNewest - diff --git a/ZeroLevel.UnitTests/PartitionStorageTests.cs b/ZeroLevel.UnitTests/PartitionStorageTests.cs new file mode 100644 index 0000000..6e8bdc5 --- /dev/null +++ b/ZeroLevel.UnitTests/PartitionStorageTests.cs @@ -0,0 +1,188 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; +using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.PartitionStorage; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.UnitTests +{ + public sealed class Metadata + { + public string Date { get; set; } + public string Time { get; set; } + } + + public sealed class TextData + : IBinarySerializable, IAsyncBinarySerializable + { + public string Title { get; set; } + public string Text { get; set; } + + public void Deserialize(IBinaryReader reader) + { + this.Title = reader.ReadString(); + this.Text = reader.ReadString(); + } + + public async Task DeserializeAsync(IAsyncBinaryReader reader) + { + this.Title = await reader.ReadStringAsync(); + this.Text = await reader.ReadStringAsync(); + } + + public void Serialize(IBinaryWriter writer) + { + writer.WriteString(this.Title); + writer.WriteString(this.Text); + } + + public async Task SerializeAsync(IAsyncBinaryWriter writer) + { + await writer.WriteStringAsync(this.Title); + await writer.WriteStringAsync(this.Text); + } + } + + public class FSDBOptions + : IDisposable + { + public StoreOptions Options { get; private set; } + public StoreSerializers Serializers { get; private set; } + + public FSDBOptions() + { + var root = @"H:\temp"; + FSUtils.CleanAndTestFolder(root); + // user id, post + Options = new StoreOptions + { + Index = new IndexOptions + { + Enabled = true, + StepType = IndexStepType.Step, + StepValue = 32, + EnableIndexInMemoryCachee = true + }, + RootFolder = root, + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), + MergeFunction = list => + { + if (list == null || list.Any() == false) + { + return new TextData[0]; + } + return list.GroupBy(i => i.Title).Select(pair => new TextData { Title = pair.Key, Text = string.Join(null, pair.Select(p => p.Text)) }).ToArray(); + }, + Partitions = new List> + { + new StoreCatalogPartition("Date", m => m.Date), + new StoreCatalogPartition("Time", m => m.Time), + }, + KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, + ThreadSafeWriting = true, + MaxDegreeOfParallelism = 16 + }; + + Serializers = new StoreSerializers( + async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteAsync(n), + async (w, n) => await w.WriteArrayAsync(n), + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadAsync()); } catch { return new DeserializeResult(false, null); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadArrayAsync()); } catch { return new DeserializeResult(false, new TextData[0]); } }); + } + + public void Dispose() + { + } + } + + public class PartitionStorageTests + : IClassFixture + { + private readonly FSDBOptions _options; + public PartitionStorageTests(FSDBOptions options) + { + _options = options; + } + + + + [Fact] + public async Task FastFSDBTest() + { + var r = new Random(Environment.TickCount); + var store = new Store(_options.Options, _options.Serializers); + + // Arrange + var numbers = new ulong[] { 86438 * 128, 83439 * 128, 131238 * 128 }; + + var texts = new TextData[9] + { + new TextData { Title = "Title1", Text = "00" }, new TextData { Title = "Title2", Text = "01" }, new TextData { Title = "Title3", Text = "02" }, + new TextData { Title = "Title1", Text = "10" }, new TextData { Title = "Title2", Text = "11" }, new TextData { Title = "Title3", Text = "12" }, + new TextData { Title = "Title1", Text = "20" }, new TextData { Title = "Title2", Text = "21" }, new TextData { Title = "Title3", Text = "22" } + }; + + var testValues = new Dictionary> + { + { numbers[0], new HashSet { "0010", "01" } }, + { numbers[1], new HashSet { "021222" } }, + { numbers[2], new HashSet { "1121", "20" } } + }; + + Console.WriteLine("Small test start"); + + // Act + using (var storePart = store.CreateBuilder(new Metadata { Date = "20230720", Time = "15:00:00" })) + { + await storePart.Store(numbers[0], texts[0]); // 1 - 00 + await storePart.Store(numbers[0], texts[3]); // 1 - 10 + await storePart.Store(numbers[0], texts[1]); // 2 - 01 + + await storePart.Store(numbers[1], texts[2]); // 3 - 02 + await storePart.Store(numbers[1], texts[5]); // 3 - 12 + await storePart.Store(numbers[1], texts[8]); // 3 - 22 + + await storePart.Store(numbers[2], texts[4]); // 2 - 11 + await storePart.Store(numbers[2], texts[6]); // 1 - 20 + await storePart.Store(numbers[2], texts[7]); // 2 - 21 + + storePart.CompleteAdding(); + await storePart.Compress(); + } + + // Assert + using (var readPart = store.CreateAccessor(new Metadata { Date = "20230720", Time = "15:00:00" })) + { + foreach (var number in numbers) + { + var result = await readPart.Find(number); + if (result.Success) + { + foreach (var td in result.Value) + { + Assert.Contains(td.Text, testValues[number]); + } + } + } + } + } +/* + [Fact] + public void IndexNoIndexFSDBTest() + { + + } + + + [Fact] + public void StressFSDBTest() + { + + }*/ + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs index ac34e98..1b183af 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs @@ -10,6 +10,8 @@ namespace ZeroLevel.Services.PartitionStorage.Interfaces Func InputSerializer { get; } + Func ValueSerializer { get; } + Func>> KeyDeserializer { get; } Func>> InputDeserializer { get; } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index edc2eaf..2792994 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -58,7 +58,12 @@ namespace ZeroLevel.Services.PartitionStorage var files = Directory.GetFiles(_catalog); if (files != null && files.Length > 0) { - await Parallel.ForEachAsync(files, async(file, _) => await CompressFile(file)); + foreach (var file in files) + { + await CompressFile(file); + } + + //await Parallel.ForEachAsync(files, async(file, _) => await CompressFile(file)); } } public async IAsyncEnumerable> Iterate() @@ -187,9 +192,9 @@ namespace ZeroLevel.Services.PartitionStorage foreach (var pair in dict.OrderBy(p => p.Key)) { var v = _options.MergeFunction(pair.Value); - writer.SerializeCompatible(pair.Key); + await Serializer.KeySerializer.Invoke(writer, pair.Key); Thread.MemoryBarrier(); - writer.SerializeCompatible(v); + await Serializer.ValueSerializer.Invoke(writer, v); } } File.Delete(file); diff --git a/ZeroLevel/Services/PartitionStorage/StoreSerializers.cs b/ZeroLevel/Services/PartitionStorage/StoreSerializers.cs index b2eb16e..dda264b 100644 --- a/ZeroLevel/Services/PartitionStorage/StoreSerializers.cs +++ b/ZeroLevel/Services/PartitionStorage/StoreSerializers.cs @@ -14,18 +14,21 @@ namespace ZeroLevel.Services.PartitionStorage { private readonly Func _keySerializer; private readonly Func _inputSerializer; + private readonly Func _valueSerializer; private readonly Func>> _keyDeserializer; private readonly Func>> _inputDeserializer; private readonly Func>> _valueDeserializer; public StoreSerializers(Func keySerializer, Func inputSerializer, + Func valueSerializer, Func>> keyDeserializer, Func>> inputDeserializer, Func>> valueDeserializer) { _keySerializer = keySerializer; _inputSerializer = inputSerializer; + _valueSerializer = valueSerializer; _keyDeserializer = keyDeserializer; _inputDeserializer = inputDeserializer; _valueDeserializer = valueDeserializer; @@ -35,6 +38,8 @@ namespace ZeroLevel.Services.PartitionStorage public Func InputSerializer => _inputSerializer; + public Func ValueSerializer => _valueSerializer; + public Func>> KeyDeserializer => _keyDeserializer; public Func>> InputDeserializer => _inputDeserializer; diff --git a/ZeroLevel/Services/Serialization/IBinaryReader.cs b/ZeroLevel/Services/Serialization/IBinaryReader.cs index a45a048..68f7e44 100644 --- a/ZeroLevel/Services/Serialization/IBinaryReader.cs +++ b/ZeroLevel/Services/Serialization/IBinaryReader.cs @@ -136,7 +136,7 @@ namespace ZeroLevel.Services.Serialization void SetPosition(long position); } - public interface IBinaryReaderAsync + public interface IAsyncBinaryReader : IDisposable { Task ReadBooleanAsync(); diff --git a/ZeroLevel/Services/Serialization/IBinarySerializable.cs b/ZeroLevel/Services/Serialization/IBinarySerializable.cs index 7e3a08d..dcb7d60 100644 --- a/ZeroLevel/Services/Serialization/IBinarySerializable.cs +++ b/ZeroLevel/Services/Serialization/IBinarySerializable.cs @@ -11,8 +11,8 @@ namespace ZeroLevel.Services.Serialization public interface IAsyncBinarySerializable { - Task SerializeAsync(IBinaryWriter writer); + Task SerializeAsync(IAsyncBinaryWriter writer); - Task DeserializeAsync(IBinaryReader reader); + Task DeserializeAsync(IAsyncBinaryReader reader); } } \ No newline at end of file diff --git a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs index d4dd99b..cf2b5d4 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs @@ -589,7 +589,7 @@ namespace ZeroLevel.Services.Serialization } public partial class MemoryStreamReader - : IBinaryReaderAsync + : IAsyncBinaryReader { /// /// Reading byte-package (read the size of the specified number of bytes, and then the packet itself read size) diff --git a/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs b/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs index 46006a5..0c47c40 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs @@ -337,14 +337,18 @@ namespace ZeroLevel.Services.Serialization public void WriteArray(T[] array) where T : IBinarySerializable { - WriteInt32(array?.Length ?? 0); if (array != null) { + WriteInt32(array.Length); for (int i = 0; i < array.Length; i++) { array[i].Serialize(this); } } + else + { + WriteInt32(0); + } } public void WriteArray(T[] array, Action writeAction) diff --git a/ZeroLevel/Services/Serialization/MessageSerializer.cs b/ZeroLevel/Services/Serialization/MessageSerializer.cs index b8637af..0db4b79 100644 --- a/ZeroLevel/Services/Serialization/MessageSerializer.cs +++ b/ZeroLevel/Services/Serialization/MessageSerializer.cs @@ -239,7 +239,7 @@ namespace ZeroLevel.Services.Serialization } return PrimitiveTypeSerializer.Deserialize(reader); } - public static async Task DeserializeCompatibleAsync(IBinaryReader reader) + public static async Task DeserializeCompatibleAsync(IAsyncBinaryReader reader) { if (typeof(IAsyncBinarySerializable).IsAssignableFrom(typeof(T))) { @@ -247,7 +247,7 @@ namespace ZeroLevel.Services.Serialization await direct.DeserializeAsync(reader); return (T)direct; } - return PrimitiveTypeSerializer.Deserialize(reader); + return PrimitiveTypeSerializer.Deserialize(reader as IBinaryReader); } public static object DeserializeCompatible(Type type, byte[] data)