Partition storage indexes

pull/4/head
Ogoun 2 years ago
parent 75f620d57d
commit 0b26d2e486

@ -1,7 +1,6 @@
using System.Diagnostics; using System.Diagnostics;
using System.Text; using System.Text;
using ZeroLevel.Services.PartitionStorage; using ZeroLevel.Services.PartitionStorage;
using ZeroLevel.Services.Serialization;
namespace PartitionFileStorageTest namespace PartitionFileStorageTest
{ {
@ -92,6 +91,7 @@ namespace PartitionFileStorageTest
public string Msisdn; public string Msisdn;
public HashSet<string> Msisdns; public HashSet<string> Msisdns;
} }
internal class Program internal class Program
{ {
private class Metadata private class Metadata
@ -116,8 +116,9 @@ namespace PartitionFileStorageTest
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd")), new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd")),
new StoreCatalogPartition<Metadata>("Date", m => m.Incoming ? "incoming" : "outcoming") new StoreCatalogPartition<Metadata>("Date", m => m.Incoming ? "incoming" : "outcoming")
}, },
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? 1 : -1 KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
}; };
options.Index.Enabled = true;
var store = new Store<ulong, ulong, byte[], Metadata>(options); var store = new Store<ulong, ulong, byte[], Metadata>(options);
@ -157,12 +158,22 @@ namespace PartitionFileStorageTest
storeOutcoming.CompleteStoreAndRebuild(); storeOutcoming.CompleteStoreAndRebuild();
sw.Stop(); sw.Stop();
Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms"); Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storeIncoming.RebuildIndex();
storeOutcoming.RebuildIndex();
sw.Stop();
Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms");
} }
private static void TestReading(string source, string root) private static void TestReading(string source, string root)
{ {
var options = new IStoreOptions<ulong, ulong, byte[], Metadata> var options = new IStoreOptions<ulong, ulong, byte[], Metadata>
{ {
Index = new IndexOptions
{
Enabled = false,
FileIndexCount = 64
},
RootFolder = root, RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 1000).ToString()), FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 1000).ToString()),
MergeFunction = list => MergeFunction = list =>
@ -185,16 +196,15 @@ namespace PartitionFileStorageTest
new PartitionSearchRequest<ulong, Metadata> new PartitionSearchRequest<ulong, Metadata>
{ {
Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }, Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true },
Keys = new ulong[] { 79645090604 } Keys = new ulong[] { 79645090604, 79645100604, 79643090604 }
}, },
new PartitionSearchRequest<ulong, Metadata> new PartitionSearchRequest<ulong, Metadata>
{ {
Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }, Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false },
Keys = new ulong[] { 79645090604 } Keys = new ulong[] { 79645090604, 79645100604, 79643090604 }
} }
} }
}; };
var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }); var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true });
Console.WriteLine($"Incoming data files: {storeIncoming.CountDataFiles()}"); Console.WriteLine($"Incoming data files: {storeIncoming.CountDataFiles()}");
var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }); var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false });
@ -219,12 +229,77 @@ namespace PartitionFileStorageTest
Console.WriteLine($"Search time: {sw.ElapsedMilliseconds}ms"); Console.WriteLine($"Search time: {sw.ElapsedMilliseconds}ms");
} }
private struct KeyIndex<TKey>
{
public TKey Key { get; set; }
public long Offset { get; set; }
}
static KeyIndex<long>[] Generate(int count)
{
var arr = new KeyIndex<long>[count];
for (int i = 0; i < count; i++)
{
arr[i] = new KeyIndex<long> { Key = i * 3, Offset = i * 17 };
}
return arr;
}
private static KeyIndex<long> BinarySearchInIndex(KeyIndex<long>[] index,
long key,
Func<long, long, int> keyComparer,
ref int position)
{
if (index == null || index.Length == 0)
{
return new KeyIndex<long> { Key = key, Offset = 0 };
}
int left = position;
int right = index.Length - 1;
int mid = 0;
long test;
while (left <= right)
{
mid = (int)Math.Floor((right + left) / 2d);
test = index[mid].Key;
var c = keyComparer(test, key);
if (c < 0)
{
left = mid + 1;
}
else if (c > 0)
{
right = mid - 1;
}
else
{
position = mid;
return index[mid];
}
}
position = mid;
return index[mid];
}
static void Main(string[] args) static void Main(string[] args)
{ {
var root = @"H:\temp"; var root = @"H:\temp";
var source = @"H:\319a9c31-d823-4dd1-89b0-7fb1bb9c4859.txt"; var source = @"H:\319a9c31-d823-4dd1-89b0-7fb1bb9c4859.txt";
BuildStore(source, root); //BuildStore(source, root);
TestReading(source, root); TestReading(source, root);
/*
Func<long, long, int> keyComparer =
(left, right) =>
(left == right) ? 0 : (left < right) ? -1 : 1;
var indexes = Generate(77);
int position = 0;
for (long i = 65; i < 700; i++)
{
var ind = BinarySearchInIndex(indexes, i, keyComparer, ref position);
Console.WriteLine($"{i}: {ind.Offset}. [{ind.Key}]");
}
*/
Console.ReadKey(); Console.ReadKey();
} }
} }

@ -65,9 +65,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AutoLoader", "AutoLoader",
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AppContainer", "AutoLoader\AppContainer\AppContainer.csproj", "{9DE345EA-955B-41A8-93AF-277C0B5A9AC5}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AppContainer", "AutoLoader\AppContainer\AppContainer.csproj", "{9DE345EA-955B-41A8-93AF-277C0B5A9AC5}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PartitionTest", "PartitionTest", "{BAD88A91-1AFA-48A8-8D39-4846A65B4167}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{6BCAB578-52F0-48F7-903E-B1B284F2D5AE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PartitionFileStorageTest", "PartitionFileStorageTest\PartitionFileStorageTest.csproj", "{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -319,18 +317,18 @@ Global
{9DE345EA-955B-41A8-93AF-277C0B5A9AC5}.Release|x64.Build.0 = Release|Any CPU {9DE345EA-955B-41A8-93AF-277C0B5A9AC5}.Release|x64.Build.0 = Release|Any CPU
{9DE345EA-955B-41A8-93AF-277C0B5A9AC5}.Release|x86.ActiveCfg = Release|Any CPU {9DE345EA-955B-41A8-93AF-277C0B5A9AC5}.Release|x86.ActiveCfg = Release|Any CPU
{9DE345EA-955B-41A8-93AF-277C0B5A9AC5}.Release|x86.Build.0 = Release|Any CPU {9DE345EA-955B-41A8-93AF-277C0B5A9AC5}.Release|x86.Build.0 = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|Any CPU.Build.0 = Debug|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|x64.ActiveCfg = Debug|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Debug|x64.ActiveCfg = Debug|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|x64.Build.0 = Debug|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Debug|x64.Build.0 = Debug|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|x86.ActiveCfg = Debug|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Debug|x86.ActiveCfg = Debug|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Debug|x86.Build.0 = Debug|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Debug|x86.Build.0 = Debug|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|Any CPU.ActiveCfg = Release|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|Any CPU.Build.0 = Release|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Release|Any CPU.Build.0 = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.ActiveCfg = Release|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Release|x64.ActiveCfg = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x64.Build.0 = Release|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Release|x64.Build.0 = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.ActiveCfg = Release|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Release|x86.ActiveCfg = Release|Any CPU
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9}.Release|x86.Build.0 = Release|Any CPU {6BCAB578-52F0-48F7-903E-B1B284F2D5AE}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -345,7 +343,6 @@ Global
{F70842E7-9A1D-4CC4-9F55-0953AEC9C7C8} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15} {F70842E7-9A1D-4CC4-9F55-0953AEC9C7C8} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15}
{2C33D5A3-6CD4-4AAA-A716-B3CD65036E25} = {D5207A5A-2F27-4992-9BA5-0BDCFC59F133} {2C33D5A3-6CD4-4AAA-A716-B3CD65036E25} = {D5207A5A-2F27-4992-9BA5-0BDCFC59F133}
{9DE345EA-955B-41A8-93AF-277C0B5A9AC5} = {2EF83101-63BC-4397-A005-A747189143D4} {9DE345EA-955B-41A8-93AF-277C0B5A9AC5} = {2EF83101-63BC-4397-A005-A747189143D4}
{EB85B2E0-03EE-4F71-B6E9-B6D6B34524B9} = {BAD88A91-1AFA-48A8-8D39-4846A65B4167}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB} SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB}

@ -5,6 +5,12 @@ using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
{ {
public class IndexOptions
{
public bool Enabled { get; set; }
public int FileIndexCount { get; set; } = 64;
}
/// <summary> /// <summary>
/// Options /// Options
/// </summary> /// </summary>
@ -26,7 +32,7 @@ namespace ZeroLevel.Services.PartitionStorage
/// <summary> /// <summary>
/// Maximum degree of parallelis /// Maximum degree of parallelis
/// </summary> /// </summary>
public int MaxDegreeOfParallelism { get; set; } = Environment.ProcessorCount / 2; public int MaxDegreeOfParallelism { get; set; } = 64;
/// <summary> /// <summary>
/// Function for translating a list of TInput into one TValue /// Function for translating a list of TInput into one TValue
/// </summary> /// </summary>
@ -40,6 +46,8 @@ namespace ZeroLevel.Services.PartitionStorage
/// </summary> /// </summary>
public StoreFilePartition<TKey, TMeta> FilePartition { get; set; } public StoreFilePartition<TKey, TMeta> FilePartition { get; set; }
public IndexOptions Index { get; set; } = new IndexOptions { Enabled = false, FileIndexCount = 64 };
internal string GetFileName(TKey key, TMeta info) internal string GetFileName(TKey key, TMeta info)
{ {
return FilePartition.PathExtractor(key, info); return FilePartition.PathExtractor(key, info);

@ -22,6 +22,10 @@ namespace ZeroLevel.Services.PartitionStorage
/// </summary> /// </summary>
void CompleteStoreAndRebuild(); void CompleteStoreAndRebuild();
/// <summary> /// <summary>
/// Rebuild indexes
/// </summary>
void RebuildIndex();
/// <summary>
/// Find in catalog partition by key /// Find in catalog partition by key
/// </summary> /// </summary>
StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key); StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key);
@ -29,6 +33,8 @@ namespace ZeroLevel.Services.PartitionStorage
/// Find in catalog partition by keys /// Find in catalog partition by keys
/// </summary> /// </summary>
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys); IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys);
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate();
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key);
/// <summary> /// <summary>
/// Has any files /// Has any files
/// </summary> /// </summary>

@ -0,0 +1,10 @@
using System.Collections.Generic;
namespace ZeroLevel.Services.PartitionStorage
{
internal interface IStorePartitionIndex<TKey>
{
KeyIndex<TKey> GetOffset(TKey key);
KeyIndex<TKey>[] GetOffset(TKey[] keys, bool inOneGroup);
}
}

@ -35,7 +35,8 @@ namespace ZeroLevel.Services.PartitionStorage
if (searchRequest.PartitionSearchRequests?.Any() ?? false) if (searchRequest.PartitionSearchRequests?.Any() ?? false)
{ {
var partitionsSearchInfo = searchRequest.PartitionSearchRequests.ToDictionary(r => r.Info, r => r.Keys); var partitionsSearchInfo = searchRequest.PartitionSearchRequests.ToDictionary(r => r.Info, r => r.Keys);
var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism }; //var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism };
var options = new ParallelOptions { MaxDegreeOfParallelism = 1 };
await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) => await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) =>
{ {
using (var accessor = CreateAccessor(pair.Key)) using (var accessor = CreateAccessor(pair.Key))

@ -12,6 +12,8 @@ namespace ZeroLevel.Services.PartitionStorage
public class StorePartitionAccessor<TKey, TInput, TValue, TMeta> public class StorePartitionAccessor<TKey, TInput, TValue, TMeta>
: IStorePartitionAccessor<TKey, TInput, TValue> : IStorePartitionAccessor<TKey, TInput, TValue>
{ {
private readonly ConcurrentDictionary<string, MemoryStreamWriter> _writeStreams
= new ConcurrentDictionary<string, MemoryStreamWriter>();
private readonly IStoreOptions<TKey, TInput, TValue, TMeta> _options; private readonly IStoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly string _catalog; private readonly string _catalog;
private readonly TMeta _info; private readonly TMeta _info;
@ -32,6 +34,8 @@ namespace ZeroLevel.Services.PartitionStorage
public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key) public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key)
{ {
var fileName = _options.GetFileName(key, _info); var fileName = _options.GetFileName(key, _info);
if (File.Exists(Path.Combine(_catalog, fileName)))
{
using (var reader = GetReadStream(fileName)) using (var reader = GetReadStream(fileName))
{ {
while (reader.EOS == false) while (reader.EOS == false)
@ -39,18 +43,117 @@ namespace ZeroLevel.Services.PartitionStorage
var k = reader.ReadCompatible<TKey>(); var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>(); var v = reader.ReadCompatible<TValue>();
var c = _options.KeyComparer(key, k); var c = _options.KeyComparer(key, k);
if (c == 0) return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = key, Value = v, Found = true }; if (c == 0) return new StorePartitionKeyValueSearchResult<TKey, TValue>
if (c == -1) break; {
Key = key,
Value = v,
Found = true
};
if (c == -1)
{
break;
}
}
}
}
return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = key,
Found = false,
Value = default
};
}
private IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(string fileName,
TKey[] keys)
{
if (File.Exists(Path.Combine(_catalog, fileName)))
{
if (_options.Index.Enabled)
{
var index = new StorePartitionIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer);
var offsets = index.GetOffset(keys, true);
using (var reader = GetReadStream(fileName))
{
for (int i = 0; i < keys.Length; i++)
{
var searchKey = keys[i];
var off = offsets[i];
reader.Stream.Seek(off.Offset, SeekOrigin.Begin);
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
yield return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = searchKey,
Value = v,
Found = true
};
break;
}
else if (c == -1)
{
break;
}
}
}
}
}
else
{
using (var reader = GetReadStream(fileName))
{
int index = 0;
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{
yield return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = keys_arr[index],
Value = v,
Found = true
};
index++;
}
else if (c == -1)
{
do
{
index++;
if (index < keys_arr.Length)
{
c = _options.KeyComparer(keys_arr[index], k);
}
} while (index < keys_arr.Length && c == -1);
}
}
}
} }
} }
return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = key, Found = false, Value = default };
} }
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys) public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys)
{ {
foreach (var key in keys) var results = keys
.GroupBy(
k => _options.GetFileName(k, _info),
k => k, (key, g) => new { FileName = key, Keys = g.ToArray() });
foreach (var group in results)
{ {
yield return Find(key); foreach (var r in Find(group.FileName, group.Keys))
{
yield return r;
}
} }
} }
@ -111,8 +214,6 @@ namespace ZeroLevel.Services.PartitionStorage
stream.SerializeCompatible(key); stream.SerializeCompatible(key);
stream.SerializeCompatible(value); stream.SerializeCompatible(value);
} }
private readonly ConcurrentDictionary<string, MemoryStreamWriter> _writeStreams = new ConcurrentDictionary<string, MemoryStreamWriter>();
private MemoryStreamWriter GetWriteStream(string fileName) private MemoryStreamWriter GetWriteStream(string fileName)
{ {
return _writeStreams.GetOrAdd(fileName, k => return _writeStreams.GetOrAdd(fileName, k =>
@ -128,20 +229,96 @@ namespace ZeroLevel.Services.PartitionStorage
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
return new MemoryStreamReader(stream); return new MemoryStreamReader(stream);
} }
public void Dispose()
{
}
public int CountDataFiles() public int CountDataFiles()
{ {
var files = Directory.GetFiles(_catalog); var files = Directory.GetFiles(_catalog);
return files?.Length ?? 0; return files?.Length ?? 0;
} }
public void DropData() public void DropData()
{ {
FSUtils.CleanAndTestFolder(_catalog); FSUtils.CleanAndTestFolder(_catalog);
} }
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate()
{
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 1)
{
foreach (var file in files)
{
using (var reader = GetReadStream(Path.GetFileName(file)))
{
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Found = true };
}
}
}
}
}
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key)
{
var fileName = _options.GetFileName(key, _info);
if (File.Exists(Path.Combine(_catalog, fileName)))
{
using (var reader = GetReadStream(fileName))
{
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Found = true };
}
}
}
}
public void RebuildIndex()
{
if (_options.Index.Enabled)
{
var indexFolder = Path.Combine(_catalog, "__indexes__");
FSUtils.CleanAndTestFolder(indexFolder);
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 1)
{
var dict = new Dictionary<TKey, long>();
foreach (var file in files)
{
dict.Clear();
using (var reader = GetReadStream(Path.GetFileName(file)))
{
while (reader.EOS == false)
{
var pos = reader.Stream.Position;
var k = reader.ReadCompatible<TKey>();
dict[k] = pos;
reader.ReadCompatible<TValue>();
}
}
if (dict.Count > _options.Index.FileIndexCount * 8)
{
var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero);
var index_file = Path.Combine(indexFolder, Path.GetFileName(file));
var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(
new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{
for (int i = 0; i < _options.Index.FileIndexCount; i++)
{
var pair = d_arr[i * step];
writer.WriteCompatible(pair.Key);
writer.WriteLong(pair.Value);
}
}
}
}
}
}
}
public void Dispose()
{
}
} }
} }

@ -0,0 +1,130 @@
using System;
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
internal struct KeyIndex<TKey>
{
public TKey Key { get; set; }
public long Offset { get; set; }
}
internal class StorePartitionIndex<TKey, TMeta>
: IStorePartitionIndex<TKey>
{
private readonly Dictionary<string, KeyIndex<TKey>[]> _indexCachee
= new Dictionary<string, KeyIndex<TKey>[]>(1024);
private readonly StoreFilePartition<TKey, TMeta> _filePartition;
private readonly Func<TKey, TKey, int> _keyComparer;
private readonly string _indexFolder;
private readonly bool _indexExists = false;
private readonly TMeta _meta;
public StorePartitionIndex(string partitionFolder, TMeta meta,
StoreFilePartition<TKey, TMeta> filePartition,
Func<TKey, TKey, int> keyComparer)
{
_indexFolder = Path.Combine(partitionFolder, "__indexes__");
_indexExists = Directory.Exists(_indexFolder);
_meta = meta;
_keyComparer = keyComparer;
_filePartition = filePartition;
}
public KeyIndex<TKey> GetOffset(TKey key)
{
if (_indexExists)
{
var index = GetFileIndex(key);
int pos = 0;
if (index != null && index.Length > 0)
{
return BinarySearchInIndex(index, key, ref pos);
}
}
return new KeyIndex<TKey> { Key = key, Offset = 0 };
}
public KeyIndex<TKey>[] GetOffset(TKey[] keys, bool inOneGroup)
{
var result = new KeyIndex<TKey>[keys.Length];
int position = 0;
if (inOneGroup)
{
var index = GetFileIndex(keys[0]);
for (int i = 0; i < keys.Length; i++)
{
result[i] = BinarySearchInIndex(index, keys[i], ref position);
}
}
else
{
for (int i = 0; i < keys.Length; i++)
{
var index = GetFileIndex(keys[i]);
result[i] = BinarySearchInIndex(index, keys[i], ref position);
}
}
return result;
}
private KeyIndex<TKey> BinarySearchInIndex(KeyIndex<TKey>[] index, TKey key, ref int position)
{
if (index == null || index.Length == 0)
{
return new KeyIndex<TKey> { Key = key, Offset = 0 };
}
int left = position;
int right = index.Length - 1;
int mid = 0;
TKey test;
while (left <= right)
{
mid = (int)Math.Floor((right + left) / 2d);
test = index[mid].Key;
var c = _keyComparer(test, key);
if (c < 0)
{
left = mid + 1;
}
else if (c > 0)
{
right = mid - 1;
}
else
{
break;
}
}
position = mid;
while (_keyComparer(index[position].Key, key) > 0 && position > 0) position--;
return index[position];
}
private KeyIndex<TKey>[] GetFileIndex(TKey key)
{
var indexName = _filePartition.PathExtractor.Invoke(key, _meta);
if (_indexCachee.TryGetValue(indexName, out var index)) return index;
var file = Path.Combine(_indexFolder, indexName);
if (File.Exists(file))
{
var list = new List<KeyIndex<TKey>>();
using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None)))
{
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var o = reader.ReadLong();
list.Add(new KeyIndex<TKey> { Key = k, Offset = o });
}
}
_indexCachee[indexName] = list.ToArray();
return _indexCachee[indexName];
}
return null;
}
}
}

@ -6,16 +6,16 @@
</Description> </Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>3.3.7.1</AssemblyVersion> <AssemblyVersion>3.3.7.3</AssemblyVersion>
<PackageReleaseNotes>PartitionStorage new methods</PackageReleaseNotes> <PackageReleaseNotes>PartitionStorage append indexes</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2022</Copyright> <Copyright>Copyright Ogoun 2022</Copyright>
<PackageLicenseUrl></PackageLicenseUrl> <PackageLicenseUrl></PackageLicenseUrl>
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<Version>3.3.7.1</Version> <Version>3.3.7.3</Version>
<FileVersion>3.3.7.1</FileVersion> <FileVersion>3.3.7.3</FileVersion>
<Platforms>AnyCPU;x64;x86</Platforms> <Platforms>AnyCPU;x64;x86</Platforms>
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
<DebugType>full</DebugType> <DebugType>full</DebugType>

Loading…
Cancel
Save

Powered by TurnKey Linux.