From 75f620d57d87e1b81479870cb1ac151302a11fe0 Mon Sep 17 00:00:00 2001 From: Ogoun Date: Sat, 12 Nov 2022 22:32:32 +0300 Subject: [PATCH] PartitionStorage --- PartitionFileStorageTest/Compressor.cs | 74 +++ PartitionFileStorageTest/MsisdnHelper.cs | 101 ++++ .../PartitionFileStorageTest.csproj | 4 + PartitionFileStorageTest/Program.cs | 284 +++++---- TestApp/Program.cs | 10 +- ZeroLevel.HNSW/Services/LinksSet.cs | 3 +- ZeroLevel.sln | 17 - ZeroLevel/Services/Collections/Condensator.cs | 62 ++ .../Services/FileSystem/BigFileParser.cs | 9 +- ZeroLevel/Services/HashFunctions/XXHash32.cs | 547 ++++++++++++++++++ ZeroLevel/Services/PartitionStorage/IStore.cs | 18 + .../PartitionStorage/IStoreOptions.cs | 63 ++ .../IStorePartitionAccessor.cs | 41 ++ ZeroLevel/Services/PartitionStorage/Store.cs | 51 ++ .../PartitionStorage/StoreCatalogPartition.cs | 16 + .../PartitionStorage/StoreFilePartition.cs | 16 + .../StorePartitionAccessor.cs | 147 +++++ .../StorePartitionKeyValueSearchResult.cs | 9 + .../PartitionStorage/StoreSearchRequest.cs | 14 + .../PartitionStorage/StoreSearchResult.cs | 9 + ZeroLevel/Services/Pools/ObjectPool.cs | 7 +- .../Serialization/MessageSerializer.cs | 13 + .../IPartitionDataConverter.cs | 11 - .../IPartitionFileStorage.cs | 13 - .../PartitionFileSystemStorage/Partition.cs | 18 - .../PartitionFileSystemStorage.cs | 157 ----- .../PartitionFileSystemStorageOptions.cs | 16 - ZeroLevel/ZeroLevel.csproj | 10 +- 28 files changed, 1390 insertions(+), 350 deletions(-) create mode 100644 PartitionFileStorageTest/Compressor.cs create mode 100644 PartitionFileStorageTest/MsisdnHelper.cs create mode 100644 ZeroLevel/Services/Collections/Condensator.cs create mode 100644 ZeroLevel/Services/HashFunctions/XXHash32.cs create mode 100644 ZeroLevel/Services/PartitionStorage/IStore.cs create mode 100644 ZeroLevel/Services/PartitionStorage/IStoreOptions.cs create mode 100644 ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs create mode 100644 ZeroLevel/Services/PartitionStorage/Store.cs create mode 100644 ZeroLevel/Services/PartitionStorage/StoreCatalogPartition.cs create mode 100644 ZeroLevel/Services/PartitionStorage/StoreFilePartition.cs create mode 100644 ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs create mode 100644 ZeroLevel/Services/PartitionStorage/StorePartitionKeyValueSearchResult.cs create mode 100644 ZeroLevel/Services/PartitionStorage/StoreSearchRequest.cs create mode 100644 ZeroLevel/Services/PartitionStorage/StoreSearchResult.cs delete mode 100644 ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionDataConverter.cs delete mode 100644 ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs delete mode 100644 ZeroLevel/Services/Storages/PartitionFileSystemStorage/Partition.cs delete mode 100644 ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs delete mode 100644 ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorageOptions.cs diff --git a/PartitionFileStorageTest/Compressor.cs b/PartitionFileStorageTest/Compressor.cs new file mode 100644 index 0000000..cbdf771 --- /dev/null +++ b/PartitionFileStorageTest/Compressor.cs @@ -0,0 +1,74 @@ +namespace PartitionFileStorageTest +{ + public static class Compressor + { + /// + /// Упаковка набора чисел в массив байтов + /// + public static byte[] GetEncodedBytes(IEnumerable list, ref ulong last) + { + byte[] segmentsBytes; + using (var memoryStream = new MemoryStream()) + { + foreach (var current in list) + { + var value = current - last; + memoryStream.Write7BitEncodedULong(value); + last = current; + } + segmentsBytes = memoryStream.ToArray(); + } + return segmentsBytes; + } + + public static IEnumerable DecodeBytesContent(byte[] bytes) + { + ulong last = 0; + using (var memoryStream = new MemoryStream(bytes)) + { + while (memoryStream.Position != memoryStream.Length) + { + var value = memoryStream.Read7BitEncodedULong(); + var current = last + value; + + yield return current; + + last = current; + } + } + } + + + public static void Write7BitEncodedULong(this MemoryStream writer, ulong value) + { + var first = true; + while (first || value > 0) + { + first = false; + var lower7bits = (byte)(value & 0x7f); + value >>= 7; + if (value > 0) + lower7bits |= 128; + writer.WriteByte(lower7bits); + } + } + + public static ulong Read7BitEncodedULong(this MemoryStream reader) + { + if (reader == null) + throw new ArgumentNullException(nameof(reader)); + + var more = true; + ulong value = 0; + var shift = 0; + while (more) + { + ulong lower7bits = (byte)reader.ReadByte(); + more = (lower7bits & 128) != 0; + value |= (lower7bits & 0x7f) << shift; + shift += 7; + } + return value; + } + } +} diff --git a/PartitionFileStorageTest/MsisdnHelper.cs b/PartitionFileStorageTest/MsisdnHelper.cs new file mode 100644 index 0000000..b705f70 --- /dev/null +++ b/PartitionFileStorageTest/MsisdnHelper.cs @@ -0,0 +1,101 @@ +namespace PartitionFileStorageTest +{ + public struct MsisdnParts + { + public int FirstDigit { get; } + public int OtherDigits { get; } + + public MsisdnParts(int firstDigit, int otherDigits) + { + FirstDigit = firstDigit; + OtherDigits = otherDigits; + } + + public override string ToString() => $"({FirstDigit},{OtherDigits})"; + } + public static class MsisdnHelper + { + public static MsisdnParts SplitParts(this ulong msisdn) + { + //расчитываем только на номера российской нумерации ("7" и 10 цифр) + //это числа от 70_000_000_000 до 79_999_999_999 + + if (msisdn < 70_000_000_000 || msisdn > 79_999_999_999) throw new ArgumentException(nameof(msisdn)); + + var firstDigit = (int)((msisdn / 1_000_000_000L) % 10); + var otherDigits = (int)(msisdn % 1_000_000_000L); + + return new MsisdnParts(firstDigit, otherDigits); + } + + public static ulong CombineParts(int firstDigit, int otherDigits) + { + return (ulong)(70_000_000_000L + firstDigit * 1_000_000_000L + otherDigits); + } + + public static IEnumerable ParseMsisdns(this IEnumerable lines) + { + foreach (var line in lines) + { + ulong msisdn; + if (line.TryParseMsisdn(out msisdn)) + { + yield return msisdn; + } + } + } + + /// + /// возвращаются только номера российской нумерации ("7" и 10 цифр) в виде long + /// + /// + /// + /// + public static bool TryParseMsisdn(this string source, out ulong msisdn) + { + var line = source.Trim(); + var length = line.Length; + + msisdn = 0; + + //допустимы форматы номеров "+71234567890", "71234567890", "1234567890" + if (length < 10 || length > 12) return false; + + var start = 0; + if (length == 12) //"+71234567890" + { + if (line[0] != '+' || line[1] != '7') return false; + start = 2; + } + if (length == 11) //"71234567890" и "81234567890" + { + if (line[0] != '7') return false; + start = 1; + } + /* + else if (length == 10) //"1234567890" + { + start = 0; + } + */ + + ulong number = 7; + + for (var i = start; i < length; i++) + { + var c = line[i]; + if ('0' <= c && c <= '9') + { + number = number * 10 + (ulong)(c - '0'); + } + else + { + return false; + } + } + + msisdn = number; + return true; + } + } +} diff --git a/PartitionFileStorageTest/PartitionFileStorageTest.csproj b/PartitionFileStorageTest/PartitionFileStorageTest.csproj index 68e186f..575110a 100644 --- a/PartitionFileStorageTest/PartitionFileStorageTest.csproj +++ b/PartitionFileStorageTest/PartitionFileStorageTest.csproj @@ -7,6 +7,10 @@ enable + + + + diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index 02e5050..e8ea52e 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -1,141 +1,231 @@ -using ZeroLevel; +using System.Diagnostics; +using System.Text; +using ZeroLevel.Services.PartitionStorage; using ZeroLevel.Services.Serialization; -using ZeroLevel.Services.Storages; -using ZeroLevel.Services.Storages.PartitionFileSystemStorage; namespace PartitionFileStorageTest { - internal class Program + public class CallRecordParser { - public class PartitionKey - { - public DateTime Date { get; set; } - public ulong Ctn { get; set; } - } + private static HashSet _partsOfNumbers = new HashSet { '*', '#', '+', '(', ')', '-' }; + private StringBuilder sb = new StringBuilder(); + private const string NO_VAL = null; - public class Record - : IBinarySerializable + private string ReadNumber(string line) { - public string[] Hosts { get; set; } - - public void Deserialize(IBinaryReader reader) + sb.Clear(); + var started = false; + foreach (var ch in line) { - this.Hosts = reader.ReadStringArray(); + if (char.IsDigit(ch)) + { + if (started) + { + sb.Append(ch); + } + else if (ch != '0') + { + sb.Append(ch); + started = true; + } + } + else if (char.IsWhiteSpace(ch) || _partsOfNumbers.Contains(ch)) continue; + else return NO_VAL; } - - public void Serialize(IBinaryWriter writer) + if (sb.Length == 11 && sb[0] == '8') sb[0] = '7'; + if (sb.Length == 3 || sb.Length == 4 || sb.Length > 10) + return sb.ToString(); + return NO_VAL; + } + private HashSet ReadNumbers(string line) + { + var result = new HashSet(); + if (string.IsNullOrWhiteSpace(line) == false) { - writer.WriteArray(this.Hosts); + char STX = (char)0x0002; + var values = line.Split(STX); + if (values.Length > 0) + { + foreach (var val in values) + { + var number = ReadNumber(val); + if (number != null) + { + result.Add(number); + } + } + } } + return result; } - - static Record GenerateRecord() + /// + /// Парсинг строки исходного файла + /// + /// + /// + public CallRecord Parse(string line) { - var record = new Record(); - var rnd = new Random((int)Environment.TickCount); - var count = rnd.Next(400); - record.Hosts = new string[count]; - for (int i = 0; i < count; i++) + var parts = line.Split('\t'); + + if (parts.Length != 2) return null; + + var msisdn = ReadNumber(parts[0].Trim()); + + if (string.IsNullOrWhiteSpace(msisdn) == false) { - record.Hosts[i] = Guid.NewGuid().ToString(); + var numbers = ReadNumbers(parts[1]); + if (numbers != null && numbers.Count > 0) + { + return new CallRecord + { + Msisdn = msisdn, + Msisdns = numbers + }; + } } - return record; + return null; } + } - static PartitionKey GenerateKey() + public class CallRecord + { + public string Msisdn; + public HashSet Msisdns; + } + internal class Program + { + private class Metadata { - var key = new PartitionKey(); - var rnd = new Random((int)Environment.TickCount); - key.Ctn = (ulong)rnd.Next(1000); - key.Date = DateTime.Now.AddDays(-rnd.Next(30)).Date; - return key; + public DateTime Date { get; set; } + public bool Incoming { get; set; } } - class DataConverter - : IPartitionDataConverter + private static void BuildStore(string source, string root) { - public IEnumerable ReadFromStorage(Stream stream) + var options = new IStoreOptions { - var reader = new MemoryStreamReader(stream); - while (reader.EOS == false) + RootFolder = root, + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 1000).ToString()), + MergeFunction = list => { - yield return reader.Read(); - } - } - - public void WriteToStorage(Stream stream, IEnumerable data) + ulong s = 0; + return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); + }, + Partitions = new List> + { + new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")), + new StoreCatalogPartition("Date", m => m.Incoming ? "incoming" : "outcoming") + }, + KeyComparer = (left, right) => left == right ? 0 : (left < right) ? 1 : -1 + }; + var store = new Store(options); + + + var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }); + var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }); + var parser = new CallRecordParser(); + var sw = new Stopwatch(); + sw.Start(); + using (FileStream fs = File.Open(source, FileMode.Open, FileAccess.Read, FileShare.None)) { - var writer = new MemoryStreamWriter(stream); - foreach (var record in data) + using (BufferedStream bs = new BufferedStream(fs, 1024 * 1024 * 64)) { - writer.Write(record); + using (StreamReader sr = new StreamReader(bs)) + { + string line; + while ((line = sr.ReadLine()) != null) + { + var record = parser.Parse(line); + if (record == null) continue; + if (!string.IsNullOrWhiteSpace(record?.Msisdn ?? string.Empty) && ulong.TryParse(record.Msisdn, out var n)) + { + var ctns = record.Msisdns.ParseMsisdns().ToArray(); + foreach (var ctn in ctns) + { + storeIncoming.Store(n, ctn); + storeOutcoming.Store(ctn, n); + } + } + } + } } } + sw.Stop(); + Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms"); + sw.Restart(); + storeIncoming.CompleteStoreAndRebuild(); + storeOutcoming.CompleteStoreAndRebuild(); + sw.Stop(); + Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms"); } - private static int COUNT_NUMBERS = ulong.MaxValue.ToString().Length; - static void Main(string[] args) + private static void TestReading(string source, string root) { - var testDict = new Dictionary>>(); - var options = new PartitionFileSystemStorageOptions - { - MaxDegreeOfParallelism = 1, - DataConverter = new DataConverter(), - UseCompression = true, - MergeFiles = false, - RootFolder = Path.Combine(Configuration.BaseDirectory, "root") - }; - options.Partitions.Add(new Partition("data", p => p.Date.ToString("yyyyMMdd"))); - options.Partitions.Add(new Partition("ctn", p => p.Ctn.ToString().PadLeft(COUNT_NUMBERS, '0'))); - var storage = new PartitionFileSystemStorage(options); - - for (int i = 0; i < 50000; i++) + var options = new IStoreOptions { - if (i % 100 == 0) - Console.WriteLine(i); - var key = GenerateKey(); - var record = GenerateRecord(); - if (testDict.ContainsKey(key.Ctn) == false) + RootFolder = root, + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 1000).ToString()), + MergeFunction = list => { - testDict[key.Ctn] = new Dictionary>(); - } - if (testDict[key.Ctn].ContainsKey(key.Date) == false) + ulong s = 0; + return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); + }, + Partitions = new List> { - testDict[key.Ctn][key.Date] = new List(); - } - testDict[key.Ctn][key.Date].Add(record); - storage.WriteAsync(key, new[] { record }).Wait(); - } - foreach (var cpair in testDict) + new StoreCatalogPartition("timestamp", m => m.Date.ToString("yyyyMMdd")), + new StoreCatalogPartition("timestamp", m => m.Incoming ? "incoming" : "outcoming") + }, + KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1 + }; + var store = new Store(options); + var request = new StoreSearchRequest { - foreach (var dpair in cpair.Value) + PartitionSearchRequests = new List> { - var key = new PartitionKey { Ctn = cpair.Key, Date = dpair.Key }; - var data = storage.CollectAsync(new[] { key }).Result.ToArray(); - var testData = dpair.Value; - - if (data.Length != testData.Count) + new PartitionSearchRequest + { + Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }, + Keys = new ulong[] { 79645090604 } + }, + new PartitionSearchRequest { - Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn} Wrong count. Expected: {testData.Count}. Got: {data.Length}"); + Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }, + Keys = new ulong[] { 79645090604 } } - else + } + }; + + var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }); + Console.WriteLine($"Incoming data files: {storeIncoming.CountDataFiles()}"); + var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }); + Console.WriteLine($"Outcoming data files: {storeOutcoming.CountDataFiles()}"); + var sw = new Stopwatch(); + sw.Start(); + var result = store.Search(request).Result; + foreach (var r in result.Results) + { + Console.WriteLine($"Incoming: {r.Key.Incoming}"); + foreach (var mr in r.Value) + { + Console.WriteLine($"\tKey: {mr.Key}. Sucess: {mr.Found}"); + if (mr.Found && mr.Value.Length > 0) { - var datahosts = data.SelectMany(r => r.Hosts).OrderBy(s => s).ToArray(); - var testhosts = testData.SelectMany(r => r.Hosts).OrderBy(s => s).ToArray(); - if (datahosts.Length != testhosts.Length) - { - Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn}. Records not equals. Different hosts count"); - } - for (int i = 0; i < datahosts.Length; i++) - { - if (string.Compare(datahosts[i], testhosts[i], StringComparison.Ordinal) != 0) - { - Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn}. Records not equals. Different hosts"); - } - } + var ctns = Compressor.DecodeBytesContent(mr.Value); + Console.WriteLine($"\t\t{string.Join(';', ctns)}"); } } } + sw.Stop(); + Console.WriteLine($"Search time: {sw.ElapsedMilliseconds}ms"); + } + + static void Main(string[] args) + { + var root = @"H:\temp"; + var source = @"H:\319a9c31-d823-4dd1-89b0-7fb1bb9c4859.txt"; + BuildStore(source, root); + TestReading(source, root); + Console.ReadKey(); } } } \ No newline at end of file diff --git a/TestApp/Program.cs b/TestApp/Program.cs index b4b4a9b..4042c1f 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -1,11 +1,15 @@ -namespace TestApp +using System; +using System.Collections.Concurrent; + +namespace TestApp { internal static class Program { private static void Main(string[] args) { - var detector = new PersonDetector(); - var predictions = detector.Detect(@"E:\Desktop\test\1.JPG"); + var test = new ConcurrentDictionary(); + var v = test.GetOrAdd("sss", 1); + Console.ReadKey(); } } } \ No newline at end of file diff --git a/ZeroLevel.HNSW/Services/LinksSet.cs b/ZeroLevel.HNSW/Services/LinksSet.cs index d431975..2ded5f4 100644 --- a/ZeroLevel.HNSW/Services/LinksSet.cs +++ b/ZeroLevel.HNSW/Services/LinksSet.cs @@ -1,5 +1,4 @@ -using System; -using System.Collections.Concurrent; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using ZeroLevel.Services.Serialization; diff --git a/ZeroLevel.sln b/ZeroLevel.sln index e4dd8c7..58bceed 100644 --- a/ZeroLevel.sln +++ b/ZeroLevel.sln @@ -69,10 +69,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PartitionTest", "PartitionT EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{442569A3-E126-4A11-B9DD-2DFA5BF76B0F}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BigFileParserTest", "Tests\BigFileParserTest\BigFileParserTest.csproj", "{E7526771-86D5-4311-A284-05D3FEFC7B75}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -335,18 +331,6 @@ Global {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.Build.0 = Release|Any CPU {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.ActiveCfg = Release|Any CPU {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.Build.0 = Release|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|Any CPU.Build.0 = Debug|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x64.ActiveCfg = Debug|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x64.Build.0 = Debug|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x86.ActiveCfg = Debug|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x86.Build.0 = Debug|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|Any CPU.ActiveCfg = Release|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|Any CPU.Build.0 = Release|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x64.ActiveCfg = Release|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x64.Build.0 = Release|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x86.ActiveCfg = Release|Any CPU - {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -362,7 +346,6 @@ Global {2C33D5A3-6CD4-4AAA-A716-B3CD65036E25} = {D5207A5A-2F27-4992-9BA5-0BDCFC59F133} {9DE345EA-955B-41A8-93AF-277C0B5A9AC5} = {2EF83101-63BC-4397-A005-A747189143D4} {EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9} = {BAD88A91-1AFA-48A8-8D39-4846A65B4167} - {E7526771-86D5-4311-A284-05D3FEFC7B75} = {442569A3-E126-4A11-B9DD-2DFA5BF76B0F} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB} diff --git a/ZeroLevel/Services/Collections/Condensator.cs b/ZeroLevel/Services/Collections/Condensator.cs new file mode 100644 index 0000000..3a3b285 --- /dev/null +++ b/ZeroLevel/Services/Collections/Condensator.cs @@ -0,0 +1,62 @@ +using System; + +namespace ZeroLevel.Services.Collections +{ + /// + /// Collects data while there is capacity and invokes an action after that (batch processing) + /// + /// + public sealed class Capacitor + : IDisposable + { + private int _index = -1; + private int _count = 0; + private readonly T[] _buffer; + private readonly Action _dischargeAction; + + public int Count => _count; + public Capacitor(int volume, Action dischargeAction) + { + if (volume < 1) volume = 16; + if (dischargeAction == null) throw new ArgumentNullException(nameof(dischargeAction)); + _buffer = new T[volume]; + _dischargeAction = dischargeAction; + } + public void Add(T val) + { + _index++; + if (_index >= _buffer.Length) + { + _dischargeAction.Invoke(_buffer, _buffer.Length); + _index = 0; + _count = 0; + } + _buffer[_index] = val; + _count++; + } + + public void Discharge() + { + if (_count > 0) + { + _dischargeAction.Invoke(_buffer, _count); + } + } + + + public void Dispose() + { + if (_count > 0) + { + try + { + Discharge(); + } + catch (Exception ex) + { + Log.Error(ex, $"[Capacitor.Dispose] Fault discharge in dispose method"); + } + } + } + } +} diff --git a/ZeroLevel/Services/FileSystem/BigFileParser.cs b/ZeroLevel/Services/FileSystem/BigFileParser.cs index 67a50d0..3512a81 100644 --- a/ZeroLevel/Services/FileSystem/BigFileParser.cs +++ b/ZeroLevel/Services/FileSystem/BigFileParser.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.IO; -using System.Threading; namespace ZeroLevel.Services.FileSystem { @@ -33,7 +32,7 @@ namespace ZeroLevel.Services.FileSystem public IEnumerable ReadBatches(int batchSize, bool skipNull = false) { - var buffer = new T[batchSize]; + T[] buffer; var buffer_index = 0; using (FileStream fs = File.Open(_filePath, FileMode.Open, FileAccess.Read, FileShare.None)) { @@ -42,6 +41,7 @@ namespace ZeroLevel.Services.FileSystem using (StreamReader sr = new StreamReader(bs)) { string line; + buffer = new T[batchSize]; while ((line = sr.ReadLine()) != null) { var value = _parser.Invoke(line); @@ -49,8 +49,7 @@ namespace ZeroLevel.Services.FileSystem buffer[buffer_index] = value; buffer_index++; if (buffer_index >= batchSize) - { - Thread.MemoryBarrier(); + { yield return buffer; buffer_index = 0; } @@ -69,7 +68,7 @@ namespace ZeroLevel.Services.FileSystem } } - public IEnumerable Read(int batchSize) + public IEnumerable Read() { using (FileStream fs = File.Open(_filePath, FileMode.Open, FileAccess.Read, FileShare.None)) { diff --git a/ZeroLevel/Services/HashFunctions/XXHash32.cs b/ZeroLevel/Services/HashFunctions/XXHash32.cs new file mode 100644 index 0000000..ddb3771 --- /dev/null +++ b/ZeroLevel/Services/HashFunctions/XXHash32.cs @@ -0,0 +1,547 @@ +using System; +using System.Security.Cryptography; + +namespace ZeroLevel.Services.HashFunctions +{ + //see details: https://github.com/Cyan4973/xxHash/blob/dev/doc/xxhash_spec.md + /// + /// Represents the class which provides a implementation of the xxHash32 algorithm. + /// + /// + public sealed class XXHash32 : HashAlgorithm + { + private const uint PRIME32_1 = 2654435761U; + private const uint PRIME32_2 = 2246822519U; + private const uint PRIME32_3 = 3266489917U; + private const uint PRIME32_4 = 668265263U; + private const uint PRIME32_5 = 374761393U; + + private static readonly Func FuncGetLittleEndianUInt32; + private static readonly Func FuncGetFinalHashUInt32; + + private uint _Seed32; + + private uint _ACC32_1; + private uint _ACC32_2; + private uint _ACC32_3; + private uint _ACC32_4; + + private uint _Hash32; + + + private int _RemainingLength; + private long _TotalLength = 0; + private int _CurrentIndex; + private byte[] _CurrentArray; + + static XXHash32() + { + if (BitConverter.IsLittleEndian) + { + + FuncGetLittleEndianUInt32 = new Func((x, i) => + { + unsafe + { + fixed (byte* array = x) + { + return *(uint*)(array + i); + } + } + }); + FuncGetFinalHashUInt32 = new Func(i => (i & 0x000000FFU) << 24 | (i & 0x0000FF00U) << 8 | (i & 0x00FF0000U) >> 8 | (i & 0xFF000000U) >> 24); + } + else + { + FuncGetLittleEndianUInt32 = new Func((x, i) => + { + unsafe + { + fixed (byte* array = x) + { + return (uint)(array[i++] | (array[i++] << 8) | (array[i++] << 16) | (array[i] << 24)); + } + } + }); + FuncGetFinalHashUInt32 = new Func(i => i); + } + } + + /// + /// Creates an instance of class by default seed(0). + /// + /// + public new static XXHash32 Create() => new XXHash32(); + + /// + /// Creates an instance of the specified implementation of XXHash32 algorithm. + /// This method always throws . + /// + /// The hash algorithm implementation to use. + /// This method always throws . + /// This method is not be supported. + public new static XXHash32 Create(string algName) => throw new NotSupportedException("This method is not be supported."); + + /// + /// Initializes a new instance of the class by default seed(0). + /// + public XXHash32() => Initialize(0); + + /// + /// Initializes a new instance of the class, and sets the to the specified value. + /// + /// Represent the seed to be used for xxHash32 computing. + public XXHash32(uint seed) => Initialize(seed); + + /// + /// Gets the value of the computed hash code. + /// + /// Hash computation has not yet completed. + public uint HashUInt32 => State == 0 ? _Hash32 : throw new InvalidOperationException("Hash computation has not yet completed."); + + /// + /// Gets or sets the value of seed used by xxHash32 algorithm. + /// + /// Hash computation has not yet completed. + public uint Seed + { + get => _Seed32; + set + { + + if (value != _Seed32) + { + if (State != 0) throw new InvalidOperationException("Hash computation has not yet completed."); + _Seed32 = value; + Initialize(); + } + } + } + + + /// + /// Initializes this instance for new hash computing. + /// + public override void Initialize() + { + _ACC32_1 = _Seed32 + PRIME32_1 + PRIME32_2; + _ACC32_2 = _Seed32 + PRIME32_2; + _ACC32_3 = _Seed32 + 0; + _ACC32_4 = _Seed32 - PRIME32_1; + } + + + + /// + /// Routes data written to the object into the hash algorithm for computing the hash. + /// + /// The input to compute the hash code for. + /// The offset into the byte array from which to begin using data. + /// The number of bytes in the byte array to use as data. + protected override void HashCore(byte[] array, int ibStart, int cbSize) + { + if (State != 1) State = 1; + var size = cbSize - ibStart; + _RemainingLength = size & 15; + if (cbSize >= 16) + { + var limit = size - _RemainingLength; + do + { + _ACC32_1 = Round32(_ACC32_1, FuncGetLittleEndianUInt32(array, ibStart)); + ibStart += 4; + _ACC32_2 = Round32(_ACC32_2, FuncGetLittleEndianUInt32(array, ibStart)); + ibStart += 4; + _ACC32_3 = Round32(_ACC32_3, FuncGetLittleEndianUInt32(array, ibStart)); + ibStart += 4; + _ACC32_4 = Round32(_ACC32_4, FuncGetLittleEndianUInt32(array, ibStart)); + ibStart += 4; + } while (ibStart < limit); + } + _TotalLength += cbSize; + + if (_RemainingLength != 0) + { + _CurrentArray = array; + _CurrentIndex = ibStart; + } + } + + /// + /// Finalizes the hash computation after the last data is processed by the cryptographic stream object. + /// + /// The computed hash code. + protected override byte[] HashFinal() + { + if (_TotalLength >= 16) + { +#if HIGHER_VERSIONS + _Hash32 = RotateLeft32_1(_ACC32_1) + RotateLeft32_7(_ACC32_2) + RotateLeft32_12(_ACC32_3) + RotateLeft32_18(_ACC32_4); +#else + _Hash32 = RotateLeft32(_ACC32_1, 1) + RotateLeft32(_ACC32_2, 7) + RotateLeft32(_ACC32_3, 12) + RotateLeft32(_ACC32_4, 18); +#endif + } + else + { + _Hash32 = _Seed32 + PRIME32_5; + } + + _Hash32 += (uint)_TotalLength; + + while (_RemainingLength >= 4) + { +#if HIGHER_VERSIONS + _Hash32 = RotateLeft32_17(_Hash32 + FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME32_3) * PRIME32_4; +#else + _Hash32 = RotateLeft32(_Hash32 + FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME32_3, 17) * PRIME32_4; +#endif + _CurrentIndex += 4; + _RemainingLength -= 4; + } + unsafe + { + fixed (byte* arrayPtr = _CurrentArray) + { + while (_RemainingLength-- >= 1) + { +#if HIGHER_VERSIONS + _Hash32 = RotateLeft32_11(_Hash32 + arrayPtr[_CurrentIndex++] * PRIME32_5) * PRIME32_1; +#else + _Hash32 = RotateLeft32(_Hash32 + arrayPtr[_CurrentIndex++] * PRIME32_5, 11) * PRIME32_1; +#endif + } + } + } + _Hash32 = (_Hash32 ^ (_Hash32 >> 15)) * PRIME32_2; + _Hash32 = (_Hash32 ^ (_Hash32 >> 13)) * PRIME32_3; + _Hash32 ^= _Hash32 >> 16; + + _TotalLength = State = 0; + + return BitConverter.GetBytes(FuncGetFinalHashUInt32(_Hash32)); + } + +#if HIGHER_VERSIONS + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static uint Round32(uint input, uint value) => RotateLeft32_13(input + (value * PRIME32_2)) * PRIME32_1; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static uint RotateLeft32_1(uint value) => (value << 1) | (value >> 31); //_ACC32_1 + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static uint RotateLeft32_7(uint value) => (value << 7) | (value >> 25); //_ACC32_2 + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static uint RotateLeft32_11(uint value) => (value << 11) | (value >> 21); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static uint RotateLeft32_12(uint value) => (value << 12) | (value >> 20);// _ACC32_3 + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static uint RotateLeft32_13(uint value) => (value << 13) | (value >> 19); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static uint RotateLeft32_17(uint value) => (value << 17) | (value >> 15); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static uint RotateLeft32_18(uint value) => (value << 18) | (value >> 14); //_ACC32_4 +#else + private static uint Round32(uint input, uint value) => RotateLeft32(input + (value * PRIME32_2), 13) * PRIME32_1; + + private static uint RotateLeft32(uint value, int count) => (value << count) | (value >> (32 - count)); +#endif + private void Initialize(uint seed) + { + HashSizeValue = 32; + _Seed32 = seed; + Initialize(); + } + + } + + /// + /// Represents the class which provides a implementation of the xxHash64 algorithm. + /// + /// + public sealed class XXHash64 : HashAlgorithm + { + private const ulong PRIME64_1 = 11400714785074694791UL; + private const ulong PRIME64_2 = 14029467366897019727UL; + private const ulong PRIME64_3 = 1609587929392839161UL; + private const ulong PRIME64_4 = 9650029242287828579UL; + private const ulong PRIME64_5 = 2870177450012600261UL; + + private static readonly Func FuncGetLittleEndianUInt32; + private static readonly Func FuncGetLittleEndianUInt64; + private static readonly Func FuncGetFinalHashUInt64; + + private ulong _Seed64; + + private ulong _ACC64_1; + private ulong _ACC64_2; + private ulong _ACC64_3; + private ulong _ACC64_4; + private ulong _Hash64; + + private int _RemainingLength; + private long _TotalLength; + private int _CurrentIndex; + private byte[] _CurrentArray; + + + + static XXHash64() + { + if (BitConverter.IsLittleEndian) + { + FuncGetLittleEndianUInt32 = new Func((x, i) => + { + unsafe + { + fixed (byte* array = x) + { + return *(uint*)(array + i); + } + } + }); + FuncGetLittleEndianUInt64 = new Func((x, i) => + { + unsafe + { + fixed (byte* array = x) + { + return *(ulong*)(array + i); + } + } + }); + FuncGetFinalHashUInt64 = new Func(i => (i & 0x00000000000000FFUL) << 56 | (i & 0x000000000000FF00UL) << 40 | (i & 0x0000000000FF0000UL) << 24 | (i & 0x00000000FF000000UL) << 8 | (i & 0x000000FF00000000UL) >> 8 | (i & 0x0000FF0000000000UL) >> 24 | (i & 0x00FF000000000000UL) >> 40 | (i & 0xFF00000000000000UL) >> 56); + } + else + { + FuncGetLittleEndianUInt32 = new Func((x, i) => + { + unsafe + { + fixed (byte* array = x) + { + return (uint)(array[i++] | (array[i++] << 8) | (array[i++] << 16) | (array[i] << 24)); + } + } + }); + FuncGetLittleEndianUInt64 = new Func((x, i) => + { + unsafe + { + fixed (byte* array = x) + { + return array[i++] | ((ulong)array[i++] << 8) | ((ulong)array[i++] << 16) | ((ulong)array[i++] << 24) | ((ulong)array[i++] << 32) | ((ulong)array[i++] << 40) | ((ulong)array[i++] << 48) | ((ulong)array[i] << 56); + } + } + }); + FuncGetFinalHashUInt64 = new Func(i => i); + } + } + + /// + /// Creates an instance of class by default seed(0). + /// + /// + public new static XXHash64 Create() => new XXHash64(); + + /// + /// Creates an instance of the specified implementation of XXHash64 algorithm. + /// This method always throws . + /// + /// The hash algorithm implementation to use. + /// This method always throws . + /// This method is not be supported. + public new static XXHash64 Create(string algName) => throw new NotSupportedException("This method is not be supported."); + + /// + /// Initializes a new instance of the class by default seed(0). + /// + public XXHash64() => Initialize(0); + + + /// + /// Initializes a new instance of the class, and sets the to the specified value. + /// + /// Represent the seed to be used for xxHash64 computing. + public XXHash64(uint seed) => Initialize(seed); + + + /// + /// Gets the value of the computed hash code. + /// + /// Computation has not yet completed. + public ulong HashUInt64 => State == 0 ? _Hash64 : throw new InvalidOperationException("Computation has not yet completed."); + + /// + /// Gets or sets the value of seed used by xxHash64 algorithm. + /// + /// Computation has not yet completed. + public ulong Seed + { + get => _Seed64; + set + { + if (value != _Seed64) + { + if (State != 0) throw new InvalidOperationException("Computation has not yet completed."); + _Seed64 = value; + Initialize(); + } + } + } + + + /// + /// Initializes this instance for new hash computing. + /// + public override void Initialize() + { + _ACC64_1 = _Seed64 + PRIME64_1 + PRIME64_2; + _ACC64_2 = _Seed64 + PRIME64_2; + _ACC64_3 = _Seed64 + 0; + _ACC64_4 = _Seed64 - PRIME64_1; + } + + /// + /// Routes data written to the object into the hash algorithm for computing the hash. + /// + /// The input to compute the hash code for. + /// The offset into the byte array from which to begin using data. + /// The number of bytes in the byte array to use as data. + protected override void HashCore(byte[] array, int ibStart, int cbSize) + { + if (State != 1) State = 1; + var size = cbSize - ibStart; + _RemainingLength = size & 31; + if (cbSize >= 32) + { + var limit = size - _RemainingLength; + do + { + _ACC64_1 = Round64(_ACC64_1, FuncGetLittleEndianUInt64(array, ibStart)); + ibStart += 8; + _ACC64_2 = Round64(_ACC64_2, FuncGetLittleEndianUInt64(array, ibStart)); + ibStart += 8; + _ACC64_3 = Round64(_ACC64_3, FuncGetLittleEndianUInt64(array, ibStart)); + ibStart += 8; + _ACC64_4 = Round64(_ACC64_4, FuncGetLittleEndianUInt64(array, ibStart)); + ibStart += 8; + } while (ibStart < limit); + } + _TotalLength += cbSize; + if (_RemainingLength != 0) + { + _CurrentArray = array; + _CurrentIndex = ibStart; + } + } + + /// + /// Finalizes the hash computation after the last data is processed by the cryptographic stream object. + /// + /// The computed hash code. + protected override byte[] HashFinal() + { + if (_TotalLength >= 32) + { +#if HIGHER_VERSIONS + _Hash64 = RotateLeft64_1(_ACC64_1) + RotateLeft64_7(_ACC64_2) + RotateLeft64_12(_ACC64_3) + RotateLeft64_18(_ACC64_4); +#else + + _Hash64 = RotateLeft64(_ACC64_1, 1) + RotateLeft64(_ACC64_2, 7) + RotateLeft64(_ACC64_3, 12) + RotateLeft64(_ACC64_4, 18); +#endif + _Hash64 = MergeRound64(_Hash64, _ACC64_1); + _Hash64 = MergeRound64(_Hash64, _ACC64_2); + _Hash64 = MergeRound64(_Hash64, _ACC64_3); + _Hash64 = MergeRound64(_Hash64, _ACC64_4); + } + else + { + _Hash64 = _Seed64 + PRIME64_5; + } + + _Hash64 += (ulong)_TotalLength; + + while (_RemainingLength >= 8) + { +#if HIGHER_VERSIONS + _Hash64 = RotateLeft64_27(_Hash64 ^ Round64(0, FuncGetLittleEndianUInt64(_CurrentArray, _CurrentIndex))) * PRIME64_1 + PRIME64_4; +#else + _Hash64 = RotateLeft64(_Hash64 ^ Round64(0, FuncGetLittleEndianUInt64(_CurrentArray, _CurrentIndex)), 27) * PRIME64_1 + PRIME64_4; +#endif + _CurrentIndex += 8; + _RemainingLength -= 8; + } + + while (_RemainingLength >= 4) + { +#if HIGHER_VERSIONS + _Hash64 = RotateLeft64_23(_Hash64 ^ (FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME64_1)) * PRIME64_2 + PRIME64_3; +#else + _Hash64 = RotateLeft64(_Hash64 ^ (FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME64_1), 23) * PRIME64_2 + PRIME64_3; +#endif + _CurrentIndex += 4; + _RemainingLength -= 4; + } + + unsafe + { + fixed (byte* arrayPtr = _CurrentArray) + { + while (_RemainingLength-- >= 1) + { +#if HIGHER_VERSIONS + _Hash64 = RotateLeft64_11(_Hash64 ^ (arrayPtr[_CurrentIndex++] * PRIME64_5)) * PRIME64_1; +#else + _Hash64 = RotateLeft64(_Hash64 ^ (arrayPtr[_CurrentIndex++] * PRIME64_5), 11) * PRIME64_1; +#endif + } + } + } + + _Hash64 = (_Hash64 ^ (_Hash64 >> 33)) * PRIME64_2; + _Hash64 = (_Hash64 ^ (_Hash64 >> 29)) * PRIME64_3; + _Hash64 ^= _Hash64 >> 32; + + _TotalLength = State = 0; + return BitConverter.GetBytes(FuncGetFinalHashUInt64(_Hash64)); + } + +#if HIGHER_VERSIONS + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ulong MergeRound64(ulong input, ulong value) => (input ^ Round64(0, value)) * PRIME64_1 + PRIME64_4; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ulong Round64(ulong input, ulong value) => RotateLeft64_31(input + (value * PRIME64_2)) * PRIME64_1; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ulong RotateLeft64_1(ulong value) => (value << 1) | (value >> 63); // _ACC64_1 + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ulong RotateLeft64_7(ulong value) => (value << 7) | (value >> 57); // _ACC64_2 + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ulong RotateLeft64_11(ulong value) => (value << 11) | (value >> 53); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ulong RotateLeft64_12(ulong value) => (value << 12) | (value >> 52);// _ACC64_3 + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ulong RotateLeft64_18(ulong value) => (value << 18) | (value >> 46); // _ACC64_4 + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ulong RotateLeft64_23(ulong value) => (value << 23) | (value >> 41); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ulong RotateLeft64_27(ulong value) => (value << 27) | (value >> 37); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static ulong RotateLeft64_31(ulong value) => (value << 31) | (value >> 33); +#else + private static ulong MergeRound64(ulong input, ulong value) => (input ^ Round64(0, value)) * PRIME64_1 + PRIME64_4; + + private static ulong Round64(ulong input, ulong value) => RotateLeft64(input + (value * PRIME64_2), 31) * PRIME64_1; + + private static ulong RotateLeft64(ulong value, int count) => (value << count) | (value >> (64 - count)); +#endif + + + private void Initialize(ulong seed) + { + HashSizeValue = 64; + _Seed64 = seed; + Initialize(); + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/IStore.cs b/ZeroLevel/Services/PartitionStorage/IStore.cs new file mode 100644 index 0000000..ca95813 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/IStore.cs @@ -0,0 +1,18 @@ +using System.Threading.Tasks; + +namespace ZeroLevel.Services.PartitionStorage +{ + /// + /// Partition store interface + /// + /// Record key + /// The value that is written in the stream + /// Value after compression of TInput values by duplicate keys (TInput list or similar) + /// Meta information for partition search + public interface IStore + { + IStorePartitionAccessor CreateAccessor(TMeta info); + + Task> Search(StoreSearchRequest searchRequest); + } +} diff --git a/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs b/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs new file mode 100644 index 0000000..2ca81fc --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Generic; +using System.IO; +using ZeroLevel.Services.FileSystem; + +namespace ZeroLevel.Services.PartitionStorage +{ + /// + /// Options + /// + /// Record key + /// The value that is written in the stream + /// Value after compression of TInput values by duplicate keys (TInput list or similar) + /// Meta information for partition search + public class IStoreOptions + { + /// + /// Method for key comparison + /// + public Func KeyComparer { get; set; } + + /// + /// Storage root directory + /// + public string RootFolder { get; set; } + /// + /// Maximum degree of parallelis + /// + public int MaxDegreeOfParallelism { get; set; } = Environment.ProcessorCount / 2; + /// + /// Function for translating a list of TInput into one TValue + /// + public Func, TValue> MergeFunction { get; set; } + /// + /// List of partitions for accessing the catalog + /// + public List> Partitions { get; set; } = new List>(); + /// + /// File Partition + /// + public StoreFilePartition FilePartition { get; set; } + + internal string GetFileName(TKey key, TMeta info) + { + return FilePartition.PathExtractor(key, info); + } + internal string GetCatalogPath(TMeta info) + { + var path = RootFolder; + foreach (var partition in Partitions) + { + var pathPart = partition.PathExtractor(info); + pathPart = FSUtils.FileNameCorrection(pathPart); + if (string.IsNullOrWhiteSpace(pathPart)) + { + throw new Exception($"Partition '{partition.Name}' not return name of part of path"); + } + path = Path.Combine(path, pathPart); + } + return path; + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs new file mode 100644 index 0000000..42315f0 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; + +namespace ZeroLevel.Services.PartitionStorage +{ + /// + /// Provides read-write operations in catalog partition + /// + /// Key type + /// Type of one input value + /// Type of records aggregate + public interface IStorePartitionAccessor + : IDisposable + { + /// + /// Save one record + /// + void Store(TKey key, TInput value); + /// + /// Complete the recording and perform the conversion of the records from + /// (TKey; TInput) to (TKey; TValue) + /// + void CompleteStoreAndRebuild(); + /// + /// Find in catalog partition by key + /// + StorePartitionKeyValueSearchResult Find(TKey key); + /// + /// Find in catalog partition by keys + /// + IEnumerable> Find(IEnumerable keys); + /// + /// Has any files + /// + int CountDataFiles(); + /// + /// Remove all files + /// + void DropData(); + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs new file mode 100644 index 0000000..906a921 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Store.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; + +namespace ZeroLevel.Services.PartitionStorage +{ + public class Store : + IStore + { + private readonly IStoreOptions _options; + public Store(IStoreOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + _options = options; + if (Directory.Exists(_options.RootFolder) == false) + { + Directory.CreateDirectory(_options.RootFolder); + } + } + + + + public IStorePartitionAccessor CreateAccessor(TMeta info) + { + return new StorePartitionAccessor(_options, info); + } + + public async Task> Search(StoreSearchRequest searchRequest) + { + var result = new StoreSearchResult(); + var results = new ConcurrentDictionary>>(); + if (searchRequest.PartitionSearchRequests?.Any() ?? false) + { + var partitionsSearchInfo = searchRequest.PartitionSearchRequests.ToDictionary(r => r.Info, r => r.Keys); + var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism }; + await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) => + { + using (var accessor = CreateAccessor(pair.Key)) + { + results[pair.Key] = accessor.Find(pair.Value).ToArray(); + } + }); + } + result.Results = results; + return result; + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/StoreCatalogPartition.cs b/ZeroLevel/Services/PartitionStorage/StoreCatalogPartition.cs new file mode 100644 index 0000000..8db142a --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/StoreCatalogPartition.cs @@ -0,0 +1,16 @@ +using System; + +namespace ZeroLevel.Services.PartitionStorage +{ + public class StoreCatalogPartition + { + public string Name { get; } + public Func PathExtractor { get; } + + public StoreCatalogPartition(string name, Func pathExtractor) + { + Name = name; + PathExtractor = pathExtractor; + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/StoreFilePartition.cs b/ZeroLevel/Services/PartitionStorage/StoreFilePartition.cs new file mode 100644 index 0000000..8cd8384 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/StoreFilePartition.cs @@ -0,0 +1,16 @@ +using System; + +namespace ZeroLevel.Services.PartitionStorage +{ + public class StoreFilePartition + { + public string Name { get; } + public Func PathExtractor { get; } + + public StoreFilePartition(string name, Func pathExtractor) + { + Name = name; + PathExtractor = pathExtractor; + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs new file mode 100644 index 0000000..abcaaae --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs @@ -0,0 +1,147 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services.PartitionStorage +{ + public class StorePartitionAccessor + : IStorePartitionAccessor + { + private readonly IStoreOptions _options; + private readonly string _catalog; + private readonly TMeta _info; + + public string Catalog { get { return _catalog; } } + public StorePartitionAccessor(IStoreOptions options, TMeta info) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + _info = info; + _options = options; + _catalog = _options.GetCatalogPath(info); + if (Directory.Exists(_catalog) == false) + { + Directory.CreateDirectory(_catalog); + } + } + + public StorePartitionKeyValueSearchResult Find(TKey key) + { + var fileName = _options.GetFileName(key, _info); + using (var reader = GetReadStream(fileName)) + { + while (reader.EOS == false) + { + var k = reader.ReadCompatible(); + var v = reader.ReadCompatible(); + var c = _options.KeyComparer(key, k); + if (c == 0) return new StorePartitionKeyValueSearchResult { Key = key, Value = v, Found = true }; + if (c == -1) break; + } + } + return new StorePartitionKeyValueSearchResult { Key = key, Found = false, Value = default }; + } + + public IEnumerable> Find(IEnumerable keys) + { + foreach (var key in keys) + { + yield return Find(key); + } + } + + public void CompleteStoreAndRebuild() + { + // Close all write streams + foreach (var s in _writeStreams) + { + try + { + s.Value.Dispose(); + } + catch { } + } + var files = Directory.GetFiles(_catalog); + if (files != null && files.Length > 1) + { + Parallel.ForEach(files, file => CompressFile(file)); + } + } + + private void CompressFile(string file) + { + var dict = new Dictionary>(); + using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) + { + while (reader.EOS == false) + { + TKey k = reader.ReadCompatible(); + TInput v = reader.ReadCompatible(); + if (false == dict.ContainsKey(k)) + { + dict[k] = new HashSet(); + } + dict[k].Add(v); + } + } + var tempPath = Path.GetTempPath(); + var tempFile = Path.Combine(tempPath, Path.GetTempFileName()); + using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) + { + // sort for search acceleration + foreach (var pair in dict.OrderBy(p => p.Key)) + { + var v = _options.MergeFunction(pair.Value); + writer.SerializeCompatible(pair.Key); + writer.SerializeCompatible(v); + } + } + File.Delete(file); + File.Move(tempFile, file, true); + } + + public void Store(TKey key, TInput value) + { + var fileName = _options.GetFileName(key, _info); + var stream = GetWriteStream(fileName); + stream.SerializeCompatible(key); + stream.SerializeCompatible(value); + } + + private readonly ConcurrentDictionary _writeStreams = new ConcurrentDictionary(); + private MemoryStreamWriter GetWriteStream(string fileName) + { + return _writeStreams.GetOrAdd(fileName, k => + { + var filePath = Path.Combine(_catalog, k); + var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); + return new MemoryStreamWriter(stream); + }); + } + private MemoryStreamReader GetReadStream(string fileName) + { + var filePath = Path.Combine(_catalog, fileName); + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + return new MemoryStreamReader(stream); + } + + public void Dispose() + { + } + + public int CountDataFiles() + { + var files = Directory.GetFiles(_catalog); + return files?.Length ?? 0; + } + + public void DropData() + { + FSUtils.CleanAndTestFolder(_catalog); + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/StorePartitionKeyValueSearchResult.cs b/ZeroLevel/Services/PartitionStorage/StorePartitionKeyValueSearchResult.cs new file mode 100644 index 0000000..a65544d --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/StorePartitionKeyValueSearchResult.cs @@ -0,0 +1,9 @@ +namespace ZeroLevel.Services.PartitionStorage +{ + public class StorePartitionKeyValueSearchResult + { + public bool Found { get; set; } + public TKey Key { get; set; } + public TValue Value { get; set; } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/StoreSearchRequest.cs b/ZeroLevel/Services/PartitionStorage/StoreSearchRequest.cs new file mode 100644 index 0000000..efa5935 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/StoreSearchRequest.cs @@ -0,0 +1,14 @@ +using System.Collections.Generic; + +namespace ZeroLevel.Services.PartitionStorage +{ + public class PartitionSearchRequest + { + public TMeta Info { get; set; } + public IEnumerable Keys { get; set; } + } + public class StoreSearchRequest + { + public IEnumerable> PartitionSearchRequests { get; set; } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/StoreSearchResult.cs b/ZeroLevel/Services/PartitionStorage/StoreSearchResult.cs new file mode 100644 index 0000000..7fe275a --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/StoreSearchResult.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace ZeroLevel.Services.PartitionStorage +{ + public class StoreSearchResult + { + public IDictionary>> Results { get; set; } + } +} diff --git a/ZeroLevel/Services/Pools/ObjectPool.cs b/ZeroLevel/Services/Pools/ObjectPool.cs index 33dbf86..bf36d31 100644 --- a/ZeroLevel/Services/Pools/ObjectPool.cs +++ b/ZeroLevel/Services/Pools/ObjectPool.cs @@ -1,9 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Threading; - -namespace ZeroLevel.Services.Pools +namespace ZeroLevel.Services.Pools { /* public enum LoadingMode { Eager, Lazy, LazyExpanding }; diff --git a/ZeroLevel/Services/Serialization/MessageSerializer.cs b/ZeroLevel/Services/Serialization/MessageSerializer.cs index 2bea9c0..5289183 100644 --- a/ZeroLevel/Services/Serialization/MessageSerializer.cs +++ b/ZeroLevel/Services/Serialization/MessageSerializer.cs @@ -51,6 +51,19 @@ namespace ZeroLevel.Services.Serialization } } + public static void SerializeCompatible(this MemoryStreamWriter writer, object obj) + { + var direct_seriazlizable = (obj as IBinarySerializable); + if (direct_seriazlizable != null) + { + direct_seriazlizable.Serialize(writer); + } + else + { + PrimitiveTypeSerializer.Serialize(writer, obj); + } + } + public static byte[] SerializeCompatible(T obj) { if (null == obj) diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionDataConverter.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionDataConverter.cs deleted file mode 100644 index bf2fbb4..0000000 --- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionDataConverter.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Collections.Generic; -using System.IO; - -namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage -{ - public interface IPartitionDataConverter - { - IEnumerable ReadFromStorage(Stream stream); - void WriteToStorage(Stream stream, IEnumerable data); - } -} diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs deleted file mode 100644 index add1f64..0000000 --- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage -{ - public interface IPartitionFileStorage - { - Task WriteAsync(TKey key, IEnumerable records); - Task> CollectAsync(IEnumerable keys, Func filter = null); - void Drop(TKey key); - } -} diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/Partition.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/Partition.cs deleted file mode 100644 index 4b11984..0000000 --- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/Partition.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; - -namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage -{ - /// - /// Make part of full file path - /// - public class Partition - { - public Partition(string name, Func pathExtractor) - { - Name = name; - PathExtractor = pathExtractor; - } - public Func PathExtractor { get; } - public string Name { get; } - } -} diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs deleted file mode 100644 index 87d1c83..0000000 --- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs +++ /dev/null @@ -1,157 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.IO; -using System.IO.Compression; -using System.Linq; -using System.Threading.Tasks; -using ZeroLevel.Services.FileSystem; - -namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage -{ - public class PartitionFileSystemStorage - : IPartitionFileStorage - { - - private readonly PartitionFileSystemStorageOptions _options; - public PartitionFileSystemStorage(PartitionFileSystemStorageOptions options) - { - if (options.RootFolder == null) - throw new ArgumentNullException(nameof(options.RootFolder)); - if (options.DataConverter == null) - throw new ArgumentNullException(nameof(options.DataConverter)); - if (!Directory.Exists(options.RootFolder)) - { - Directory.CreateDirectory(options.RootFolder); - } - _options = options; - if (options.MergeFiles) - { - Sheduller.RemindEvery(TimeSpan.FromMinutes(_options.MergeFrequencyInMinutes), MergeDataFiles); - } - } - - public void Drop(TKey key) - { - var path = GetDataPath(key); - FSUtils.RemoveFolder(path, 3, 500); - } - - public async Task> CollectAsync(IEnumerable keys, Func filter = null) - { - if (filter == null) filter = (_) => true; - var pathes = keys.Safe().Select(k => GetDataPath(k)); - var files = pathes.Safe().SelectMany(p => Directory.GetFiles(p)).Where(n => n.StartsWith("__") == false); - var set = new ConcurrentBag(); - if (files.Any()) - { - var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism }; - await Parallel.ForEachAsync(files, options, async (file, _) => - { - using (var stream = CreateReadStream(file)) - { - foreach (var item in _options.DataConverter.ReadFromStorage(stream)) - { - if (filter(item)) - { - set.Add(item); - } - } - } - }); - } - return set; - } - public async Task WriteAsync(TKey key, IEnumerable records) - { - using (var stream = CreateWriteStream(key)) - { - _options.DataConverter.WriteToStorage(stream, records); - await stream.FlushAsync(); - } - } - - #region Private members - private ConcurrentDictionary _processingPath = new ConcurrentDictionary(); - private void MergeDataFiles() - { - var folders = new Stack(); - folders.Push(_options.RootFolder); - while (folders.Count > 0) - { - var dir = folders.Pop(); - MergeFolder(dir); - foreach (var subdir in Directory.GetDirectories(dir, "*.*", SearchOption.TopDirectoryOnly)) - { - folders.Push(subdir); - } - } - } - - private void MergeFolder(string path) - { - var v = _processingPath.GetOrAdd(path, 0); - if (v != 0) // каталог обрабатывается в настоящий момент - { - return; - } - var files = Directory.GetFiles(path); - if (files != null && files.Length > 1) - { - // TODO - } - } - - private string GetDataFilePath(string path) - { - return Path.Combine(path, Guid.NewGuid().ToString()); - } - private string GetDataPath(TKey key) - { - var path = _options.RootFolder; - foreach (var partition in _options.Partitions) - { - var pathPart = partition.PathExtractor(key); - pathPart = FSUtils.FileNameCorrection(pathPart); - if (string.IsNullOrWhiteSpace(pathPart)) - { - throw new Exception($"Partition '{partition.Name}' not return name of part of path"); - } - path = Path.Combine(path, pathPart); - } - return path; - } - private Stream CreateWriteStream(TKey key) - { - var path = GetDataPath(key); - if (!Directory.Exists(path)) - { - Directory.CreateDirectory(path); - } - var fullPath = GetDataFilePath(path); - var stream = File.OpenWrite(fullPath); - if (_options.UseCompression) - { - return new GZipStream(stream, CompressionMode.Compress, false); - } - return stream; - } - - private Stream CreateReadStream(string path) - { - var stream = File.OpenRead(path); - if (_options.UseCompression) - { - var ms = new MemoryStream(); - using (var compressed = new GZipStream(stream, CompressionMode.Decompress, false)) - { - compressed.CopyTo(ms); - } - ms.Position = 0; - return ms; - } - return stream; - } - #endregion - } -} diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorageOptions.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorageOptions.cs deleted file mode 100644 index ac8ad5f..0000000 --- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorageOptions.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; -using System.Collections.Generic; - -namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage -{ - public class PartitionFileSystemStorageOptions - { - public string RootFolder { get; set; } - public int MaxDegreeOfParallelism { get; set; } = Environment.ProcessorCount / 2; - public bool MergeFiles { get; set; } = false; - public int MergeFrequencyInMinutes { get; set; } = 180; - public bool UseCompression { get; set; } = false; - public IPartitionDataConverter DataConverter { get; set; } - public List> Partitions { get; set; } = new List>(); - } -} diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index 850c564..bbde94c 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -6,16 +6,16 @@ ogoun ogoun - 3.3.6.8 - BigFileReader update + 3.3.7.1 + PartitionStorage new methods https://github.com/ogoun/Zero/wiki Copyright Ogoun 2022 https://github.com/ogoun/Zero git - 3.3.6.8 - 3.3.6.8 + 3.3.7.1 + 3.3.7.1 AnyCPU;x64;x86 zero.png full @@ -67,7 +67,7 @@ - + True