PartitionStorage

pull/3/head
Ogoun 2 years ago
parent e536eb5328
commit 75f620d57d

@ -0,0 +1,74 @@
namespace PartitionFileStorageTest
{
public static class Compressor
{
/// <summary>
/// Упаковка набора чисел в массив байтов
/// </summary>
public static byte[] GetEncodedBytes(IEnumerable<ulong> list, ref ulong last)
{
byte[] segmentsBytes;
using (var memoryStream = new MemoryStream())
{
foreach (var current in list)
{
var value = current - last;
memoryStream.Write7BitEncodedULong(value);
last = current;
}
segmentsBytes = memoryStream.ToArray();
}
return segmentsBytes;
}
public static IEnumerable<ulong> DecodeBytesContent(byte[] bytes)
{
ulong last = 0;
using (var memoryStream = new MemoryStream(bytes))
{
while (memoryStream.Position != memoryStream.Length)
{
var value = memoryStream.Read7BitEncodedULong();
var current = last + value;
yield return current;
last = current;
}
}
}
public static void Write7BitEncodedULong(this MemoryStream writer, ulong value)
{
var first = true;
while (first || value > 0)
{
first = false;
var lower7bits = (byte)(value & 0x7f);
value >>= 7;
if (value > 0)
lower7bits |= 128;
writer.WriteByte(lower7bits);
}
}
public static ulong Read7BitEncodedULong(this MemoryStream reader)
{
if (reader == null)
throw new ArgumentNullException(nameof(reader));
var more = true;
ulong value = 0;
var shift = 0;
while (more)
{
ulong lower7bits = (byte)reader.ReadByte();
more = (lower7bits & 128) != 0;
value |= (lower7bits & 0x7f) << shift;
shift += 7;
}
return value;
}
}
}

@ -0,0 +1,101 @@
namespace PartitionFileStorageTest
{
public struct MsisdnParts
{
public int FirstDigit { get; }
public int OtherDigits { get; }
public MsisdnParts(int firstDigit, int otherDigits)
{
FirstDigit = firstDigit;
OtherDigits = otherDigits;
}
public override string ToString() => $"({FirstDigit},{OtherDigits})";
}
public static class MsisdnHelper
{
public static MsisdnParts SplitParts(this ulong msisdn)
{
//расчитываем только на номера российской нумерации ("7" и 10 цифр)
//это числа от 70_000_000_000 до 79_999_999_999
if (msisdn < 70_000_000_000 || msisdn > 79_999_999_999) throw new ArgumentException(nameof(msisdn));
var firstDigit = (int)((msisdn / 1_000_000_000L) % 10);
var otherDigits = (int)(msisdn % 1_000_000_000L);
return new MsisdnParts(firstDigit, otherDigits);
}
public static ulong CombineParts(int firstDigit, int otherDigits)
{
return (ulong)(70_000_000_000L + firstDigit * 1_000_000_000L + otherDigits);
}
public static IEnumerable<ulong> ParseMsisdns(this IEnumerable<string> lines)
{
foreach (var line in lines)
{
ulong msisdn;
if (line.TryParseMsisdn(out msisdn))
{
yield return msisdn;
}
}
}
/// <summary>
/// возвращаются только номера российской нумерации ("7" и 10 цифр) в виде long
/// </summary>
/// <param name="source"></param>
/// <param name="msisdn"></param>
/// <returns></returns>
public static bool TryParseMsisdn(this string source, out ulong msisdn)
{
var line = source.Trim();
var length = line.Length;
msisdn = 0;
//допустимы форматы номеров "+71234567890", "71234567890", "1234567890"
if (length < 10 || length > 12) return false;
var start = 0;
if (length == 12) //"+71234567890"
{
if (line[0] != '+' || line[1] != '7') return false;
start = 2;
}
if (length == 11) //"71234567890" и "81234567890"
{
if (line[0] != '7') return false;
start = 1;
}
/*
else if (length == 10) //"1234567890"
{
start = 0;
}
*/
ulong number = 7;
for (var i = start; i < length; i++)
{
var c = line[i];
if ('0' <= c && c <= '9')
{
number = number * 10 + (ulong)(c - '0');
}
else
{
return false;
}
}
msisdn = number;
return true;
}
}
}

@ -7,6 +7,10 @@
<Nullable>enable</Nullable> <Nullable>enable</Nullable>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.FASTER.Core" Version="2.0.22" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\ZeroLevel\ZeroLevel.csproj" /> <ProjectReference Include="..\ZeroLevel\ZeroLevel.csproj" />
</ItemGroup> </ItemGroup>

@ -1,141 +1,231 @@
using ZeroLevel; using System.Diagnostics;
using System.Text;
using ZeroLevel.Services.PartitionStorage;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
using ZeroLevel.Services.Storages;
using ZeroLevel.Services.Storages.PartitionFileSystemStorage;
namespace PartitionFileStorageTest namespace PartitionFileStorageTest
{ {
internal class Program public class CallRecordParser
{
public class PartitionKey
{ {
public DateTime Date { get; set; } private static HashSet<char> _partsOfNumbers = new HashSet<char> { '*', '#', '+', '(', ')', '-' };
public ulong Ctn { get; set; } private StringBuilder sb = new StringBuilder();
} private const string NO_VAL = null;
public class Record private string ReadNumber(string line)
: IBinarySerializable
{ {
public string[] Hosts { get; set; } sb.Clear();
var started = false;
public void Deserialize(IBinaryReader reader) foreach (var ch in line)
{
if (char.IsDigit(ch))
{ {
this.Hosts = reader.ReadStringArray(); if (started)
{
sb.Append(ch);
} }
else if (ch != '0')
public void Serialize(IBinaryWriter writer)
{ {
writer.WriteArray(this.Hosts); sb.Append(ch);
started = true;
} }
} }
else if (char.IsWhiteSpace(ch) || _partsOfNumbers.Contains(ch)) continue;
static Record GenerateRecord() else return NO_VAL;
}
if (sb.Length == 11 && sb[0] == '8') sb[0] = '7';
if (sb.Length == 3 || sb.Length == 4 || sb.Length > 10)
return sb.ToString();
return NO_VAL;
}
private HashSet<string> ReadNumbers(string line)
{
var result = new HashSet<string>();
if (string.IsNullOrWhiteSpace(line) == false)
{ {
var record = new Record(); char STX = (char)0x0002;
var rnd = new Random((int)Environment.TickCount); var values = line.Split(STX);
var count = rnd.Next(400); if (values.Length > 0)
record.Hosts = new string[count];
for (int i = 0; i < count; i++)
{ {
record.Hosts[i] = Guid.NewGuid().ToString(); foreach (var val in values)
{
var number = ReadNumber(val);
if (number != null)
{
result.Add(number);
}
} }
return record;
} }
static PartitionKey GenerateKey()
{
var key = new PartitionKey();
var rnd = new Random((int)Environment.TickCount);
key.Ctn = (ulong)rnd.Next(1000);
key.Date = DateTime.Now.AddDays(-rnd.Next(30)).Date;
return key;
} }
return result;
}
/// <summary>
/// Парсинг строки исходного файла
/// </summary>
/// <param name="line"></param>
/// <returns></returns>
public CallRecord Parse(string line)
{
var parts = line.Split('\t');
class DataConverter if (parts.Length != 2) return null;
: IPartitionDataConverter<Record>
var msisdn = ReadNumber(parts[0].Trim());
if (string.IsNullOrWhiteSpace(msisdn) == false)
{ {
public IEnumerable<Record> ReadFromStorage(Stream stream) var numbers = ReadNumbers(parts[1]);
if (numbers != null && numbers.Count > 0)
{ {
var reader = new MemoryStreamReader(stream); return new CallRecord
while (reader.EOS == false)
{ {
yield return reader.Read<Record>(); Msisdn = msisdn,
Msisdns = numbers
};
}
}
return null;
} }
} }
public void WriteToStorage(Stream stream, IEnumerable<Record> data) public class CallRecord
{ {
var writer = new MemoryStreamWriter(stream); public string Msisdn;
foreach (var record in data) public HashSet<string> Msisdns;
{
writer.Write<Record>(record);
}
} }
internal class Program
{
private class Metadata
{
public DateTime Date { get; set; }
public bool Incoming { get; set; }
} }
private static int COUNT_NUMBERS = ulong.MaxValue.ToString().Length; private static void BuildStore(string source, string root)
static void Main(string[] args) {
var options = new IStoreOptions<ulong, ulong, byte[], Metadata>
{
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 1000).ToString()),
MergeFunction = list =>
{ {
var testDict = new Dictionary<ulong, Dictionary<DateTime, List<Record>>>(); ulong s = 0;
var options = new PartitionFileSystemStorageOptions<PartitionKey, Record> return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
},
Partitions = new List<StoreCatalogPartition<Metadata>>
{ {
MaxDegreeOfParallelism = 1, new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd")),
DataConverter = new DataConverter(), new StoreCatalogPartition<Metadata>("Date", m => m.Incoming ? "incoming" : "outcoming")
UseCompression = true, },
MergeFiles = false, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? 1 : -1
RootFolder = Path.Combine(Configuration.BaseDirectory, "root")
}; };
options.Partitions.Add(new Partition<PartitionKey>("data", p => p.Date.ToString("yyyyMMdd"))); var store = new Store<ulong, ulong, byte[], Metadata>(options);
options.Partitions.Add(new Partition<PartitionKey>("ctn", p => p.Ctn.ToString().PadLeft(COUNT_NUMBERS, '0')));
var storage = new PartitionFileSystemStorage<PartitionKey, Record>(options);
for (int i = 0; i < 50000; i++)
var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true });
var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false });
var parser = new CallRecordParser();
var sw = new Stopwatch();
sw.Start();
using (FileStream fs = File.Open(source, FileMode.Open, FileAccess.Read, FileShare.None))
{ {
if (i % 100 == 0) using (BufferedStream bs = new BufferedStream(fs, 1024 * 1024 * 64))
Console.WriteLine(i);
var key = GenerateKey();
var record = GenerateRecord();
if (testDict.ContainsKey(key.Ctn) == false)
{ {
testDict[key.Ctn] = new Dictionary<DateTime, List<Record>>(); using (StreamReader sr = new StreamReader(bs))
} {
if (testDict[key.Ctn].ContainsKey(key.Date) == false) string line;
while ((line = sr.ReadLine()) != null)
{
var record = parser.Parse(line);
if (record == null) continue;
if (!string.IsNullOrWhiteSpace(record?.Msisdn ?? string.Empty) && ulong.TryParse(record.Msisdn, out var n))
{ {
testDict[key.Ctn][key.Date] = new List<Record>(); var ctns = record.Msisdns.ParseMsisdns().ToArray();
foreach (var ctn in ctns)
{
storeIncoming.Store(n, ctn);
storeOutcoming.Store(ctn, n);
}
}
}
} }
testDict[key.Ctn][key.Date].Add(record);
storage.WriteAsync(key, new[] { record }).Wait();
} }
foreach (var cpair in testDict) }
sw.Stop();
Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storeIncoming.CompleteStoreAndRebuild();
storeOutcoming.CompleteStoreAndRebuild();
sw.Stop();
Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms");
}
private static void TestReading(string source, string root)
{ {
foreach (var dpair in cpair.Value) var options = new IStoreOptions<ulong, ulong, byte[], Metadata>
{ {
var key = new PartitionKey { Ctn = cpair.Key, Date = dpair.Key }; RootFolder = root,
var data = storage.CollectAsync(new[] { key }).Result.ToArray(); FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 1000).ToString()),
var testData = dpair.Value; MergeFunction = list =>
if (data.Length != testData.Count)
{ {
Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn} Wrong count. Expected: {testData.Count}. Got: {data.Length}"); ulong s = 0;
} return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
else },
Partitions = new List<StoreCatalogPartition<Metadata>>
{ {
var datahosts = data.SelectMany(r => r.Hosts).OrderBy(s => s).ToArray(); new StoreCatalogPartition<Metadata>("timestamp", m => m.Date.ToString("yyyyMMdd")),
var testhosts = testData.SelectMany(r => r.Hosts).OrderBy(s => s).ToArray(); new StoreCatalogPartition<Metadata>("timestamp", m => m.Incoming ? "incoming" : "outcoming")
if (datahosts.Length != testhosts.Length) },
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1
};
var store = new Store<ulong, ulong, byte[], Metadata>(options);
var request = new StoreSearchRequest<ulong, Metadata>
{ {
Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn}. Records not equals. Different hosts count"); PartitionSearchRequests = new List<PartitionSearchRequest<ulong, Metadata>>
}
for (int i = 0; i < datahosts.Length; i++)
{ {
if (string.Compare(datahosts[i], testhosts[i], StringComparison.Ordinal) != 0) new PartitionSearchRequest<ulong, Metadata>
{ {
Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn}. Records not equals. Different hosts"); Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true },
Keys = new ulong[] { 79645090604 }
},
new PartitionSearchRequest<ulong, Metadata>
{
Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false },
Keys = new ulong[] { 79645090604 }
}
} }
};
var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true });
Console.WriteLine($"Incoming data files: {storeIncoming.CountDataFiles()}");
var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false });
Console.WriteLine($"Outcoming data files: {storeOutcoming.CountDataFiles()}");
var sw = new Stopwatch();
sw.Start();
var result = store.Search(request).Result;
foreach (var r in result.Results)
{
Console.WriteLine($"Incoming: {r.Key.Incoming}");
foreach (var mr in r.Value)
{
Console.WriteLine($"\tKey: {mr.Key}. Sucess: {mr.Found}");
if (mr.Found && mr.Value.Length > 0)
{
var ctns = Compressor.DecodeBytesContent(mr.Value);
Console.WriteLine($"\t\t{string.Join(';', ctns)}");
} }
} }
} }
sw.Stop();
Console.WriteLine($"Search time: {sw.ElapsedMilliseconds}ms");
} }
static void Main(string[] args)
{
var root = @"H:\temp";
var source = @"H:\319a9c31-d823-4dd1-89b0-7fb1bb9c4859.txt";
BuildStore(source, root);
TestReading(source, root);
Console.ReadKey();
} }
} }
} }

@ -1,11 +1,15 @@
namespace TestApp using System;
using System.Collections.Concurrent;
namespace TestApp
{ {
internal static class Program internal static class Program
{ {
private static void Main(string[] args) private static void Main(string[] args)
{ {
var detector = new PersonDetector(); var test = new ConcurrentDictionary<string, int>();
var predictions = detector.Detect(@"E:\Desktop\test\1.JPG"); var v = test.GetOrAdd("sss", 1);
Console.ReadKey();
} }
} }
} }

@ -1,5 +1,4 @@
using System; using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;

@ -69,10 +69,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PartitionTest", "PartitionT
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{442569A3-E126-4A11-B9DD-2DFA5BF76B0F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BigFileParserTest", "Tests\BigFileParserTest\BigFileParserTest.csproj", "{E7526771-86D5-4311-A284-05D3FEFC7B75}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -335,18 +331,6 @@ Global
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.Build.0 = Release|Any CPU {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.Build.0 = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.ActiveCfg = Release|Any CPU {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.ActiveCfg = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.Build.0 = Release|Any CPU {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.Build.0 = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x64.ActiveCfg = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x64.Build.0 = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x86.ActiveCfg = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x86.Build.0 = Debug|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|Any CPU.Build.0 = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x64.ActiveCfg = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x64.Build.0 = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x86.ActiveCfg = Release|Any CPU
{E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -362,7 +346,6 @@ Global
{2C33D5A3-6CD4-4AAA-A716-B3CD65036E25} = {D5207A5A-2F27-4992-9BA5-0BDCFC59F133} {2C33D5A3-6CD4-4AAA-A716-B3CD65036E25} = {D5207A5A-2F27-4992-9BA5-0BDCFC59F133}
{9DE345EA-955B-41A8-93AF-277C0B5A9AC5} = {2EF83101-63BC-4397-A005-A747189143D4} {9DE345EA-955B-41A8-93AF-277C0B5A9AC5} = {2EF83101-63BC-4397-A005-A747189143D4}
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9} = {BAD88A91-1AFA-48A8-8D39-4846A65B4167} {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9} = {BAD88A91-1AFA-48A8-8D39-4846A65B4167}
{E7526771-86D5-4311-A284-05D3FEFC7B75} = {442569A3-E126-4A11-B9DD-2DFA5BF76B0F}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB} SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB}

@ -0,0 +1,62 @@
using System;
namespace ZeroLevel.Services.Collections
{
/// <summary>
/// Collects data while there is capacity and invokes an action after that (batch processing)
/// </summary>
/// <typeparam name="T"></typeparam>
public sealed class Capacitor<T>
: IDisposable
{
private int _index = -1;
private int _count = 0;
private readonly T[] _buffer;
private readonly Action<T[], int> _dischargeAction;
public int Count => _count;
public Capacitor(int volume, Action<T[], int> dischargeAction)
{
if (volume < 1) volume = 16;
if (dischargeAction == null) throw new ArgumentNullException(nameof(dischargeAction));
_buffer = new T[volume];
_dischargeAction = dischargeAction;
}
public void Add(T val)
{
_index++;
if (_index >= _buffer.Length)
{
_dischargeAction.Invoke(_buffer, _buffer.Length);
_index = 0;
_count = 0;
}
_buffer[_index] = val;
_count++;
}
public void Discharge()
{
if (_count > 0)
{
_dischargeAction.Invoke(_buffer, _count);
}
}
public void Dispose()
{
if (_count > 0)
{
try
{
Discharge();
}
catch (Exception ex)
{
Log.Error(ex, $"[Capacitor.Dispose] Fault discharge in dispose method");
}
}
}
}
}

@ -1,7 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Threading;
namespace ZeroLevel.Services.FileSystem namespace ZeroLevel.Services.FileSystem
{ {
@ -33,7 +32,7 @@ namespace ZeroLevel.Services.FileSystem
public IEnumerable<T[]> ReadBatches(int batchSize, bool skipNull = false) public IEnumerable<T[]> ReadBatches(int batchSize, bool skipNull = false)
{ {
var buffer = new T[batchSize]; T[] buffer;
var buffer_index = 0; var buffer_index = 0;
using (FileStream fs = File.Open(_filePath, FileMode.Open, FileAccess.Read, FileShare.None)) using (FileStream fs = File.Open(_filePath, FileMode.Open, FileAccess.Read, FileShare.None))
{ {
@ -42,6 +41,7 @@ namespace ZeroLevel.Services.FileSystem
using (StreamReader sr = new StreamReader(bs)) using (StreamReader sr = new StreamReader(bs))
{ {
string line; string line;
buffer = new T[batchSize];
while ((line = sr.ReadLine()) != null) while ((line = sr.ReadLine()) != null)
{ {
var value = _parser.Invoke(line); var value = _parser.Invoke(line);
@ -50,7 +50,6 @@ namespace ZeroLevel.Services.FileSystem
buffer_index++; buffer_index++;
if (buffer_index >= batchSize) if (buffer_index >= batchSize)
{ {
Thread.MemoryBarrier();
yield return buffer; yield return buffer;
buffer_index = 0; buffer_index = 0;
} }
@ -69,7 +68,7 @@ namespace ZeroLevel.Services.FileSystem
} }
} }
public IEnumerable<T> Read(int batchSize) public IEnumerable<T> Read()
{ {
using (FileStream fs = File.Open(_filePath, FileMode.Open, FileAccess.Read, FileShare.None)) using (FileStream fs = File.Open(_filePath, FileMode.Open, FileAccess.Read, FileShare.None))
{ {

@ -0,0 +1,547 @@
using System;
using System.Security.Cryptography;
namespace ZeroLevel.Services.HashFunctions
{
//see details: https://github.com/Cyan4973/xxHash/blob/dev/doc/xxhash_spec.md
/// <summary>
/// Represents the class which provides a implementation of the xxHash32 algorithm.
/// </summary>
///<threadsafety static="true" instance="false"/>
public sealed class XXHash32 : HashAlgorithm
{
private const uint PRIME32_1 = 2654435761U;
private const uint PRIME32_2 = 2246822519U;
private const uint PRIME32_3 = 3266489917U;
private const uint PRIME32_4 = 668265263U;
private const uint PRIME32_5 = 374761393U;
private static readonly Func<byte[], int, uint> FuncGetLittleEndianUInt32;
private static readonly Func<uint, uint> FuncGetFinalHashUInt32;
private uint _Seed32;
private uint _ACC32_1;
private uint _ACC32_2;
private uint _ACC32_3;
private uint _ACC32_4;
private uint _Hash32;
private int _RemainingLength;
private long _TotalLength = 0;
private int _CurrentIndex;
private byte[] _CurrentArray;
static XXHash32()
{
if (BitConverter.IsLittleEndian)
{
FuncGetLittleEndianUInt32 = new Func<byte[], int, uint>((x, i) =>
{
unsafe
{
fixed (byte* array = x)
{
return *(uint*)(array + i);
}
}
});
FuncGetFinalHashUInt32 = new Func<uint, uint>(i => (i & 0x000000FFU) << 24 | (i & 0x0000FF00U) << 8 | (i & 0x00FF0000U) >> 8 | (i & 0xFF000000U) >> 24);
}
else
{
FuncGetLittleEndianUInt32 = new Func<byte[], int, uint>((x, i) =>
{
unsafe
{
fixed (byte* array = x)
{
return (uint)(array[i++] | (array[i++] << 8) | (array[i++] << 16) | (array[i] << 24));
}
}
});
FuncGetFinalHashUInt32 = new Func<uint, uint>(i => i);
}
}
/// <summary>
/// Creates an instance of <see cref="XXHash32"/> class by default seed(0).
/// </summary>
/// <returns></returns>
public new static XXHash32 Create() => new XXHash32();
/// <summary>
/// Creates an instance of the specified implementation of XXHash32 algorithm.
/// <para>This method always throws <see cref="NotSupportedException"/>. </para>
/// </summary>
/// <param name="algName">The hash algorithm implementation to use.</param>
/// <returns>This method always throws <see cref="NotSupportedException"/>. </returns>
/// <exception cref="NotSupportedException">This method is not be supported.</exception>
public new static XXHash32 Create(string algName) => throw new NotSupportedException("This method is not be supported.");
/// <summary>
/// Initializes a new instance of the <see cref="XXHash32"/> class by default seed(0).
/// </summary>
public XXHash32() => Initialize(0);
/// <summary>
/// Initializes a new instance of the <see cref="XXHash32"/> class, and sets the <see cref="Seed"/> to the specified value.
/// </summary>
/// <param name="seed">Represent the seed to be used for xxHash32 computing.</param>
public XXHash32(uint seed) => Initialize(seed);
/// <summary>
/// Gets the <see cref="uint"/> value of the computed hash code.
/// </summary>
/// <exception cref="InvalidOperationException">Hash computation has not yet completed.</exception>
public uint HashUInt32 => State == 0 ? _Hash32 : throw new InvalidOperationException("Hash computation has not yet completed.");
/// <summary>
/// Gets or sets the value of seed used by xxHash32 algorithm.
/// </summary>
/// <exception cref="InvalidOperationException">Hash computation has not yet completed.</exception>
public uint Seed
{
get => _Seed32;
set
{
if (value != _Seed32)
{
if (State != 0) throw new InvalidOperationException("Hash computation has not yet completed.");
_Seed32 = value;
Initialize();
}
}
}
/// <summary>
/// Initializes this instance for new hash computing.
/// </summary>
public override void Initialize()
{
_ACC32_1 = _Seed32 + PRIME32_1 + PRIME32_2;
_ACC32_2 = _Seed32 + PRIME32_2;
_ACC32_3 = _Seed32 + 0;
_ACC32_4 = _Seed32 - PRIME32_1;
}
/// <summary>
/// Routes data written to the object into the hash algorithm for computing the hash.
/// </summary>
/// <param name="array">The input to compute the hash code for.</param>
/// <param name="ibStart">The offset into the byte array from which to begin using data.</param>
/// <param name="cbSize">The number of bytes in the byte array to use as data.</param>
protected override void HashCore(byte[] array, int ibStart, int cbSize)
{
if (State != 1) State = 1;
var size = cbSize - ibStart;
_RemainingLength = size & 15;
if (cbSize >= 16)
{
var limit = size - _RemainingLength;
do
{
_ACC32_1 = Round32(_ACC32_1, FuncGetLittleEndianUInt32(array, ibStart));
ibStart += 4;
_ACC32_2 = Round32(_ACC32_2, FuncGetLittleEndianUInt32(array, ibStart));
ibStart += 4;
_ACC32_3 = Round32(_ACC32_3, FuncGetLittleEndianUInt32(array, ibStart));
ibStart += 4;
_ACC32_4 = Round32(_ACC32_4, FuncGetLittleEndianUInt32(array, ibStart));
ibStart += 4;
} while (ibStart < limit);
}
_TotalLength += cbSize;
if (_RemainingLength != 0)
{
_CurrentArray = array;
_CurrentIndex = ibStart;
}
}
/// <summary>
/// Finalizes the hash computation after the last data is processed by the cryptographic stream object.
/// </summary>
/// <returns>The computed hash code.</returns>
protected override byte[] HashFinal()
{
if (_TotalLength >= 16)
{
#if HIGHER_VERSIONS
_Hash32 = RotateLeft32_1(_ACC32_1) + RotateLeft32_7(_ACC32_2) + RotateLeft32_12(_ACC32_3) + RotateLeft32_18(_ACC32_4);
#else
_Hash32 = RotateLeft32(_ACC32_1, 1) + RotateLeft32(_ACC32_2, 7) + RotateLeft32(_ACC32_3, 12) + RotateLeft32(_ACC32_4, 18);
#endif
}
else
{
_Hash32 = _Seed32 + PRIME32_5;
}
_Hash32 += (uint)_TotalLength;
while (_RemainingLength >= 4)
{
#if HIGHER_VERSIONS
_Hash32 = RotateLeft32_17(_Hash32 + FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME32_3) * PRIME32_4;
#else
_Hash32 = RotateLeft32(_Hash32 + FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME32_3, 17) * PRIME32_4;
#endif
_CurrentIndex += 4;
_RemainingLength -= 4;
}
unsafe
{
fixed (byte* arrayPtr = _CurrentArray)
{
while (_RemainingLength-- >= 1)
{
#if HIGHER_VERSIONS
_Hash32 = RotateLeft32_11(_Hash32 + arrayPtr[_CurrentIndex++] * PRIME32_5) * PRIME32_1;
#else
_Hash32 = RotateLeft32(_Hash32 + arrayPtr[_CurrentIndex++] * PRIME32_5, 11) * PRIME32_1;
#endif
}
}
}
_Hash32 = (_Hash32 ^ (_Hash32 >> 15)) * PRIME32_2;
_Hash32 = (_Hash32 ^ (_Hash32 >> 13)) * PRIME32_3;
_Hash32 ^= _Hash32 >> 16;
_TotalLength = State = 0;
return BitConverter.GetBytes(FuncGetFinalHashUInt32(_Hash32));
}
#if HIGHER_VERSIONS
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static uint Round32(uint input, uint value) => RotateLeft32_13(input + (value * PRIME32_2)) * PRIME32_1;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static uint RotateLeft32_1(uint value) => (value << 1) | (value >> 31); //_ACC32_1
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static uint RotateLeft32_7(uint value) => (value << 7) | (value >> 25); //_ACC32_2
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static uint RotateLeft32_11(uint value) => (value << 11) | (value >> 21);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static uint RotateLeft32_12(uint value) => (value << 12) | (value >> 20);// _ACC32_3
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static uint RotateLeft32_13(uint value) => (value << 13) | (value >> 19);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static uint RotateLeft32_17(uint value) => (value << 17) | (value >> 15);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static uint RotateLeft32_18(uint value) => (value << 18) | (value >> 14); //_ACC32_4
#else
private static uint Round32(uint input, uint value) => RotateLeft32(input + (value * PRIME32_2), 13) * PRIME32_1;
private static uint RotateLeft32(uint value, int count) => (value << count) | (value >> (32 - count));
#endif
private void Initialize(uint seed)
{
HashSizeValue = 32;
_Seed32 = seed;
Initialize();
}
}
/// <summary>
/// Represents the class which provides a implementation of the xxHash64 algorithm.
/// </summary>
/// <threadsafety static="true" instance="false"/>
public sealed class XXHash64 : HashAlgorithm
{
private const ulong PRIME64_1 = 11400714785074694791UL;
private const ulong PRIME64_2 = 14029467366897019727UL;
private const ulong PRIME64_3 = 1609587929392839161UL;
private const ulong PRIME64_4 = 9650029242287828579UL;
private const ulong PRIME64_5 = 2870177450012600261UL;
private static readonly Func<byte[], int, uint> FuncGetLittleEndianUInt32;
private static readonly Func<byte[], int, ulong> FuncGetLittleEndianUInt64;
private static readonly Func<ulong, ulong> FuncGetFinalHashUInt64;
private ulong _Seed64;
private ulong _ACC64_1;
private ulong _ACC64_2;
private ulong _ACC64_3;
private ulong _ACC64_4;
private ulong _Hash64;
private int _RemainingLength;
private long _TotalLength;
private int _CurrentIndex;
private byte[] _CurrentArray;
static XXHash64()
{
if (BitConverter.IsLittleEndian)
{
FuncGetLittleEndianUInt32 = new Func<byte[], int, uint>((x, i) =>
{
unsafe
{
fixed (byte* array = x)
{
return *(uint*)(array + i);
}
}
});
FuncGetLittleEndianUInt64 = new Func<byte[], int, ulong>((x, i) =>
{
unsafe
{
fixed (byte* array = x)
{
return *(ulong*)(array + i);
}
}
});
FuncGetFinalHashUInt64 = new Func<ulong, ulong>(i => (i & 0x00000000000000FFUL) << 56 | (i & 0x000000000000FF00UL) << 40 | (i & 0x0000000000FF0000UL) << 24 | (i & 0x00000000FF000000UL) << 8 | (i & 0x000000FF00000000UL) >> 8 | (i & 0x0000FF0000000000UL) >> 24 | (i & 0x00FF000000000000UL) >> 40 | (i & 0xFF00000000000000UL) >> 56);
}
else
{
FuncGetLittleEndianUInt32 = new Func<byte[], int, uint>((x, i) =>
{
unsafe
{
fixed (byte* array = x)
{
return (uint)(array[i++] | (array[i++] << 8) | (array[i++] << 16) | (array[i] << 24));
}
}
});
FuncGetLittleEndianUInt64 = new Func<byte[], int, ulong>((x, i) =>
{
unsafe
{
fixed (byte* array = x)
{
return array[i++] | ((ulong)array[i++] << 8) | ((ulong)array[i++] << 16) | ((ulong)array[i++] << 24) | ((ulong)array[i++] << 32) | ((ulong)array[i++] << 40) | ((ulong)array[i++] << 48) | ((ulong)array[i] << 56);
}
}
});
FuncGetFinalHashUInt64 = new Func<ulong, ulong>(i => i);
}
}
/// <summary>
/// Creates an instance of <see cref="XXHash64"/> class by default seed(0).
/// </summary>
/// <returns></returns>
public new static XXHash64 Create() => new XXHash64();
/// <summary>
/// Creates an instance of the specified implementation of XXHash64 algorithm.
/// <para>This method always throws <see cref="NotSupportedException"/>. </para>
/// </summary>
/// <param name="algName">The hash algorithm implementation to use.</param>
/// <returns>This method always throws <see cref="NotSupportedException"/>. </returns>
/// <exception cref="NotSupportedException">This method is not be supported.</exception>
public new static XXHash64 Create(string algName) => throw new NotSupportedException("This method is not be supported.");
/// <summary>
/// Initializes a new instance of the <see cref="XXHash64"/> class by default seed(0).
/// </summary>
public XXHash64() => Initialize(0);
/// <summary>
/// Initializes a new instance of the <see cref="XXHash64"/> class, and sets the <see cref="Seed"/> to the specified value.
/// </summary>
/// <param name="seed">Represent the seed to be used for xxHash64 computing.</param>
public XXHash64(uint seed) => Initialize(seed);
/// <summary>
/// Gets the <see cref="ulong"/> value of the computed hash code.
/// </summary>
/// <exception cref="InvalidOperationException">Computation has not yet completed.</exception>
public ulong HashUInt64 => State == 0 ? _Hash64 : throw new InvalidOperationException("Computation has not yet completed.");
/// <summary>
/// Gets or sets the value of seed used by xxHash64 algorithm.
/// </summary>
/// <exception cref="InvalidOperationException">Computation has not yet completed.</exception>
public ulong Seed
{
get => _Seed64;
set
{
if (value != _Seed64)
{
if (State != 0) throw new InvalidOperationException("Computation has not yet completed.");
_Seed64 = value;
Initialize();
}
}
}
/// <summary>
/// Initializes this instance for new hash computing.
/// </summary>
public override void Initialize()
{
_ACC64_1 = _Seed64 + PRIME64_1 + PRIME64_2;
_ACC64_2 = _Seed64 + PRIME64_2;
_ACC64_3 = _Seed64 + 0;
_ACC64_4 = _Seed64 - PRIME64_1;
}
/// <summary>
/// Routes data written to the object into the hash algorithm for computing the hash.
/// </summary>
/// <param name="array">The input to compute the hash code for.</param>
/// <param name="ibStart">The offset into the byte array from which to begin using data.</param>
/// <param name="cbSize">The number of bytes in the byte array to use as data.</param>
protected override void HashCore(byte[] array, int ibStart, int cbSize)
{
if (State != 1) State = 1;
var size = cbSize - ibStart;
_RemainingLength = size & 31;
if (cbSize >= 32)
{
var limit = size - _RemainingLength;
do
{
_ACC64_1 = Round64(_ACC64_1, FuncGetLittleEndianUInt64(array, ibStart));
ibStart += 8;
_ACC64_2 = Round64(_ACC64_2, FuncGetLittleEndianUInt64(array, ibStart));
ibStart += 8;
_ACC64_3 = Round64(_ACC64_3, FuncGetLittleEndianUInt64(array, ibStart));
ibStart += 8;
_ACC64_4 = Round64(_ACC64_4, FuncGetLittleEndianUInt64(array, ibStart));
ibStart += 8;
} while (ibStart < limit);
}
_TotalLength += cbSize;
if (_RemainingLength != 0)
{
_CurrentArray = array;
_CurrentIndex = ibStart;
}
}
/// <summary>
/// Finalizes the hash computation after the last data is processed by the cryptographic stream object.
/// </summary>
/// <returns>The computed hash code.</returns>
protected override byte[] HashFinal()
{
if (_TotalLength >= 32)
{
#if HIGHER_VERSIONS
_Hash64 = RotateLeft64_1(_ACC64_1) + RotateLeft64_7(_ACC64_2) + RotateLeft64_12(_ACC64_3) + RotateLeft64_18(_ACC64_4);
#else
_Hash64 = RotateLeft64(_ACC64_1, 1) + RotateLeft64(_ACC64_2, 7) + RotateLeft64(_ACC64_3, 12) + RotateLeft64(_ACC64_4, 18);
#endif
_Hash64 = MergeRound64(_Hash64, _ACC64_1);
_Hash64 = MergeRound64(_Hash64, _ACC64_2);
_Hash64 = MergeRound64(_Hash64, _ACC64_3);
_Hash64 = MergeRound64(_Hash64, _ACC64_4);
}
else
{
_Hash64 = _Seed64 + PRIME64_5;
}
_Hash64 += (ulong)_TotalLength;
while (_RemainingLength >= 8)
{
#if HIGHER_VERSIONS
_Hash64 = RotateLeft64_27(_Hash64 ^ Round64(0, FuncGetLittleEndianUInt64(_CurrentArray, _CurrentIndex))) * PRIME64_1 + PRIME64_4;
#else
_Hash64 = RotateLeft64(_Hash64 ^ Round64(0, FuncGetLittleEndianUInt64(_CurrentArray, _CurrentIndex)), 27) * PRIME64_1 + PRIME64_4;
#endif
_CurrentIndex += 8;
_RemainingLength -= 8;
}
while (_RemainingLength >= 4)
{
#if HIGHER_VERSIONS
_Hash64 = RotateLeft64_23(_Hash64 ^ (FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME64_1)) * PRIME64_2 + PRIME64_3;
#else
_Hash64 = RotateLeft64(_Hash64 ^ (FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME64_1), 23) * PRIME64_2 + PRIME64_3;
#endif
_CurrentIndex += 4;
_RemainingLength -= 4;
}
unsafe
{
fixed (byte* arrayPtr = _CurrentArray)
{
while (_RemainingLength-- >= 1)
{
#if HIGHER_VERSIONS
_Hash64 = RotateLeft64_11(_Hash64 ^ (arrayPtr[_CurrentIndex++] * PRIME64_5)) * PRIME64_1;
#else
_Hash64 = RotateLeft64(_Hash64 ^ (arrayPtr[_CurrentIndex++] * PRIME64_5), 11) * PRIME64_1;
#endif
}
}
}
_Hash64 = (_Hash64 ^ (_Hash64 >> 33)) * PRIME64_2;
_Hash64 = (_Hash64 ^ (_Hash64 >> 29)) * PRIME64_3;
_Hash64 ^= _Hash64 >> 32;
_TotalLength = State = 0;
return BitConverter.GetBytes(FuncGetFinalHashUInt64(_Hash64));
}
#if HIGHER_VERSIONS
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong MergeRound64(ulong input, ulong value) => (input ^ Round64(0, value)) * PRIME64_1 + PRIME64_4;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong Round64(ulong input, ulong value) => RotateLeft64_31(input + (value * PRIME64_2)) * PRIME64_1;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong RotateLeft64_1(ulong value) => (value << 1) | (value >> 63); // _ACC64_1
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong RotateLeft64_7(ulong value) => (value << 7) | (value >> 57); // _ACC64_2
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong RotateLeft64_11(ulong value) => (value << 11) | (value >> 53);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong RotateLeft64_12(ulong value) => (value << 12) | (value >> 52);// _ACC64_3
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong RotateLeft64_18(ulong value) => (value << 18) | (value >> 46); // _ACC64_4
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong RotateLeft64_23(ulong value) => (value << 23) | (value >> 41);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong RotateLeft64_27(ulong value) => (value << 27) | (value >> 37);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong RotateLeft64_31(ulong value) => (value << 31) | (value >> 33);
#else
private static ulong MergeRound64(ulong input, ulong value) => (input ^ Round64(0, value)) * PRIME64_1 + PRIME64_4;
private static ulong Round64(ulong input, ulong value) => RotateLeft64(input + (value * PRIME64_2), 31) * PRIME64_1;
private static ulong RotateLeft64(ulong value, int count) => (value << count) | (value >> (64 - count));
#endif
private void Initialize(ulong seed)
{
HashSizeValue = 64;
_Seed64 = seed;
Initialize();
}
}
}

@ -0,0 +1,18 @@
using System.Threading.Tasks;
namespace ZeroLevel.Services.PartitionStorage
{
/// <summary>
/// Partition store interface
/// </summary>
/// <typeparam name="TKey">Record key</typeparam>
/// <typeparam name="TInput">The value that is written in the stream</typeparam>
/// <typeparam name="TValue">Value after compression of TInput values by duplicate keys (TInput list or similar)</typeparam>
/// <typeparam name="TMeta">Meta information for partition search</typeparam>
public interface IStore<TKey, TInput, TValue, TMeta>
{
IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info);
Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest);
}
}

@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Services.PartitionStorage
{
/// <summary>
/// Options
/// </summary>
/// <typeparam name="TKey">Record key</typeparam>
/// <typeparam name="TInput">The value that is written in the stream</typeparam>
/// <typeparam name="TValue">Value after compression of TInput values by duplicate keys (TInput list or similar)</typeparam>
/// <typeparam name="TMeta">Meta information for partition search</typeparam>
public class IStoreOptions<TKey, TInput, TValue, TMeta>
{
/// <summary>
/// Method for key comparison
/// </summary>
public Func<TKey, TKey, int> KeyComparer { get; set; }
/// <summary>
/// Storage root directory
/// </summary>
public string RootFolder { get; set; }
/// <summary>
/// Maximum degree of parallelis
/// </summary>
public int MaxDegreeOfParallelism { get; set; } = Environment.ProcessorCount / 2;
/// <summary>
/// Function for translating a list of TInput into one TValue
/// </summary>
public Func<IEnumerable<TInput>, TValue> MergeFunction { get; set; }
/// <summary>
/// List of partitions for accessing the catalog
/// </summary>
public List<StoreCatalogPartition<TMeta>> Partitions { get; set; } = new List<StoreCatalogPartition<TMeta>>();
/// <summary>
/// File Partition
/// </summary>
public StoreFilePartition<TKey, TMeta> FilePartition { get; set; }
internal string GetFileName(TKey key, TMeta info)
{
return FilePartition.PathExtractor(key, info);
}
internal string GetCatalogPath(TMeta info)
{
var path = RootFolder;
foreach (var partition in Partitions)
{
var pathPart = partition.PathExtractor(info);
pathPart = FSUtils.FileNameCorrection(pathPart);
if (string.IsNullOrWhiteSpace(pathPart))
{
throw new Exception($"Partition '{partition.Name}' not return name of part of path");
}
path = Path.Combine(path, pathPart);
}
return path;
}
}
}

@ -0,0 +1,41 @@
using System;
using System.Collections.Generic;
namespace ZeroLevel.Services.PartitionStorage
{
/// <summary>
/// Provides read-write operations in catalog partition
/// </summary>
/// <typeparam name="TKey">Key type</typeparam>
/// <typeparam name="TInput">Type of one input value</typeparam>
/// <typeparam name="TValue">Type of records aggregate</typeparam>
public interface IStorePartitionAccessor<TKey, TInput, TValue>
: IDisposable
{
/// <summary>
/// Save one record
/// </summary>
void Store(TKey key, TInput value);
/// <summary>
/// Complete the recording and perform the conversion of the records from
/// (TKey; TInput) to (TKey; TValue)
/// </summary>
void CompleteStoreAndRebuild();
/// <summary>
/// Find in catalog partition by key
/// </summary>
StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key);
/// <summary>
/// Find in catalog partition by keys
/// </summary>
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys);
/// <summary>
/// Has any files
/// </summary>
int CountDataFiles();
/// <summary>
/// Remove all files
/// </summary>
void DropData();
}
}

@ -0,0 +1,51 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
namespace ZeroLevel.Services.PartitionStorage
{
public class Store<TKey, TInput, TValue, TMeta> :
IStore<TKey, TInput, TValue, TMeta>
{
private readonly IStoreOptions<TKey, TInput, TValue, TMeta> _options;
public Store(IStoreOptions<TKey, TInput, TValue, TMeta> options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
_options = options;
if (Directory.Exists(_options.RootFolder) == false)
{
Directory.CreateDirectory(_options.RootFolder);
}
}
public IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info)
{
return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info);
}
public async Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest)
{
var result = new StoreSearchResult<TKey, TValue, TMeta>();
var results = new ConcurrentDictionary<TMeta, IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>>>();
if (searchRequest.PartitionSearchRequests?.Any() ?? false)
{
var partitionsSearchInfo = searchRequest.PartitionSearchRequests.ToDictionary(r => r.Info, r => r.Keys);
var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism };
await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) =>
{
using (var accessor = CreateAccessor(pair.Key))
{
results[pair.Key] = accessor.Find(pair.Value).ToArray();
}
});
}
result.Results = results;
return result;
}
}
}

@ -0,0 +1,16 @@
using System;
namespace ZeroLevel.Services.PartitionStorage
{
public class StoreCatalogPartition<TMeta>
{
public string Name { get; }
public Func<TMeta, string> PathExtractor { get; }
public StoreCatalogPartition(string name, Func<TMeta, string> pathExtractor)
{
Name = name;
PathExtractor = pathExtractor;
}
}
}

@ -0,0 +1,16 @@
using System;
namespace ZeroLevel.Services.PartitionStorage
{
public class StoreFilePartition<TKey, TMeta>
{
public string Name { get; }
public Func<TKey, TMeta, string> PathExtractor { get; }
public StoreFilePartition(string name, Func<TKey, TMeta, string> pathExtractor)
{
Name = name;
PathExtractor = pathExtractor;
}
}
}

@ -0,0 +1,147 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
public class StorePartitionAccessor<TKey, TInput, TValue, TMeta>
: IStorePartitionAccessor<TKey, TInput, TValue>
{
private readonly IStoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly string _catalog;
private readonly TMeta _info;
public string Catalog { get { return _catalog; } }
public StorePartitionAccessor(IStoreOptions<TKey, TInput, TValue, TMeta> options, TMeta info)
{
if (options == null) throw new ArgumentNullException(nameof(options));
_info = info;
_options = options;
_catalog = _options.GetCatalogPath(info);
if (Directory.Exists(_catalog) == false)
{
Directory.CreateDirectory(_catalog);
}
}
public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key)
{
var fileName = _options.GetFileName(key, _info);
using (var reader = GetReadStream(fileName))
{
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var c = _options.KeyComparer(key, k);
if (c == 0) return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = key, Value = v, Found = true };
if (c == -1) break;
}
}
return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = key, Found = false, Value = default };
}
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys)
{
foreach (var key in keys)
{
yield return Find(key);
}
}
public void CompleteStoreAndRebuild()
{
// Close all write streams
foreach (var s in _writeStreams)
{
try
{
s.Value.Dispose();
}
catch { }
}
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 1)
{
Parallel.ForEach(files, file => CompressFile(file));
}
}
private void CompressFile(string file)
{
var dict = new Dictionary<TKey, HashSet<TInput>>();
using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024)))
{
while (reader.EOS == false)
{
TKey k = reader.ReadCompatible<TKey>();
TInput v = reader.ReadCompatible<TInput>();
if (false == dict.ContainsKey(k))
{
dict[k] = new HashSet<TInput>();
}
dict[k].Add(v);
}
}
var tempPath = Path.GetTempPath();
var tempFile = Path.Combine(tempPath, Path.GetTempFileName());
using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
{
// sort for search acceleration
foreach (var pair in dict.OrderBy(p => p.Key))
{
var v = _options.MergeFunction(pair.Value);
writer.SerializeCompatible(pair.Key);
writer.SerializeCompatible(v);
}
}
File.Delete(file);
File.Move(tempFile, file, true);
}
public void Store(TKey key, TInput value)
{
var fileName = _options.GetFileName(key, _info);
var stream = GetWriteStream(fileName);
stream.SerializeCompatible(key);
stream.SerializeCompatible(value);
}
private readonly ConcurrentDictionary<string, MemoryStreamWriter> _writeStreams = new ConcurrentDictionary<string, MemoryStreamWriter>();
private MemoryStreamWriter GetWriteStream(string fileName)
{
return _writeStreams.GetOrAdd(fileName, k =>
{
var filePath = Path.Combine(_catalog, k);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
return new MemoryStreamWriter(stream);
});
}
private MemoryStreamReader GetReadStream(string fileName)
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
return new MemoryStreamReader(stream);
}
public void Dispose()
{
}
public int CountDataFiles()
{
var files = Directory.GetFiles(_catalog);
return files?.Length ?? 0;
}
public void DropData()
{
FSUtils.CleanAndTestFolder(_catalog);
}
}
}

@ -0,0 +1,9 @@
namespace ZeroLevel.Services.PartitionStorage
{
public class StorePartitionKeyValueSearchResult<TKey, TValue>
{
public bool Found { get; set; }
public TKey Key { get; set; }
public TValue Value { get; set; }
}
}

@ -0,0 +1,14 @@
using System.Collections.Generic;
namespace ZeroLevel.Services.PartitionStorage
{
public class PartitionSearchRequest<TKey, TMeta>
{
public TMeta Info { get; set; }
public IEnumerable<TKey> Keys { get; set; }
}
public class StoreSearchRequest<TKey, TMeta>
{
public IEnumerable<PartitionSearchRequest<TKey, TMeta>> PartitionSearchRequests { get; set; }
}
}

@ -0,0 +1,9 @@
using System.Collections.Generic;
namespace ZeroLevel.Services.PartitionStorage
{
public class StoreSearchResult<TKey, TValue, TMeta>
{
public IDictionary<TMeta, IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>>> Results { get; set; }
}
}

@ -1,9 +1,4 @@
using System; namespace ZeroLevel.Services.Pools
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
namespace ZeroLevel.Services.Pools
{ {
/* /*
public enum LoadingMode { Eager, Lazy, LazyExpanding }; public enum LoadingMode { Eager, Lazy, LazyExpanding };

@ -51,6 +51,19 @@ namespace ZeroLevel.Services.Serialization
} }
} }
public static void SerializeCompatible(this MemoryStreamWriter writer, object obj)
{
var direct_seriazlizable = (obj as IBinarySerializable);
if (direct_seriazlizable != null)
{
direct_seriazlizable.Serialize(writer);
}
else
{
PrimitiveTypeSerializer.Serialize(writer, obj);
}
}
public static byte[] SerializeCompatible<T>(T obj) public static byte[] SerializeCompatible<T>(T obj)
{ {
if (null == obj) if (null == obj)

@ -1,11 +0,0 @@
using System.Collections.Generic;
using System.IO;
namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
{
public interface IPartitionDataConverter<TRecord>
{
IEnumerable<TRecord> ReadFromStorage(Stream stream);
void WriteToStorage(Stream stream, IEnumerable<TRecord> data);
}
}

@ -1,13 +0,0 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
{
public interface IPartitionFileStorage<TKey, TRecord>
{
Task WriteAsync(TKey key, IEnumerable<TRecord> records);
Task<IEnumerable<TRecord>> CollectAsync(IEnumerable<TKey> keys, Func<TRecord, bool> filter = null);
void Drop(TKey key);
}
}

@ -1,18 +0,0 @@
using System;
namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
{
/// <summary>
/// Make part of full file path
/// </summary>
public class Partition<TKey>
{
public Partition(string name, Func<TKey, string> pathExtractor)
{
Name = name;
PathExtractor = pathExtractor;
}
public Func<TKey, string> PathExtractor { get; }
public string Name { get; }
}
}

@ -1,157 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
{
public class PartitionFileSystemStorage<TKey, TRecord>
: IPartitionFileStorage<TKey, TRecord>
{
private readonly PartitionFileSystemStorageOptions<TKey, TRecord> _options;
public PartitionFileSystemStorage(PartitionFileSystemStorageOptions<TKey, TRecord> options)
{
if (options.RootFolder == null)
throw new ArgumentNullException(nameof(options.RootFolder));
if (options.DataConverter == null)
throw new ArgumentNullException(nameof(options.DataConverter));
if (!Directory.Exists(options.RootFolder))
{
Directory.CreateDirectory(options.RootFolder);
}
_options = options;
if (options.MergeFiles)
{
Sheduller.RemindEvery(TimeSpan.FromMinutes(_options.MergeFrequencyInMinutes), MergeDataFiles);
}
}
public void Drop(TKey key)
{
var path = GetDataPath(key);
FSUtils.RemoveFolder(path, 3, 500);
}
public async Task<IEnumerable<TRecord>> CollectAsync(IEnumerable<TKey> keys, Func<TRecord, bool> filter = null)
{
if (filter == null) filter = (_) => true;
var pathes = keys.Safe().Select(k => GetDataPath(k));
var files = pathes.Safe().SelectMany(p => Directory.GetFiles(p)).Where(n => n.StartsWith("__") == false);
var set = new ConcurrentBag<TRecord>();
if (files.Any())
{
var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism };
await Parallel.ForEachAsync(files, options, async (file, _) =>
{
using (var stream = CreateReadStream(file))
{
foreach (var item in _options.DataConverter.ReadFromStorage(stream))
{
if (filter(item))
{
set.Add(item);
}
}
}
});
}
return set;
}
public async Task WriteAsync(TKey key, IEnumerable<TRecord> records)
{
using (var stream = CreateWriteStream(key))
{
_options.DataConverter.WriteToStorage(stream, records);
await stream.FlushAsync();
}
}
#region Private members
private ConcurrentDictionary<string, int> _processingPath = new ConcurrentDictionary<string, int>();
private void MergeDataFiles()
{
var folders = new Stack<string>();
folders.Push(_options.RootFolder);
while (folders.Count > 0)
{
var dir = folders.Pop();
MergeFolder(dir);
foreach (var subdir in Directory.GetDirectories(dir, "*.*", SearchOption.TopDirectoryOnly))
{
folders.Push(subdir);
}
}
}
private void MergeFolder(string path)
{
var v = _processingPath.GetOrAdd(path, 0);
if (v != 0) // каталог обрабатывается в настоящий момент
{
return;
}
var files = Directory.GetFiles(path);
if (files != null && files.Length > 1)
{
// TODO
}
}
private string GetDataFilePath(string path)
{
return Path.Combine(path, Guid.NewGuid().ToString());
}
private string GetDataPath(TKey key)
{
var path = _options.RootFolder;
foreach (var partition in _options.Partitions)
{
var pathPart = partition.PathExtractor(key);
pathPart = FSUtils.FileNameCorrection(pathPart);
if (string.IsNullOrWhiteSpace(pathPart))
{
throw new Exception($"Partition '{partition.Name}' not return name of part of path");
}
path = Path.Combine(path, pathPart);
}
return path;
}
private Stream CreateWriteStream(TKey key)
{
var path = GetDataPath(key);
if (!Directory.Exists(path))
{
Directory.CreateDirectory(path);
}
var fullPath = GetDataFilePath(path);
var stream = File.OpenWrite(fullPath);
if (_options.UseCompression)
{
return new GZipStream(stream, CompressionMode.Compress, false);
}
return stream;
}
private Stream CreateReadStream(string path)
{
var stream = File.OpenRead(path);
if (_options.UseCompression)
{
var ms = new MemoryStream();
using (var compressed = new GZipStream(stream, CompressionMode.Decompress, false))
{
compressed.CopyTo(ms);
}
ms.Position = 0;
return ms;
}
return stream;
}
#endregion
}
}

@ -1,16 +0,0 @@
using System;
using System.Collections.Generic;
namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
{
public class PartitionFileSystemStorageOptions<TKey, TRecord>
{
public string RootFolder { get; set; }
public int MaxDegreeOfParallelism { get; set; } = Environment.ProcessorCount / 2;
public bool MergeFiles { get; set; } = false;
public int MergeFrequencyInMinutes { get; set; } = 180;
public bool UseCompression { get; set; } = false;
public IPartitionDataConverter<TRecord> DataConverter { get; set; }
public List<Partition<TKey>> Partitions { get; set; } = new List<Partition<TKey>>();
}
}

@ -6,16 +6,16 @@
</Description> </Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>3.3.6.8</AssemblyVersion> <AssemblyVersion>3.3.7.1</AssemblyVersion>
<PackageReleaseNotes>BigFileReader update</PackageReleaseNotes> <PackageReleaseNotes>PartitionStorage new methods</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2022</Copyright> <Copyright>Copyright Ogoun 2022</Copyright>
<PackageLicenseUrl></PackageLicenseUrl> <PackageLicenseUrl></PackageLicenseUrl>
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<Version>3.3.6.8</Version> <Version>3.3.7.1</Version>
<FileVersion>3.3.6.8</FileVersion> <FileVersion>3.3.7.1</FileVersion>
<Platforms>AnyCPU;x64;x86</Platforms> <Platforms>AnyCPU;x64;x86</Platforms>
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
<DebugType>full</DebugType> <DebugType>full</DebugType>
@ -67,7 +67,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<None Include="..\zero.png"> <None Update="zero.png">
<Pack>True</Pack> <Pack>True</Pack>
<PackagePath></PackagePath> <PackagePath></PackagePath>
</None> </None>

Loading…
Cancel
Save

Powered by TurnKey Linux.