diff --git a/PartitionFileStorageTest/PartitionFileStorageTest.csproj b/PartitionFileStorageTest/PartitionFileStorageTest.csproj index 6fb9333..b95bc06 100644 --- a/PartitionFileStorageTest/PartitionFileStorageTest.csproj +++ b/PartitionFileStorageTest/PartitionFileStorageTest.csproj @@ -8,7 +8,7 @@ - + diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index ecc89d9..ae51b5e 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -31,50 +31,67 @@ namespace PartitionFileStorageTest return num_base + (uint)r.Next(999999); } - private static void FastTest(StoreOptions options) + private static async Task FastTest(StoreOptions options) { var r = new Random(Environment.TickCount); - var store = new Store(options); - var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); + var store = new Store(options, new StoreSerializers( + async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteULongAsync(n), + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } })); - Console.WriteLine("Small test start"); var c1 = (ulong)(86438 * 128); var c2 = (ulong)(83438 * 128); var c3 = (ulong)(831238 * 128); - storePart.Store(c1, Generate(r)); - storePart.Store(c1, Generate(r)); - storePart.Store(c1, Generate(r)); - storePart.Store(c2, Generate(r)); - storePart.Store(c2, Generate(r)); - storePart.Store(c2, Generate(r)); - storePart.Store(c2, Generate(r)); - storePart.Store(c2, Generate(r)); - storePart.Store(c3, Generate(r)); - storePart.Store(c3, Generate(r)); - storePart.Store(c3, Generate(r)); - storePart.CompleteAdding(); - storePart.Compress(); - var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); - Console.WriteLine("Data:"); - foreach (var e in readPart.Iterate()) - { - Console.WriteLine($"{e.Key}: {e.Value.Length}"); + using (var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) })) + { + Console.WriteLine("Small test start"); + await storePart.Store(c1, Generate(r)); + await storePart.Store(c1, Generate(r)); + await storePart.Store(c1, Generate(r)); + await storePart.Store(c2, Generate(r)); + await storePart.Store(c2, Generate(r)); + await storePart.Store(c2, Generate(r)); + await storePart.Store(c2, Generate(r)); + await storePart.Store(c2, Generate(r)); + await storePart.Store(c3, Generate(r)); + await storePart.Store(c3, Generate(r)); + await storePart.Store(c3, Generate(r)); + storePart.CompleteAdding(); + await storePart.Compress(); } - readPart.RemoveKey(c1); - Console.WriteLine("Data after remove:"); - foreach (var e in readPart.Iterate()) + + using (var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) })) { - Console.WriteLine($"{e.Key}: {e.Value.Length}"); + Console.WriteLine("Data:"); + await foreach (var kv in readPart.Iterate()) + { + Console.WriteLine($"{kv.Key}: {kv.Value.Length}"); + } + + await readPart.RemoveKey(c1); + Console.WriteLine("Data after remove:"); + + await foreach (var kv in readPart.Iterate()) + { + Console.WriteLine($"{kv.Key}: {kv.Value.Length}"); + } } } - private static void FullStoreTest(StoreOptions options, + private static async Task FullStoreTest(StoreOptions options, List<(ulong, ulong)> pairs) { var meta = new Metadata { Date = new DateTime(2022, 11, 08) }; var r = new Random(Environment.TickCount); - var store = new Store(options); + var store = new Store(options, new StoreSerializers( + async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteULongAsync(n), + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } })); var storePart = store.CreateBuilder(meta); var sw = new Stopwatch(); sw.Start(); @@ -90,7 +107,7 @@ namespace PartitionFileStorageTest var val = pairs[i].Item2; if (testData.ContainsKey(key) == false) testData[key] = new HashSet(); testData[key].Add(val); - storePart.Store(key, val); + await storePart.Store(key, val); if (key % 717 == 0) { testKeys1.Add(key); @@ -105,7 +122,7 @@ namespace PartitionFileStorageTest Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms. Records writed: {storePart.TotalRecords}"); sw.Restart(); storePart.CompleteAdding(); - storePart.Compress(); + await storePart.Compress(); sw.Stop(); Log.Info($"Compress: {sw.ElapsedMilliseconds}ms"); sw.Restart(); @@ -135,7 +152,7 @@ namespace PartitionFileStorageTest ulong totalKeys = 0; foreach (var key in testKeys1) { - var result = readPart.Find(key); + var result = readPart.Find(key).Result; totalData += (ulong)(result.Value?.Length ?? 0); totalKeys++; } @@ -145,7 +162,7 @@ namespace PartitionFileStorageTest Log.Info("Test #1 remove by keys"); for (int i = 0; i < testKeys1.Count; i++) { - readPart.RemoveKey(testKeys1[i], false); + await readPart.RemoveKey(testKeys1[i], false); } sw.Restart(); readPart.RebuildIndex(); @@ -155,7 +172,7 @@ namespace PartitionFileStorageTest foreach (var key in testKeys1) { var result = readPart.Find(key); - totalData += (ulong)(result.Value?.Length ?? 0); + totalData += (ulong)(result.Result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); @@ -165,19 +182,19 @@ namespace PartitionFileStorageTest foreach (var key in testKeys2) { var result = readPart.Find(key); - totalData += (ulong)(result.Value?.Length ?? 0); + totalData += (ulong)(result.Result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Test #2 remove keys batch"); - readPart.RemoveKeys(testKeys2); + await readPart.RemoveKeys(testKeys2); Log.Info("Test #2 reading after remove"); foreach (var key in testKeys2) { var result = readPart.Find(key); - totalData += (ulong)(result.Value?.Length ?? 0); + totalData += (ulong)(result.Result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); @@ -185,11 +202,12 @@ namespace PartitionFileStorageTest totalKeys = 0; Log.Info("Iterator test"); - foreach (var e in readPart.Iterate()) + await foreach (var kv in readPart.Iterate()) { - totalData += (ulong)(e.Value?.Length ?? 0); + totalData += (ulong)(kv.Value?.Length ?? 0); totalKeys++; } + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; @@ -199,7 +217,7 @@ namespace PartitionFileStorageTest { if (test.Value.Count > 0 && testKeys1.Contains(test.Key) == false && testKeys2.Contains(test.Key) == false) { - var result = Compressor.DecodeBytesContent(readPart.Find(test.Key).Value).ToHashSet(); + var result = Compressor.DecodeBytesContent(readPart.Find(test.Key).Result.Value).ToHashSet(); if (test.Value.Count != result.Count) { Log.Info($"Key '{test.Key}' not found!"); @@ -227,12 +245,17 @@ namespace PartitionFileStorageTest } } - private static void FullStoreMultithreadTest(StoreOptions options) + private static async Task FullStoreMultithreadTest(StoreOptions options) { var meta = new Metadata { Date = new DateTime(2022, 11, 08) }; var r = new Random(Environment.TickCount); - var store = new Store(options); - var storePart = store.CreateBuilder(meta); + var store = new Store(options, new StoreSerializers( + async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteULongAsync(n), + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } })); + var sw = new Stopwatch(); sw.Start(); var insertCount = (long)(0.7 * PAIRS_COUNT); @@ -241,58 +264,60 @@ namespace PartitionFileStorageTest var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 24 }; var Keys = new ConcurrentHashSet(); - Parallel.ForEach(MassGenerator((long)(0.7 * PAIRS_COUNT)), parallelOptions, pair => + using (var storePart = store.CreateBuilder(meta)) { - var key = pair.Item1; - var val = pair.Item2; - storePart.Store(key, val); - if (key % 717 == 0) + Parallel.ForEach(MassGenerator((long)(0.7 * PAIRS_COUNT)), parallelOptions, pair => { - testKeys1.Add(key); - } - if (key % 117 == 0) + var key = pair.Item1; + var val = pair.Item2; + storePart.Store(key, val); + if (key % 717 == 0) + { + testKeys1.Add(key); + } + if (key % 117 == 0) + { + testKeys2.Add(key); + } + Keys.Add(key); + }); + if (storePart.TotalRecords != insertCount) { - testKeys2.Add(key); + Log.Error($"Count of stored record no equals expected. Recorded: {storePart.TotalRecords}. Expected: {insertCount}. Unique keys: {Keys.Count}"); } - Keys.Add(key); - }); - - if (storePart.TotalRecords != insertCount) - { - Log.Error($"Count of stored record no equals expected. Recorded: {storePart.TotalRecords}. Expected: {insertCount}. Unique keys: {Keys.Count}"); + sw.Stop(); + Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms"); + sw.Restart(); + storePart.CompleteAdding(); + await storePart.Compress(); + sw.Stop(); + Log.Info($"Compress: {sw.ElapsedMilliseconds}ms"); + sw.Restart(); + storePart.RebuildIndex(); } - - sw.Stop(); - Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms"); - sw.Restart(); - storePart.CompleteAdding(); - storePart.Compress(); - sw.Stop(); - Log.Info($"Compress: {sw.ElapsedMilliseconds}ms"); - sw.Restart(); - storePart.RebuildIndex(); sw.Stop(); Log.Info($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); Log.Info("Start merge test"); sw.Restart(); - var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data)); - - Parallel.ForEach(MassGenerator((long)(0.3 * PAIRS_COUNT)), parallelOptions, pair => + using (var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data))) { - var key = pair.Item1; - var val = pair.Item2; - merger.Store(key, val); - Keys.Add(key); - }); + Parallel.ForEach(MassGenerator((long)(0.3 * PAIRS_COUNT)), parallelOptions, pair => + { + var key = pair.Item1; + var val = pair.Item2; + merger.Store(key, val); + Keys.Add(key); + }); - if (merger.TotalRecords != ((long)(0.3 * PAIRS_COUNT))) - { - Log.Error($"Count of stored record no equals expected. Recorded: {merger.TotalRecords}. Expected: {((long)(0.3 * PAIRS_COUNT))}"); - } + if (merger.TotalRecords != ((long)(0.3 * PAIRS_COUNT))) + { + Log.Error($"Count of stored record no equals expected. Recorded: {merger.TotalRecords}. Expected: {((long)(0.3 * PAIRS_COUNT))}"); + } - Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {PAIRS_COUNT}. Unique keys: {Keys.Count}"); - merger.Compress(); // auto reindex + Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {PAIRS_COUNT}. Unique keys: {Keys.Count}"); + merger.Compress(); // auto reindex + } sw.Stop(); Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); @@ -300,10 +325,10 @@ namespace PartitionFileStorageTest ulong totalKeys = 0; var s = new HashSet(); - store.Bypass(meta, (k, v) => + await foreach (var kv in store.Bypass(meta)) { - s.Add(k); - }); + s.Add(kv.Key); + } Log.Info($"Keys count: {s.Count}"); using (var readPart = store.CreateAccessor(meta)) @@ -343,14 +368,14 @@ namespace PartitionFileStorageTest foreach (var key in testKeys2) { var result = readPart.Find(key); - totalData += (ulong)(result.Value?.Length ?? 0); + totalData += (ulong)(result.Result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys}/{Keys.Count} keys. {totalData} bytes"); totalData = 0; totalKeys = 0; Log.Info("Test #2 remove keys batch"); - readPart.RemoveKeys(testKeys2); + await readPart.RemoveKeys(testKeys2); foreach (var k in testKeys2) { Keys.TryRemove(k); @@ -359,7 +384,7 @@ namespace PartitionFileStorageTest foreach (var key in testKeys2) { var result = readPart.Find(key); - totalData += (ulong)(result.Value?.Length ?? 0); + totalData += (ulong)(result.Result.Value?.Length ?? 0); totalKeys++; } Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); @@ -367,9 +392,9 @@ namespace PartitionFileStorageTest totalKeys = 0; Log.Info("Iterator test"); - foreach (var e in readPart.Iterate()) + await foreach (var kv in readPart.Iterate()) { - totalData += (ulong)(e.Value?.Length ?? 0); + totalData += (ulong)(kv.Value?.Length ?? 0); totalKeys++; } } @@ -380,7 +405,12 @@ namespace PartitionFileStorageTest private static void FaultIndexTest(string filePath) { - var serializer = new StoreStandartSerializer(); + var serializer = new StoreSerializers( + async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteULongAsync(n), + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } }); // 1 build index var index = new Dictionary(); using (var reader = new MemoryStreamReader(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024))) @@ -390,11 +420,13 @@ namespace PartitionFileStorageTest { counter--; var pos = reader.Position; - serializer.KeyDeserializer.Invoke(reader, out var k); - serializer.ValueDeserializer.Invoke(reader, out var _); + + var kv = serializer.KeyDeserializer.Invoke(reader).Result; + var vv = serializer.ValueDeserializer.Invoke(reader).Result; + if (counter == 0) { - index[k] = pos; + index[kv.Value] = pos; counter = 16; } } @@ -407,12 +439,12 @@ namespace PartitionFileStorageTest var accessor = fileReader.GetAccessor(pair.Value); using (var reader = new MemoryStreamReader(accessor)) { - serializer.KeyDeserializer.Invoke(reader, out var k); - if (k != pair.Key) + var kv = serializer.KeyDeserializer.Invoke(reader).Result; + if (kv.Value != pair.Key) { Log.Warning("Broken index"); } - serializer.ValueDeserializer.Invoke(reader, out var _); + serializer.ValueDeserializer.Invoke(reader).Wait(); } } @@ -436,7 +468,12 @@ namespace PartitionFileStorageTest private static void FaultUncompressedReadTest(string filePath) { - var serializer = new StoreStandartSerializer(); + var serializer = new StoreSerializers( + async (w, n) => await w.WriteULongAsync(n), + async (w, n) => await w.WriteULongAsync(n), + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } }); // 1 build index var dict = new Dictionary>(); using (var reader = new MemoryStreamReader(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) @@ -445,17 +482,17 @@ namespace PartitionFileStorageTest { try { - serializer.KeyDeserializer.Invoke(reader, out var key); - if (false == dict.ContainsKey(key)) + var key = serializer.KeyDeserializer.Invoke(reader).Result; + if (false == dict.ContainsKey(key.Value)) { - dict[key] = new HashSet(); + dict[key.Value] = new HashSet(); } if (reader.EOS) { break; } - serializer.InputDeserializer.Invoke(reader, out var input); - dict[key].Add(input); + var input = serializer.InputDeserializer.Invoke(reader).Result; + dict[key.Value].Add(input.Value); } catch (Exception ex) { @@ -490,12 +527,17 @@ namespace PartitionFileStorageTest KeyComparer = (left, right) => string.Compare(left, right, true), ThreadSafeWriting = true }; - var store = new Store(options); + var store = new Store(options, new StoreSerializers( + async (w, n) => await w.WriteStringAsync(n), + async (w, n) => await w.WriteULongAsync(n), + async (r) => { try { return new DeserializeResult(true, await r.ReadStringAsync()); } catch { return new DeserializeResult(false, string.Empty); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadULongAsync()); } catch { return new DeserializeResult(false, 0); } }, + async (r) => { try { return new DeserializeResult(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult(false, new byte[0]); } })); var builder = store.CreateBuilder(meta); builder.Compress(); } - static void Main(string[] args) + static async Task Main(string[] args) { //FaultCompressionTest(@"F:\Desktop\DATKA\DNS", new StoreMetadata { Date = new DateTime(2023, 01, 20) }); @@ -558,22 +600,23 @@ namespace PartitionFileStorageTest } */ Log.Info("Start test"); - // FastTest(options); + FSUtils.CleanAndTestFolder(root); - FullStoreMultithreadTest(optionsMultiThread); + await FastTest(options); + FSUtils.CleanAndTestFolder(root); + await FullStoreMultithreadTest(optionsMultiThread); - /* - FSUtils.CleanAndTestFolder(root); - FullStoreTest(options, pairs); - */ - //TestParallelFileReadingMMF(); - /* - + + /*FSUtils.CleanAndTestFolder(root); + FullStoreTest(options, pairs);*/ + FSUtils.CleanAndTestFolder(root); - - */ + //TestParallelFileReadingMMF(); + + + Console.WriteLine("Completed"); Console.ReadKey(); } diff --git a/TestApp/Program.cs b/TestApp/Program.cs index 243d099..4ad4a61 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -1,21 +1,62 @@ using System; +using System.Linq; +using System.Reflection; +using ZeroLevel.Services.Invokation; +using ZeroLevel.Services.ObjectMapping; +using ZeroLevel.Services.Serialization; namespace TestApp { internal static class Program { - private static void Main(string[] args) + private class Wrapper { - AppDomain.CurrentDomain.AssemblyResolve += CurrentDomain_AssemblyResolve; - } + public string ReadId; + public string WriteId; + public IInvokeWrapper Invoker; - private static System.Reflection.Assembly CurrentDomain_AssemblyResolve(object sender, ResolveEventArgs e) - { - if (e.Name.StartsWith("Microsoft.Build.")) + public T Read(IBinaryReader reader) + { + return (T)Invoker.Invoke(reader, ReadId); + } + + public object ReadObject(IBinaryReader reader) + { + return Invoker.Invoke(reader, ReadId); + } + + public void Write(IBinaryWriter writer, T value) + { + Invoker.Invoke(writer, WriteId, new object[] { value }); + } + + public void WriteObject(IBinaryWriter writer, object value) { - // искать в локальной папке + Invoker.Invoke(writer, WriteId, new object[] { value }); } - return null; + } + + private static Func CreateArrayPredicate() + { + var typeArg = typeof(T).GetElementType(); + return mi => mi.Name.Equals("WriteArray", StringComparison.Ordinal) && + mi.GetParameters().First().ParameterType.GetElementType().IsAssignableFrom(typeArg); + } + + private static Func CreateCollectionPredicate() + { + var typeArg = typeof(T).GetGenericArguments().First(); + return mi => mi.Name.Equals("WriteCollection", StringComparison.Ordinal) && + mi.GetParameters().First().ParameterType.GetGenericArguments().First().IsAssignableFrom(typeArg); + } + + private static void Main(string[] args) + { + var wrapper = new Wrapper { Invoker = InvokeWrapper.Create() }; + var ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDateTimeArray").First(); + var WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreateArrayPredicate()).First(); + + Console.Write(WriteId); } } } \ No newline at end of file diff --git a/TestApp/TestApp.csproj b/TestApp/TestApp.csproj index 45fe2fd..0d41ae4 100644 --- a/TestApp/TestApp.csproj +++ b/TestApp/TestApp.csproj @@ -7,7 +7,7 @@ - + diff --git a/ZeroLevel.Discovery/ZeroLevel.Discovery.csproj b/ZeroLevel.Discovery/ZeroLevel.Discovery.csproj index d23cfc5..d0cfea3 100644 --- a/ZeroLevel.Discovery/ZeroLevel.Discovery.csproj +++ b/ZeroLevel.Discovery/ZeroLevel.Discovery.csproj @@ -7,7 +7,7 @@ - + diff --git a/ZeroLevel.HNSW/ZeroLevel.HNSW.csproj b/ZeroLevel.HNSW/ZeroLevel.HNSW.csproj index 34df9da..63d56ea 100644 --- a/ZeroLevel.HNSW/ZeroLevel.HNSW.csproj +++ b/ZeroLevel.HNSW/ZeroLevel.HNSW.csproj @@ -44,4 +44,8 @@ + + + + diff --git a/ZeroLevel.Qdrant/ZeroLevel.Qdrant.csproj b/ZeroLevel.Qdrant/ZeroLevel.Qdrant.csproj index ec569e0..a0521fd 100644 --- a/ZeroLevel.Qdrant/ZeroLevel.Qdrant.csproj +++ b/ZeroLevel.Qdrant/ZeroLevel.Qdrant.csproj @@ -18,7 +18,7 @@ - + diff --git a/ZeroLevel.UnitTests/SerializationTests.cs b/ZeroLevel.UnitTests/SerializationTests.cs index 4f2b1fa..8f15365 100644 --- a/ZeroLevel.UnitTests/SerializationTests.cs +++ b/ZeroLevel.UnitTests/SerializationTests.cs @@ -337,9 +337,9 @@ namespace ZeroLevel.Serialization [Fact] public void SerializeCollectionDateTime() { - MakeCollectionTest(null); - MakeCollectionTest(new DateTime[] { }); - MakeCollectionTest(new DateTime[] { DateTime.Now, DateTime.UtcNow, DateTime.Today, DateTime.Now.AddYears(2000), DateTime.MinValue, DateTime.MaxValue }); + MakeCollectionTest(null); + MakeCollectionTest(new DateTime?[] { }); + MakeCollectionTest(new DateTime?[] { DateTime.Now, DateTime.UtcNow, DateTime.Today, DateTime.Now.AddYears(2000), null, DateTime.MinValue, DateTime.MaxValue }); } [Fact] diff --git a/ZeroLevel.UnitTests/ZeroLevel.UnitTests.csproj b/ZeroLevel.UnitTests/ZeroLevel.UnitTests.csproj index ca24844..990ceec 100644 --- a/ZeroLevel.UnitTests/ZeroLevel.UnitTests.csproj +++ b/ZeroLevel.UnitTests/ZeroLevel.UnitTests.csproj @@ -9,9 +9,9 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/ZeroLevel/Services/Memory/IViewAccessor.cs b/ZeroLevel/Services/Memory/IViewAccessor.cs index 9223405..d8d53f4 100644 --- a/ZeroLevel/Services/Memory/IViewAccessor.cs +++ b/ZeroLevel/Services/Memory/IViewAccessor.cs @@ -1,16 +1,18 @@ using System; +using System.Threading.Tasks; namespace ZeroLevel.Services.Memory { public interface IViewAccessor : IDisposable { + bool IsMemoryStream { get; } /// /// End of view /// bool EOV { get; } long Position { get; } - byte[] ReadBuffer(int count); + Task ReadBuffer(int count); bool CheckOutOfRange(int offset); void Seek(long offset); } diff --git a/ZeroLevel/Services/Memory/MMFViewAccessor.cs b/ZeroLevel/Services/Memory/MMFViewAccessor.cs index 2d86277..bd17fe9 100644 --- a/ZeroLevel/Services/Memory/MMFViewAccessor.cs +++ b/ZeroLevel/Services/Memory/MMFViewAccessor.cs @@ -1,6 +1,8 @@ using System; using System.IO; using System.IO.MemoryMappedFiles; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; namespace ZeroLevel.Services.Memory { @@ -20,29 +22,27 @@ namespace ZeroLevel.Services.Memory public long Position => _absoluteOffset + _accessor.Position; - public bool CheckOutOfRange(int offset) - { - return offset < 0 || (_accessor.Position + offset) > _accessor.Length; - } + public bool IsMemoryStream => false; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool CheckOutOfRange(int offset) => offset < 0 || (_accessor.Position + offset) > _accessor.Length; public void Dispose() { _accessor?.Dispose(); } - public byte[] ReadBuffer(int count) + public async Task ReadBuffer(int count) { if (count == 0) return null; var buffer = new byte[count]; - var readedCount = _accessor.Read(buffer, 0, count); + var readedCount = await _accessor.ReadAsync(buffer, 0, count); if (count != readedCount) throw new InvalidOperationException($"The stream returned less data ({count} bytes) than expected ({readedCount} bytes)"); return buffer; } - public void Seek(long offset) - { - _accessor.Seek(offset, SeekOrigin.Begin); - } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Seek(long offset) => _accessor.Seek(offset, SeekOrigin.Begin); } } diff --git a/ZeroLevel/Services/Memory/StreamVewAccessor.cs b/ZeroLevel/Services/Memory/StreamVewAccessor.cs index 3558b67..ade51cf 100644 --- a/ZeroLevel/Services/Memory/StreamVewAccessor.cs +++ b/ZeroLevel/Services/Memory/StreamVewAccessor.cs @@ -1,5 +1,7 @@ using System; using System.IO; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; namespace ZeroLevel.Services.Memory { @@ -16,16 +18,16 @@ namespace ZeroLevel.Services.Memory public long Position => _stream.Position; - public bool CheckOutOfRange(int offset) - { - return offset < 0 || (_stream.Position + offset) > _stream.Length; - } + public bool IsMemoryStream => _stream is MemoryStream; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool CheckOutOfRange(int offset) => offset < 0 || (_stream.Position + offset) > _stream.Length; - public byte[] ReadBuffer(int count) + public async Task ReadBuffer(int count) { if (count == 0) return null; var buffer = new byte[count]; - var readedCount = _stream.Read(buffer, 0, count); + var readedCount = await _stream.ReadAsync(buffer, 0, count); if (count != readedCount) throw new InvalidOperationException($"The stream returned less data ({count} bytes) than expected ({readedCount} bytes)"); return buffer; @@ -36,9 +38,7 @@ namespace ZeroLevel.Services.Memory _stream.Dispose(); } - public void Seek(long offset) - { - _stream.Seek(offset, SeekOrigin.Begin); - } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Seek(long offset) => _stream.Seek(offset, SeekOrigin.Begin); } } diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/ValuesIndex/ValueIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/ValuesIndex/ValueIndex.cs deleted file mode 100644 index 595f82a..0000000 --- a/ZeroLevel/Services/PartitionStorage/Indexes/ValuesIndex/ValueIndex.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace ZeroLevel.Services.PartitionStorage -{ - /*TODO IN FUTURE*/ - internal struct ValueIndex - { - public TValue Value { get; set; } - public long Offset { get; set; } - } -} diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs index 6121dc4..1fcbaec 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using ZeroLevel.Services.PartitionStorage.Interfaces; namespace ZeroLevel.Services.PartitionStorage @@ -28,15 +29,15 @@ namespace ZeroLevel.Services.PartitionStorage /// /// Performs a search for data in the repository /// - StoreSearchResult Search(StoreSearchRequest searchRequest); + Task> Search(StoreSearchRequest searchRequest); /// /// bypass all key value by meta /// - void Bypass(TMeta meta, Action handler); + IAsyncEnumerable> Bypass(TMeta meta); /// /// true - if key exists /// - bool Exists(TMeta meta, TKey key); + Task Exists(TMeta meta, TKey key); /// /// Deleting a partition /// diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs index 76cc7d1..d686f3b 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs @@ -1,4 +1,6 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; namespace ZeroLevel.Services.PartitionStorage { @@ -18,42 +20,42 @@ namespace ZeroLevel.Services.PartitionStorage /// /// Search in a partition for a specified key /// - StorePartitionKeyValueSearchResult Find(TKey key); + Task> Find(TKey key); /// /// Search in a partition for a specified keys /// - IEnumerable> Find(IEnumerable keys); + Task Find(IEnumerable keys, Action searchResultHandler); /// /// Iterating over all recorded data /// - IEnumerable> Iterate(); + IAsyncEnumerable> Iterate(); /// /// Iterating over all recorded data of the file with the specified key /// - IEnumerable> IterateKeyBacket(TKey key); + Task IterateKeyBacket(TKey key, Action kvHandler); /// /// Deleting the specified key and associated data /// /// Key /// true - automatically rebuild the index of the file from which data was deleted (default = false) - void RemoveKey(TKey key, bool autoReindex = false); + Task RemoveKey(TKey key, bool autoReindex = false); /// /// Deleting the specified keys and associated data /// /// Keys /// true - automatically rebuild the index of the file from which data was deleted (default = true) - void RemoveKeys(IEnumerable keys, bool autoReindex = true); + Task RemoveKeys(IEnumerable keys, bool autoReindex = true); /// /// Delete all keys with data except the specified key /// /// Key /// true - automatically rebuild the index of the file from which data was deleted (default = true) - void RemoveAllExceptKey(TKey key, bool autoReindex = true); + Task RemoveAllExceptKey(TKey key, bool autoReindex = true); /// /// Delete all keys with data other than the specified ones /// /// Keys /// true - automatically rebuild the index of the file from which data was deleted (default = true) - void RemoveAllExceptKeys(IEnumerable keys, bool autoReindex = true); + Task RemoveAllExceptKeys(IEnumerable keys, bool autoReindex = true); } } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs index 2a86373..cf0b162 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Threading.Tasks; namespace ZeroLevel.Services.PartitionStorage { @@ -15,11 +16,11 @@ namespace ZeroLevel.Services.PartitionStorage { get; } - IEnumerable> Iterate(); + IAsyncEnumerable> Iterate(); /// /// Writing a key-value pair /// - void Store(TKey key, TInput value); + Task Store(TKey key, TInput value); /// /// Called after all key-value pairs are written to the partition /// @@ -27,7 +28,7 @@ namespace ZeroLevel.Services.PartitionStorage /// /// Performs compression/grouping of recorded data in a partition /// - void Compress(); + Task Compress(); /// /// Rebuilds indexes for data in a partition /// diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionMergeBuilder.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionMergeBuilder.cs index 95c72a0..9fe0e02 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionMergeBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionMergeBuilder.cs @@ -1,4 +1,6 @@ -namespace ZeroLevel.Services.PartitionStorage.Interfaces +using System.Threading.Tasks; + +namespace ZeroLevel.Services.PartitionStorage.Interfaces { /// /// Provides write operations in catalog partition @@ -16,10 +18,10 @@ /// /// Writing a key-value pair /// - void Store(TKey key, TInput value); + Task Store(TKey key, TInput value); /// /// Perform the conversion of the records from (TKey; TInput) to (TKey; TValue). Called after CompleteAdding /// - void Compress(); + Task Compress(); } } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs index 6132807..ac34e98 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs @@ -1,14 +1,19 @@ using System; +using System.Threading.Tasks; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage.Interfaces { public interface IStoreSerializer { - Action KeySerializer { get; } - Action InputSerializer { get; } - TryDeserializeMethod KeyDeserializer { get; } - TryDeserializeMethod InputDeserializer { get; } - TryDeserializeMethod ValueDeserializer { get; } + Func KeySerializer { get; } + + Func InputSerializer { get; } + + Func>> KeyDeserializer { get; } + + Func>> InputDeserializer { get; } + + Func>> ValueDeserializer { get; } } } diff --git a/ZeroLevel/Services/PartitionStorage/KV.cs b/ZeroLevel/Services/PartitionStorage/KV.cs new file mode 100644 index 0000000..df51584 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/KV.cs @@ -0,0 +1,4 @@ +namespace ZeroLevel.Services.PartitionStorage +{ + public record KV(TKey Key, TValue Value); +} diff --git a/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs index 08852cb..cbe32ef 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs @@ -13,7 +13,7 @@ namespace ZeroLevel.Services.PartitionStorage.Partition internal sealed class CompactKeyStorePartitionBuilder : BasePartition, IStorePartitionBuilder { - private readonly Action _storeMethod; + private readonly Func _storeMethod; private long _totalRecords = 0; @@ -39,9 +39,9 @@ namespace ZeroLevel.Services.PartitionStorage.Partition #region IStorePartitionBuilder - public void Store(TKey key, TInput value) + public async Task Store(TKey key, TInput value) { - _storeMethod.Invoke(key, value); + await _storeMethod.Invoke(key, value); Interlocked.Increment(ref _totalRecords); } @@ -50,18 +50,16 @@ namespace ZeroLevel.Services.PartitionStorage.Partition CloseWriteStreams(); } - public void Compress() + public async Task Compress() { var files = Directory.GetFiles(_catalog); if (files != null && files.Length > 0) { - Parallel.ForEach(files, file => CompressFile(file)); + await Parallel.ForEachAsync(files, async (file, ct) => await CompressFile(file)); } } - public IEnumerable> Iterate() + public async IAsyncEnumerable> Iterate() { - TKey key; - TInput input; var files = Directory.GetFiles(_catalog); if (files != null && files.Length > 0) { @@ -73,9 +71,13 @@ namespace ZeroLevel.Services.PartitionStorage.Partition { while (reader.EOS == false) { - if (Serializer.KeyDeserializer.Invoke(reader, out key) == false) break; - if (Serializer.InputDeserializer.Invoke(reader, out input) == false) break; - yield return new StorePartitionKeyValueSearchResult { Key = key, Value = input, Status = SearchResult.Success }; + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) break; + + var iv = await Serializer.InputDeserializer.Invoke(reader); + if (iv.Success == false) break; + + yield return new SearchResult { Key = kv.Value, Value = iv.Value, Success = true }; } } } @@ -86,17 +88,16 @@ namespace ZeroLevel.Services.PartitionStorage.Partition #endregion #region Private methods - private void StoreDirect(TKey key, TInput value) + private async Task StoreDirect(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); if (TryGetWriteStream(groupKey, out var stream)) { - Serializer.KeySerializer.Invoke(stream, key); - Thread.MemoryBarrier(); - Serializer.InputSerializer.Invoke(stream, value); + await Serializer.KeySerializer.Invoke(stream, key); + await Serializer.InputSerializer.Invoke(stream, value); } } - private void StoreDirectSafe(TKey key, TInput value) + private async Task StoreDirectSafe(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); bool lockTaken = false; @@ -105,9 +106,8 @@ namespace ZeroLevel.Services.PartitionStorage.Partition Monitor.Enter(stream, ref lockTaken); try { - Serializer.KeySerializer.Invoke(stream, key); - Thread.MemoryBarrier(); - Serializer.InputSerializer.Invoke(stream, value); + await Serializer.KeySerializer.Invoke(stream, key); + await Serializer.InputSerializer.Invoke(stream, value); } finally { @@ -119,10 +119,8 @@ namespace ZeroLevel.Services.PartitionStorage.Partition } } - internal void CompressFile(string file) + internal async Task CompressFile(string file) { - TKey key; - TInput input; var dict = new Dictionary>(); PhisicalFileAccessorCachee.LockFile(file); try @@ -131,23 +129,25 @@ namespace ZeroLevel.Services.PartitionStorage.Partition { while (reader.EOS == false) { - if (false == Serializer.KeyDeserializer.Invoke(reader, out key)) + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) { throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key."); } - if (false == dict.ContainsKey(key)) + if (false == dict.ContainsKey(kv.Value)) { - dict[key] = new HashSet(); + dict[kv.Value] = new HashSet(); } if (reader.EOS) { break; } - if (false == Serializer.InputDeserializer.Invoke(reader, out input)) + var iv = await Serializer.InputDeserializer.Invoke(reader); + if (iv.Success == false) { throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault input value."); } - dict[key].Add(input); + dict[kv.Value].Add(iv.Value); } } var tempFile = FSUtils.GetAppLocalTemporaryFile(); diff --git a/ZeroLevel/Services/PartitionStorage/Partition/InternalDirectHybridPartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/InternalDirectHybridPartition.cs deleted file mode 100644 index 57f4532..0000000 --- a/ZeroLevel/Services/PartitionStorage/Partition/InternalDirectHybridPartition.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace ZeroLevel.Services.PartitionStorage.Partition -{ - internal class InternalDirectHybridPartition - { - - } -} diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs index 42651ce..356869f 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Threading.Tasks; using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Partition; using ZeroLevel.Services.Serialization; @@ -60,14 +61,14 @@ namespace ZeroLevel.Services.PartitionStorage /// public void DropData() => _temporaryAccessor.DropData(); public string GetCatalogPath() => _accessor.GetCatalogPath(); - public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value); + public async Task Store(TKey key, TInput value) => await _temporaryAccessor.Store(key, value); public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(), _temporaryAccessor.CountDataFiles()); /// /// Performs compression/grouping of recorded data in a partition /// - public void Compress() + public async Task Compress() { var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath()); @@ -88,7 +89,7 @@ namespace ZeroLevel.Services.PartitionStorage { foreach (var i in r.Value) { - _temporaryAccessor.Store(r.Key, i); + await _temporaryAccessor.Store(r.Key, i); } } } @@ -99,7 +100,7 @@ namespace ZeroLevel.Services.PartitionStorage // compress new file foreach (var file in newFiles) { - (_temporaryAccessor as StorePartitionBuilder) + await (_temporaryAccessor as StorePartitionBuilder) .CompressFile(file); } @@ -141,7 +142,7 @@ namespace ZeroLevel.Services.PartitionStorage #endregion #region Private methods - private IEnumerable>> + private IEnumerable>> IterateReadKeyInputs(string filePath) { if (File.Exists(filePath)) @@ -154,11 +155,11 @@ namespace ZeroLevel.Services.PartitionStorage var k = _keyDeserializer.Invoke(reader); var v = _valueDeserializer.Invoke(reader); var input = _decompress(v); - yield return new StorePartitionKeyValueSearchResult> + yield return new SearchResult> { Key = k, Value = input, - Status = SearchResult.Success + Success = true }; } } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs index ebd92e2..368d561 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Threading.Tasks; using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.Memory; using ZeroLevel.Services.PartitionStorage.Interfaces; @@ -29,10 +30,8 @@ namespace ZeroLevel.Services.PartitionStorage #region IStorePartitionAccessor - public StorePartitionKeyValueSearchResult Find(TKey key) + public async Task> Find(TKey key) { - TKey k; - TValue v; IViewAccessor memoryAccessor; try { @@ -49,10 +48,10 @@ namespace ZeroLevel.Services.PartitionStorage catch (Exception ex) { Log.SystemError(ex, $"[StorePartitionAccessor.Find] Fault get IViewAccessor by key {(key == null ? string.Empty : key.ToString())}"); - return new StorePartitionKeyValueSearchResult + return new SearchResult { Key = key, - Status = SearchResult.FileLockedOrUnavaliable, + Success = false, Value = default }; } @@ -62,14 +61,18 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break; - if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break; - var c = _options.KeyComparer(key, k); - if (c == 0) return new StorePartitionKeyValueSearchResult + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) break; + + var vv = await Serializer.ValueDeserializer.Invoke(reader); + if(vv.Success == false) break; + + var c = _options.KeyComparer(key, kv.Value); + if (c == 0) return new SearchResult { Key = key, - Value = v, - Status = SearchResult.Success + Value = vv.Value, + Success = true }; if (c == -1) { @@ -78,14 +81,14 @@ namespace ZeroLevel.Services.PartitionStorage } } } - return new StorePartitionKeyValueSearchResult + return new SearchResult { Key = key, - Status = SearchResult.NotFound, + Success = false, Value = default }; } - public IEnumerable> Find(IEnumerable keys) + public async Task Find(IEnumerable keys, Action searchResultHandler) { var results = keys.Distinct() .GroupBy( @@ -93,18 +96,13 @@ namespace ZeroLevel.Services.PartitionStorage k => k, (key, g) => new { FileName = key, Keys = g.ToArray() }); foreach (var group in results) { - foreach (var r in Find(group.FileName, group.Keys)) - { - yield return r; - } + await Find(group.FileName, group.Keys, searchResultHandler); } } - public IEnumerable> Iterate() + public async IAsyncEnumerable> Iterate() { if (Directory.Exists(_catalog)) { - TKey k; - TValue v; var files = Directory.GetFiles(_catalog); if (files != null && files.Length > 0) { @@ -117,9 +115,13 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break; - if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break; - yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Status = SearchResult.Success }; + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) break; + + var vv = await Serializer.ValueDeserializer.Invoke(reader); + if (vv.Success == false) break; + + yield return new KV(kv.Value, vv.Value); } } } @@ -127,10 +129,8 @@ namespace ZeroLevel.Services.PartitionStorage } } } - public IEnumerable> IterateKeyBacket(TKey key) + public async Task IterateKeyBacket(TKey key, Action kvHandler) { - TKey k; - TValue v; var fileName = _options.GetFileName(key, _info); var filePath = Path.Combine(_catalog, fileName); if (File.Exists(filePath)) @@ -142,9 +142,13 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break; - if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break; - yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Status = SearchResult.Success }; + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) break; + + var vv = await Serializer.ValueDeserializer.Invoke(reader); + if (vv.Success == false) break; + + kvHandler.Invoke(kv.Value, vv.Value); } } } @@ -158,11 +162,11 @@ namespace ZeroLevel.Services.PartitionStorage Indexes.ResetCachee(); } } - public void RemoveAllExceptKey(TKey key, bool autoReindex = true) + public async Task RemoveAllExceptKey(TKey key, bool autoReindex = true) { - RemoveAllExceptKeys(new[] { key }, autoReindex); + await RemoveAllExceptKeys(new[] { key }, autoReindex); } - public void RemoveAllExceptKeys(IEnumerable keys, bool autoReindex = true) + public async Task RemoveAllExceptKeys(IEnumerable keys, bool autoReindex = true) { var results = keys.Distinct() .GroupBy( @@ -170,18 +174,18 @@ namespace ZeroLevel.Services.PartitionStorage k => k, (key, g) => new { FileName = key, Keys = g.OrderBy(k => k).ToArray() }); foreach (var group in results) { - RemoveKeyGroup(group.FileName, group.Keys, false, autoReindex); + await RemoveKeyGroup(group.FileName, group.Keys, false, autoReindex); if (_options.Index.Enabled) { Indexes.RemoveCacheeItem(group.FileName); } } } - public void RemoveKey(TKey key, bool autoReindex = false) + public async Task RemoveKey(TKey key, bool autoReindex = false) { - RemoveKeys(new[] { key }, autoReindex); + await RemoveKeys(new[] { key }, autoReindex); } - public void RemoveKeys(IEnumerable keys, bool autoReindex = true) + public async Task RemoveKeys(IEnumerable keys, bool autoReindex = true) { var results = keys.Distinct() .GroupBy( @@ -189,7 +193,7 @@ namespace ZeroLevel.Services.PartitionStorage k => k, (key, g) => new { FileName = key, Keys = g.OrderBy(k => k).ToArray() }); foreach (var group in results) { - RemoveKeyGroup(group.FileName, group.Keys, true, autoReindex); + await RemoveKeyGroup(group.FileName, group.Keys, true, autoReindex); if (_options.Index.Enabled) { Indexes.RemoveCacheeItem(group.FileName); @@ -200,8 +204,7 @@ namespace ZeroLevel.Services.PartitionStorage #region Private methods - private IEnumerable> Find(string fileName, - TKey[] keys) + private async Task Find(string fileName, TKey[] keys, Action searchResultHandler) { TKey k; TValue v; @@ -230,17 +233,16 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break; - if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break; - var c = _options.KeyComparer(searchKey, k); + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) break; + + var vv = await Serializer.ValueDeserializer.Invoke(reader); + if (vv.Success == false) break; + + var c = _options.KeyComparer(searchKey, kv.Value); if (c == 0) { - yield return new StorePartitionKeyValueSearchResult - { - Key = searchKey, - Value = v, - Status = SearchResult.Success - }; + searchResultHandler.Invoke(kv.Value, vv.Value); break; } else if (c == -1) @@ -263,17 +265,16 @@ namespace ZeroLevel.Services.PartitionStorage var keys_arr = keys.OrderBy(k => k).ToArray(); while (reader.EOS == false && index < keys_arr.Length) { - if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break; - if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break; - var c = _options.KeyComparer(keys_arr[index], k); + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) break; + + var vv = await Serializer.ValueDeserializer.Invoke(reader); + if (vv.Success == false) break; + + var c = _options.KeyComparer(keys_arr[index], kv.Value); if (c == 0) { - yield return new StorePartitionKeyValueSearchResult - { - Key = keys_arr[index], - Value = v, - Status = SearchResult.Success - }; + searchResultHandler.Invoke(kv.Value, vv.Value); index++; } else if (c == -1) @@ -283,7 +284,7 @@ namespace ZeroLevel.Services.PartitionStorage index++; if (index < keys_arr.Length) { - c = _options.KeyComparer(keys_arr[index], k); + c = _options.KeyComparer(keys_arr[index], kv.Value); } } while (index < keys_arr.Length && c == -1); } @@ -294,9 +295,8 @@ namespace ZeroLevel.Services.PartitionStorage } } - private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove, bool autoReindex) + private async Task RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove, bool autoReindex) { - TKey k; var filePath = Path.Combine(_catalog, fileName); if (File.Exists(filePath)) { @@ -325,18 +325,22 @@ namespace ZeroLevel.Services.PartitionStorage while (reader.EOS == false) { var startPosition = reader.Position; - if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) + + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) { Log.Error($"[StorePartitionAccessor.RemoveKeyGroup] Fault remove keys from file '{fileName}'. Incorrect file structure. Fault read key."); return; } - if (Serializer.ValueDeserializer.Invoke(reader, out var _) == false) + + var vv = await Serializer.ValueDeserializer.Invoke(reader); + if (vv.Success == false) { Log.Error($"[StorePartitionAccessor.RemoveKeyGroup] Fault remove keys from file '{fileName}'. Incorrect file structure. Fault read value."); return; } var endPosition = reader.Position; - var c = _options.KeyComparer(searchKey, k); + var c = _options.KeyComparer(searchKey, kv.Value); if (c == 0) { ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); @@ -362,18 +366,23 @@ namespace ZeroLevel.Services.PartitionStorage while (reader.EOS == false && index < keys_arr.Length) { var startPosition = reader.Position; - if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) + + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) { Log.Error($"[StorePartitionAccessor.RemoveKeyGroup] Fault remove keys from file '{fileName}'. Incorrect file structure. Fault read key."); return; } - if (Serializer.ValueDeserializer.Invoke(reader, out var _) == false) + + var vv = await Serializer.ValueDeserializer.Invoke(reader); + if (vv.Success == false) { Log.Error($"[StorePartitionAccessor.RemoveKeyGroup] Fault remove keys from file '{fileName}'. Incorrect file structure. Fault read value."); return; } + var endPosition = reader.Position; - var c = _options.KeyComparer(keys_arr[index], k); + var c = _options.KeyComparer(keys_arr[index], kv.Value); if (c == 0) { ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); @@ -386,7 +395,7 @@ namespace ZeroLevel.Services.PartitionStorage index++; if (index < keys_arr.Length) { - c = _options.KeyComparer(keys_arr[index], k); + c = _options.KeyComparer(keys_arr[index], kv.Value); } } while (index < keys_arr.Length && c == -1); } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index 9d04b11..edc2eaf 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -14,7 +14,7 @@ namespace ZeroLevel.Services.PartitionStorage internal sealed class StorePartitionBuilder : BasePartition, IStorePartitionBuilder { - private readonly Func _storeMethod; + private readonly Func> _storeMethod; private long _totalRecords = 0; @@ -40,9 +40,9 @@ namespace ZeroLevel.Services.PartitionStorage #region IStorePartitionBuilder - public void Store(TKey key, TInput value) + public async Task Store(TKey key, TInput value) { - if (_storeMethod.Invoke(key, value)) + if (await _storeMethod.Invoke(key, value)) { Interlocked.Increment(ref _totalRecords); } @@ -53,18 +53,16 @@ namespace ZeroLevel.Services.PartitionStorage CloseWriteStreams(); } - public void Compress() + public async Task Compress() { var files = Directory.GetFiles(_catalog); if (files != null && files.Length > 0) { - Parallel.ForEach(files, file => CompressFile(file)); + await Parallel.ForEachAsync(files, async(file, _) => await CompressFile(file)); } } - public IEnumerable> Iterate() + public async IAsyncEnumerable> Iterate() { - TKey key; - TInput val; var files = Directory.GetFiles(_catalog); if (files != null && files.Length > 0) { @@ -77,9 +75,13 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - if (Serializer.KeyDeserializer.Invoke(reader, out key) == false) break; - if (Serializer.InputDeserializer.Invoke(reader, out val) == false) break; - yield return new StorePartitionKeyValueSearchResult { Key = key, Value = val, Status = SearchResult.Success }; + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) break; + + var vv = await Serializer.InputDeserializer.Invoke(reader); + if (vv.Success == false) break; + + yield return new SearchResult { Key = kv.Value, Value = vv.Value, Success = true }; } } } @@ -90,14 +92,14 @@ namespace ZeroLevel.Services.PartitionStorage #endregion #region Private methods - private bool StoreDirect(TKey key, TInput value) + private async Task StoreDirect(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); if (TryGetWriteStream(groupKey, out var stream)) { - Serializer.KeySerializer.Invoke(stream, key); + await Serializer.KeySerializer.Invoke(stream, key); Thread.MemoryBarrier(); - Serializer.InputSerializer.Invoke(stream, value); + await Serializer.InputSerializer.Invoke(stream, value); return true; } else @@ -106,7 +108,7 @@ namespace ZeroLevel.Services.PartitionStorage } return false; } - private bool StoreDirectSafe(TKey key, TInput value) + private async Task StoreDirectSafe(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); bool lockTaken = false; @@ -115,9 +117,9 @@ namespace ZeroLevel.Services.PartitionStorage Monitor.Enter(stream, ref lockTaken); try { - Serializer.KeySerializer.Invoke(stream, key); + await Serializer.KeySerializer.Invoke(stream, key); Thread.MemoryBarrier(); - Serializer.InputSerializer.Invoke(stream, value); + await Serializer.InputSerializer.Invoke(stream, value); return true; } finally @@ -135,10 +137,8 @@ namespace ZeroLevel.Services.PartitionStorage return false; } - internal void CompressFile(string file) + internal async Task CompressFile(string file) { - TKey key; - TInput input; PhisicalFileAccessorCachee.LockFile(file); try { @@ -150,25 +150,27 @@ namespace ZeroLevel.Services.PartitionStorage { while (reader.EOS == false) { - if (Serializer.KeyDeserializer.Invoke(reader, out key) == false) + var kv = await Serializer.KeyDeserializer.Invoke(reader); + if (kv.Success == false) { throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key."); } - if (key != null) + if (kv.Value != null) { - if (false == dict.ContainsKey(key)) + if (false == dict.ContainsKey(kv.Value)) { - dict[key] = new HashSet(); + dict[kv.Value] = new HashSet(); } if (reader.EOS) { break; } - if (Serializer.InputDeserializer.Invoke(reader, out input) == false) + var iv = await Serializer.InputDeserializer.Invoke(reader); + if (iv.Success == false) { throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read input value."); } - dict[key].Add(input); + dict[kv.Value].Add(iv.Value); } else { diff --git a/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs index 1792b40..f85d15a 100644 --- a/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs +++ b/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs @@ -1,9 +1,9 @@ namespace ZeroLevel.Services.PartitionStorage { - public enum SearchResult + public class SearchResult { - Success, - NotFound, - FileLockedOrUnavaliable + public bool Success { get; set; } + public TKey Key { get; set; } + public TValue Value { get; set; } } } diff --git a/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs deleted file mode 100644 index 61ba3de..0000000 --- a/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace ZeroLevel.Services.PartitionStorage -{ - public class StorePartitionKeyValueSearchResult - { - public SearchResult Status { get; set; } - public TKey Key { get; set; } - public TValue Value { get; set; } - } -} diff --git a/ZeroLevel/Services/PartitionStorage/Search/StoreSearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/StoreSearchResult.cs index 7fe275a..850f4f6 100644 --- a/ZeroLevel/Services/PartitionStorage/Search/StoreSearchResult.cs +++ b/ZeroLevel/Services/PartitionStorage/Search/StoreSearchResult.cs @@ -4,6 +4,6 @@ namespace ZeroLevel.Services.PartitionStorage { public class StoreSearchResult { - public IDictionary>> Results { get; set; } + public IDictionary>> Results { get; set; } } } diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs index d834aa0..dbc8a6c 100644 --- a/ZeroLevel/Services/PartitionStorage/Store.cs +++ b/ZeroLevel/Services/PartitionStorage/Store.cs @@ -17,18 +17,12 @@ namespace ZeroLevel.Services.PartitionStorage private readonly PhisicalFileAccessorCachee _fileAccessorCachee; public Store(StoreOptions options, - IStoreSerializer serializer = null) + IStoreSerializer serializer) { if (options == null) throw new ArgumentNullException(nameof(options)); + if (serializer == null) throw new ArgumentNullException(nameof(serializer)); _options = options; - if (serializer == null) - { - _serializer = new StoreStandartSerializer(); - } - else - { - _serializer = serializer; - } + _serializer = serializer; if (Directory.Exists(_options.RootFolder) == false) { Directory.CreateDirectory(_options.RootFolder); @@ -76,10 +70,10 @@ namespace ZeroLevel.Services.PartitionStorage _fileAccessorCachee.DropAllIndexReaders(); } - public StoreSearchResult Search(StoreSearchRequest searchRequest) + public async Task> Search(StoreSearchRequest searchRequest) { var result = new StoreSearchResult(); - var results = new ConcurrentDictionary>>(); + var results = new ConcurrentDictionary>>(); if (searchRequest.PartitionSearchRequests?.Any() ?? false) { var partitionsSearchInfo = searchRequest @@ -89,16 +83,19 @@ namespace ZeroLevel.Services.PartitionStorage { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism }; - Parallel.ForEach(partitionsSearchInfo, options, (pair, _) => + await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) => { var accessor = CreateAccessor(pair.Key); if (accessor != null) { using (accessor) { - results[pair.Key] = accessor - .Find(pair.Value) - .ToArray(); + var set = new List>(); + await foreach (var kv in accessor.Iterate()) + { + set.Add(new KV(kv.Key, kv.Value)); + } + results[pair.Key] = set; } } }); @@ -112,29 +109,30 @@ namespace ZeroLevel.Services.PartitionStorage _fileAccessorCachee.Dispose(); } - public void Bypass(TMeta meta, Action handler) + public async IAsyncEnumerable> Bypass(TMeta meta) { var accessor = CreateAccessor(meta); if (accessor != null) { using (accessor) { - foreach (var kv in accessor.Iterate()) + await foreach (var kv in accessor.Iterate()) { - handler.Invoke(kv.Key, kv.Value); + yield return kv; } } } } - public bool Exists(TMeta meta, TKey key) + public async Task Exists(TMeta meta, TKey key) { var accessor = CreateAccessor(meta); if (accessor != null) { using (accessor) { - return accessor.Find(key).Status == SearchResult.Success; + var info = await accessor.Find(key); + return info.Success; } } return false; diff --git a/ZeroLevel/Services/PartitionStorage/StoreSerializers.cs b/ZeroLevel/Services/PartitionStorage/StoreSerializers.cs new file mode 100644 index 0000000..b2eb16e --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/StoreSerializers.cs @@ -0,0 +1,44 @@ +using System; +using System.Threading.Tasks; +using ZeroLevel.Services.PartitionStorage.Interfaces; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services.PartitionStorage +{ + public record DeserializeResult(bool Success, T Value); + + public delegate Task> TryDeserializeMethod(MemoryStreamReader reader); + + public sealed class StoreSerializers + : IStoreSerializer + { + private readonly Func _keySerializer; + private readonly Func _inputSerializer; + private readonly Func>> _keyDeserializer; + private readonly Func>> _inputDeserializer; + private readonly Func>> _valueDeserializer; + + public StoreSerializers(Func keySerializer, + Func inputSerializer, + Func>> keyDeserializer, + Func>> inputDeserializer, + Func>> valueDeserializer) + { + _keySerializer = keySerializer; + _inputSerializer = inputSerializer; + _keyDeserializer = keyDeserializer; + _inputDeserializer = inputDeserializer; + _valueDeserializer = valueDeserializer; + } + + public Func KeySerializer => _keySerializer; + + public Func InputSerializer => _inputSerializer; + + public Func>> KeyDeserializer => _keyDeserializer; + + public Func>> InputDeserializer => _inputDeserializer; + + public Func>> ValueDeserializer => _valueDeserializer; + } +} diff --git a/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs b/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs deleted file mode 100644 index 5dcbc68..0000000 --- a/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs +++ /dev/null @@ -1,39 +0,0 @@ -using System; -using ZeroLevel.Services.PartitionStorage.Interfaces; -using ZeroLevel.Services.Serialization; - -namespace ZeroLevel.Services.PartitionStorage -{ - - - // TODO INTERNAL - public sealed class StoreStandartSerializer - : IStoreSerializer - { - private readonly Action _keySerializer; - private readonly Action _inputSerializer; - private readonly TryDeserializeMethod _keyDeserializer; - private readonly TryDeserializeMethod _inputDeserializer; - private readonly TryDeserializeMethod _valueDeserializer; - - public StoreStandartSerializer() - { - _keySerializer = MessageSerializer.GetSerializer(); - _inputSerializer = MessageSerializer.GetSerializer(); - - _keyDeserializer = MessageSerializer.GetSafetyDeserializer(); - _inputDeserializer = MessageSerializer.GetSafetyDeserializer(); - _valueDeserializer = MessageSerializer.GetSafetyDeserializer(); - } - - public Action KeySerializer => _keySerializer; - - public Action InputSerializer => _inputSerializer; - - public TryDeserializeMethod KeyDeserializer => _keyDeserializer; - - public TryDeserializeMethod InputDeserializer => _inputDeserializer; - - public TryDeserializeMethod ValueDeserializer => _valueDeserializer; - } -} diff --git a/ZeroLevel/Services/PartitionStorage/UniformKeyValueStorage.cs b/ZeroLevel/Services/PartitionStorage/UniformKeyValueStorage.cs deleted file mode 100644 index cfc755b..0000000 --- a/ZeroLevel/Services/PartitionStorage/UniformKeyValueStorage.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace ZeroLevel.Services.PartitionStorage -{ - public sealed class UniformKeyValueStorage - { - - } -} diff --git a/ZeroLevel/Services/Serialization/IBinaryReader.cs b/ZeroLevel/Services/Serialization/IBinaryReader.cs index a5bd294..a45a048 100644 --- a/ZeroLevel/Services/Serialization/IBinaryReader.cs +++ b/ZeroLevel/Services/Serialization/IBinaryReader.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Net; +using System.Threading.Tasks; namespace ZeroLevel.Services.Serialization { @@ -57,7 +58,7 @@ namespace ZeroLevel.Services.Serialization IPAddress[] ReadIPArray(); IPEndPoint[] ReadIPEndPointArray(); Guid[] ReadGuidArray(); - DateTime[] ReadDateTimeArray(); + DateTime?[] ReadDateTimeArray(); Int64[] ReadInt64Array(); Int32[] ReadInt32Array(); UInt64[] ReadUInt64Array(); @@ -78,7 +79,7 @@ namespace ZeroLevel.Services.Serialization List ReadCollection() where T : IBinarySerializable, new(); List ReadStringCollection(); List ReadGuidCollection(); - List ReadDateTimeCollection(); + List ReadDateTimeCollection(); List ReadCharCollection(); List ReadInt64Collection(); List ReadInt32Collection(); @@ -104,7 +105,7 @@ namespace ZeroLevel.Services.Serialization IEnumerable ReadIPCollectionLazy(); IEnumerable ReadIPEndPointCollectionLazy(); IEnumerable ReadGuidCollectionLazy(); - IEnumerable ReadDateTimeCollectionLazy(); + IEnumerable ReadDateTimeCollectionLazy(); IEnumerable ReadInt64CollectionLazy(); IEnumerable ReadInt32CollectionLazy(); IEnumerable ReadUInt64CollectionLazy(); @@ -134,4 +135,86 @@ namespace ZeroLevel.Services.Serialization void SetPosition(long position); } + + public interface IBinaryReaderAsync + : IDisposable + { + Task ReadBooleanAsync(); + Task ReadCharAsync(); + Task ReadByteAsync(); + Task ReadBytesAsync(); + Task ReadDoubleAsync(); + Task ReadFloatAsync(); + Task ReadShortAsync(); + Task ReadUShortAsync(); + Task ReadInt32Async(); + Task ReadUInt32Async(); + Task ReadLongAsync(); + Task ReadULongAsync(); + Task ReadStringAsync(); + Task ReadGuidAsync(); + Task ReadDateTimeAsync(); + Task ReadTimeAsync(); + Task ReadDateAsync(); + Task ReadDecimalAsync(); + Task ReadTimeSpanAsync(); + Task ReadIPAsync(); + Task ReadIPEndpointAsync(); + + #region Extensions + + #region Arrays + Task ReadArrayAsync() where T : IAsyncBinarySerializable, new(); + Task ReadStringArrayAsync(); + Task ReadIPArrayAsync(); + Task ReadIPEndPointArrayAsync(); + Task ReadGuidArrayAsync(); + Task ReadDateTimeArrayAsync(); + Task ReadInt64ArrayAsync(); + Task ReadInt32ArrayAsync(); + Task ReadUInt64ArrayAsync(); + Task ReadUInt32ArrayAsync(); + Task ReadCharArrayAsync(); + Task ReadShortArrayAsync(); + Task ReadUShortArrayAsync(); + Task ReadFloatArrayAsync(); + Task ReadDoubleArrayAsync(); + Task ReadBooleanArrayAsync(); + Task ReadByteArrayAsync(); + Task ReadByteArrayArrayAsync(); + Task ReadDecimalArrayAsync(); + Task ReadTimeSpanArrayAsync(); + #endregion + + #region Collections + Task> ReadCollectionAsync() where T : IAsyncBinarySerializable, new(); + Task> ReadStringCollectionAsync(); + Task> ReadGuidCollectionAsync(); + Task> ReadDateTimeCollectionAsync(); + Task> ReadCharCollectionAsync(); + Task> ReadInt64CollectionAsync(); + Task> ReadInt32CollectionAsync(); + Task> ReadDoubleCollectionAsync(); + Task> ReadDecimalCollectionAsync(); + Task> ReadTimeSpanCollectionAsync(); + Task> ReadFloatCollectionAsync(); + Task> ReadBooleanCollectionAsync(); + Task> ReadByteCollectionAsync(); + Task> ReadByteArrayCollectionAsync(); + Task> ReadIPCollectionAsync(); + Task> ReadIPEndPointCollectionAsync(); + Task> ReadUInt64CollectionAsync(); + Task> ReadUInt32CollectionAsync(); + Task> ReadShortCollectionAsync(); + Task> ReadUShortCollectionAsync(); + #endregion + + Task ReadAsync() where T : IAsyncBinarySerializable; + Task ReadAsync(object arg) where T : IAsyncBinarySerializable; + Task ReadCompatibleAsync(); + Task> ReadDictionaryAsync(); + Task> ReadDictionaryAsConcurrentAsync(); + + #endregion Extensions + } } \ No newline at end of file diff --git a/ZeroLevel/Services/Serialization/IBinarySerializable.cs b/ZeroLevel/Services/Serialization/IBinarySerializable.cs index db5ab27..7e3a08d 100644 --- a/ZeroLevel/Services/Serialization/IBinarySerializable.cs +++ b/ZeroLevel/Services/Serialization/IBinarySerializable.cs @@ -1,4 +1,6 @@ -namespace ZeroLevel.Services.Serialization +using System.Threading.Tasks; + +namespace ZeroLevel.Services.Serialization { public interface IBinarySerializable { @@ -6,4 +8,11 @@ void Deserialize(IBinaryReader reader); } + + public interface IAsyncBinarySerializable + { + Task SerializeAsync(IBinaryWriter writer); + + Task DeserializeAsync(IBinaryReader reader); + } } \ No newline at end of file diff --git a/ZeroLevel/Services/Serialization/IBinaryWriter.cs b/ZeroLevel/Services/Serialization/IBinaryWriter.cs index d69bd08..44a5ace 100644 --- a/ZeroLevel/Services/Serialization/IBinaryWriter.cs +++ b/ZeroLevel/Services/Serialization/IBinaryWriter.cs @@ -3,6 +3,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Net; +using System.Threading.Tasks; namespace ZeroLevel.Services.Serialization { @@ -60,6 +61,7 @@ namespace ZeroLevel.Services.Serialization void WriteArray(IPEndPoint[] array); void WriteArray(Guid[] array); void WriteArray(DateTime[] array); + void WriteArray(DateTime?[] array); void WriteArray(UInt64[] array); void WriteArray(UInt32[] array); void WriteArray(char[] array); @@ -83,6 +85,7 @@ namespace ZeroLevel.Services.Serialization void WriteCollection(IEnumerable collection); void WriteCollection(IEnumerable collection); void WriteCollection(IEnumerable collection); + void WriteCollection(IEnumerable collection); void WriteCollection(IEnumerable collection); void WriteCollection(IEnumerable collection); void WriteCollection(IEnumerable collection); @@ -112,4 +115,110 @@ namespace ZeroLevel.Services.Serialization Stream Stream { get; } } + + public interface IAsyncBinaryWriter + : IDisposable + { + Task WriteCharAsync(char val); + + Task WriteBytesAsync(byte[] val); + + Task WriteShortAsync(short number); + + Task WriteUShortAsync(ushort number); + + Task WriteDoubleAsync(double val); + + Task WriteFloatAsync(float val); + + Task WriteInt32Async(Int32 number); + + Task WriteUInt32Async(UInt32 number); + + Task WriteLongAsync(Int64 number); + + Task WriteULongAsync(UInt64 number); + + Task WriteStringAsync(string line); + + Task WriteGuidAsync(Guid guid); + + Task WriteDateTimeAsync(DateTime? datetime); + + Task WriteTimeAsync(TimeOnly? time); + + Task WriteDateAsync(DateOnly? date); + + Task WriteDecimalAsync(Decimal number); + + Task WriteTimeSpanAsync(TimeSpan period); + + Task WriteIPAsync(IPAddress ip); + + Task WriteIPEndpointAsync(IPEndPoint endpoint); + + #region Extensions + + #region Arrays + Task WriteArrayAsync(T[] array) + where T : IAsyncBinarySerializable; + Task WriteArrayAsync(string[] array); + Task WriteArrayAsync(IPAddress[] array); + Task WriteArrayAsync(IPEndPoint[] array); + Task WriteArrayAsync(Guid[] array); + Task WriteArrayAsync(DateTime[] array); + Task WriteArrayAsync(DateTime?[] array); + Task WriteArrayAsync(UInt64[] array); + Task WriteArrayAsync(UInt32[] array); + Task WriteArrayAsync(char[] array); + Task WriteArrayAsync(short[] array); + Task WriteArrayAsync(ushort[] array); + Task WriteArrayAsync(Int64[] array); + Task WriteArrayAsync(Int32[] array); + Task WriteArrayAsync(float[] array); + Task WriteArrayAsync(Double[] array); + Task WriteArrayAsync(bool[] array); + Task WriteArrayAsync(byte[] array); + Task WriteArrayAsync(byte[][] array); + Task WriteArrayAsync(decimal[] array); + Task WriteArrayAsync(TimeSpan[] array); + #endregion + + #region Collections + Task WriteCollectionAsync(IEnumerable collection) + where T : IAsyncBinarySerializable; + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + Task WriteCollectionAsync(IEnumerable collection); + #endregion + + Task WriteDictionaryAsync(IDictionary collection); + Task WriteDictionaryAsync(ConcurrentDictionary collection); + + Task WriteAsync(T item) + where T : IAsyncBinarySerializable; + + Task WriteCompatibleAsync(T item); + + #endregion Extensions + + Stream Stream { get; } + } } \ No newline at end of file diff --git a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs index d943588..d4dd99b 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs @@ -3,8 +3,10 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Net; +using System.Runtime.CompilerServices; using System.Runtime.Serialization; using System.Text; +using System.Threading.Tasks; using ZeroLevel.Services.Extensions; using ZeroLevel.Services.Memory; @@ -13,7 +15,7 @@ namespace ZeroLevel.Services.Serialization /// /// A wrapper over a MemoryStream for reading, with a check for overflow /// - public sealed class MemoryStreamReader + public partial class MemoryStreamReader : IBinaryReader { private readonly IViewAccessor _accessor; @@ -124,10 +126,11 @@ namespace ZeroLevel.Services.Serialization public decimal ReadDecimal() { - var p1 = ReadInt32(); - var p2 = ReadInt32(); - var p3 = ReadInt32(); - var p4 = ReadInt32(); + var arr = ReadBuffer(16); + var p1 = BitConverter.ToInt32(arr, 0); + var p2 = BitConverter.ToInt32(arr, 4); + var p3 = BitConverter.ToInt32(arr, 8); + var p4 = BitConverter.ToInt32(arr, 12); return BitConverterExt.ToDecimal(new int[] { p1, p2, p3, p4 }); } @@ -190,7 +193,7 @@ namespace ZeroLevel.Services.Serialization { if (CheckOutOfRange(count)) throw new OutOfMemoryException("Array index out of bounds"); - var buffer = _accessor.ReadBuffer(count); + var buffer = _accessor.ReadBuffer(count).Result; if (_reverseByteOrder && count > 1) { byte b; @@ -213,7 +216,7 @@ namespace ZeroLevel.Services.Serialization } try { - buffer = _accessor.ReadBuffer(count); + buffer = _accessor.ReadBuffer(count).Result; if (_reverseByteOrder && count > 1) { byte b; @@ -287,16 +290,13 @@ namespace ZeroLevel.Services.Serialization /// /// Check if data reading is outside the stream /// - public bool CheckOutOfRange(int offset) - { - return _accessor.CheckOutOfRange(offset); - } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool CheckOutOfRange(int offset) => _accessor.CheckOutOfRange(offset); #region Extensions #region Collections - public List ReadCollection() - where T : IBinarySerializable, new() + private List ReadList(Func read) { int count = ReadInt32(); var collection = new List(count); @@ -304,805 +304,690 @@ namespace ZeroLevel.Services.Serialization { for (int i = 0; i < count; i++) { - var item = new T(); - item.Deserialize(this); - collection.Add(item); + collection.Add(read.Invoke()); } } return collection; } - public List ReadStringCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) + public List ReadCollection() + where T : IBinarySerializable, new() + => + ReadList(() => { - for (int i = 0; i < count; i++) - { - collection.Add(ReadString()); - } - } - return collection; - } + var item = new T(); + item.Deserialize(this); + return item; + }); - public List ReadIPCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadIP()); - } - } - return collection; - } + public List ReadStringCollection() => ReadList(ReadString); - public List ReadIPEndPointCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadIPEndpoint()); - } - } - return collection; - } + public List ReadIPCollection() => ReadList(ReadIP); - public List ReadGuidCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadGuid()); - } - } - return collection; - } + public List ReadIPEndPointCollection() => ReadList(ReadIPEndpoint); - public List ReadDateTimeCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadDateTime() ?? DateTime.MinValue); - } - } - return collection; - } + public List ReadGuidCollection() => ReadList(ReadGuid); - public List ReadInt64Collection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadLong()); - } - } - return collection; - } + public List ReadDateTimeCollection() => ReadList(ReadDateTime); - public List ReadInt32Collection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadInt32()); - } - } - return collection; - } + public List ReadInt64Collection() => ReadList(ReadLong); - public List ReadUInt64Collection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadULong()); - } - } - return collection; - } + public List ReadInt32Collection() => ReadList(ReadInt32); - public List ReadUInt32Collection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadUInt32()); - } - } - return collection; - } + public List ReadUInt64Collection() => ReadList(ReadULong); - public List ReadCharCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadChar()); - } - } - return collection; - } + public List ReadUInt32Collection() => ReadList(ReadUInt32); - public List ReadShortCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadShort()); - } - } - return collection; - } + public List ReadCharCollection() => ReadList(ReadChar); - public List ReadUShortCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadUShort()); - } - } - return collection; - } + public List ReadShortCollection() => ReadList(ReadShort); - public List ReadFloatCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadFloat()); - } - } - return collection; - } + public List ReadUShortCollection() => ReadList(ReadUShort); - public List ReadDoubleCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - collection.Add(ReadDouble()); - } - } - return collection; - } + public List ReadFloatCollection() => ReadList(ReadFloat); + + public List ReadDoubleCollection() => ReadList(ReadDouble); + + public List ReadBooleanCollection() => ReadList(ReadBoolean); + + public List ReadByteCollection() => ReadList(ReadByte); - public List ReadBooleanCollection() + public List ReadByteArrayCollection() => ReadList(ReadBytes); + + public List ReadDecimalCollection() => ReadList(ReadDecimal); + + public List ReadTimeSpanCollection() => ReadList(ReadTimeSpan); + #endregion + + #region Collections lazy + + private IEnumerable ReadEnumerable(Func read) { int count = ReadInt32(); - var collection = new List(count); if (count > 0) { for (int i = 0; i < count; i++) { - collection.Add(ReadBoolean()); + yield return read.Invoke(); } } - return collection; } - public List ReadByteCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) + public IEnumerable ReadCollectionLazy() + where T : IBinarySerializable, new() + => + ReadEnumerable(() => { - for (int i = 0; i < count; i++) - { - collection.Add(ReadByte()); - } - } - return collection; - } + var item = new T(); + item.Deserialize(this); + return item; + }); + + public IEnumerable ReadStringCollectionLazy() => ReadEnumerable(ReadString); + + public IEnumerable ReadIPCollectionLazy() => ReadEnumerable(ReadIP); + public IEnumerable ReadIPEndPointCollectionLazy() => ReadEnumerable(ReadIPEndpoint); + + public IEnumerable ReadGuidCollectionLazy() => ReadEnumerable(ReadGuid); + + public IEnumerable ReadDateTimeCollectionLazy() => ReadEnumerable(ReadDateTime); + + public IEnumerable ReadInt64CollectionLazy() => ReadEnumerable(ReadLong); + + public IEnumerable ReadInt32CollectionLazy() => ReadEnumerable(ReadInt32); + + public IEnumerable ReadUInt64CollectionLazy() => ReadEnumerable(ReadULong); + + public IEnumerable ReadUInt32CollectionLazy() => ReadEnumerable(ReadUInt32); + + public IEnumerable ReadCharCollectionLazy() => ReadEnumerable(ReadChar); + + public IEnumerable ReadShortCollectionLazy() => ReadEnumerable(ReadShort); + + public IEnumerable ReadUShortCollectionLazy() => ReadEnumerable(ReadUShort); + + public IEnumerable ReadFloatCollectionLazy() => ReadEnumerable(ReadFloat); + + public IEnumerable ReadDoubleCollectionLazy() => ReadEnumerable(ReadDouble); + + public IEnumerable ReadBooleanCollectionLazy() => ReadEnumerable(ReadBoolean); + + public IEnumerable ReadByteCollectionLazy() => ReadEnumerable(ReadByte); - public List ReadByteArrayCollection() + public IEnumerable ReadByteArrayCollectionLazy() => ReadEnumerable(ReadBytes); + + public IEnumerable ReadDecimalCollectionLazy() => ReadEnumerable(ReadDecimal); + + public IEnumerable ReadTimeSpanCollectionLazy() => ReadEnumerable(ReadTimeSpan); + #endregion + + #region Arrays + private T[] ReadArray(Func read) { int count = ReadInt32(); - var collection = new List(count); + var array = new T[count]; if (count > 0) { for (int i = 0; i < count; i++) { - collection.Add(ReadBytes()); + array[i] = read.Invoke(); } } - return collection; + return array; } - public List ReadDecimalCollection() - { - int count = ReadInt32(); - var collection = new List(count); - if (count > 0) + + public T[] ReadArray() + where T : IBinarySerializable, new() + => + ReadArray(() => { - for (int i = 0; i < count; i++) - { - collection.Add(ReadDecimal()); - } - } - return collection; - } + var item = new T(); + item.Deserialize(this); + return item; + }); + + public string[] ReadStringArray() => ReadArray(ReadString); + + public IPAddress[] ReadIPArray() => ReadArray(ReadIP); + + public IPEndPoint[] ReadIPEndPointArray() => ReadArray(ReadIPEndpoint); + + public Guid[] ReadGuidArray() => ReadArray(ReadGuid); + + public DateTime?[] ReadDateTimeArray() => ReadArray(ReadDateTime); + + public Int64[] ReadInt64Array() => ReadArray(ReadLong); + + public Int32[] ReadInt32Array() => ReadArray(ReadInt32); + + public UInt64[] ReadUInt64Array() => ReadArray(ReadULong); + + public UInt32[] ReadUInt32Array() => ReadArray(ReadUInt32); + + public char[] ReadCharArray() => ReadArray(ReadChar); + + public short[] ReadShortArray() => ReadArray(ReadShort); - public List ReadTimeSpanCollection() + public ushort[] ReadUShortArray() => ReadArray(ReadUShort); + + public float[] ReadFloatArray() => ReadArray(ReadFloat); + + public Double[] ReadDoubleArray() => ReadArray(ReadDouble); + + public bool[] ReadBooleanArray() => ReadArray(ReadBoolean); + + public byte[] ReadByteArray() => ReadBytes(); + + public byte[][] ReadByteArrayArray() => ReadArray(ReadBytes); + + public decimal[] ReadDecimalArray() => ReadArray(ReadDecimal); + + public TimeSpan[] ReadTimeSpanArray() => ReadArray(ReadTimeSpan); + #endregion + + public Dictionary ReadDictionary() { int count = ReadInt32(); - var collection = new List(count); + var collection = new Dictionary(count); if (count > 0) { + TKey key; + TValue value; for (int i = 0; i < count; i++) { - collection.Add(ReadTimeSpan()); + key = ReadCompatible(); + value = ReadCompatible(); + collection.Add(key, value); } } return collection; } - #endregion - #region Collections lazy - public IEnumerable ReadCollectionLazy() - where T : IBinarySerializable, new() + public ConcurrentDictionary ReadDictionaryAsConcurrent() { int count = ReadInt32(); + var collection = new ConcurrentDictionary(); if (count > 0) { + TKey key; + TValue value; for (int i = 0; i < count; i++) { - var item = new T(); - item.Deserialize(this); - yield return item; + key = ReadCompatible(); + value = ReadCompatible(); + collection.TryAdd(key, value); } } + return collection; } - public IEnumerable ReadStringCollectionLazy() + public T ReadCompatible() { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadString(); - } - } + return MessageSerializer.DeserializeCompatible(this); } - public IEnumerable ReadIPCollectionLazy() + public T Read() where T : IBinarySerializable { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadIP(); - } - } + byte type = ReadByte(); + if (type == 0) return default(T); + var item = (T)Activator.CreateInstance(); + item.Deserialize(this); + return item; } - public IEnumerable ReadIPEndPointCollectionLazy() + public bool TryReadByte(out byte b) { - int count = ReadInt32(); - if (count > 0) + if (TryReadBuffer(1, out var buffer)) { - for (int i = 0; i < count; i++) - { - yield return ReadIPEndpoint(); - } + b = buffer[0]; + return true; } + b = default; + return false; } - public IEnumerable ReadGuidCollectionLazy() + public bool TryRead(out T item) where T : IBinarySerializable { - int count = ReadInt32(); - if (count > 0) + if (TryReadByte(out var type)) { - for (int i = 0; i < count; i++) + if (type == 0) { - yield return ReadGuid(); + item = default(T); + return true; } - } - } - - public IEnumerable ReadDateTimeCollectionLazy() - { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) + try { - yield return ReadDateTime() ?? DateTime.MinValue; + var o = (IBinarySerializable)FormatterServices.GetUninitializedObject(typeof(T)); + o.Deserialize(this); + item = (T)o; + return true; } - } - } - - public IEnumerable ReadInt64CollectionLazy() - { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) + catch (Exception ex) { - yield return ReadLong(); + Log.SystemError(ex, "[MemoryStreamReader.TryRead]"); } } + item = default; + return false; } - public IEnumerable ReadInt32CollectionLazy() + public T Read(object arg) where T : IBinarySerializable { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadInt32(); - } - } + byte type = ReadByte(); + if (type == 0) return default(T); + var item = (T)Activator.CreateInstance(typeof(T), arg); + item.Deserialize(this); + return item; } + #endregion Extensions - public IEnumerable ReadUInt64CollectionLazy() + public void Dispose() { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadULong(); - } - } + _accessor.Dispose(); } + } - public IEnumerable ReadUInt32CollectionLazy() + public partial class MemoryStreamReader + : IBinaryReaderAsync + { + /// + /// Reading byte-package (read the size of the specified number of bytes, and then the packet itself read size) + /// + public async Task ReadBufferAsync(int count) { - int count = ReadInt32(); - if (count > 0) + if (CheckOutOfRange(count)) + throw new OutOfMemoryException("Array index out of bounds"); + var buffer = await _accessor.ReadBuffer(count); + if (_reverseByteOrder && count > 1) { - for (int i = 0; i < count; i++) + byte b; + for (int i = 0; i < (count >> 1); i++) { - yield return ReadUInt32(); + b = buffer[i]; + buffer[i] = buffer[count - i - 1]; + buffer[count - i - 1] = b; } } + return buffer; } - public IEnumerable ReadCharCollectionLazy() + public async Task TryReadBufferAsync(int count, byte[] buffer) { - int count = ReadInt32(); - if (count > 0) + if (CheckOutOfRange(count)) { - for (int i = 0; i < count; i++) - { - yield return ReadChar(); - } + buffer = null; + return false; } - } - - public IEnumerable ReadShortCollectionLazy() - { - int count = ReadInt32(); - if (count > 0) + try { - for (int i = 0; i < count; i++) + buffer = await _accessor.ReadBuffer(count); + if (_reverseByteOrder && count > 1) { - yield return ReadShort(); + byte b; + for (int i = 0; i < (count >> 1); i++) + { + b = buffer[i]; + buffer[i] = buffer[count - i - 1]; + buffer[count - i - 1] = b; + } } } - } - - public IEnumerable ReadUShortCollectionLazy() - { - int count = ReadInt32(); - if (count > 0) + catch (Exception ex) { - for (int i = 0; i < count; i++) - { - yield return ReadUShort(); - } + Log.SystemError(ex, $"[MemoryStreamReader.TryReadBufferAsync] Fault read {count} bytes"); + buffer = null; + return false; } + return true; } - public IEnumerable ReadFloatCollectionLazy() + /// + /// Flag reading + /// + public async Task ReadBooleanAsync() { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadFloat(); - } - } + return BitConverter.ToBoolean(new byte[1] { await ReadByteAsync() }, 0); } - public IEnumerable ReadDoubleCollectionLazy() + /// + /// Reading byte + /// + public async Task ReadByteAsync() { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadDouble(); - } - } + var buffer = await ReadBufferAsync(1); + return buffer[0]; } - public IEnumerable ReadBooleanCollectionLazy() + public async Task ReadCharAsync() { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadBoolean(); - } - } + var buffer = await ReadBufferAsync(2); + return BitConverter.ToChar(buffer, 0); } - public IEnumerable ReadByteCollectionLazy() + /// + /// Reading bytes + /// + /// + public async Task ReadBytesAsync() { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadByte(); - } - } + var length = BitConverter.ToInt32(await ReadBufferAsync(4), 0); + if (length == 0) return new byte[0]; + return ReadBuffer(length); } - public IEnumerable ReadByteArrayCollectionLazy() + public async Task ReadShortAsync() { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadBytes(); - } - } + var buffer = await ReadBufferAsync(2); + return BitConverter.ToInt16(buffer, 0); } - public IEnumerable ReadDecimalCollectionLazy() + public async Task ReadUShortAsync() { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadDecimal(); - } - } + var buffer = await ReadBufferAsync(2); + return BitConverter.ToUInt16(buffer, 0); } - public IEnumerable ReadTimeSpanCollectionLazy() + /// + /// Read 32-bit integer (4 bytes) + /// + public async Task ReadInt32Async() { - int count = ReadInt32(); - if (count > 0) - { - for (int i = 0; i < count; i++) - { - yield return ReadTimeSpan(); - } - } + var buffer = await ReadBufferAsync(4); + return BitConverter.ToInt32(buffer, 0); } - #endregion - #region Arrays - public T[] ReadArray() - where T : IBinarySerializable, new() + public async Task ReadUInt32Async() { - int count = ReadInt32(); - var array = new T[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - var item = new T(); - item.Deserialize(this); - array[i] = item; - } - } - return array; + var buffer = await ReadBufferAsync(4); + return BitConverter.ToUInt32(buffer, 0); } - public string[] ReadStringArray() + public async Task ReadDecimalAsync() { - int count = ReadInt32(); - var array = new string[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadString(); - } - } - return array; + var arr = await ReadBufferAsync(16); + var p1 = BitConverter.ToInt32(arr, 0); + var p2 = BitConverter.ToInt32(arr, 4); + var p3 = BitConverter.ToInt32(arr, 8); + var p4 = BitConverter.ToInt32(arr, 12); + return BitConverterExt.ToDecimal(new int[] { p1, p2, p3, p4 }); } - public IPAddress[] ReadIPArray() - { - int count = ReadInt32(); - var array = new IPAddress[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadIP(); - } - } - return array; + /// + /// Read integer 64-bit number (8 bytes) + /// + public async Task ReadLongAsync() + { + var buffer = await ReadBufferAsync(8); + return BitConverter.ToInt64(buffer, 0); } - public IPEndPoint[] ReadIPEndPointArray() + public async Task ReadULongAsync() { - int count = ReadInt32(); - var array = new IPEndPoint[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadIPEndpoint(); - } - } - return array; + var buffer = await ReadBufferAsync(8); + return BitConverter.ToUInt64(buffer, 0); } - public Guid[] ReadGuidArray() + public async Task ReadTimeSpanAsync() { - int count = ReadInt32(); - var array = new Guid[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadGuid(); - } - } - return array; + return new TimeSpan(await ReadLongAsync()); } - public DateTime[] ReadDateTimeArray() + public async Task ReadFloatAsync() { - int count = ReadInt32(); - var array = new DateTime[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = (ReadDateTime() ?? DateTime.MinValue); - } - } - return array; + var buffer = await ReadBufferAsync(4); + return BitConverter.ToSingle(buffer, 0); } - public Int64[] ReadInt64Array() + public async Task ReadDoubleAsync() { - int count = ReadInt32(); - var array = new Int64[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadLong(); - } - } - return array; + var buffer = await ReadBufferAsync(8); + return BitConverter.ToDouble(buffer, 0); } - public Int32[] ReadInt32Array() + /// + /// Read string (4 bytes per length + Length bytes) + /// + public async Task ReadStringAsync() { - int count = ReadInt32(); - var array = new Int32[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadInt32(); - } - } - return array; + var length = BitConverter.ToInt32(await ReadBufferAsync(4), 0); + if (length == 0) return null; + var buffer = await ReadBufferAsync(length); + return Encoding.UTF8.GetString(buffer); } - public UInt64[] ReadUInt64Array() + /// + /// Read GUID (16 bytes) + /// + public async Task ReadGuidAsync() { - int count = ReadInt32(); - var array = new UInt64[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadULong(); - } - } - return array; + var buffer = await ReadBufferAsync(16); + return new Guid(buffer); } - public UInt32[] ReadUInt32Array() + /// + /// Reading the datetime + /// + /// + public async Task ReadDateTimeAsync() { - int count = ReadInt32(); - var array = new UInt32[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadUInt32(); - } - } - return array; + var is_null = ReadByte(); + if (is_null == 0) return null; + var buffer = await ReadBufferAsync(8); + long deserialized = BitConverter.ToInt64(buffer, 0); + return DateTime.FromBinary(deserialized); } - - public char[] ReadCharArray() + public async Task ReadTimeAsync() { - int count = ReadInt32(); - var array = new char[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadChar(); - } - } - return array; + var is_null = await ReadByteAsync(); + if (is_null == 0) return null; + var ts = await ReadTimeSpanAsync(); + return TimeOnly.FromTimeSpan(ts); } - public short[] ReadShortArray() + public async Task ReadDateAsync() { - int count = ReadInt32(); - var array = new short[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadShort(); - } - } - return array; + var is_null = await ReadByteAsync(); + if (is_null == 0) return null; + var days = await ReadInt32Async(); + return DateOnly.FromDayNumber(days); } - - public ushort[] ReadUShortArray() + public async Task ReadIPAsync() { - int count = ReadInt32(); - var array = new ushort[count]; - if (count > 0) + var exists = await ReadByteAsync(); + if (exists == 1) { - for (int i = 0; i < count; i++) - { - array[i] = ReadUShort(); - } + var addr = await ReadBytesAsync(); + return new IPAddress(addr); } - return array; + return null; } - public float[] ReadFloatArray() + public async Task ReadIPEndpointAsync() { - int count = ReadInt32(); - var array = new float[count]; - if (count > 0) + var exists = await ReadByteAsync(); + if (exists == 1) { - for (int i = 0; i < count; i++) - { - array[i] = ReadFloat(); - } + var addr = await ReadIPAsync(); + var port = await ReadInt32Async(); + return new IPEndPoint(addr, port); } - return array; + return null; } - public Double[] ReadDoubleArray() + #region Extensions + + #region Collections + private async Task> ReadListAsync(Func> readAsync, Func read) { - int count = ReadInt32(); - var array = new Double[count]; + int count = await ReadInt32Async(); + var collection = new List(count); if (count > 0) { - for (int i = 0; i < count; i++) + if (_accessor.IsMemoryStream) + { + for (int i = 0; i < count; i++) + { + collection.Add(read.Invoke()); + } + } + else { - array[i] = ReadDouble(); + for (int i = 0; i < count; i++) + { + collection.Add(await readAsync.Invoke()); + } } } - return array; + return collection; } - public bool[] ReadBooleanArray() + public async Task> ReadCollectionAsync() + where T : IAsyncBinarySerializable, new() { - int count = ReadInt32(); - var array = new bool[count]; + int count = await ReadInt32Async(); + var collection = new List(count); if (count > 0) { for (int i = 0; i < count; i++) { - array[i] = ReadBoolean(); + var item = new T(); + await item.DeserializeAsync(this); + collection.Add(item); } } - return array; + return collection; } - public byte[] ReadByteArray() - { - return ReadBytes(); - } + public async Task> ReadStringCollectionAsync() => await ReadListAsync(ReadStringAsync, ReadString); + + public async Task> ReadIPCollectionAsync() => await ReadListAsync(ReadIPAsync, ReadIP); + + public async Task> ReadIPEndPointCollectionAsync() => await ReadListAsync(ReadIPEndpointAsync, ReadIPEndpoint); + + public async Task> ReadGuidCollectionAsync() => await ReadListAsync(ReadGuidAsync, ReadGuid); + + public async Task> ReadDateTimeCollectionAsync() => await ReadListAsync(ReadDateTimeAsync, ReadDateTime); + + public async Task> ReadInt64CollectionAsync() => await ReadListAsync(ReadLongAsync, ReadLong); + + public async Task> ReadInt32CollectionAsync() => await ReadListAsync(ReadInt32Async, ReadInt32); + + public async Task> ReadUInt64CollectionAsync() => await ReadListAsync(ReadULongAsync, ReadULong); + + public async Task> ReadUInt32CollectionAsync() => await ReadListAsync(ReadUInt32Async, ReadUInt32); + + public async Task> ReadCharCollectionAsync() => await ReadListAsync(ReadCharAsync, ReadChar); + + public async Task> ReadShortCollectionAsync() => await ReadListAsync(ReadShortAsync, ReadShort); + + public async Task> ReadUShortCollectionAsync() => await ReadListAsync(ReadUShortAsync, ReadUShort); + + public async Task> ReadFloatCollectionAsync() => await ReadListAsync(ReadFloatAsync, ReadFloat); + + public async Task> ReadDoubleCollectionAsync() => await ReadListAsync(ReadDoubleAsync, ReadDouble); + + public async Task> ReadBooleanCollectionAsync() => await ReadListAsync(ReadBooleanAsync, ReadBoolean); + + public async Task> ReadByteCollectionAsync() => await ReadListAsync(ReadByteAsync, ReadByte); + + public async Task> ReadByteArrayCollectionAsync() => await ReadListAsync(ReadBytesAsync, ReadBytes); + + public async Task> ReadDecimalCollectionAsync() => await ReadListAsync(ReadDecimalAsync, ReadDecimal); + + public async Task> ReadTimeSpanCollectionAsync() => await ReadListAsync(ReadTimeSpanAsync, ReadTimeSpan); - public byte[][] ReadByteArrayArray() + #endregion + + #region Arrays + private async Task ReadArrayAsync(Func> readAsync, Func read) { - int count = ReadInt32(); - var array = new byte[count][]; + int count = await ReadInt32Async(); + var array = new T[count]; if (count > 0) { - for (int i = 0; i < count; i++) + if (_accessor.IsMemoryStream) + { + for (int i = 0; i < count; i++) + { + array[i] = read.Invoke(); + } + } + else { - array[i] = ReadBytes(); + for (int i = 0; i < count; i++) + { + array[i] = await readAsync.Invoke(); + } } } return array; } - - public decimal[] ReadDecimalArray() + public async Task ReadArrayAsync() + where T : IAsyncBinarySerializable, new() { int count = ReadInt32(); - var array = new decimal[count]; + var array = new T[count]; if (count > 0) { for (int i = 0; i < count; i++) { - array[i] = ReadDecimal(); + var item = new T(); + await item.DeserializeAsync(this); + array[i] = item; } } return array; } - public TimeSpan[] ReadTimeSpanArray() + public async Task ReadStringArrayAsync() => await ReadArrayAsync(ReadStringAsync, ReadString); + + public async Task ReadIPArrayAsync() => await ReadArrayAsync(ReadIPAsync, ReadIP); + + public async Task ReadIPEndPointArrayAsync() => await ReadArrayAsync(ReadIPEndpointAsync, ReadIPEndpoint); + + public async Task ReadGuidArrayAsync() => await ReadArrayAsync(ReadGuidAsync, ReadGuid); + + public async Task ReadDateTimeArrayAsync() => await ReadArrayAsync(ReadDateTimeAsync, ReadDateTime); + + public async Task ReadInt64ArrayAsync() => await ReadArrayAsync(ReadLongAsync, ReadLong); + + public async Task ReadInt32ArrayAsync() => await ReadArrayAsync(ReadInt32Async, ReadInt32); + + public async Task ReadUInt64ArrayAsync() => await ReadArrayAsync(ReadULongAsync, ReadULong); + + public async Task ReadUInt32ArrayAsync() => await ReadArrayAsync(ReadUInt32Async, ReadUInt32); + + public async Task ReadCharArrayAsync() => await ReadArrayAsync(ReadCharAsync, ReadChar); + + public async Task ReadShortArrayAsync() => await ReadArrayAsync(ReadShortAsync, ReadShort); + + public async Task ReadUShortArrayAsync() => await ReadArrayAsync(ReadUShortAsync, ReadUShort); + + public async Task ReadFloatArrayAsync() => await ReadArrayAsync(ReadFloatAsync, ReadFloat); + + public async Task ReadDoubleArrayAsync() => await ReadArrayAsync(ReadDoubleAsync, ReadDouble); + + public async Task ReadBooleanArrayAsync() => await ReadArrayAsync(ReadBooleanAsync, ReadBoolean); + + public async Task ReadByteArrayAsync() { - int count = ReadInt32(); - var array = new TimeSpan[count]; - if (count > 0) + if (_accessor.IsMemoryStream) { - for (int i = 0; i < count; i++) - { - array[i] = ReadTimeSpan(); - } + return ReadBytes(); } - return array; + return await ReadBytesAsync(); } - #endregion + public async Task ReadByteArrayArrayAsync() => await ReadArrayAsync(ReadBytesAsync, ReadBytes); + public async Task ReadDecimalArrayAsync() => await ReadArrayAsync(ReadDecimalAsync, ReadDecimal); - public Dictionary ReadDictionary() + public async Task ReadTimeSpanArrayAsync() => await ReadArrayAsync(ReadTimeSpanAsync, ReadTimeSpan); + + #endregion + + public async Task> ReadDictionaryAsync() { int count = ReadInt32(); var collection = new Dictionary(count); @@ -1112,15 +997,15 @@ namespace ZeroLevel.Services.Serialization TValue value; for (int i = 0; i < count; i++) { - key = ReadCompatible(); - value = ReadCompatible(); + key = await ReadCompatibleAsync(); + value = await ReadCompatibleAsync(); collection.Add(key, value); } } return collection; } - public ConcurrentDictionary ReadDictionaryAsConcurrent() + public async Task> ReadDictionaryAsConcurrentAsync() { int count = ReadInt32(); var collection = new ConcurrentDictionary(); @@ -1130,77 +1015,37 @@ namespace ZeroLevel.Services.Serialization TValue value; for (int i = 0; i < count; i++) { - key = ReadCompatible(); - value = ReadCompatible(); + key = await ReadCompatibleAsync(); + value = await ReadCompatibleAsync(); collection.TryAdd(key, value); } } return collection; } - public T ReadCompatible() + public async Task ReadCompatibleAsync() { - return MessageSerializer.DeserializeCompatible(this); + return await MessageSerializer.DeserializeCompatibleAsync(this); } - public T Read() where T : IBinarySerializable + public async Task ReadAsync() where T : IAsyncBinarySerializable { - byte type = ReadByte(); + byte type = await ReadByteAsync(); if (type == 0) return default(T); var item = (T)Activator.CreateInstance(); - item.Deserialize(this); + await item.DeserializeAsync(this); return item; } - public bool TryReadByte(out byte b) - { - if (TryReadBuffer(1, out var buffer)) - { - b = buffer[0]; - return true; - } - b = default; - return false; - } - - public bool TryRead(out T item) where T : IBinarySerializable - { - if (TryReadByte(out var type)) - { - if (type == 0) - { - item = default(T); - return true; - } - try - { - var o = (IBinarySerializable)FormatterServices.GetUninitializedObject(typeof(T)); - o.Deserialize(this); - item = (T)o; - return true; - } - catch (Exception ex) - { - Log.SystemError(ex, "[MemoryStreamReader.TryRead]"); - } - } - item = default; - return false; - } - - public T Read(object arg) where T : IBinarySerializable + public async Task ReadAsync(object arg) where T : IAsyncBinarySerializable { byte type = ReadByte(); if (type == 0) return default(T); var item = (T)Activator.CreateInstance(typeof(T), arg); - item.Deserialize(this); + await item.DeserializeAsync(this); return item; } #endregion Extensions - - public void Dispose() - { - _accessor.Dispose(); - } } + } \ No newline at end of file diff --git a/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs b/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs index c5a8060..46006a5 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs @@ -4,7 +4,9 @@ using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; +using System.Runtime.CompilerServices; using System.Text; +using System.Threading.Tasks; using ZeroLevel.Services.Extensions; namespace ZeroLevel.Services.Serialization @@ -12,17 +14,33 @@ namespace ZeroLevel.Services.Serialization /// /// Wrapper over memorystream for writing /// - public sealed class MemoryStreamWriter : + public partial class MemoryStreamWriter : IBinaryWriter { - public Stream Stream + private const byte ZERO = 0; + private const byte ONE = 1; + private long _saved_stream_position = -1; + private const int BATCH_MEMORY_SIZE_LIMIT = 1024 * 1024; // 1Mb + private void MockCount() { - get + _saved_stream_position = this._stream.Position; + WriteInt32(0); // count mock + } + + private void UpdateCount(int count) + { + if (_saved_stream_position != -1) { - return _stream; + var current_position = this._stream.Position; + this._stream.Position = _saved_stream_position; + WriteInt32(count); + this._stream.Position = current_position; + _saved_stream_position = -1; } } + public Stream Stream => _stream; + private readonly Stream _stream; public MemoryStreamWriter() @@ -40,25 +58,33 @@ namespace ZeroLevel.Services.Serialization _stream = writer._stream; } - /// - /// Record a boolean value (1 byte) - /// - public void WriteBoolean(bool val) - { - _stream.WriteByte(BitConverter.GetBytes(val)[0]); - } - - /// - /// Write byte (1 byte) - /// - public void WriteByte(byte val) - { - _stream.WriteByte(val); - } - - /// - /// Write char (2 bytes) - /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteBoolean(bool val) => _stream.WriteByte(val ? ONE : ZERO); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteByte(byte val) => _stream.WriteByte(val); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteShort(short number) => _stream.Write(BitConverter.GetBytes(number), 0, 2); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteUShort(ushort number) => _stream.Write(BitConverter.GetBytes(number), 0, 2); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteInt32(Int32 number) => _stream.Write(BitConverter.GetBytes(number), 0, 4); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteUInt32(UInt32 number) => _stream.Write(BitConverter.GetBytes(number), 0, 4); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteLong(Int64 number) => _stream.Write(BitConverter.GetBytes(number), 0, 8); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteULong(UInt64 number) => _stream.Write(BitConverter.GetBytes(number), 0, 8); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteTimeSpan(TimeSpan period) => WriteLong(period.Ticks); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteDecimal(Decimal number) => _stream.Write(BitConverterExt.GetBytes(number), 0, 16); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteDouble(double val) => _stream.Write(BitConverter.GetBytes(val), 0, 8); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteFloat(float val) => _stream.Write(BitConverter.GetBytes(val), 0, 4); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteGuid(Guid guid) => _stream.Write(guid.ToByteArray(), 0, 16); + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void WriteChar(char val) { var data = BitConverter.GetBytes(val); @@ -81,66 +107,6 @@ namespace ZeroLevel.Services.Serialization _stream.Write(val, 0, val.Length); } } - - /// - /// Record a 32-bit integer (4 bytes) - /// - public void WriteShort(short number) - { - _stream.Write(BitConverter.GetBytes(number), 0, 2); - } - - public void WriteUShort(ushort number) - { - _stream.Write(BitConverter.GetBytes(number), 0, 2); - } - - /// - /// Record a 32-bit integer (4 bytes) - /// - public void WriteInt32(Int32 number) - { - _stream.Write(BitConverter.GetBytes(number), 0, 4); - } - - public void WriteUInt32(UInt32 number) - { - _stream.Write(BitConverter.GetBytes(number), 0, 4); - } - - /// - /// Record an integer 64-bit number (8 bytes) - /// - public void WriteLong(Int64 number) - { - _stream.Write(BitConverter.GetBytes(number), 0, 8); - } - - public void WriteULong(UInt64 number) - { - _stream.Write(BitConverter.GetBytes(number), 0, 8); - } - - public void WriteTimeSpan(TimeSpan period) - { - WriteLong(period.Ticks); - } - - public void WriteDecimal(Decimal number) - { - _stream.Write(BitConverterExt.GetBytes(number), 0, 16); - } - - public void WriteDouble(double val) - { - _stream.Write(BitConverter.GetBytes(val), 0, 8); - } - - public void WriteFloat(float val) - { - _stream.Write(BitConverter.GetBytes(val), 0, 4); - } - /// /// Write string (4 bytes long + Length bytes) /// @@ -158,14 +124,6 @@ namespace ZeroLevel.Services.Serialization } } - /// - /// GUID record (16 bytes) - /// - public void WriteGuid(Guid guid) - { - _stream.Write(guid.ToByteArray(), 0, 16); - } - /// /// Record the datetime /// @@ -313,491 +271,871 @@ namespace ZeroLevel.Services.Serialization } } - public void WriteCollection(IEnumerable collection) + public void WriteCollection(IEnumerable collection, Action writeAction) { - WriteInt32(collection?.Count() ?? 0); if (collection != null) { + MockCount(); + int count = 0; + foreach (var item in collection) { - WriteString(item); + writeAction.Invoke(item); + count++; } + + UpdateCount(count); + } + else + { + WriteInt32(0); } } - public void WriteCollection(IEnumerable collection) + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteString(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteIP(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteIPEndpoint(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteGuid(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteDateTime(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteDateTime(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteULong(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteUInt32(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteChar(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteShort(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteUShort(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteLong(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteInt32(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteFloat(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteDouble(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteBoolean(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteByte(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteBytes(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteDecimal(s)); + + public void WriteCollection(IEnumerable collection) => WriteCollection(collection, s => WriteTimeSpan(s)); + #endregion + + #region Arrays + public void WriteArray(T[] array) + where T : IBinarySerializable { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + WriteInt32(array?.Length ?? 0); + if (array != null) { - foreach (var item in collection) + for (int i = 0; i < array.Length; i++) { - WriteIP(item); + array[i].Serialize(this); } } } - public void WriteCollection(IEnumerable collection) + public void WriteArray(T[] array, Action writeAction) { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + if (array != null) { - foreach (var item in collection) + WriteInt32(array.Length); + for (int i = 0; i < array.Length; i++) { - WriteIPEndpoint(item); + writeAction.Invoke(array[i]); } } + else + { + WriteInt32(0); + } } - public void WriteCollection(IEnumerable collection) + public void WriteArray(string[] array) => WriteArray(array, WriteString); + + public void WriteArray(IPAddress[] array) => WriteArray(array, WriteIP); + + public void WriteArray(IPEndPoint[] array) => WriteArray(array, WriteIPEndpoint); + + public void WriteArray(Guid[] array) => WriteArray(array, WriteGuid); + + public void WriteArray(DateTime[] array) => WriteArray(array, dt => WriteDateTime(dt)); + + public void WriteArray(DateTime?[] array) => WriteArray(array, WriteDateTime); + + public void WriteArray(UInt64[] array) => WriteArray(array, WriteULong); + + public void WriteArray(UInt32[] array) => WriteArray(array, WriteUInt32); + + public void WriteArray(char[] array) => WriteArray(array, WriteChar); + + public void WriteArray(short[] array) => WriteArray(array, WriteShort); + + public void WriteArray(ushort[] array) => WriteArray(array, WriteUShort); + + public void WriteArray(Int64[] array) => WriteArray(array, WriteLong); + + public void WriteArray(Int32[] array) => WriteArray(array, WriteInt32); + + public void WriteArray(float[] array) => WriteArray(array, WriteFloat); + + public void WriteArray(Double[] array) => WriteArray(array, WriteDouble); + + public void WriteArray(bool[] array) => WriteArray(array, WriteBoolean); + + public void WriteArray(byte[] array) => WriteArray(array, WriteByte); + + public void WriteArray(byte[][] array) => WriteArray(array, WriteBytes); + + public void WriteArray(decimal[] array) => WriteArray(array, WriteDecimal); + + public void WriteArray(TimeSpan[] array) => WriteArray(array, WriteTimeSpan); + #endregion + + public void WriteCompatible(T item) { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) - { - foreach (var item in collection) - { - WriteGuid(item); - } - } + var buffer = MessageSerializer.SerializeCompatible(item); + _stream.Write(buffer, 0, buffer.Length); } - public void WriteCollection(IEnumerable collection) + public void Write(T item) + where T : IBinarySerializable { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + if (item != null) { - foreach (var item in collection) - { - WriteDateTime(item); - } + WriteByte(1); + item.Serialize(this); + } + else + { + WriteByte(0); } } - public void WriteCollection(IEnumerable collection) + public void WriteDictionary(IDictionary collection) { WriteInt32(collection?.Count() ?? 0); if (collection != null) { foreach (var item in collection) { - WriteULong(item); + WriteCompatible(item.Key); + WriteCompatible(item.Value); } } } - public void WriteCollection(IEnumerable collection) + public void WriteDictionary(ConcurrentDictionary collection) { WriteInt32(collection?.Count() ?? 0); if (collection != null) { foreach (var item in collection) { - WriteUInt32(item); + WriteCompatible(item.Key); + WriteCompatible(item.Value); } } } - public void WriteCollection(IEnumerable collection) + #endregion Extension + } + + /// + /// Async methods + /// + public partial class MemoryStreamWriter : + IAsyncBinaryWriter + { + /// + /// Write char (2 bytes) + /// + public async Task WriteCharAsync(char val) { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) - { - foreach (var item in collection) - { - WriteChar(item); - } - } + var data = BitConverter.GetBytes(val); + await _stream.WriteAsync(data, 0, 2); } - public void WriteCollection(IEnumerable collection) + /// + /// Write array bytes + /// + /// + public async Task WriteBytesAsync(byte[] val) { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + if (val == null) { - foreach (var item in collection) - { - WriteShort(item); - } + await WriteInt32Async(0); + } + else + { + await WriteInt32Async(val.Length); + await _stream.WriteAsync(val, 0, val.Length); } } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteShortAsync(short number) => await _stream.WriteAsync(BitConverter.GetBytes(number), 0, 2); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteUShortAsync(ushort number) => await _stream.WriteAsync(BitConverter.GetBytes(number), 0, 2); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteInt32Async(Int32 number) => await _stream.WriteAsync(BitConverter.GetBytes(number), 0, 4); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteUInt32Async(UInt32 number) => await _stream.WriteAsync(BitConverter.GetBytes(number), 0, 4); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteLongAsync(Int64 number) => await _stream.WriteAsync(BitConverter.GetBytes(number), 0, 8); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteULongAsync(UInt64 number) => await _stream.WriteAsync(BitConverter.GetBytes(number), 0, 8); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteTimeSpanAsync(TimeSpan period) => await WriteLongAsync(period.Ticks); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteDecimalAsync(Decimal number) => await _stream.WriteAsync(BitConverterExt.GetBytes(number), 0, 16); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteDoubleAsync(double val)=> await _stream.WriteAsync(BitConverter.GetBytes(val), 0, 8); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteFloatAsync(float val) => await _stream.WriteAsync(BitConverter.GetBytes(val), 0, 4); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public async Task WriteGuidAsync(Guid guid) => await _stream.WriteAsync(guid.ToByteArray(), 0, 16); - public void WriteCollection(IEnumerable collection) + /// + /// Write string (4 bytes long + Length bytes) + /// + public async Task WriteStringAsync(string line) { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + if (line == null) { - foreach (var item in collection) - { - WriteUShort(item); - } + await WriteInt32Async(0); + } + else + { + var buffer = Encoding.UTF8.GetBytes(line); + await WriteInt32Async(buffer.Length); + await _stream.WriteAsync(buffer, 0, buffer.Length); } } - public void WriteCollection(IEnumerable collection) + + + /// + /// Record the datetime + /// + /// + public async Task WriteDateTimeAsync(DateTime? datetime) { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + if (datetime == null) { - foreach (var item in collection) - { - WriteLong(item); - } + WriteByte(0); + } + else + { + WriteByte(1); + long serialized = datetime.Value.ToBinary(); + byte[] data = BitConverter.GetBytes(serialized); + await _stream.WriteAsync(data, 0, 8); } } - public void WriteCollection(IEnumerable collection) + public async Task WriteTimeAsync(TimeOnly? time) { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + if (time == null) { - foreach (var item in collection) - { - WriteInt32(item); - } + WriteByte(0); + } + else + { + WriteByte(1); + var ts = time.Value.ToTimeSpan(); + await WriteTimeSpanAsync(ts); } } - public void WriteCollection(IEnumerable collection) + public async Task WriteDateAsync(DateOnly? date) { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + if (date == null) { - foreach (var item in collection) - { - WriteFloat(item); - } + WriteByte(0); + } + else + { + WriteByte(1); + var days = date.Value.DayNumber; + await WriteInt32Async(days); } } - public void WriteCollection(IEnumerable collection) + public async Task WriteIPAsync(IPAddress ip) { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + if (ip == null) { - foreach (var item in collection) - { - WriteDouble(item); - } + WriteByte(0); + } + else + { + WriteByte(1); + await WriteBytesAsync(ip.GetAddressBytes()); } } - public void WriteCollection(IEnumerable collection) + public async Task WriteIPEndpointAsync(IPEndPoint endpoint) { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + if (endpoint == null) { - foreach (var item in collection) - { - WriteBoolean(item); - } + WriteByte(0); + } + else + { + WriteByte(1); + await WriteIPAsync(endpoint.Address); + await WriteInt32Async(endpoint.Port); } } - public void WriteCollection(IEnumerable collection) + public async Task CompleteAsync() { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + return (_stream as MemoryStream)?.ToArray() ?? (await ReadToEndAsync(_stream)); + } + + private static async Task ReadToEndAsync(System.IO.Stream stream) + { + long originalPosition = 0; + if (stream.CanSeek) { - foreach (var item in collection) + originalPosition = stream.Position; + stream.Position = 0; + } + try + { + byte[] readBuffer = new byte[4096]; + int totalBytesRead = 0; + int bytesRead; + while ((bytesRead = await stream.ReadAsync(readBuffer, totalBytesRead, readBuffer.Length - totalBytesRead)) > 0) { - WriteByte(item); + totalBytesRead += bytesRead; + if (totalBytesRead == readBuffer.Length) + { + int nextByte = stream.ReadByte(); + if (nextByte != -1) + { + byte[] temp = new byte[readBuffer.Length * 2]; + Buffer.BlockCopy(readBuffer, 0, temp, 0, readBuffer.Length); + Buffer.SetByte(temp, totalBytesRead, (byte)nextByte); + readBuffer = temp; + totalBytesRead++; + } + } + } + byte[] buffer = readBuffer; + if (readBuffer.Length != totalBytesRead) + { + buffer = new byte[totalBytesRead]; + Buffer.BlockCopy(readBuffer, 0, buffer, 0, totalBytesRead); } + return buffer; } - } - - public void WriteCollection(IEnumerable collection) - { - WriteInt32(collection?.Count() ?? 0); - if (collection != null) + finally { - foreach (var item in collection) + if (stream.CanSeek) { - WriteBytes(item); + stream.Position = originalPosition; } } } - public void WriteCollection(IEnumerable collection) + public async Task DisposeAsync() + { + await _stream.FlushAsync(); + await _stream.DisposeAsync(); + } + + #region Extension + + #region Collections + + /// + /// Increase writing by batches + /// + private async Task OptimizedWriteCollectionByChunksAsync(IEnumerable collection, Action saveAction, int chunk_size) { - WriteInt32(collection?.Count() ?? 0); if (collection != null) { - foreach (var item in collection) + MockCount(); + int count = 0; + if (_stream is MemoryStream) { - WriteDecimal(item); + foreach (var item in collection) + { + saveAction.Invoke(this, item); + count++; + } } + else + { + using (var ms = new MemoryStream()) + { + using (var writer = new MemoryStreamWriter(ms)) + { + foreach (var items in collection.Chunkify(chunk_size)) + { + foreach (var item in items) + { + saveAction.Invoke(writer, item); + count++; + } + await WriteBytesAsync(writer.Complete()); + writer.Stream.Position = 0; + } + } + } + } + UpdateCount(count); + } + else + { + WriteInt32(0); } } - public void WriteCollection(IEnumerable collection) + public async Task WriteCollectionAsync(IEnumerable collection) + where T : IAsyncBinarySerializable { - WriteInt32(collection?.Count() ?? 0); if (collection != null) { + MockCount(); + int count = 0; foreach (var item in collection) { - WriteTimeSpan(item); + await item.SerializeAsync(this); + count++; } + UpdateCount(count); } - } - #endregion - - #region Arrays - public void WriteArray(T[] array) - where T : IBinarySerializable - { - WriteInt32(array?.Length ?? 0); - if (array != null) + else { - for (int i = 0; i < array.Length; i++) - { - array[i].Serialize(this); - } + WriteInt32(0); } } - public void WriteArray(string[] array) + public async Task WriteCollectionAsync(IEnumerable collection) { - WriteInt32(array?.Length ?? 0); - if (array != null) + if (collection != null) { - for (int i = 0; i < array.Length; i++) + MockCount(); + int count = 0; + if (collection != null) { - WriteString(array[i]); + foreach (var item in collection) + { + await WriteStringAsync(item); + count++; + } } + UpdateCount(count); } - } - - public void WriteArray(IPAddress[] array) - { - WriteInt32(array?.Length ?? 0); - if (array != null) + else { - for (int i = 0; i < array.Length; i++) - { - WriteIP(array[i]); - } + WriteInt32(0); } } - public void WriteArray(IPEndPoint[] array) + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteIP(i), BATCH_MEMORY_SIZE_LIMIT / 5); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteIPEndpoint(i), BATCH_MEMORY_SIZE_LIMIT / 9); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteGuid(i), BATCH_MEMORY_SIZE_LIMIT / 16); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDateTime(i), BATCH_MEMORY_SIZE_LIMIT / 9); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDateTime(i), BATCH_MEMORY_SIZE_LIMIT / 9); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteULong(i), BATCH_MEMORY_SIZE_LIMIT / 8); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteUInt32(i), BATCH_MEMORY_SIZE_LIMIT / 4); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteChar(i), BATCH_MEMORY_SIZE_LIMIT / 2); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteShort(i), BATCH_MEMORY_SIZE_LIMIT / 2); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteUShort(i), BATCH_MEMORY_SIZE_LIMIT / 2); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteLong(i), BATCH_MEMORY_SIZE_LIMIT / 8); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteInt32(i), BATCH_MEMORY_SIZE_LIMIT / 4); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteFloat(i), BATCH_MEMORY_SIZE_LIMIT / 4); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDouble(i), BATCH_MEMORY_SIZE_LIMIT / 8); + + public async Task WriteCollectionAsync(IEnumerable collection) { - WriteInt32(array?.Length ?? 0); - if (array != null) + if (collection != null) { - for (int i = 0; i < array.Length; i++) + MockCount(); + + int count = 0; + if (_stream is MemoryStream) { - WriteIPEndpoint(array[i]); + foreach (var item in collection) + { + WriteBoolean(item); + count++; + } } - } - } - - public void WriteArray(Guid[] array) - { - WriteInt32(array?.Length ?? 0); - if (array != null) - { - for (int i = 0; i < array.Length; i++) + else { - WriteGuid(array[i]); + var buffer = new byte[BATCH_MEMORY_SIZE_LIMIT]; + int index = 0; + foreach (var b in collection) + { + buffer[index] = b ? ONE : ZERO; + index++; + if (index == BATCH_MEMORY_SIZE_LIMIT) + { + await _stream.WriteAsync(buffer, 0, buffer.Length); + index = 0; + } + count++; + } + if (index != 0) + { + _stream.Write(buffer, 0, index); + } } - } - } - public void WriteArray(DateTime[] array) - { - WriteInt32(array?.Length ?? 0); - if (array != null) + UpdateCount(count); + } + else { - for (int i = 0; i < array.Length; i++) - { - WriteDateTime(array[i]); - } + WriteInt32(0); } } - public void WriteArray(UInt64[] array) + public async Task WriteCollectionAsync(IEnumerable collection) { - WriteInt32(array?.Length ?? 0); - if (array != null) + if (collection != null) { - for (int i = 0; i < array.Length; i++) + MockCount(); + + int count = 0; + if (_stream is MemoryStream) { - WriteULong(array[i]); + foreach (var item in collection) + { + WriteByte(item); + count++; + } } - } - } - - public void WriteArray(UInt32[] array) - { - WriteInt32(array?.Length ?? 0); - if (array != null) - { - for (int i = 0; i < array.Length; i++) + else { - WriteUInt32(array[i]); + var buffer = new byte[BATCH_MEMORY_SIZE_LIMIT]; + int index = 0; + foreach (var b in collection) + { + buffer[index] = b; + index++; + if (index == BATCH_MEMORY_SIZE_LIMIT) + { + await _stream.WriteAsync(buffer, 0, buffer.Length); + index = 0; + } + count++; + } + if (index != 0) + { + _stream.Write(buffer, 0, index); + } } - } - } - public void WriteArray(char[] array) - { - WriteInt32(array?.Length ?? 0); - if (array != null) + UpdateCount(count); + } + else { - for (int i = 0; i < array.Length; i++) - { - WriteChar(array[i]); - } + WriteInt32(0); } } - public void WriteArray(short[] array) + public async Task WriteCollectionAsync(IEnumerable collection) { - WriteInt32(array?.Length ?? 0); - if (array != null) + if (collection != null) { - for (int i = 0; i < array.Length; i++) + MockCount(); + + int count = 0; + if (_stream is MemoryStream) { - WriteShort(array[i]); + foreach (var item in collection) + { + WriteBytes(item); + count++; + } } - } - } - - public void WriteArray(ushort[] array) - { - WriteInt32(array?.Length ?? 0); - if (array != null) - { - for (int i = 0; i < array.Length; i++) + else { - WriteUShort(array[i]); + foreach (var b in collection) + { + await WriteBytesAsync(b); + count++; + } } + UpdateCount(count); } - } - - public void WriteArray(Int64[] array) - { - WriteInt32(array?.Length ?? 0); - if (array != null) + else { - for (int i = 0; i < array.Length; i++) - { - WriteLong(array[i]); - } + WriteInt32(0); } } - public void WriteArray(Int32[] array) + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDecimal(i), BATCH_MEMORY_SIZE_LIMIT / 16); + + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteTimeSpan(i), BATCH_MEMORY_SIZE_LIMIT / 16); + #endregion + + #region Arrays + + /// + /// Increase writing by batches + /// + private async Task OptimizedWriteArrayByChunksAsync(T[] array, Action saveAction, int chunk_size) { - WriteInt32(array?.Length ?? 0); if (array != null) { - for (int i = 0; i < array.Length; i++) + WriteInt32(array.Length); + + if (_stream is MemoryStream) { - WriteInt32(array[i]); + for (int i = 0; i < array.Length; i++) + { + saveAction.Invoke(this, array[i]); + } } - } - } - - public void WriteArray(float[] array) - { - WriteInt32(array?.Length ?? 0); - if (array != null) - { - for (int i = 0; i < array.Length; i++) + else { - WriteFloat(array[i]); + using (var ms = new MemoryStream()) + { + using (var writer = new MemoryStreamWriter(ms)) + { + for (int i = 0; i < array.Length; i += chunk_size) + { + for (int j = 0; j < chunk_size && (i + j) < array.Length; j++) + { + saveAction.Invoke(writer, array[i + j]); + } + await WriteBytesAsync(writer.Complete()); + writer.Stream.Position = 0; + } + } + } } } - } - - public void WriteArray(Double[] array) - { - WriteInt32(array?.Length ?? 0); - if (array != null) + else { - for (int i = 0; i < array.Length; i++) - { - WriteDouble(array[i]); - } + WriteInt32(0); } } - public void WriteArray(bool[] array) + public async Task WriteArrayAsync(T[] array) + where T : IAsyncBinarySerializable { - WriteInt32(array?.Length ?? 0); if (array != null) { + await WriteInt32Async(array.Length); for (int i = 0; i < array.Length; i++) { - WriteBoolean(array[i]); + await array[i].SerializeAsync(this); } } + else + { + WriteInt32(0); + } } - public void WriteArray(byte[] array) + public async Task WriteArrayAsync(string[] array) { - WriteInt32(array?.Length ?? 0); if (array != null) { - for (int i = 0; i < array.Length; i++) + if (_stream is MemoryStream) { - WriteByte(array[i]); + await WriteInt32Async(array.Length); + for (int i = 0; i < array.Length; i++) + { + WriteString(array[i]); + } } + else + { + await WriteInt32Async(array.Length); + for (int i = 0; i < array.Length; i++) + { + await WriteStringAsync(array[i]); + } + } + } + else + { + WriteInt32(0); } } - public void WriteArray(byte[][] array) + public async Task WriteArrayAsync(IPAddress[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteIP(i), BATCH_MEMORY_SIZE_LIMIT / 5); + + public async Task WriteArrayAsync(IPEndPoint[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteIPEndpoint(i), BATCH_MEMORY_SIZE_LIMIT / 9); + + public async Task WriteArrayAsync(Guid[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteGuid(i), BATCH_MEMORY_SIZE_LIMIT / 16); + + public async Task WriteArrayAsync(DateTime[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteDateTime(i), BATCH_MEMORY_SIZE_LIMIT / 9); + + public async Task WriteArrayAsync(DateTime?[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteDateTime(i), BATCH_MEMORY_SIZE_LIMIT / 9); + + public async Task WriteArrayAsync(UInt64[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteULong(i), BATCH_MEMORY_SIZE_LIMIT / 8); + + public async Task WriteArrayAsync(UInt32[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteUInt32(i), BATCH_MEMORY_SIZE_LIMIT / 4); + + public async Task WriteArrayAsync(char[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteChar(i), BATCH_MEMORY_SIZE_LIMIT / 2); + + public async Task WriteArrayAsync(short[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteShort(i), BATCH_MEMORY_SIZE_LIMIT / 2); + + public async Task WriteArrayAsync(ushort[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteUShort(i), BATCH_MEMORY_SIZE_LIMIT / 2); + + public async Task WriteArrayAsync(Int64[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteLong(i), BATCH_MEMORY_SIZE_LIMIT / 8); + + public async Task WriteArrayAsync(Int32[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteInt32(i), BATCH_MEMORY_SIZE_LIMIT / 4); + + public async Task WriteArrayAsync(float[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteFloat(i), BATCH_MEMORY_SIZE_LIMIT / 4); + + public async Task WriteArrayAsync(Double[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteDouble(i), BATCH_MEMORY_SIZE_LIMIT / 8); + + public async Task WriteArrayAsync(bool[] array) { - WriteInt32(array?.Length ?? 0); if (array != null) { - for (int i = 0; i < array.Length; i++) + WriteInt32(array.Length); + + if (_stream is MemoryStream) { - WriteBytes(array[i]); + for (int i = 0; i < array.Length; i++) + { + WriteBoolean(array[i]); + } + } + else + { + var buffer = new byte[BATCH_MEMORY_SIZE_LIMIT]; + using (var ms = new MemoryStream()) + { + using (var writer = new MemoryStreamWriter(ms)) + { + for (int i = 0; i < array.Length; i += BATCH_MEMORY_SIZE_LIMIT) + { + for (int j = 0; j < BATCH_MEMORY_SIZE_LIMIT && (i + j) < array.Length; j++) + { + buffer[j] = array[i + j] ? ONE : ZERO; + } + await WriteBytesAsync(writer.Complete()); + writer.Stream.Position = 0; + } + } + } } } + else + { + WriteInt32(0); + } } - public void WriteArray(decimal[] array) + public async Task WriteArrayAsync(byte[] array) { - WriteInt32(array?.Length ?? 0); if (array != null) { - for (int i = 0; i < array.Length; i++) + WriteInt32(array.Length); + + if (_stream is MemoryStream) { - WriteDecimal(array[i]); + for (int i = 0; i < array.Length; i++) + { + WriteByte(array[i]); + } + } + else + { + var buffer = new byte[BATCH_MEMORY_SIZE_LIMIT]; + using (var ms = new MemoryStream()) + { + using (var writer = new MemoryStreamWriter(ms)) + { + for (int i = 0; i < array.Length; i += BATCH_MEMORY_SIZE_LIMIT) + { + for (int j = 0; j < BATCH_MEMORY_SIZE_LIMIT && (i + j) < array.Length; j++) + { + buffer[j] = array[i + j]; + } + await WriteBytesAsync(writer.Complete()); + writer.Stream.Position = 0; + } + } + } } } + else + { + WriteInt32(0); + } } - public void WriteArray(TimeSpan[] array) + public async Task WriteArrayAsync(byte[][] array) { - WriteInt32(array?.Length ?? 0); if (array != null) { - for (int i = 0; i < array.Length; i++) + WriteInt32(array.Length); + if (_stream is MemoryStream) { - WriteTimeSpan(array[i]); + for (int i = 0; i < array.Length; i++) + { + WriteBytes(array[i]); + } } + else + { + for (int i = 0; i < array.Length; i++) + { + await WriteBytesAsync(array[i]); + } + } + } + else + { + WriteInt32(0); } } + + public async Task WriteArrayAsync(decimal[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteDecimal(i), BATCH_MEMORY_SIZE_LIMIT / 16); + + public async Task WriteArrayAsync(TimeSpan[] array) => await OptimizedWriteArrayByChunksAsync(array, (w, i) => w.WriteTimeSpan(i), BATCH_MEMORY_SIZE_LIMIT / 8); #endregion - public void WriteCompatible(T item) + public async Task WriteCompatibleAsync(T item) { var buffer = MessageSerializer.SerializeCompatible(item); - _stream.Write(buffer, 0, buffer.Length); + await _stream.WriteAsync(buffer, 0, buffer.Length); } - public void Write(T item) - where T : IBinarySerializable + public async Task WriteAsync(T item) + where T : IAsyncBinarySerializable { if (item != null) { WriteByte(1); - item.Serialize(this); + await item.SerializeAsync(this); } else { @@ -805,30 +1143,38 @@ namespace ZeroLevel.Services.Serialization } } - public void WriteDictionary(IDictionary collection) + public async Task WriteDictionaryAsync(IDictionary collection) { - WriteInt32(collection?.Count() ?? 0); if (collection != null) { + WriteInt32(collection.Count); foreach (var item in collection) { - WriteCompatible(item.Key); - WriteCompatible(item.Value); + await WriteCompatibleAsync(item.Key); + await WriteCompatibleAsync(item.Value); } } + else + { + WriteInt32(0); + } } - public void WriteDictionary(ConcurrentDictionary collection) + public async Task WriteDictionaryAsync(ConcurrentDictionary collection) { - WriteInt32(collection?.Count() ?? 0); if (collection != null) { + WriteInt32(collection.Count); foreach (var item in collection) { - WriteCompatible(item.Key); - WriteCompatible(item.Value); + await WriteCompatibleAsync(item.Key); + await WriteCompatibleAsync(item.Value); } } + else + { + WriteInt32(0); + } } #endregion Extension diff --git a/ZeroLevel/Services/Serialization/MessageSerializer.cs b/ZeroLevel/Services/Serialization/MessageSerializer.cs index 4cd16d0..b8637af 100644 --- a/ZeroLevel/Services/Serialization/MessageSerializer.cs +++ b/ZeroLevel/Services/Serialization/MessageSerializer.cs @@ -2,10 +2,10 @@ using System.Collections.Generic; using System.IO; using System.Runtime.Serialization; +using System.Threading.Tasks; namespace ZeroLevel.Services.Serialization { - public delegate bool TryDeserializeMethod(MemoryStreamReader reader, out T output); public static class MessageSerializer { public static byte[] Serialize(T obj) @@ -85,16 +85,7 @@ namespace ZeroLevel.Services.Serialization } return false; } - - public static TryDeserializeMethod GetSafetyDeserializer() - { - if (typeof(IBinarySerializable).IsAssignableFrom(typeof(T))) - { - return TryObjectDeserialize; - } - return TryPrimitiveTypeDeserialize; - } - + public static byte[] SerializeCompatible(object obj) { if (null == obj) @@ -248,6 +239,16 @@ namespace ZeroLevel.Services.Serialization } return PrimitiveTypeSerializer.Deserialize(reader); } + public static async Task DeserializeCompatibleAsync(IBinaryReader reader) + { + if (typeof(IAsyncBinarySerializable).IsAssignableFrom(typeof(T))) + { + var direct = (IAsyncBinarySerializable)Activator.CreateInstance(); + await direct.DeserializeAsync(reader); + return (T)direct; + } + return PrimitiveTypeSerializer.Deserialize(reader); + } public static object DeserializeCompatible(Type type, byte[] data) { diff --git a/ZeroLevel/Services/Serialization/PrimitiveTypeSerializer.cs b/ZeroLevel/Services/Serialization/PrimitiveTypeSerializer.cs index e7bb090..287bb5b 100644 --- a/ZeroLevel/Services/Serialization/PrimitiveTypeSerializer.cs +++ b/ZeroLevel/Services/Serialization/PrimitiveTypeSerializer.cs @@ -3,7 +3,6 @@ using System.Collections.Generic; using System.Linq; using System.Net; using System.Reflection; -using System.Runtime.CompilerServices; using ZeroLevel.Services.Invokation; using ZeroLevel.Services.Reflection; @@ -83,6 +82,7 @@ namespace ZeroLevel.Services.Serialization _cachee.Add(typeof(ushort[]), Create()); _cachee.Add(typeof(Decimal[]), Create()); _cachee.Add(typeof(DateTime[]), Create()); + _cachee.Add(typeof(DateTime?[]), Create()); _cachee.Add(typeof(Guid[]), Create()); _cachee.Add(typeof(String[]), Create()); _cachee.Add(typeof(TimeSpan[]), Create()); @@ -103,6 +103,7 @@ namespace ZeroLevel.Services.Serialization _cachee.Add(typeof(IEnumerable), Create>()); _cachee.Add(typeof(IEnumerable), Create>()); _cachee.Add(typeof(IEnumerable), Create>()); + _cachee.Add(typeof(IEnumerable), Create>()); _cachee.Add(typeof(IEnumerable), Create>()); _cachee.Add(typeof(IEnumerable), Create>()); _cachee.Add(typeof(IEnumerable), Create>()); @@ -122,6 +123,7 @@ namespace ZeroLevel.Services.Serialization _arrayTypesCachee.Add(typeof(ushort), typeof(ushort[])); _arrayTypesCachee.Add(typeof(Decimal), typeof(Decimal[])); _arrayTypesCachee.Add(typeof(DateTime), typeof(DateTime[])); + _arrayTypesCachee.Add(typeof(DateTime?), typeof(DateTime?[])); _arrayTypesCachee.Add(typeof(Guid), typeof(Guid[])); _arrayTypesCachee.Add(typeof(String), typeof(String[])); _arrayTypesCachee.Add(typeof(TimeSpan), typeof(TimeSpan[])); @@ -142,6 +144,7 @@ namespace ZeroLevel.Services.Serialization _enumTypesCachee.Add(typeof(ushort), typeof(IEnumerable)); _enumTypesCachee.Add(typeof(Decimal), typeof(IEnumerable)); _enumTypesCachee.Add(typeof(DateTime), typeof(IEnumerable)); + _enumTypesCachee.Add(typeof(DateTime?), typeof(IEnumerable)); _enumTypesCachee.Add(typeof(Guid), typeof(IEnumerable)); _enumTypesCachee.Add(typeof(String), typeof(IEnumerable)); _enumTypesCachee.Add(typeof(TimeSpan), typeof(IEnumerable)); @@ -286,6 +289,11 @@ namespace ZeroLevel.Services.Serialization wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDateTimeArray").First(); wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreateArrayPredicate()).First(); } + else if (type == typeof(DateTime?[])) + { + wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDateTimeArray").First(); + wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreateArrayPredicate()).First(); + } else if (type == typeof(Double[])) { wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDoubleArray").First(); @@ -384,6 +392,11 @@ namespace ZeroLevel.Services.Serialization wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDateTimeCollection").First(); wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreateCollectionPredicate()).First(); } + else if (type == typeof(IEnumerable)) + { + wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDateTimeCollection").First(); + wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreateCollectionPredicate()).First(); + } else if (type == typeof(IEnumerable)) { wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDoubleCollection").First();