diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index 107d21d..02e5050 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -1,6 +1,7 @@ using ZeroLevel; using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Storages; +using ZeroLevel.Services.Storages.PartitionFileSystemStorage; namespace PartitionFileStorageTest { @@ -53,7 +54,7 @@ namespace PartitionFileStorageTest class DataConverter : IPartitionDataConverter { - public IEnumerable ConvertFromStorage(Stream stream) + public IEnumerable ReadFromStorage(Stream stream) { var reader = new MemoryStreamReader(stream); while (reader.EOS == false) @@ -62,7 +63,7 @@ namespace PartitionFileStorageTest } } - public void ConvertToStorage(Stream stream, IEnumerable data) + public void WriteToStorage(Stream stream, IEnumerable data) { var writer = new MemoryStreamWriter(stream); foreach (var record in data) diff --git a/ZeroLevel.sln b/ZeroLevel.sln index ed5fc5c..e4dd8c7 100644 --- a/ZeroLevel.sln +++ b/ZeroLevel.sln @@ -67,7 +67,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AppContainer", "AutoLoader\ EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PartitionTest", "PartitionTest", "{BAD88A91-1AFA-48A8-8D39-4846A65B4167}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{442569A3-E126-4A11-B9DD-2DFA5BF76B0F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BigFileParserTest", "Tests\BigFileParserTest\BigFileParserTest.csproj", "{E7526771-86D5-4311-A284-05D3FEFC7B75}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -331,6 +335,18 @@ Global {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.Build.0 = Release|Any CPU {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.ActiveCfg = Release|Any CPU {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.Build.0 = Release|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x64.ActiveCfg = Debug|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x64.Build.0 = Debug|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x86.ActiveCfg = Debug|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x86.Build.0 = Debug|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|Any CPU.Build.0 = Release|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x64.ActiveCfg = Release|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x64.Build.0 = Release|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x86.ActiveCfg = Release|Any CPU + {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -346,6 +362,7 @@ Global {2C33D5A3-6CD4-4AAA-A716-B3CD65036E25} = {D5207A5A-2F27-4992-9BA5-0BDCFC59F133} {9DE345EA-955B-41A8-93AF-277C0B5A9AC5} = {2EF83101-63BC-4397-A005-A747189143D4} {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9} = {BAD88A91-1AFA-48A8-8D39-4846A65B4167} + {E7526771-86D5-4311-A284-05D3FEFC7B75} = {442569A3-E126-4A11-B9DD-2DFA5BF76B0F} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB} diff --git a/ZeroLevel/Services/FileSystem/BigFileParser.cs b/ZeroLevel/Services/FileSystem/BigFileParser.cs index 4c451ba..67a50d0 100644 --- a/ZeroLevel/Services/FileSystem/BigFileParser.cs +++ b/ZeroLevel/Services/FileSystem/BigFileParser.cs @@ -31,7 +31,7 @@ namespace ZeroLevel.Services.FileSystem } - public IEnumerable ReadBatches(int batchSize) + public IEnumerable ReadBatches(int batchSize, bool skipNull = false) { var buffer = new T[batchSize]; var buffer_index = 0; @@ -44,13 +44,15 @@ namespace ZeroLevel.Services.FileSystem string line; while ((line = sr.ReadLine()) != null) { - buffer[buffer_index] = _parser.Invoke(line); + var value = _parser.Invoke(line); + if (skipNull && value == null) continue; + buffer[buffer_index] = value; buffer_index++; if (buffer_index >= batchSize) - { - buffer_index = 0; + { Thread.MemoryBarrier(); yield return buffer; + buffer_index = 0; } } } diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs index 25adbfe..add1f64 100644 --- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs +++ b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage @@ -6,7 +7,7 @@ namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage public interface IPartitionFileStorage { Task WriteAsync(TKey key, IEnumerable records); - Task> CollectAsync(IEnumerable keys); + Task> CollectAsync(IEnumerable keys, Func filter = null); void Drop(TKey key); } } diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs index f4c75fc..87d1c83 100644 --- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs +++ b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs @@ -37,8 +37,9 @@ namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage FSUtils.RemoveFolder(path, 3, 500); } - public async Task> CollectAsync(IEnumerable keys) + public async Task> CollectAsync(IEnumerable keys, Func filter = null) { + if (filter == null) filter = (_) => true; var pathes = keys.Safe().Select(k => GetDataPath(k)); var files = pathes.Safe().SelectMany(p => Directory.GetFiles(p)).Where(n => n.StartsWith("__") == false); var set = new ConcurrentBag(); @@ -51,7 +52,10 @@ namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage { foreach (var item in _options.DataConverter.ReadFromStorage(stream)) { - set.Add(item); + if (filter(item)) + { + set.Add(item); + } } } }); @@ -68,13 +72,14 @@ namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage } #region Private members + private ConcurrentDictionary _processingPath = new ConcurrentDictionary(); private void MergeDataFiles() { var folders = new Stack(); folders.Push(_options.RootFolder); while (folders.Count > 0) { - var dir = folders.Pop(); + var dir = folders.Pop(); MergeFolder(dir); foreach (var subdir in Directory.GetDirectories(dir, "*.*", SearchOption.TopDirectoryOnly)) { @@ -85,6 +90,11 @@ namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage private void MergeFolder(string path) { + var v = _processingPath.GetOrAdd(path, 0); + if (v != 0) // каталог обрабатывается в настоящий момент + { + return; + } var files = Directory.GetFiles(path); if (files != null && files.Length > 1) { diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index 1dbfa7c..850c564 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -6,16 +6,16 @@ ogoun ogoun - 3.3.6.7 - PartitionFileSystemStorage + 3.3.6.8 + BigFileReader update https://github.com/ogoun/Zero/wiki Copyright Ogoun 2022 https://github.com/ogoun/Zero git - 3.3.6.7 - 3.3.6.7 + 3.3.6.8 + 3.3.6.8 AnyCPU;x64;x86 zero.png full