diff --git a/PartitionFileStorageTest/Compressor.cs b/PartitionFileStorageTest/Compressor.cs
new file mode 100644
index 0000000..cbdf771
--- /dev/null
+++ b/PartitionFileStorageTest/Compressor.cs
@@ -0,0 +1,74 @@
+namespace PartitionFileStorageTest
+{
+ public static class Compressor
+ {
+ ///
+ /// Упаковка набора чисел в массив байтов
+ ///
+ public static byte[] GetEncodedBytes(IEnumerable list, ref ulong last)
+ {
+ byte[] segmentsBytes;
+ using (var memoryStream = new MemoryStream())
+ {
+ foreach (var current in list)
+ {
+ var value = current - last;
+ memoryStream.Write7BitEncodedULong(value);
+ last = current;
+ }
+ segmentsBytes = memoryStream.ToArray();
+ }
+ return segmentsBytes;
+ }
+
+ public static IEnumerable DecodeBytesContent(byte[] bytes)
+ {
+ ulong last = 0;
+ using (var memoryStream = new MemoryStream(bytes))
+ {
+ while (memoryStream.Position != memoryStream.Length)
+ {
+ var value = memoryStream.Read7BitEncodedULong();
+ var current = last + value;
+
+ yield return current;
+
+ last = current;
+ }
+ }
+ }
+
+
+ public static void Write7BitEncodedULong(this MemoryStream writer, ulong value)
+ {
+ var first = true;
+ while (first || value > 0)
+ {
+ first = false;
+ var lower7bits = (byte)(value & 0x7f);
+ value >>= 7;
+ if (value > 0)
+ lower7bits |= 128;
+ writer.WriteByte(lower7bits);
+ }
+ }
+
+ public static ulong Read7BitEncodedULong(this MemoryStream reader)
+ {
+ if (reader == null)
+ throw new ArgumentNullException(nameof(reader));
+
+ var more = true;
+ ulong value = 0;
+ var shift = 0;
+ while (more)
+ {
+ ulong lower7bits = (byte)reader.ReadByte();
+ more = (lower7bits & 128) != 0;
+ value |= (lower7bits & 0x7f) << shift;
+ shift += 7;
+ }
+ return value;
+ }
+ }
+}
diff --git a/PartitionFileStorageTest/MsisdnHelper.cs b/PartitionFileStorageTest/MsisdnHelper.cs
new file mode 100644
index 0000000..b705f70
--- /dev/null
+++ b/PartitionFileStorageTest/MsisdnHelper.cs
@@ -0,0 +1,101 @@
+namespace PartitionFileStorageTest
+{
+ public struct MsisdnParts
+ {
+ public int FirstDigit { get; }
+ public int OtherDigits { get; }
+
+ public MsisdnParts(int firstDigit, int otherDigits)
+ {
+ FirstDigit = firstDigit;
+ OtherDigits = otherDigits;
+ }
+
+ public override string ToString() => $"({FirstDigit},{OtherDigits})";
+ }
+ public static class MsisdnHelper
+ {
+ public static MsisdnParts SplitParts(this ulong msisdn)
+ {
+ //расчитываем только на номера российской нумерации ("7" и 10 цифр)
+ //это числа от 70_000_000_000 до 79_999_999_999
+
+ if (msisdn < 70_000_000_000 || msisdn > 79_999_999_999) throw new ArgumentException(nameof(msisdn));
+
+ var firstDigit = (int)((msisdn / 1_000_000_000L) % 10);
+ var otherDigits = (int)(msisdn % 1_000_000_000L);
+
+ return new MsisdnParts(firstDigit, otherDigits);
+ }
+
+ public static ulong CombineParts(int firstDigit, int otherDigits)
+ {
+ return (ulong)(70_000_000_000L + firstDigit * 1_000_000_000L + otherDigits);
+ }
+
+ public static IEnumerable ParseMsisdns(this IEnumerable lines)
+ {
+ foreach (var line in lines)
+ {
+ ulong msisdn;
+ if (line.TryParseMsisdn(out msisdn))
+ {
+ yield return msisdn;
+ }
+ }
+ }
+
+ ///
+ /// возвращаются только номера российской нумерации ("7" и 10 цифр) в виде long
+ ///
+ ///
+ ///
+ ///
+ public static bool TryParseMsisdn(this string source, out ulong msisdn)
+ {
+ var line = source.Trim();
+ var length = line.Length;
+
+ msisdn = 0;
+
+ //допустимы форматы номеров "+71234567890", "71234567890", "1234567890"
+ if (length < 10 || length > 12) return false;
+
+ var start = 0;
+ if (length == 12) //"+71234567890"
+ {
+ if (line[0] != '+' || line[1] != '7') return false;
+ start = 2;
+ }
+ if (length == 11) //"71234567890" и "81234567890"
+ {
+ if (line[0] != '7') return false;
+ start = 1;
+ }
+ /*
+ else if (length == 10) //"1234567890"
+ {
+ start = 0;
+ }
+ */
+
+ ulong number = 7;
+
+ for (var i = start; i < length; i++)
+ {
+ var c = line[i];
+ if ('0' <= c && c <= '9')
+ {
+ number = number * 10 + (ulong)(c - '0');
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ msisdn = number;
+ return true;
+ }
+ }
+}
diff --git a/PartitionFileStorageTest/PartitionFileStorageTest.csproj b/PartitionFileStorageTest/PartitionFileStorageTest.csproj
index 68e186f..575110a 100644
--- a/PartitionFileStorageTest/PartitionFileStorageTest.csproj
+++ b/PartitionFileStorageTest/PartitionFileStorageTest.csproj
@@ -7,6 +7,10 @@
enable
+
+
+
+
diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs
index 02e5050..e8ea52e 100644
--- a/PartitionFileStorageTest/Program.cs
+++ b/PartitionFileStorageTest/Program.cs
@@ -1,141 +1,231 @@
-using ZeroLevel;
+using System.Diagnostics;
+using System.Text;
+using ZeroLevel.Services.PartitionStorage;
using ZeroLevel.Services.Serialization;
-using ZeroLevel.Services.Storages;
-using ZeroLevel.Services.Storages.PartitionFileSystemStorage;
namespace PartitionFileStorageTest
{
- internal class Program
+ public class CallRecordParser
{
- public class PartitionKey
- {
- public DateTime Date { get; set; }
- public ulong Ctn { get; set; }
- }
+ private static HashSet _partsOfNumbers = new HashSet { '*', '#', '+', '(', ')', '-' };
+ private StringBuilder sb = new StringBuilder();
+ private const string NO_VAL = null;
- public class Record
- : IBinarySerializable
+ private string ReadNumber(string line)
{
- public string[] Hosts { get; set; }
-
- public void Deserialize(IBinaryReader reader)
+ sb.Clear();
+ var started = false;
+ foreach (var ch in line)
{
- this.Hosts = reader.ReadStringArray();
+ if (char.IsDigit(ch))
+ {
+ if (started)
+ {
+ sb.Append(ch);
+ }
+ else if (ch != '0')
+ {
+ sb.Append(ch);
+ started = true;
+ }
+ }
+ else if (char.IsWhiteSpace(ch) || _partsOfNumbers.Contains(ch)) continue;
+ else return NO_VAL;
}
-
- public void Serialize(IBinaryWriter writer)
+ if (sb.Length == 11 && sb[0] == '8') sb[0] = '7';
+ if (sb.Length == 3 || sb.Length == 4 || sb.Length > 10)
+ return sb.ToString();
+ return NO_VAL;
+ }
+ private HashSet ReadNumbers(string line)
+ {
+ var result = new HashSet();
+ if (string.IsNullOrWhiteSpace(line) == false)
{
- writer.WriteArray(this.Hosts);
+ char STX = (char)0x0002;
+ var values = line.Split(STX);
+ if (values.Length > 0)
+ {
+ foreach (var val in values)
+ {
+ var number = ReadNumber(val);
+ if (number != null)
+ {
+ result.Add(number);
+ }
+ }
+ }
}
+ return result;
}
-
- static Record GenerateRecord()
+ ///
+ /// Парсинг строки исходного файла
+ ///
+ ///
+ ///
+ public CallRecord Parse(string line)
{
- var record = new Record();
- var rnd = new Random((int)Environment.TickCount);
- var count = rnd.Next(400);
- record.Hosts = new string[count];
- for (int i = 0; i < count; i++)
+ var parts = line.Split('\t');
+
+ if (parts.Length != 2) return null;
+
+ var msisdn = ReadNumber(parts[0].Trim());
+
+ if (string.IsNullOrWhiteSpace(msisdn) == false)
{
- record.Hosts[i] = Guid.NewGuid().ToString();
+ var numbers = ReadNumbers(parts[1]);
+ if (numbers != null && numbers.Count > 0)
+ {
+ return new CallRecord
+ {
+ Msisdn = msisdn,
+ Msisdns = numbers
+ };
+ }
}
- return record;
+ return null;
}
+ }
- static PartitionKey GenerateKey()
+ public class CallRecord
+ {
+ public string Msisdn;
+ public HashSet Msisdns;
+ }
+ internal class Program
+ {
+ private class Metadata
{
- var key = new PartitionKey();
- var rnd = new Random((int)Environment.TickCount);
- key.Ctn = (ulong)rnd.Next(1000);
- key.Date = DateTime.Now.AddDays(-rnd.Next(30)).Date;
- return key;
+ public DateTime Date { get; set; }
+ public bool Incoming { get; set; }
}
- class DataConverter
- : IPartitionDataConverter
+ private static void BuildStore(string source, string root)
{
- public IEnumerable ReadFromStorage(Stream stream)
+ var options = new IStoreOptions
{
- var reader = new MemoryStreamReader(stream);
- while (reader.EOS == false)
+ RootFolder = root,
+ FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 1000).ToString()),
+ MergeFunction = list =>
{
- yield return reader.Read();
- }
- }
-
- public void WriteToStorage(Stream stream, IEnumerable data)
+ ulong s = 0;
+ return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
+ },
+ Partitions = new List>
+ {
+ new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")),
+ new StoreCatalogPartition("Date", m => m.Incoming ? "incoming" : "outcoming")
+ },
+ KeyComparer = (left, right) => left == right ? 0 : (left < right) ? 1 : -1
+ };
+ var store = new Store(options);
+
+
+ var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true });
+ var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false });
+ var parser = new CallRecordParser();
+ var sw = new Stopwatch();
+ sw.Start();
+ using (FileStream fs = File.Open(source, FileMode.Open, FileAccess.Read, FileShare.None))
{
- var writer = new MemoryStreamWriter(stream);
- foreach (var record in data)
+ using (BufferedStream bs = new BufferedStream(fs, 1024 * 1024 * 64))
{
- writer.Write(record);
+ using (StreamReader sr = new StreamReader(bs))
+ {
+ string line;
+ while ((line = sr.ReadLine()) != null)
+ {
+ var record = parser.Parse(line);
+ if (record == null) continue;
+ if (!string.IsNullOrWhiteSpace(record?.Msisdn ?? string.Empty) && ulong.TryParse(record.Msisdn, out var n))
+ {
+ var ctns = record.Msisdns.ParseMsisdns().ToArray();
+ foreach (var ctn in ctns)
+ {
+ storeIncoming.Store(n, ctn);
+ storeOutcoming.Store(ctn, n);
+ }
+ }
+ }
+ }
}
}
+ sw.Stop();
+ Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms");
+ sw.Restart();
+ storeIncoming.CompleteStoreAndRebuild();
+ storeOutcoming.CompleteStoreAndRebuild();
+ sw.Stop();
+ Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms");
}
- private static int COUNT_NUMBERS = ulong.MaxValue.ToString().Length;
- static void Main(string[] args)
+ private static void TestReading(string source, string root)
{
- var testDict = new Dictionary>>();
- var options = new PartitionFileSystemStorageOptions
- {
- MaxDegreeOfParallelism = 1,
- DataConverter = new DataConverter(),
- UseCompression = true,
- MergeFiles = false,
- RootFolder = Path.Combine(Configuration.BaseDirectory, "root")
- };
- options.Partitions.Add(new Partition("data", p => p.Date.ToString("yyyyMMdd")));
- options.Partitions.Add(new Partition("ctn", p => p.Ctn.ToString().PadLeft(COUNT_NUMBERS, '0')));
- var storage = new PartitionFileSystemStorage(options);
-
- for (int i = 0; i < 50000; i++)
+ var options = new IStoreOptions
{
- if (i % 100 == 0)
- Console.WriteLine(i);
- var key = GenerateKey();
- var record = GenerateRecord();
- if (testDict.ContainsKey(key.Ctn) == false)
+ RootFolder = root,
+ FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 1000).ToString()),
+ MergeFunction = list =>
{
- testDict[key.Ctn] = new Dictionary>();
- }
- if (testDict[key.Ctn].ContainsKey(key.Date) == false)
+ ulong s = 0;
+ return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
+ },
+ Partitions = new List>
{
- testDict[key.Ctn][key.Date] = new List();
- }
- testDict[key.Ctn][key.Date].Add(record);
- storage.WriteAsync(key, new[] { record }).Wait();
- }
- foreach (var cpair in testDict)
+ new StoreCatalogPartition("timestamp", m => m.Date.ToString("yyyyMMdd")),
+ new StoreCatalogPartition("timestamp", m => m.Incoming ? "incoming" : "outcoming")
+ },
+ KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1
+ };
+ var store = new Store(options);
+ var request = new StoreSearchRequest
{
- foreach (var dpair in cpair.Value)
+ PartitionSearchRequests = new List>
{
- var key = new PartitionKey { Ctn = cpair.Key, Date = dpair.Key };
- var data = storage.CollectAsync(new[] { key }).Result.ToArray();
- var testData = dpair.Value;
-
- if (data.Length != testData.Count)
+ new PartitionSearchRequest
+ {
+ Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true },
+ Keys = new ulong[] { 79645090604 }
+ },
+ new PartitionSearchRequest
{
- Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn} Wrong count. Expected: {testData.Count}. Got: {data.Length}");
+ Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false },
+ Keys = new ulong[] { 79645090604 }
}
- else
+ }
+ };
+
+ var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true });
+ Console.WriteLine($"Incoming data files: {storeIncoming.CountDataFiles()}");
+ var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false });
+ Console.WriteLine($"Outcoming data files: {storeOutcoming.CountDataFiles()}");
+ var sw = new Stopwatch();
+ sw.Start();
+ var result = store.Search(request).Result;
+ foreach (var r in result.Results)
+ {
+ Console.WriteLine($"Incoming: {r.Key.Incoming}");
+ foreach (var mr in r.Value)
+ {
+ Console.WriteLine($"\tKey: {mr.Key}. Sucess: {mr.Found}");
+ if (mr.Found && mr.Value.Length > 0)
{
- var datahosts = data.SelectMany(r => r.Hosts).OrderBy(s => s).ToArray();
- var testhosts = testData.SelectMany(r => r.Hosts).OrderBy(s => s).ToArray();
- if (datahosts.Length != testhosts.Length)
- {
- Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn}. Records not equals. Different hosts count");
- }
- for (int i = 0; i < datahosts.Length; i++)
- {
- if (string.Compare(datahosts[i], testhosts[i], StringComparison.Ordinal) != 0)
- {
- Console.WriteLine($"[{key.Date.ToString("yyyyMMdd")}] {key.Ctn}. Records not equals. Different hosts");
- }
- }
+ var ctns = Compressor.DecodeBytesContent(mr.Value);
+ Console.WriteLine($"\t\t{string.Join(';', ctns)}");
}
}
}
+ sw.Stop();
+ Console.WriteLine($"Search time: {sw.ElapsedMilliseconds}ms");
+ }
+
+ static void Main(string[] args)
+ {
+ var root = @"H:\temp";
+ var source = @"H:\319a9c31-d823-4dd1-89b0-7fb1bb9c4859.txt";
+ BuildStore(source, root);
+ TestReading(source, root);
+ Console.ReadKey();
}
}
}
\ No newline at end of file
diff --git a/TestApp/Program.cs b/TestApp/Program.cs
index b4b4a9b..4042c1f 100644
--- a/TestApp/Program.cs
+++ b/TestApp/Program.cs
@@ -1,11 +1,15 @@
-namespace TestApp
+using System;
+using System.Collections.Concurrent;
+
+namespace TestApp
{
internal static class Program
{
private static void Main(string[] args)
{
- var detector = new PersonDetector();
- var predictions = detector.Detect(@"E:\Desktop\test\1.JPG");
+ var test = new ConcurrentDictionary();
+ var v = test.GetOrAdd("sss", 1);
+ Console.ReadKey();
}
}
}
\ No newline at end of file
diff --git a/ZeroLevel.HNSW/Services/LinksSet.cs b/ZeroLevel.HNSW/Services/LinksSet.cs
index d431975..2ded5f4 100644
--- a/ZeroLevel.HNSW/Services/LinksSet.cs
+++ b/ZeroLevel.HNSW/Services/LinksSet.cs
@@ -1,5 +1,4 @@
-using System;
-using System.Collections.Concurrent;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using ZeroLevel.Services.Serialization;
diff --git a/ZeroLevel.sln b/ZeroLevel.sln
index e4dd8c7..58bceed 100644
--- a/ZeroLevel.sln
+++ b/ZeroLevel.sln
@@ -69,10 +69,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PartitionTest", "PartitionT
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}"
EndProject
-Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{442569A3-E126-4A11-B9DD-2DFA5BF76B0F}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BigFileParserTest", "Tests\BigFileParserTest\BigFileParserTest.csproj", "{E7526771-86D5-4311-A284-05D3FEFC7B75}"
-EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -335,18 +331,6 @@ Global
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.Build.0 = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.ActiveCfg = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.Build.0 = Release|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x64.ActiveCfg = Debug|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x64.Build.0 = Debug|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x86.ActiveCfg = Debug|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Debug|x86.Build.0 = Debug|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|Any CPU.Build.0 = Release|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x64.ActiveCfg = Release|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x64.Build.0 = Release|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x86.ActiveCfg = Release|Any CPU
- {E7526771-86D5-4311-A284-05D3FEFC7B75}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -362,7 +346,6 @@ Global
{2C33D5A3-6CD4-4AAA-A716-B3CD65036E25} = {D5207A5A-2F27-4992-9BA5-0BDCFC59F133}
{9DE345EA-955B-41A8-93AF-277C0B5A9AC5} = {2EF83101-63BC-4397-A005-A747189143D4}
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9} = {BAD88A91-1AFA-48A8-8D39-4846A65B4167}
- {E7526771-86D5-4311-A284-05D3FEFC7B75} = {442569A3-E126-4A11-B9DD-2DFA5BF76B0F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB}
diff --git a/ZeroLevel/Services/Collections/Condensator.cs b/ZeroLevel/Services/Collections/Condensator.cs
new file mode 100644
index 0000000..3a3b285
--- /dev/null
+++ b/ZeroLevel/Services/Collections/Condensator.cs
@@ -0,0 +1,62 @@
+using System;
+
+namespace ZeroLevel.Services.Collections
+{
+ ///
+ /// Collects data while there is capacity and invokes an action after that (batch processing)
+ ///
+ ///
+ public sealed class Capacitor
+ : IDisposable
+ {
+ private int _index = -1;
+ private int _count = 0;
+ private readonly T[] _buffer;
+ private readonly Action _dischargeAction;
+
+ public int Count => _count;
+ public Capacitor(int volume, Action dischargeAction)
+ {
+ if (volume < 1) volume = 16;
+ if (dischargeAction == null) throw new ArgumentNullException(nameof(dischargeAction));
+ _buffer = new T[volume];
+ _dischargeAction = dischargeAction;
+ }
+ public void Add(T val)
+ {
+ _index++;
+ if (_index >= _buffer.Length)
+ {
+ _dischargeAction.Invoke(_buffer, _buffer.Length);
+ _index = 0;
+ _count = 0;
+ }
+ _buffer[_index] = val;
+ _count++;
+ }
+
+ public void Discharge()
+ {
+ if (_count > 0)
+ {
+ _dischargeAction.Invoke(_buffer, _count);
+ }
+ }
+
+
+ public void Dispose()
+ {
+ if (_count > 0)
+ {
+ try
+ {
+ Discharge();
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, $"[Capacitor.Dispose] Fault discharge in dispose method");
+ }
+ }
+ }
+ }
+}
diff --git a/ZeroLevel/Services/FileSystem/BigFileParser.cs b/ZeroLevel/Services/FileSystem/BigFileParser.cs
index 67a50d0..3512a81 100644
--- a/ZeroLevel/Services/FileSystem/BigFileParser.cs
+++ b/ZeroLevel/Services/FileSystem/BigFileParser.cs
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
-using System.Threading;
namespace ZeroLevel.Services.FileSystem
{
@@ -33,7 +32,7 @@ namespace ZeroLevel.Services.FileSystem
public IEnumerable ReadBatches(int batchSize, bool skipNull = false)
{
- var buffer = new T[batchSize];
+ T[] buffer;
var buffer_index = 0;
using (FileStream fs = File.Open(_filePath, FileMode.Open, FileAccess.Read, FileShare.None))
{
@@ -42,6 +41,7 @@ namespace ZeroLevel.Services.FileSystem
using (StreamReader sr = new StreamReader(bs))
{
string line;
+ buffer = new T[batchSize];
while ((line = sr.ReadLine()) != null)
{
var value = _parser.Invoke(line);
@@ -49,8 +49,7 @@ namespace ZeroLevel.Services.FileSystem
buffer[buffer_index] = value;
buffer_index++;
if (buffer_index >= batchSize)
- {
- Thread.MemoryBarrier();
+ {
yield return buffer;
buffer_index = 0;
}
@@ -69,7 +68,7 @@ namespace ZeroLevel.Services.FileSystem
}
}
- public IEnumerable Read(int batchSize)
+ public IEnumerable Read()
{
using (FileStream fs = File.Open(_filePath, FileMode.Open, FileAccess.Read, FileShare.None))
{
diff --git a/ZeroLevel/Services/HashFunctions/XXHash32.cs b/ZeroLevel/Services/HashFunctions/XXHash32.cs
new file mode 100644
index 0000000..ddb3771
--- /dev/null
+++ b/ZeroLevel/Services/HashFunctions/XXHash32.cs
@@ -0,0 +1,547 @@
+using System;
+using System.Security.Cryptography;
+
+namespace ZeroLevel.Services.HashFunctions
+{
+ //see details: https://github.com/Cyan4973/xxHash/blob/dev/doc/xxhash_spec.md
+ ///
+ /// Represents the class which provides a implementation of the xxHash32 algorithm.
+ ///
+ ///
+ public sealed class XXHash32 : HashAlgorithm
+ {
+ private const uint PRIME32_1 = 2654435761U;
+ private const uint PRIME32_2 = 2246822519U;
+ private const uint PRIME32_3 = 3266489917U;
+ private const uint PRIME32_4 = 668265263U;
+ private const uint PRIME32_5 = 374761393U;
+
+ private static readonly Func FuncGetLittleEndianUInt32;
+ private static readonly Func FuncGetFinalHashUInt32;
+
+ private uint _Seed32;
+
+ private uint _ACC32_1;
+ private uint _ACC32_2;
+ private uint _ACC32_3;
+ private uint _ACC32_4;
+
+ private uint _Hash32;
+
+
+ private int _RemainingLength;
+ private long _TotalLength = 0;
+ private int _CurrentIndex;
+ private byte[] _CurrentArray;
+
+ static XXHash32()
+ {
+ if (BitConverter.IsLittleEndian)
+ {
+
+ FuncGetLittleEndianUInt32 = new Func((x, i) =>
+ {
+ unsafe
+ {
+ fixed (byte* array = x)
+ {
+ return *(uint*)(array + i);
+ }
+ }
+ });
+ FuncGetFinalHashUInt32 = new Func(i => (i & 0x000000FFU) << 24 | (i & 0x0000FF00U) << 8 | (i & 0x00FF0000U) >> 8 | (i & 0xFF000000U) >> 24);
+ }
+ else
+ {
+ FuncGetLittleEndianUInt32 = new Func((x, i) =>
+ {
+ unsafe
+ {
+ fixed (byte* array = x)
+ {
+ return (uint)(array[i++] | (array[i++] << 8) | (array[i++] << 16) | (array[i] << 24));
+ }
+ }
+ });
+ FuncGetFinalHashUInt32 = new Func(i => i);
+ }
+ }
+
+ ///
+ /// Creates an instance of class by default seed(0).
+ ///
+ ///
+ public new static XXHash32 Create() => new XXHash32();
+
+ ///
+ /// Creates an instance of the specified implementation of XXHash32 algorithm.
+ /// This method always throws .
+ ///
+ /// The hash algorithm implementation to use.
+ /// This method always throws .
+ /// This method is not be supported.
+ public new static XXHash32 Create(string algName) => throw new NotSupportedException("This method is not be supported.");
+
+ ///
+ /// Initializes a new instance of the class by default seed(0).
+ ///
+ public XXHash32() => Initialize(0);
+
+ ///
+ /// Initializes a new instance of the class, and sets the to the specified value.
+ ///
+ /// Represent the seed to be used for xxHash32 computing.
+ public XXHash32(uint seed) => Initialize(seed);
+
+ ///
+ /// Gets the value of the computed hash code.
+ ///
+ /// Hash computation has not yet completed.
+ public uint HashUInt32 => State == 0 ? _Hash32 : throw new InvalidOperationException("Hash computation has not yet completed.");
+
+ ///
+ /// Gets or sets the value of seed used by xxHash32 algorithm.
+ ///
+ /// Hash computation has not yet completed.
+ public uint Seed
+ {
+ get => _Seed32;
+ set
+ {
+
+ if (value != _Seed32)
+ {
+ if (State != 0) throw new InvalidOperationException("Hash computation has not yet completed.");
+ _Seed32 = value;
+ Initialize();
+ }
+ }
+ }
+
+
+ ///
+ /// Initializes this instance for new hash computing.
+ ///
+ public override void Initialize()
+ {
+ _ACC32_1 = _Seed32 + PRIME32_1 + PRIME32_2;
+ _ACC32_2 = _Seed32 + PRIME32_2;
+ _ACC32_3 = _Seed32 + 0;
+ _ACC32_4 = _Seed32 - PRIME32_1;
+ }
+
+
+
+ ///
+ /// Routes data written to the object into the hash algorithm for computing the hash.
+ ///
+ /// The input to compute the hash code for.
+ /// The offset into the byte array from which to begin using data.
+ /// The number of bytes in the byte array to use as data.
+ protected override void HashCore(byte[] array, int ibStart, int cbSize)
+ {
+ if (State != 1) State = 1;
+ var size = cbSize - ibStart;
+ _RemainingLength = size & 15;
+ if (cbSize >= 16)
+ {
+ var limit = size - _RemainingLength;
+ do
+ {
+ _ACC32_1 = Round32(_ACC32_1, FuncGetLittleEndianUInt32(array, ibStart));
+ ibStart += 4;
+ _ACC32_2 = Round32(_ACC32_2, FuncGetLittleEndianUInt32(array, ibStart));
+ ibStart += 4;
+ _ACC32_3 = Round32(_ACC32_3, FuncGetLittleEndianUInt32(array, ibStart));
+ ibStart += 4;
+ _ACC32_4 = Round32(_ACC32_4, FuncGetLittleEndianUInt32(array, ibStart));
+ ibStart += 4;
+ } while (ibStart < limit);
+ }
+ _TotalLength += cbSize;
+
+ if (_RemainingLength != 0)
+ {
+ _CurrentArray = array;
+ _CurrentIndex = ibStart;
+ }
+ }
+
+ ///
+ /// Finalizes the hash computation after the last data is processed by the cryptographic stream object.
+ ///
+ /// The computed hash code.
+ protected override byte[] HashFinal()
+ {
+ if (_TotalLength >= 16)
+ {
+#if HIGHER_VERSIONS
+ _Hash32 = RotateLeft32_1(_ACC32_1) + RotateLeft32_7(_ACC32_2) + RotateLeft32_12(_ACC32_3) + RotateLeft32_18(_ACC32_4);
+#else
+ _Hash32 = RotateLeft32(_ACC32_1, 1) + RotateLeft32(_ACC32_2, 7) + RotateLeft32(_ACC32_3, 12) + RotateLeft32(_ACC32_4, 18);
+#endif
+ }
+ else
+ {
+ _Hash32 = _Seed32 + PRIME32_5;
+ }
+
+ _Hash32 += (uint)_TotalLength;
+
+ while (_RemainingLength >= 4)
+ {
+#if HIGHER_VERSIONS
+ _Hash32 = RotateLeft32_17(_Hash32 + FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME32_3) * PRIME32_4;
+#else
+ _Hash32 = RotateLeft32(_Hash32 + FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME32_3, 17) * PRIME32_4;
+#endif
+ _CurrentIndex += 4;
+ _RemainingLength -= 4;
+ }
+ unsafe
+ {
+ fixed (byte* arrayPtr = _CurrentArray)
+ {
+ while (_RemainingLength-- >= 1)
+ {
+#if HIGHER_VERSIONS
+ _Hash32 = RotateLeft32_11(_Hash32 + arrayPtr[_CurrentIndex++] * PRIME32_5) * PRIME32_1;
+#else
+ _Hash32 = RotateLeft32(_Hash32 + arrayPtr[_CurrentIndex++] * PRIME32_5, 11) * PRIME32_1;
+#endif
+ }
+ }
+ }
+ _Hash32 = (_Hash32 ^ (_Hash32 >> 15)) * PRIME32_2;
+ _Hash32 = (_Hash32 ^ (_Hash32 >> 13)) * PRIME32_3;
+ _Hash32 ^= _Hash32 >> 16;
+
+ _TotalLength = State = 0;
+
+ return BitConverter.GetBytes(FuncGetFinalHashUInt32(_Hash32));
+ }
+
+#if HIGHER_VERSIONS
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static uint Round32(uint input, uint value) => RotateLeft32_13(input + (value * PRIME32_2)) * PRIME32_1;
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static uint RotateLeft32_1(uint value) => (value << 1) | (value >> 31); //_ACC32_1
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static uint RotateLeft32_7(uint value) => (value << 7) | (value >> 25); //_ACC32_2
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static uint RotateLeft32_11(uint value) => (value << 11) | (value >> 21);
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static uint RotateLeft32_12(uint value) => (value << 12) | (value >> 20);// _ACC32_3
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static uint RotateLeft32_13(uint value) => (value << 13) | (value >> 19);
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static uint RotateLeft32_17(uint value) => (value << 17) | (value >> 15);
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static uint RotateLeft32_18(uint value) => (value << 18) | (value >> 14); //_ACC32_4
+#else
+ private static uint Round32(uint input, uint value) => RotateLeft32(input + (value * PRIME32_2), 13) * PRIME32_1;
+
+ private static uint RotateLeft32(uint value, int count) => (value << count) | (value >> (32 - count));
+#endif
+ private void Initialize(uint seed)
+ {
+ HashSizeValue = 32;
+ _Seed32 = seed;
+ Initialize();
+ }
+
+ }
+
+ ///
+ /// Represents the class which provides a implementation of the xxHash64 algorithm.
+ ///
+ ///
+ public sealed class XXHash64 : HashAlgorithm
+ {
+ private const ulong PRIME64_1 = 11400714785074694791UL;
+ private const ulong PRIME64_2 = 14029467366897019727UL;
+ private const ulong PRIME64_3 = 1609587929392839161UL;
+ private const ulong PRIME64_4 = 9650029242287828579UL;
+ private const ulong PRIME64_5 = 2870177450012600261UL;
+
+ private static readonly Func FuncGetLittleEndianUInt32;
+ private static readonly Func FuncGetLittleEndianUInt64;
+ private static readonly Func FuncGetFinalHashUInt64;
+
+ private ulong _Seed64;
+
+ private ulong _ACC64_1;
+ private ulong _ACC64_2;
+ private ulong _ACC64_3;
+ private ulong _ACC64_4;
+ private ulong _Hash64;
+
+ private int _RemainingLength;
+ private long _TotalLength;
+ private int _CurrentIndex;
+ private byte[] _CurrentArray;
+
+
+
+ static XXHash64()
+ {
+ if (BitConverter.IsLittleEndian)
+ {
+ FuncGetLittleEndianUInt32 = new Func((x, i) =>
+ {
+ unsafe
+ {
+ fixed (byte* array = x)
+ {
+ return *(uint*)(array + i);
+ }
+ }
+ });
+ FuncGetLittleEndianUInt64 = new Func((x, i) =>
+ {
+ unsafe
+ {
+ fixed (byte* array = x)
+ {
+ return *(ulong*)(array + i);
+ }
+ }
+ });
+ FuncGetFinalHashUInt64 = new Func(i => (i & 0x00000000000000FFUL) << 56 | (i & 0x000000000000FF00UL) << 40 | (i & 0x0000000000FF0000UL) << 24 | (i & 0x00000000FF000000UL) << 8 | (i & 0x000000FF00000000UL) >> 8 | (i & 0x0000FF0000000000UL) >> 24 | (i & 0x00FF000000000000UL) >> 40 | (i & 0xFF00000000000000UL) >> 56);
+ }
+ else
+ {
+ FuncGetLittleEndianUInt32 = new Func((x, i) =>
+ {
+ unsafe
+ {
+ fixed (byte* array = x)
+ {
+ return (uint)(array[i++] | (array[i++] << 8) | (array[i++] << 16) | (array[i] << 24));
+ }
+ }
+ });
+ FuncGetLittleEndianUInt64 = new Func((x, i) =>
+ {
+ unsafe
+ {
+ fixed (byte* array = x)
+ {
+ return array[i++] | ((ulong)array[i++] << 8) | ((ulong)array[i++] << 16) | ((ulong)array[i++] << 24) | ((ulong)array[i++] << 32) | ((ulong)array[i++] << 40) | ((ulong)array[i++] << 48) | ((ulong)array[i] << 56);
+ }
+ }
+ });
+ FuncGetFinalHashUInt64 = new Func(i => i);
+ }
+ }
+
+ ///
+ /// Creates an instance of class by default seed(0).
+ ///
+ ///
+ public new static XXHash64 Create() => new XXHash64();
+
+ ///
+ /// Creates an instance of the specified implementation of XXHash64 algorithm.
+ /// This method always throws .
+ ///
+ /// The hash algorithm implementation to use.
+ /// This method always throws .
+ /// This method is not be supported.
+ public new static XXHash64 Create(string algName) => throw new NotSupportedException("This method is not be supported.");
+
+ ///
+ /// Initializes a new instance of the class by default seed(0).
+ ///
+ public XXHash64() => Initialize(0);
+
+
+ ///
+ /// Initializes a new instance of the class, and sets the to the specified value.
+ ///
+ /// Represent the seed to be used for xxHash64 computing.
+ public XXHash64(uint seed) => Initialize(seed);
+
+
+ ///
+ /// Gets the value of the computed hash code.
+ ///
+ /// Computation has not yet completed.
+ public ulong HashUInt64 => State == 0 ? _Hash64 : throw new InvalidOperationException("Computation has not yet completed.");
+
+ ///
+ /// Gets or sets the value of seed used by xxHash64 algorithm.
+ ///
+ /// Computation has not yet completed.
+ public ulong Seed
+ {
+ get => _Seed64;
+ set
+ {
+ if (value != _Seed64)
+ {
+ if (State != 0) throw new InvalidOperationException("Computation has not yet completed.");
+ _Seed64 = value;
+ Initialize();
+ }
+ }
+ }
+
+
+ ///
+ /// Initializes this instance for new hash computing.
+ ///
+ public override void Initialize()
+ {
+ _ACC64_1 = _Seed64 + PRIME64_1 + PRIME64_2;
+ _ACC64_2 = _Seed64 + PRIME64_2;
+ _ACC64_3 = _Seed64 + 0;
+ _ACC64_4 = _Seed64 - PRIME64_1;
+ }
+
+ ///
+ /// Routes data written to the object into the hash algorithm for computing the hash.
+ ///
+ /// The input to compute the hash code for.
+ /// The offset into the byte array from which to begin using data.
+ /// The number of bytes in the byte array to use as data.
+ protected override void HashCore(byte[] array, int ibStart, int cbSize)
+ {
+ if (State != 1) State = 1;
+ var size = cbSize - ibStart;
+ _RemainingLength = size & 31;
+ if (cbSize >= 32)
+ {
+ var limit = size - _RemainingLength;
+ do
+ {
+ _ACC64_1 = Round64(_ACC64_1, FuncGetLittleEndianUInt64(array, ibStart));
+ ibStart += 8;
+ _ACC64_2 = Round64(_ACC64_2, FuncGetLittleEndianUInt64(array, ibStart));
+ ibStart += 8;
+ _ACC64_3 = Round64(_ACC64_3, FuncGetLittleEndianUInt64(array, ibStart));
+ ibStart += 8;
+ _ACC64_4 = Round64(_ACC64_4, FuncGetLittleEndianUInt64(array, ibStart));
+ ibStart += 8;
+ } while (ibStart < limit);
+ }
+ _TotalLength += cbSize;
+ if (_RemainingLength != 0)
+ {
+ _CurrentArray = array;
+ _CurrentIndex = ibStart;
+ }
+ }
+
+ ///
+ /// Finalizes the hash computation after the last data is processed by the cryptographic stream object.
+ ///
+ /// The computed hash code.
+ protected override byte[] HashFinal()
+ {
+ if (_TotalLength >= 32)
+ {
+#if HIGHER_VERSIONS
+ _Hash64 = RotateLeft64_1(_ACC64_1) + RotateLeft64_7(_ACC64_2) + RotateLeft64_12(_ACC64_3) + RotateLeft64_18(_ACC64_4);
+#else
+
+ _Hash64 = RotateLeft64(_ACC64_1, 1) + RotateLeft64(_ACC64_2, 7) + RotateLeft64(_ACC64_3, 12) + RotateLeft64(_ACC64_4, 18);
+#endif
+ _Hash64 = MergeRound64(_Hash64, _ACC64_1);
+ _Hash64 = MergeRound64(_Hash64, _ACC64_2);
+ _Hash64 = MergeRound64(_Hash64, _ACC64_3);
+ _Hash64 = MergeRound64(_Hash64, _ACC64_4);
+ }
+ else
+ {
+ _Hash64 = _Seed64 + PRIME64_5;
+ }
+
+ _Hash64 += (ulong)_TotalLength;
+
+ while (_RemainingLength >= 8)
+ {
+#if HIGHER_VERSIONS
+ _Hash64 = RotateLeft64_27(_Hash64 ^ Round64(0, FuncGetLittleEndianUInt64(_CurrentArray, _CurrentIndex))) * PRIME64_1 + PRIME64_4;
+#else
+ _Hash64 = RotateLeft64(_Hash64 ^ Round64(0, FuncGetLittleEndianUInt64(_CurrentArray, _CurrentIndex)), 27) * PRIME64_1 + PRIME64_4;
+#endif
+ _CurrentIndex += 8;
+ _RemainingLength -= 8;
+ }
+
+ while (_RemainingLength >= 4)
+ {
+#if HIGHER_VERSIONS
+ _Hash64 = RotateLeft64_23(_Hash64 ^ (FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME64_1)) * PRIME64_2 + PRIME64_3;
+#else
+ _Hash64 = RotateLeft64(_Hash64 ^ (FuncGetLittleEndianUInt32(_CurrentArray, _CurrentIndex) * PRIME64_1), 23) * PRIME64_2 + PRIME64_3;
+#endif
+ _CurrentIndex += 4;
+ _RemainingLength -= 4;
+ }
+
+ unsafe
+ {
+ fixed (byte* arrayPtr = _CurrentArray)
+ {
+ while (_RemainingLength-- >= 1)
+ {
+#if HIGHER_VERSIONS
+ _Hash64 = RotateLeft64_11(_Hash64 ^ (arrayPtr[_CurrentIndex++] * PRIME64_5)) * PRIME64_1;
+#else
+ _Hash64 = RotateLeft64(_Hash64 ^ (arrayPtr[_CurrentIndex++] * PRIME64_5), 11) * PRIME64_1;
+#endif
+ }
+ }
+ }
+
+ _Hash64 = (_Hash64 ^ (_Hash64 >> 33)) * PRIME64_2;
+ _Hash64 = (_Hash64 ^ (_Hash64 >> 29)) * PRIME64_3;
+ _Hash64 ^= _Hash64 >> 32;
+
+ _TotalLength = State = 0;
+ return BitConverter.GetBytes(FuncGetFinalHashUInt64(_Hash64));
+ }
+
+#if HIGHER_VERSIONS
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ulong MergeRound64(ulong input, ulong value) => (input ^ Round64(0, value)) * PRIME64_1 + PRIME64_4;
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ulong Round64(ulong input, ulong value) => RotateLeft64_31(input + (value * PRIME64_2)) * PRIME64_1;
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ulong RotateLeft64_1(ulong value) => (value << 1) | (value >> 63); // _ACC64_1
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ulong RotateLeft64_7(ulong value) => (value << 7) | (value >> 57); // _ACC64_2
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ulong RotateLeft64_11(ulong value) => (value << 11) | (value >> 53);
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ulong RotateLeft64_12(ulong value) => (value << 12) | (value >> 52);// _ACC64_3
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ulong RotateLeft64_18(ulong value) => (value << 18) | (value >> 46); // _ACC64_4
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ulong RotateLeft64_23(ulong value) => (value << 23) | (value >> 41);
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ulong RotateLeft64_27(ulong value) => (value << 27) | (value >> 37);
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static ulong RotateLeft64_31(ulong value) => (value << 31) | (value >> 33);
+#else
+ private static ulong MergeRound64(ulong input, ulong value) => (input ^ Round64(0, value)) * PRIME64_1 + PRIME64_4;
+
+ private static ulong Round64(ulong input, ulong value) => RotateLeft64(input + (value * PRIME64_2), 31) * PRIME64_1;
+
+ private static ulong RotateLeft64(ulong value, int count) => (value << count) | (value >> (64 - count));
+#endif
+
+
+ private void Initialize(ulong seed)
+ {
+ HashSizeValue = 64;
+ _Seed64 = seed;
+ Initialize();
+ }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/IStore.cs b/ZeroLevel/Services/PartitionStorage/IStore.cs
new file mode 100644
index 0000000..ca95813
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/IStore.cs
@@ -0,0 +1,18 @@
+using System.Threading.Tasks;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ ///
+ /// Partition store interface
+ ///
+ /// Record key
+ /// The value that is written in the stream
+ /// Value after compression of TInput values by duplicate keys (TInput list or similar)
+ /// Meta information for partition search
+ public interface IStore
+ {
+ IStorePartitionAccessor CreateAccessor(TMeta info);
+
+ Task> Search(StoreSearchRequest searchRequest);
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs b/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs
new file mode 100644
index 0000000..2ca81fc
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs
@@ -0,0 +1,63 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using ZeroLevel.Services.FileSystem;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ ///
+ /// Options
+ ///
+ /// Record key
+ /// The value that is written in the stream
+ /// Value after compression of TInput values by duplicate keys (TInput list or similar)
+ /// Meta information for partition search
+ public class IStoreOptions
+ {
+ ///
+ /// Method for key comparison
+ ///
+ public Func KeyComparer { get; set; }
+
+ ///
+ /// Storage root directory
+ ///
+ public string RootFolder { get; set; }
+ ///
+ /// Maximum degree of parallelis
+ ///
+ public int MaxDegreeOfParallelism { get; set; } = Environment.ProcessorCount / 2;
+ ///
+ /// Function for translating a list of TInput into one TValue
+ ///
+ public Func, TValue> MergeFunction { get; set; }
+ ///
+ /// List of partitions for accessing the catalog
+ ///
+ public List> Partitions { get; set; } = new List>();
+ ///
+ /// File Partition
+ ///
+ public StoreFilePartition FilePartition { get; set; }
+
+ internal string GetFileName(TKey key, TMeta info)
+ {
+ return FilePartition.PathExtractor(key, info);
+ }
+ internal string GetCatalogPath(TMeta info)
+ {
+ var path = RootFolder;
+ foreach (var partition in Partitions)
+ {
+ var pathPart = partition.PathExtractor(info);
+ pathPart = FSUtils.FileNameCorrection(pathPart);
+ if (string.IsNullOrWhiteSpace(pathPart))
+ {
+ throw new Exception($"Partition '{partition.Name}' not return name of part of path");
+ }
+ path = Path.Combine(path, pathPart);
+ }
+ return path;
+ }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs
new file mode 100644
index 0000000..42315f0
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs
@@ -0,0 +1,41 @@
+using System;
+using System.Collections.Generic;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ ///
+ /// Provides read-write operations in catalog partition
+ ///
+ /// Key type
+ /// Type of one input value
+ /// Type of records aggregate
+ public interface IStorePartitionAccessor
+ : IDisposable
+ {
+ ///
+ /// Save one record
+ ///
+ void Store(TKey key, TInput value);
+ ///
+ /// Complete the recording and perform the conversion of the records from
+ /// (TKey; TInput) to (TKey; TValue)
+ ///
+ void CompleteStoreAndRebuild();
+ ///
+ /// Find in catalog partition by key
+ ///
+ StorePartitionKeyValueSearchResult Find(TKey key);
+ ///
+ /// Find in catalog partition by keys
+ ///
+ IEnumerable> Find(IEnumerable keys);
+ ///
+ /// Has any files
+ ///
+ int CountDataFiles();
+ ///
+ /// Remove all files
+ ///
+ void DropData();
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs
new file mode 100644
index 0000000..906a921
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/Store.cs
@@ -0,0 +1,51 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ public class Store :
+ IStore
+ {
+ private readonly IStoreOptions _options;
+ public Store(IStoreOptions options)
+ {
+ if (options == null) throw new ArgumentNullException(nameof(options));
+ _options = options;
+ if (Directory.Exists(_options.RootFolder) == false)
+ {
+ Directory.CreateDirectory(_options.RootFolder);
+ }
+ }
+
+
+
+ public IStorePartitionAccessor CreateAccessor(TMeta info)
+ {
+ return new StorePartitionAccessor(_options, info);
+ }
+
+ public async Task> Search(StoreSearchRequest searchRequest)
+ {
+ var result = new StoreSearchResult();
+ var results = new ConcurrentDictionary>>();
+ if (searchRequest.PartitionSearchRequests?.Any() ?? false)
+ {
+ var partitionsSearchInfo = searchRequest.PartitionSearchRequests.ToDictionary(r => r.Info, r => r.Keys);
+ var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism };
+ await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) =>
+ {
+ using (var accessor = CreateAccessor(pair.Key))
+ {
+ results[pair.Key] = accessor.Find(pair.Value).ToArray();
+ }
+ });
+ }
+ result.Results = results;
+ return result;
+ }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/StoreCatalogPartition.cs b/ZeroLevel/Services/PartitionStorage/StoreCatalogPartition.cs
new file mode 100644
index 0000000..8db142a
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/StoreCatalogPartition.cs
@@ -0,0 +1,16 @@
+using System;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ public class StoreCatalogPartition
+ {
+ public string Name { get; }
+ public Func PathExtractor { get; }
+
+ public StoreCatalogPartition(string name, Func pathExtractor)
+ {
+ Name = name;
+ PathExtractor = pathExtractor;
+ }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/StoreFilePartition.cs b/ZeroLevel/Services/PartitionStorage/StoreFilePartition.cs
new file mode 100644
index 0000000..8cd8384
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/StoreFilePartition.cs
@@ -0,0 +1,16 @@
+using System;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ public class StoreFilePartition
+ {
+ public string Name { get; }
+ public Func PathExtractor { get; }
+
+ public StoreFilePartition(string name, Func pathExtractor)
+ {
+ Name = name;
+ PathExtractor = pathExtractor;
+ }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs
new file mode 100644
index 0000000..abcaaae
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs
@@ -0,0 +1,147 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+using ZeroLevel.Services.FileSystem;
+using ZeroLevel.Services.Serialization;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ public class StorePartitionAccessor
+ : IStorePartitionAccessor
+ {
+ private readonly IStoreOptions _options;
+ private readonly string _catalog;
+ private readonly TMeta _info;
+
+ public string Catalog { get { return _catalog; } }
+ public StorePartitionAccessor(IStoreOptions options, TMeta info)
+ {
+ if (options == null) throw new ArgumentNullException(nameof(options));
+ _info = info;
+ _options = options;
+ _catalog = _options.GetCatalogPath(info);
+ if (Directory.Exists(_catalog) == false)
+ {
+ Directory.CreateDirectory(_catalog);
+ }
+ }
+
+ public StorePartitionKeyValueSearchResult Find(TKey key)
+ {
+ var fileName = _options.GetFileName(key, _info);
+ using (var reader = GetReadStream(fileName))
+ {
+ while (reader.EOS == false)
+ {
+ var k = reader.ReadCompatible();
+ var v = reader.ReadCompatible();
+ var c = _options.KeyComparer(key, k);
+ if (c == 0) return new StorePartitionKeyValueSearchResult { Key = key, Value = v, Found = true };
+ if (c == -1) break;
+ }
+ }
+ return new StorePartitionKeyValueSearchResult { Key = key, Found = false, Value = default };
+ }
+
+ public IEnumerable> Find(IEnumerable keys)
+ {
+ foreach (var key in keys)
+ {
+ yield return Find(key);
+ }
+ }
+
+ public void CompleteStoreAndRebuild()
+ {
+ // Close all write streams
+ foreach (var s in _writeStreams)
+ {
+ try
+ {
+ s.Value.Dispose();
+ }
+ catch { }
+ }
+ var files = Directory.GetFiles(_catalog);
+ if (files != null && files.Length > 1)
+ {
+ Parallel.ForEach(files, file => CompressFile(file));
+ }
+ }
+
+ private void CompressFile(string file)
+ {
+ var dict = new Dictionary>();
+ using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024)))
+ {
+ while (reader.EOS == false)
+ {
+ TKey k = reader.ReadCompatible();
+ TInput v = reader.ReadCompatible();
+ if (false == dict.ContainsKey(k))
+ {
+ dict[k] = new HashSet();
+ }
+ dict[k].Add(v);
+ }
+ }
+ var tempPath = Path.GetTempPath();
+ var tempFile = Path.Combine(tempPath, Path.GetTempFileName());
+ using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
+ {
+ // sort for search acceleration
+ foreach (var pair in dict.OrderBy(p => p.Key))
+ {
+ var v = _options.MergeFunction(pair.Value);
+ writer.SerializeCompatible(pair.Key);
+ writer.SerializeCompatible(v);
+ }
+ }
+ File.Delete(file);
+ File.Move(tempFile, file, true);
+ }
+
+ public void Store(TKey key, TInput value)
+ {
+ var fileName = _options.GetFileName(key, _info);
+ var stream = GetWriteStream(fileName);
+ stream.SerializeCompatible(key);
+ stream.SerializeCompatible(value);
+ }
+
+ private readonly ConcurrentDictionary _writeStreams = new ConcurrentDictionary();
+ private MemoryStreamWriter GetWriteStream(string fileName)
+ {
+ return _writeStreams.GetOrAdd(fileName, k =>
+ {
+ var filePath = Path.Combine(_catalog, k);
+ var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
+ return new MemoryStreamWriter(stream);
+ });
+ }
+ private MemoryStreamReader GetReadStream(string fileName)
+ {
+ var filePath = Path.Combine(_catalog, fileName);
+ var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
+ return new MemoryStreamReader(stream);
+ }
+
+ public void Dispose()
+ {
+ }
+
+ public int CountDataFiles()
+ {
+ var files = Directory.GetFiles(_catalog);
+ return files?.Length ?? 0;
+ }
+
+ public void DropData()
+ {
+ FSUtils.CleanAndTestFolder(_catalog);
+ }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/StorePartitionKeyValueSearchResult.cs b/ZeroLevel/Services/PartitionStorage/StorePartitionKeyValueSearchResult.cs
new file mode 100644
index 0000000..a65544d
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/StorePartitionKeyValueSearchResult.cs
@@ -0,0 +1,9 @@
+namespace ZeroLevel.Services.PartitionStorage
+{
+ public class StorePartitionKeyValueSearchResult
+ {
+ public bool Found { get; set; }
+ public TKey Key { get; set; }
+ public TValue Value { get; set; }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/StoreSearchRequest.cs b/ZeroLevel/Services/PartitionStorage/StoreSearchRequest.cs
new file mode 100644
index 0000000..efa5935
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/StoreSearchRequest.cs
@@ -0,0 +1,14 @@
+using System.Collections.Generic;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ public class PartitionSearchRequest
+ {
+ public TMeta Info { get; set; }
+ public IEnumerable Keys { get; set; }
+ }
+ public class StoreSearchRequest
+ {
+ public IEnumerable> PartitionSearchRequests { get; set; }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/StoreSearchResult.cs b/ZeroLevel/Services/PartitionStorage/StoreSearchResult.cs
new file mode 100644
index 0000000..7fe275a
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/StoreSearchResult.cs
@@ -0,0 +1,9 @@
+using System.Collections.Generic;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ public class StoreSearchResult
+ {
+ public IDictionary>> Results { get; set; }
+ }
+}
diff --git a/ZeroLevel/Services/Pools/ObjectPool.cs b/ZeroLevel/Services/Pools/ObjectPool.cs
index 33dbf86..bf36d31 100644
--- a/ZeroLevel/Services/Pools/ObjectPool.cs
+++ b/ZeroLevel/Services/Pools/ObjectPool.cs
@@ -1,9 +1,4 @@
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Threading;
-
-namespace ZeroLevel.Services.Pools
+namespace ZeroLevel.Services.Pools
{
/*
public enum LoadingMode { Eager, Lazy, LazyExpanding };
diff --git a/ZeroLevel/Services/Serialization/MessageSerializer.cs b/ZeroLevel/Services/Serialization/MessageSerializer.cs
index 2bea9c0..5289183 100644
--- a/ZeroLevel/Services/Serialization/MessageSerializer.cs
+++ b/ZeroLevel/Services/Serialization/MessageSerializer.cs
@@ -51,6 +51,19 @@ namespace ZeroLevel.Services.Serialization
}
}
+ public static void SerializeCompatible(this MemoryStreamWriter writer, object obj)
+ {
+ var direct_seriazlizable = (obj as IBinarySerializable);
+ if (direct_seriazlizable != null)
+ {
+ direct_seriazlizable.Serialize(writer);
+ }
+ else
+ {
+ PrimitiveTypeSerializer.Serialize(writer, obj);
+ }
+ }
+
public static byte[] SerializeCompatible(T obj)
{
if (null == obj)
diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionDataConverter.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionDataConverter.cs
deleted file mode 100644
index bf2fbb4..0000000
--- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionDataConverter.cs
+++ /dev/null
@@ -1,11 +0,0 @@
-using System.Collections.Generic;
-using System.IO;
-
-namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
-{
- public interface IPartitionDataConverter
- {
- IEnumerable ReadFromStorage(Stream stream);
- void WriteToStorage(Stream stream, IEnumerable data);
- }
-}
diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs
deleted file mode 100644
index add1f64..0000000
--- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/IPartitionFileStorage.cs
+++ /dev/null
@@ -1,13 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-
-namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
-{
- public interface IPartitionFileStorage
- {
- Task WriteAsync(TKey key, IEnumerable records);
- Task> CollectAsync(IEnumerable keys, Func filter = null);
- void Drop(TKey key);
- }
-}
diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/Partition.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/Partition.cs
deleted file mode 100644
index 4b11984..0000000
--- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/Partition.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-using System;
-
-namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
-{
- ///
- /// Make part of full file path
- ///
- public class Partition
- {
- public Partition(string name, Func pathExtractor)
- {
- Name = name;
- PathExtractor = pathExtractor;
- }
- public Func PathExtractor { get; }
- public string Name { get; }
- }
-}
diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs
deleted file mode 100644
index 87d1c83..0000000
--- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorage.cs
+++ /dev/null
@@ -1,157 +0,0 @@
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.IO;
-using System.IO.Compression;
-using System.Linq;
-using System.Threading.Tasks;
-using ZeroLevel.Services.FileSystem;
-
-namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
-{
- public class PartitionFileSystemStorage
- : IPartitionFileStorage
- {
-
- private readonly PartitionFileSystemStorageOptions _options;
- public PartitionFileSystemStorage(PartitionFileSystemStorageOptions options)
- {
- if (options.RootFolder == null)
- throw new ArgumentNullException(nameof(options.RootFolder));
- if (options.DataConverter == null)
- throw new ArgumentNullException(nameof(options.DataConverter));
- if (!Directory.Exists(options.RootFolder))
- {
- Directory.CreateDirectory(options.RootFolder);
- }
- _options = options;
- if (options.MergeFiles)
- {
- Sheduller.RemindEvery(TimeSpan.FromMinutes(_options.MergeFrequencyInMinutes), MergeDataFiles);
- }
- }
-
- public void Drop(TKey key)
- {
- var path = GetDataPath(key);
- FSUtils.RemoveFolder(path, 3, 500);
- }
-
- public async Task> CollectAsync(IEnumerable keys, Func filter = null)
- {
- if (filter == null) filter = (_) => true;
- var pathes = keys.Safe().Select(k => GetDataPath(k));
- var files = pathes.Safe().SelectMany(p => Directory.GetFiles(p)).Where(n => n.StartsWith("__") == false);
- var set = new ConcurrentBag();
- if (files.Any())
- {
- var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism };
- await Parallel.ForEachAsync(files, options, async (file, _) =>
- {
- using (var stream = CreateReadStream(file))
- {
- foreach (var item in _options.DataConverter.ReadFromStorage(stream))
- {
- if (filter(item))
- {
- set.Add(item);
- }
- }
- }
- });
- }
- return set;
- }
- public async Task WriteAsync(TKey key, IEnumerable records)
- {
- using (var stream = CreateWriteStream(key))
- {
- _options.DataConverter.WriteToStorage(stream, records);
- await stream.FlushAsync();
- }
- }
-
- #region Private members
- private ConcurrentDictionary _processingPath = new ConcurrentDictionary();
- private void MergeDataFiles()
- {
- var folders = new Stack();
- folders.Push(_options.RootFolder);
- while (folders.Count > 0)
- {
- var dir = folders.Pop();
- MergeFolder(dir);
- foreach (var subdir in Directory.GetDirectories(dir, "*.*", SearchOption.TopDirectoryOnly))
- {
- folders.Push(subdir);
- }
- }
- }
-
- private void MergeFolder(string path)
- {
- var v = _processingPath.GetOrAdd(path, 0);
- if (v != 0) // каталог обрабатывается в настоящий момент
- {
- return;
- }
- var files = Directory.GetFiles(path);
- if (files != null && files.Length > 1)
- {
- // TODO
- }
- }
-
- private string GetDataFilePath(string path)
- {
- return Path.Combine(path, Guid.NewGuid().ToString());
- }
- private string GetDataPath(TKey key)
- {
- var path = _options.RootFolder;
- foreach (var partition in _options.Partitions)
- {
- var pathPart = partition.PathExtractor(key);
- pathPart = FSUtils.FileNameCorrection(pathPart);
- if (string.IsNullOrWhiteSpace(pathPart))
- {
- throw new Exception($"Partition '{partition.Name}' not return name of part of path");
- }
- path = Path.Combine(path, pathPart);
- }
- return path;
- }
- private Stream CreateWriteStream(TKey key)
- {
- var path = GetDataPath(key);
- if (!Directory.Exists(path))
- {
- Directory.CreateDirectory(path);
- }
- var fullPath = GetDataFilePath(path);
- var stream = File.OpenWrite(fullPath);
- if (_options.UseCompression)
- {
- return new GZipStream(stream, CompressionMode.Compress, false);
- }
- return stream;
- }
-
- private Stream CreateReadStream(string path)
- {
- var stream = File.OpenRead(path);
- if (_options.UseCompression)
- {
- var ms = new MemoryStream();
- using (var compressed = new GZipStream(stream, CompressionMode.Decompress, false))
- {
- compressed.CopyTo(ms);
- }
- ms.Position = 0;
- return ms;
- }
- return stream;
- }
- #endregion
- }
-}
diff --git a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorageOptions.cs b/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorageOptions.cs
deleted file mode 100644
index ac8ad5f..0000000
--- a/ZeroLevel/Services/Storages/PartitionFileSystemStorage/PartitionFileSystemStorageOptions.cs
+++ /dev/null
@@ -1,16 +0,0 @@
-using System;
-using System.Collections.Generic;
-
-namespace ZeroLevel.Services.Storages.PartitionFileSystemStorage
-{
- public class PartitionFileSystemStorageOptions
- {
- public string RootFolder { get; set; }
- public int MaxDegreeOfParallelism { get; set; } = Environment.ProcessorCount / 2;
- public bool MergeFiles { get; set; } = false;
- public int MergeFrequencyInMinutes { get; set; } = 180;
- public bool UseCompression { get; set; } = false;
- public IPartitionDataConverter DataConverter { get; set; }
- public List> Partitions { get; set; } = new List>();
- }
-}
diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj
index 850c564..bbde94c 100644
--- a/ZeroLevel/ZeroLevel.csproj
+++ b/ZeroLevel/ZeroLevel.csproj
@@ -6,16 +6,16 @@
ogoun
ogoun
- 3.3.6.8
- BigFileReader update
+ 3.3.7.1
+ PartitionStorage new methods
https://github.com/ogoun/Zero/wiki
Copyright Ogoun 2022
https://github.com/ogoun/Zero
git
- 3.3.6.8
- 3.3.6.8
+ 3.3.7.1
+ 3.3.7.1
AnyCPU;x64;x86
zero.png
full
@@ -67,7 +67,7 @@
-
+
True