BigFileReaderUpdate

pull/3/head
Ogoun 2 years ago
parent a996f36abe
commit e536eb5328

@ -1,6 +1,7 @@
using ZeroLevel; using ZeroLevel;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
using ZeroLevel.Services.Storages; using ZeroLevel.Services.Storages;
using ZeroLevel.Services.Storages.PartitionFileSystemStorage;
namespace PartitionFileStorageTest namespace PartitionFileStorageTest
{ {
@ -53,7 +54,7 @@ namespace PartitionFileStorageTest
class DataConverter class DataConverter
: IPartitionDataConverter<Record> : IPartitionDataConverter<Record>
{ {
public IEnumerable<Record> ConvertFromStorage(Stream stream) public IEnumerable<Record> ReadFromStorage(Stream stream)
{ {
var reader = new MemoryStreamReader(stream); var reader = new MemoryStreamReader(stream);
while (reader.EOS == false) while (reader.EOS == false)
@ -62,7 +63,7 @@ namespace PartitionFileStorageTest
} }
} }
public void ConvertToStorage(Stream stream, IEnumerable<Record> data) public void WriteToStorage(Stream stream, IEnumerable<Record> data)
{ {
var writer = new MemoryStreamWriter(stream); var writer = new MemoryStreamWriter(stream);
foreach (var record in data) foreach (var record in data)

@ -67,7 +67,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AppContainer", "AutoLoader\
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PartitionTest", "PartitionTest", "{BAD88A91-1AFA-48A8-8D39-4846A65B4167}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PartitionTest", "PartitionTest", "{BAD88A91-1AFA-48A8-8D39-4846A65B4167}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{442569A3-E126-4A11-B9DD-2DFA5BF76B0F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BigFileParserTest", "Tests\BigFileParserTest\BigFileParserTest.csproj", "{E7526771-86D5-4311-A284-05D3FEFC7B75}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -331,6 +335,18 @@ Global
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.Build.0 = Release|Any CPU {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.Build.0 = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.ActiveCfg = Release|Any CPU {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.ActiveCfg = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.Build.0 = Release|Any CPU {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.Build.0 = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x64.ActiveCfg = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x64.Build.0 = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x86.ActiveCfg = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x86.Build.0 = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|Any CPU.Build.0 = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x64.ActiveCfg = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x64.Build.0 = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x86.ActiveCfg = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -346,6 +362,7 @@ Global
{2C33D5A3-6CD4-4AAA-A716-B3CD65036E25} = {D5207A5A-2F27-4992-9BA5-0BDCFC59F133} {2C33D5A3-6CD4-4AAA-A716-B3CD65036E25} = {D5207A5A-2F27-4992-9BA5-0BDCFC59F133}
{9DE345EA-955B-41A8-93AF-277C0B5A9AC5} = {2EF83101-63BC-4397-A005-A747189143D4} {9DE345EA-955B-41A8-93AF-277C0B5A9AC5} = {2EF83101-63BC-4397-A005-A747189143D4}
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9} = {BAD88A91-1AFA-48A8-8D39-4846A65B4167} {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9} = {BAD88A91-1AFA-48A8-8D39-4846A65B4167}
{E7526771-86D5-4311-A284-05D3FEFC7B75} = {442569A3-E126-4A11-B9DD-2DFA5BF76B0F}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB} SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB}

@ -31,7 +31,7 @@ namespace ZeroLevel.Services.FileSystem
} }
public IEnumerable<T[]> ReadBatches(int batchSize) public IEnumerable<T[]> ReadBatches(int batchSize, bool skipNull = false)
{ {
var buffer = new T[batchSize]; var buffer = new T[batchSize];
var buffer_index = 0; var buffer_index = 0;
@ -44,13 +44,15 @@ namespace ZeroLevel.Services.FileSystem
string line; string line;
while ((line = sr.ReadLine()) != null) while ((line = sr.ReadLine()) != null)
{ {
buffer[buffer_index] = _parser.Invoke(line); var value = _parser.Invoke(line);
if (skipNull && value == null) continue;
buffer[buffer_index] = value;
buffer_index++; buffer_index++;
if (buffer_index >= batchSize) if (buffer_index >= batchSize)
{ {
buffer_index = 0;
Thread.MemoryBarrier(); Thread.MemoryBarrier();
yield return buffer; yield return buffer;
buffer_index = 0;
} }
} }
} }

@ -1,4 +1,5 @@
using System.Collections.Generic; using System;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
@ -6,7 +7,7 @@ namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
public interface IPartitionFileStorage<TKey, TRecord> public interface IPartitionFileStorage<TKey, TRecord>
{ {
Task WriteAsync(TKey key, IEnumerable<TRecord> records); Task WriteAsync(TKey key, IEnumerable<TRecord> records);
Task<IEnumerable<TRecord>> CollectAsync(IEnumerable<TKey> keys); Task<IEnumerable<TRecord>> CollectAsync(IEnumerable<TKey> keys, Func<TRecord, bool> filter = null);
void Drop(TKey key); void Drop(TKey key);
} }
} }

@ -37,8 +37,9 @@ namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
FSUtils.RemoveFolder(path, 3, 500); FSUtils.RemoveFolder(path, 3, 500);
} }
public async Task<IEnumerable<TRecord>> CollectAsync(IEnumerable<TKey> keys) public async Task<IEnumerable<TRecord>> CollectAsync(IEnumerable<TKey> keys, Func<TRecord, bool> filter = null)
{ {
if (filter == null) filter = (_) => true;
var pathes = keys.Safe().Select(k => GetDataPath(k)); var pathes = keys.Safe().Select(k => GetDataPath(k));
var files = pathes.Safe().SelectMany(p => Directory.GetFiles(p)).Where(n => n.StartsWith("__") == false); var files = pathes.Safe().SelectMany(p => Directory.GetFiles(p)).Where(n => n.StartsWith("__") == false);
var set = new ConcurrentBag<TRecord>(); var set = new ConcurrentBag<TRecord>();
@ -51,7 +52,10 @@ namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
{ {
foreach (var item in _options.DataConverter.ReadFromStorage(stream)) foreach (var item in _options.DataConverter.ReadFromStorage(stream))
{ {
set.Add(item); if (filter(item))
{
set.Add(item);
}
} }
} }
}); });
@ -68,13 +72,14 @@ namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
} }
#region Private members #region Private members
private ConcurrentDictionary<string, int> _processingPath = new ConcurrentDictionary<string, int>();
private void MergeDataFiles() private void MergeDataFiles()
{ {
var folders = new Stack<string>(); var folders = new Stack<string>();
folders.Push(_options.RootFolder); folders.Push(_options.RootFolder);
while (folders.Count > 0) while (folders.Count > 0)
{ {
var dir = folders.Pop(); var dir = folders.Pop();
MergeFolder(dir); MergeFolder(dir);
foreach (var subdir in Directory.GetDirectories(dir, "*.*", SearchOption.TopDirectoryOnly)) foreach (var subdir in Directory.GetDirectories(dir, "*.*", SearchOption.TopDirectoryOnly))
{ {
@ -85,6 +90,11 @@ namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
private void MergeFolder(string path) private void MergeFolder(string path)
{ {
var v = _processingPath.GetOrAdd(path, 0);
if (v != 0) // каталог обрабатывается в настоящий момент
{
return;
}
var files = Directory.GetFiles(path); var files = Directory.GetFiles(path);
if (files != null && files.Length > 1) if (files != null && files.Length > 1)
{ {

@ -6,16 +6,16 @@
</Description> </Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>3.3.6.7</AssemblyVersion> <AssemblyVersion>3.3.6.8</AssemblyVersion>
<PackageReleaseNotes>PartitionFileSystemStorage</PackageReleaseNotes> <PackageReleaseNotes>BigFileReader update</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2022</Copyright> <Copyright>Copyright Ogoun 2022</Copyright>
<PackageLicenseUrl></PackageLicenseUrl> <PackageLicenseUrl></PackageLicenseUrl>
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<Version>3.3.6.7</Version> <Version>3.3.6.8</Version>
<FileVersion>3.3.6.7</FileVersion> <FileVersion>3.3.6.8</FileVersion>
<Platforms>AnyCPU;x64;x86</Platforms> <Platforms>AnyCPU;x64;x86</Platforms>
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
<DebugType>full</DebugType> <DebugType>full</DebugType>

Loading…
Cancel
Save

Powered by TurnKey Linux.