From 829d39d8540f7d3d57fc0a8bc760b4311cfa6ba6 Mon Sep 17 00:00:00 2001 From: Ogoun Date: Tue, 25 Oct 2022 13:04:46 +0300 Subject: [PATCH] PartitionFileStorage --- .../PartitionFileStorageTest.csproj | 14 ++ PartitionFileStorageTest/Program.cs | 140 ++++++++++++ ZeroLevel.NN/ZeroLevel.NN.csproj | 3 +- ZeroLevel.sln | 17 ++ ZeroLevel/Services/IL.cs | 214 ++++++++++++++++++ .../Services/Storages/PartitionFileStorage.cs | 184 +++++++++++++++ 6 files changed, 570 insertions(+), 2 deletions(-) create mode 100644 PartitionFileStorageTest/PartitionFileStorageTest.csproj create mode 100644 PartitionFileStorageTest/Program.cs create mode 100644 ZeroLevel/Services/Storages/PartitionFileStorage.cs diff --git a/PartitionFileStorageTest/PartitionFileStorageTest.csproj b/PartitionFileStorageTest/PartitionFileStorageTest.csproj new file mode 100644 index 0000000..68e186f --- /dev/null +++ b/PartitionFileStorageTest/PartitionFileStorageTest.csproj @@ -0,0 +1,14 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs new file mode 100644 index 0000000..783bb87 --- /dev/null +++ b/PartitionFileStorageTest/Program.cs @@ -0,0 +1,140 @@ +using ZeroLevel; +using ZeroLevel.Services.Serialization; +using ZeroLevel.Services.Storages; + +namespace PartitionFileStorageTest +{ + internal class Program + { + public class PartitionKey + { + public DateTime Date { get; set; } + public ulong Ctn { get; set; } + } + + public class Record + : IBinarySerializable + { + public string[] Hosts { get; set; } + + public void Deserialize(IBinaryReader reader) + { + this.Hosts = reader.ReadStringArray(); + } + + public void Serialize(IBinaryWriter writer) + { + writer.WriteArray(this.Hosts); + } + } + + static Record GenerateRecord() + { + var record = new Record(); + var rnd = new Random((int)Environment.TickCount); + var count = rnd.Next(400); + record.Hosts = new string[count]; + for (int i = 0; i < count; i++) + { + record.Hosts[i] = Guid.NewGuid().ToString(); + } + return record; + } + + static PartitionKey GenerateKey() + { + var key = new PartitionKey(); + var rnd = new Random((int)Environment.TickCount); + key.Ctn = (ulong)rnd.Next(1000); + key.Date = DateTime.Now.AddDays(-rnd.Next(30)).Date; + return key; + } + + class DataConverter + : IPartitionDataConverter + { + public IEnumerable ConvertFromStorage(Stream stream) + { + var reader = new MemoryStreamReader(stream); + while (reader.EOS == false) + { + yield return reader.Read(); + } + } + + public void ConvertToStorage(Stream stream, IEnumerable data) + { + var writer = new MemoryStreamWriter(stream); + foreach (var record in data) + { + writer.Write(record); + } + } + } + + private static int COUNT_NUMBERS = ulong.MaxValue.ToString().Length; + static void Main(string[] args) + { + var testDict = new Dictionary>>(); + var options = new PartitionFileStorageOptions + { + MaxDegreeOfParallelism = 1, + DataConverter = new DataConverter(), + UseCompression = true, + MergeFiles = false, + RootFolder = Path.Combine(Configuration.BaseDirectory, "root") + }; + options.Partitions.Add(new Partition("data", p => p.Date.ToString("yyyyMMdd"))); + options.Partitions.Add(new Partition("ctn", p => p.Ctn.ToString().PadLeft(COUNT_NUMBERS, '0'))); + var storage = new PartitionFileStorage(options); + + for (int i = 0; i < 50000; i++) + { + if (i % 100 == 0) + Console.WriteLine(i); + var key = GenerateKey(); + var record = GenerateRecord(); + if (testDict.ContainsKey(key.Ctn) == false) + { + testDict[key.Ctn] = new Dictionary>(); + } + if (testDict[key.Ctn].ContainsKey(key.Date) == false) + { + testDict[key.Ctn][key.Date] = new List(); + } + testDict[key.Ctn][key.Date].Add(record); + storage.WriteAsync(key, new[] { record }).Wait(); + } + foreach (var cpair in testDict) + { + foreach (var dpair in cpair.Value) + { + var key = new PartitionKey { Ctn = cpair.Key, Date = dpair.Key }; + var data = storage.CollectAsync(new[] { key }).Result.ToArray(); + var testData = dpair.Value; + + if (data.Length != testData.Count) + { + Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn} Wrong count. Expected: {testData.Count}. Got: {data.Length}"); + } + else + { + var datahosts = data.SelectMany(r => r.Hosts).OrderBy(s => s).ToArray(); + var testhosts = testData.SelectMany(r => r.Hosts).OrderBy(s => s).ToArray(); + if (datahosts.Length != testhosts.Length) + { + Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn}. Records not equals. Different hosts count"); + } + for (int i = 0; i < datahosts.Length; i++) + { + if (string.Compare(datahosts[i], testhosts[i], StringComparison.Ordinal) != 0) + { + Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn}. Records not equals. Different hosts"); + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/ZeroLevel.NN/ZeroLevel.NN.csproj b/ZeroLevel.NN/ZeroLevel.NN.csproj index dc33965..397a3a7 100644 --- a/ZeroLevel.NN/ZeroLevel.NN.csproj +++ b/ZeroLevel.NN/ZeroLevel.NN.csproj @@ -36,13 +36,12 @@ - + - diff --git a/ZeroLevel.sln b/ZeroLevel.sln index 837e078..ed5fc5c 100644 --- a/ZeroLevel.sln +++ b/ZeroLevel.sln @@ -65,6 +65,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AutoLoader", "AutoLoader", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AppContainer", "AutoLoader\AppContainer\AppContainer.csproj", "{9DE345EA-955B-41A8-93AF-277C0B5A9AC5}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PartitionTest", "PartitionTest", "{BAD88A91-1AFA-48A8-8D39-4846A65B4167}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -315,6 +319,18 @@ Global {9DE345EA-955B-41A8-93AF-277C0B5A9AC5}.Release|x64.Build.0 = Release|Any CPU {9DE345EA-955B-41A8-93AF-277C0B5A9AC5}.Release|x86.ActiveCfg = Release|Any CPU {9DE345EA-955B-41A8-93AF-277C0B5A9AC5}.Release|x86.Build.0 = Release|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|x64.ActiveCfg = Debug|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|x64.Build.0 = Debug|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|x86.ActiveCfg = Debug|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|x86.Build.0 = Debug|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|Any CPU.Build.0 = Release|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.ActiveCfg = Release|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.Build.0 = Release|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.ActiveCfg = Release|Any CPU + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -329,6 +345,7 @@ Global {F70842E7-9A1D-4CC4-9F55-0953AEC9C7C8} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15} {2C33D5A3-6CD4-4AAA-A716-B3CD65036E25} = {D5207A5A-2F27-4992-9BA5-0BDCFC59F133} {9DE345EA-955B-41A8-93AF-277C0B5A9AC5} = {2EF83101-63BC-4397-A005-A747189143D4} + {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9} = {BAD88A91-1AFA-48A8-8D39-4846A65B4167} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB} diff --git a/ZeroLevel/Services/IL.cs b/ZeroLevel/Services/IL.cs index c3c5832..e2612ac 100644 --- a/ZeroLevel/Services/IL.cs +++ b/ZeroLevel/Services/IL.cs @@ -311,4 +311,218 @@ namespace ZeroLevel.Services } #endregion } + + public delegate object RemoteProcedureCallHandler(string contractName, string methodName, string returnTypeFullName, params object[] args); + + internal static class DynamicProxyGenerator + { + #region Private Fields + private readonly static IDictionary _builders = new Dictionary(); + private readonly static IDictionary _types = new Dictionary(); + private readonly static object _lockObject = new object(); + #endregion + /* + #region Public Methods + // Note that calling this method will cause any further + // attempts to generate an interface to fail + internal static void Save() + { + foreach (var builder in _builders.Select(b => b.Value)) + { + var ass = (AssemblyBuilder)builder.Assembly; + try + { + ass.Save(ass.GetName().Name + ".dll"); + } + catch { } + } + } + #endregion + */ + #region Private Methods + /// + /// Создание экземпляра прокси класса по интерфейсу + /// + /// Интерфейс + /// Обработчик вызова метода + /// Экземпляр прокси класса + internal static T CreateInterfaceInstance(RemoteProcedureCallHandler rpcHandler) + { + var destType = GenerateInterfaceType(rpcHandler); + return (T)Activator.CreateInstance(destType); + } + /// + /// Проверка корректности начальных условий + /// + /// Тип интерфейса + /// Метод-обработчик удаленного вызова + private static void Validate(Type sourceType, MethodInfo mi) + { + if (!sourceType.IsInterface) + throw new ArgumentException("Type T is not an interface", "T"); + if ((mi.Attributes & MethodAttributes.Public) != MethodAttributes.Public) + throw new ArgumentException("Method must be public.", "getter"); + } + /// + /// Получение динамического сбощика модуля + /// + /// Тип интерфейса + /// ModuleBuilder + private static ModuleBuilder CreateModuleBuilder(Type sourceType) + { + var orginalAssemblyName = sourceType.Assembly.GetName().Name; + ModuleBuilder moduleBuilder; + if (!_builders.TryGetValue(orginalAssemblyName, out moduleBuilder)) + { + var newAssemblyName = new AssemblyName(Guid.NewGuid() + "." + orginalAssemblyName); + var dynamicAssembly = AssemblyBuilder.DefineDynamicAssembly(newAssemblyName, AssemblyBuilderAccess.RunAndCollect); + moduleBuilder = dynamicAssembly.DefineDynamicModule(newAssemblyName.Name); + _builders.Add(orginalAssemblyName, moduleBuilder); + } + return moduleBuilder; + } + /// + /// Создание списка списка интерфейсов, по которому требуется создать прокси класс + /// В список входит указанный интерфейс и все интерфейсы от которых он унаследован + /// + /// Тип интерфейса + /// Список интерфейсов + internal static List GetDistinctInterfaces(Type sourceType) + { + var interfaces = new List(); + IEnumerable subList = new[] { sourceType }; + while (subList.Count() != 0) + { + interfaces.AddRange(subList); + subList = subList.SelectMany(i => i.GetInterfaces()); + } + return interfaces.Distinct().ToList(); + } + /// + /// Добавление нового метода в прокси тип + /// + /// Сборщик типа + /// Описание метода + /// Прокси метод + /// Контракт к которому относится метод (Тип интерфейса) + private static void AppendMethodToProxy(TypeBuilder typeBuilder, MethodInfo method, RemoteProcedureCallHandler handler, string contractName) + { + string methodName = method.Name; + Type retType = method.ReturnType; + bool hasReturnValue = retType != typeof(void); + + var newMethod = typeBuilder.DefineMethod(methodName, + method.Attributes ^ MethodAttributes.Abstract, + method.CallingConvention, + retType, + method.ReturnParameter.GetRequiredCustomModifiers(), + method.ReturnParameter.GetOptionalCustomModifiers(), + method.GetParameters().Select(p => p.ParameterType).ToArray(), + method.GetParameters().Select(p => p.GetRequiredCustomModifiers()).ToArray(), + method.GetParameters().Select(p => p.GetOptionalCustomModifiers()).ToArray() + ); + + var il = newMethod.GetILGenerator(); + + /* Type exType = typeof(Exception); + ConstructorInfo exCtorInfo = exType.GetConstructor(new Type[] { typeof(string) }); + MethodInfo exToStrMI = exType.GetMethod("ToString"); + MethodInfo writeLineMI = typeof(Console).GetMethod("WriteLine", new Type[] { typeof(string), typeof(object) }); + LocalBuilder tmp2 = il.DeclareLocal(exType);*/ + + ParameterInfo[] parameters = method.GetParameters(); + // Массив для параметров оригинального метода + LocalBuilder argArray = il.DeclareLocal(typeof(object[])); + il.Emit(OpCodes.Ldc_I4, parameters.Length); + il.Emit(OpCodes.Newarr, typeof(object)); + il.Emit(OpCodes.Stloc, argArray); + // Заполнение массива + for (int i = 0; i < parameters.Length; i++) + { + ParameterInfo info = parameters[i]; + il.Emit(OpCodes.Ldloc, argArray); + il.Emit(OpCodes.Ldc_I4, i); + il.Emit(OpCodes.Ldarg_S, i + 1); + if (info.ParameterType.IsPrimitive || info.ParameterType.IsValueType) + il.Emit(OpCodes.Box, info.ParameterType); + il.Emit(OpCodes.Stelem_Ref); + } + // Аргументы прокси-метода + il.Emit(OpCodes.Ldstr, contractName); + il.Emit(OpCodes.Ldstr, methodName); + if (hasReturnValue) + { + il.Emit(OpCodes.Ldstr, retType.FullName); + } + else + { + il.Emit(OpCodes.Ldstr, typeof(void).FullName); + } + il.Emit(OpCodes.Ldloc, argArray); + // Вызов прокси-метода + + // Label exBlock = il.BeginExceptionBlock(); + il.EmitCall(OpCodes.Call, handler.GetMethodInfo(), null); + /* + il.Emit(OpCodes.Stloc_S, tmp2); + il.Emit(OpCodes.Ldstr, "Caught {0}"); + il.Emit(OpCodes.Ldloc_S, tmp2); + il.EmitCall(OpCodes.Callvirt, exToStrMI, null); + il.EmitCall(OpCodes.Call, writeLineMI, null); + il.Emit(OpCodes.Ldc_I4_M1); + il.EndExceptionBlock(); + */ + // Возврат результата + if (hasReturnValue) + { + if (retType.IsValueType) + il.Emit(OpCodes.Unbox_Any, retType); + } + else + { + il.Emit(OpCodes.Pop); + } + il.Emit(OpCodes.Ret); + } + /// + /// Создание прокси типа для интерфейса + /// + /// Интерфейс + /// Прокси метод, для обработки вызова реального метода + /// Созданный тип + private static Type GenerateInterfaceType(RemoteProcedureCallHandler rpcHandler) + { + var sourceType = typeof(T); + Type newType; + if (_types.TryGetValue(sourceType, out newType)) + return newType; + + string sourceContractFullName = sourceType.FullName; + // Make sure the same interface isn't implemented twice + lock (_lockObject) + { + if (_types.TryGetValue(sourceType, out newType)) + return newType; + // Validation + Validate(sourceType, rpcHandler.Method); + // Module and Assembly Creation + var moduleBuilder = CreateModuleBuilder(sourceType); + var assemblyName = moduleBuilder.Assembly.GetName(); + // Create the TypeBuilder + var typeBuilder = moduleBuilder.DefineType(sourceType.FullName, TypeAttributes.Public | TypeAttributes.Class, typeof(object), new[] { sourceType }); + // Enumerate interface inheritance hierarchy + var interfaces = GetDistinctInterfaces(sourceType); + // Create the methods + foreach (var method in interfaces.SelectMany(i => i.GetMethods())) + { + AppendMethodToProxy(typeBuilder, method, rpcHandler, sourceContractFullName); + } + // Create and return the defined type + newType = typeBuilder.CreateType(); + _types.Add(sourceType, newType); + return newType; + } + } + #endregion + } } diff --git a/ZeroLevel/Services/Storages/PartitionFileStorage.cs b/ZeroLevel/Services/Storages/PartitionFileStorage.cs new file mode 100644 index 0000000..1808626 --- /dev/null +++ b/ZeroLevel/Services/Storages/PartitionFileStorage.cs @@ -0,0 +1,184 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using System.Linq; +using System.Threading.Tasks; +using ZeroLevel.Services.FileSystem; + +namespace ZeroLevel.Services.Storages +{ + public class Partition + { + public Partition(string name, Func pathExtractor) + { + Name = name; + PathExtractor = pathExtractor; + } + public Func PathExtractor { get; } + public string Name { get; } + } + /// + /// Write data to partitional data storage + /// + public interface IPartitionFileStorage + { + Task WriteAsync(TKey key, IEnumerable records); + Task> CollectAsync(IEnumerable keys); + void Drop(TKey key); + } + + public interface IPartitionDataConverter + { + IEnumerable ConvertFromStorage(Stream stream); + void ConvertToStorage(Stream stream, IEnumerable data); + } + + public class PartitionFileStorageOptions + { + public string RootFolder { get; set; } + public int MaxDegreeOfParallelism { get; set; } = Environment.ProcessorCount / 2; + public bool MergeFiles { get; set; } = false; + public int MergeFrequencyInMinutes { get; set; } = 180; + public bool UseCompression { get; set; } = false; + public IPartitionDataConverter DataConverter { get; set; } + public List> Partitions { get; set; } = new List>(); + } + + public class PartitionFileStorage + : IPartitionFileStorage + { + + private readonly PartitionFileStorageOptions _options; + public PartitionFileStorage(PartitionFileStorageOptions options) + { + if (options.RootFolder == null) + throw new ArgumentNullException(nameof(options.RootFolder)); + if (options.DataConverter == null) + throw new ArgumentNullException(nameof(options.DataConverter)); + if (!Directory.Exists(options.RootFolder)) + { + Directory.CreateDirectory(options.RootFolder); + } + _options = options; + if (options.MergeFiles) + { + Sheduller.RemindEvery(TimeSpan.FromMinutes(_options.MergeFrequencyInMinutes), MergeDataFiles); + } + } + + public void Drop(TKey key) + { + var path = GetDataPath(key); + FSUtils.RemoveFolder(path, 3, 500); + } + + public async Task> CollectAsync(IEnumerable keys) + { + var pathes = keys.Safe().Select(k => GetDataPath(k)); + var files = pathes.Safe().SelectMany(p => Directory.GetFiles(p)).Where(n => n.StartsWith("__") == false); + var set = new ConcurrentBag(); + if (files.Any()) + { + var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism }; + await Parallel.ForEachAsync(files, options, async (file, _) => + { + using (var stream = CreateReadStream(file)) + { + foreach (var item in _options.DataConverter.ConvertFromStorage(stream)) + { + set.Add(item); + } + } + }); + } + return set; + } + public async Task WriteAsync(TKey key, IEnumerable records) + { + using (var stream = CreateWriteStream(key)) + { + _options.DataConverter.ConvertToStorage(stream, records); + await stream.FlushAsync(); + } + } + + #region Private members + private void MergeDataFiles() + { + var folders = new Stack(); + folders.Push(_options.RootFolder); + while (folders.Count > 0) + { + var dir = folders.Pop(); + MergeFolder(dir); + foreach (var subdir in Directory.GetDirectories(dir, "*.*", SearchOption.TopDirectoryOnly)) + { + folders.Push(subdir); + } + } + } + + private void MergeFolder(string path) + { + var files = Directory.GetFiles(path); + if (files != null && files.Length > 1) + { + + } + } + + private string GetDataFilePath(string path) + { + return Path.Combine(path, Guid.NewGuid().ToString()); + } + private string GetDataPath(TKey key) + { + var path = _options.RootFolder; + foreach (var partition in _options.Partitions) + { + var pathPart = partition.PathExtractor(key); + pathPart = FSUtils.FileNameCorrection(pathPart); + if (string.IsNullOrWhiteSpace(pathPart)) + { + throw new Exception($"Partition '{partition.Name}' not return name of part of path"); + } + path = Path.Combine(path, pathPart); + } + return path; + } + private Stream CreateWriteStream(TKey key) + { + var path = GetDataPath(key); + if (!Directory.Exists(path)) + { + Directory.CreateDirectory(path); + } + var fullPath = GetDataFilePath(path); + var stream = File.OpenWrite(fullPath); + if (_options.UseCompression) + { + return new GZipStream(stream, CompressionMode.Compress, false); + } + return stream; + } + + private Stream CreateReadStream(string path) + { + var stream = File.OpenRead(path); + if (_options.UseCompression) + { + var ms = new MemoryStream(); + using (var compressed = new GZipStream(stream, CompressionMode.Decompress, false)) + { + compressed.CopyTo(ms); + } + ms.Position = 0; + return ms; + } + return stream; + } + #endregion + } +}