Async rebuild index

pull/4/head
Ogoun 2 years ago
parent 3606fe27f7
commit 6eaa0c19a8

@ -128,7 +128,7 @@ namespace PartitionFileStorageTest
sw.Stop();
Log.Info($"Compress: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storePart.RebuildIndex();
await storePart.RebuildIndex();
sw.Stop();
Log.Info($"Rebuild indexes: {sw.ElapsedMilliseconds}ms");
@ -167,7 +167,7 @@ namespace PartitionFileStorageTest
await readPart.RemoveKey(testKeys1[i], false);
}
sw.Restart();
readPart.RebuildIndex();
await readPart.RebuildIndex();
sw.Stop();
Log.Info($"Rebuild indexes after remove: {sw.ElapsedMilliseconds}ms");
Log.Info("Test #1 reading after remove");
@ -296,7 +296,7 @@ namespace PartitionFileStorageTest
sw.Stop();
Log.Info($"Compress: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storePart.RebuildIndex();
await storePart.RebuildIndex();
}
sw.Stop();
Log.Info($"Rebuild indexes: {sw.ElapsedMilliseconds}ms");

@ -2,6 +2,8 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
@ -16,27 +18,30 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly string _indexCatalog;
private readonly string _dataCatalog;
private readonly int _stepValue;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
private readonly IStoreKVSerializer<TKey, TValue> Serializer;
private readonly PhisicalFileAccessorCachee _phisicalFileAccessorCachee;
public IndexBuilder(IndexStepType indexType, int stepValue, string dataCatalog, PhisicalFileAccessorCachee phisicalFileAccessorCachee)
public IndexBuilder(IndexStepType indexType,
int stepValue,
string dataCatalog,
PhisicalFileAccessorCachee phisicalFileAccessorCachee,
IStoreKVSerializer<TKey, TValue> serializer)
{
_dataCatalog = dataCatalog;
_indexCatalog = Path.Combine(dataCatalog, INDEX_SUBFOLDER_NAME);
_indexType = indexType;
_stepValue = stepValue;
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
Serializer = serializer;
_phisicalFileAccessorCachee = phisicalFileAccessorCachee;
}
/// <summary>
/// Rebuild indexes for all files
/// </summary>
internal void RebuildIndex()
internal async Task RebuildIndex()
{
var files = Directory.GetFiles(_dataCatalog);
if (files != null && files.Length > 0)
{
foreach (var file in files)
{
RebuildFileIndex(Path.GetFileName(file));
@ -46,15 +51,15 @@ namespace ZeroLevel.Services.PartitionStorage
/// <summary>
/// Rebuild index for the specified file
/// </summary>
internal void RebuildFileIndex(string file)
internal async Task RebuildFileIndex(string file)
{
if (_indexType == IndexStepType.AbsoluteCount)
{
RebuildFileIndexWithAbsoluteCountIndexes(file);
await RebuildFileIndexWithAbsoluteCountIndexes(file);
}
else
{
RebuildFileIndexWithSteps(file);
await RebuildFileIndexWithSteps(file);
}
}
@ -81,7 +86,7 @@ namespace ZeroLevel.Services.PartitionStorage
/// <summary>
/// Rebuild index with specified number of steps for specified file
/// </summary>
private void RebuildFileIndexWithAbsoluteCountIndexes(string file)
private async Task RebuildFileIndexWithAbsoluteCountIndexes(string file)
{
if (false == Directory.Exists(_indexCatalog))
{
@ -93,9 +98,9 @@ namespace ZeroLevel.Services.PartitionStorage
while (reader.EOS == false)
{
var pos = reader.Position;
var k = _keyDeserializer.Invoke(reader);
dict[k] = pos;
_valueDeserializer.Invoke(reader);
var k = await Serializer.KeyDeserializer.Invoke(reader);
dict[k.Value] = pos;
await Serializer.ValueDeserializer.Invoke(reader);
}
}
if (dict.Count > _stepValue)
@ -130,7 +135,7 @@ namespace ZeroLevel.Services.PartitionStorage
/// <summary>
/// Rebuild index with specified step for keys
/// </summary>
private void RebuildFileIndexWithSteps(string file)
private async Task RebuildFileIndexWithSteps(string file)
{
if (false == Directory.Exists(_indexCatalog))
{
@ -153,8 +158,8 @@ namespace ZeroLevel.Services.PartitionStorage
{
counter--;
var pos = reader.Position;
var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
var k = await Serializer.KeyDeserializer.Invoke(reader);
await Serializer.ValueDeserializer.Invoke(reader);
if (counter == 0)
{
writer.WriteCompatible(k);

@ -16,7 +16,7 @@ namespace ZeroLevel.Services.PartitionStorage
/// <summary>
/// Rebuilds indexes for data in a partition
/// </summary>
void RebuildIndex();
Task RebuildIndex();
/// <summary>
/// Search in a partition for a specified key
/// </summary>

@ -32,6 +32,6 @@ namespace ZeroLevel.Services.PartitionStorage
/// <summary>
/// Rebuilds indexes for data in a partition
/// </summary>
void RebuildIndex();
Task RebuildIndex();
}
}

@ -4,18 +4,22 @@ using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage.Interfaces
{
public interface IStoreSerializer<TKey, TInput, TValue>
public interface IStoreKVSerializer<TKey, TValue>
{
Func<MemoryStreamWriter, TKey, Task> KeySerializer { get; }
Func<MemoryStreamWriter, TInput, Task> InputSerializer { get; }
Func<MemoryStreamWriter, TValue, Task> ValueSerializer { get; }
Func<MemoryStreamReader, Task<DeserializeResult<TKey>>> KeyDeserializer { get; }
Func<MemoryStreamReader, Task<DeserializeResult<TInput>>> InputDeserializer { get; }
Func<MemoryStreamReader, Task<DeserializeResult<TValue>>> ValueDeserializer { get; }
}
public interface IStoreSerializer<TKey, TInput, TValue>
: IStoreKVSerializer<TKey, TValue>
{
Func<MemoryStreamWriter, TInput, Task> InputSerializer { get; }
Func<MemoryStreamReader, Task<DeserializeResult<TInput>>> InputDeserializer { get; }
}
}

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Memory;
using ZeroLevel.Services.PartitionStorage.Interfaces;
@ -40,7 +41,7 @@ namespace ZeroLevel.Services.PartitionStorage.Partition
Directory.CreateDirectory(_catalog);
}
_phisicalFileAccessor = fileAccessorCachee;
_indexBuilder = _options.Index.Enabled ? new IndexBuilder<TKey, TValue>(_options.Index.StepType, _options.Index.StepValue, _catalog, fileAccessorCachee) : null;
_indexBuilder = _options.Index.Enabled ? new IndexBuilder<TKey, TValue>(_options.Index.StepType, _options.Index.StepValue, _catalog, fileAccessorCachee, Serializer) : null;
Serializer = serializer;
}
@ -60,21 +61,21 @@ namespace ZeroLevel.Services.PartitionStorage.Partition
/// <summary>
/// Rebuild indexes for all files
/// </summary>
protected void RebuildIndexes()
protected async Task RebuildIndexes()
{
if (_options.Index.Enabled)
{
_indexBuilder.RebuildIndex();
await _indexBuilder.RebuildIndex();
}
}
/// <summary>
/// Rebuild index for the specified file
/// </summary>
internal void RebuildFileIndex(string file)
internal async Task RebuildFileIndex(string file)
{
if (_options.Index.Enabled)
{
_indexBuilder.RebuildFileIndex(file);
await _indexBuilder.RebuildFileIndex(file);
}
}
/// <summary>

@ -84,7 +84,7 @@ namespace ZeroLevel.Services.PartitionStorage.Partition
}
}
}
public void RebuildIndex() => RebuildIndexes();
public async Task RebuildIndex() => await RebuildIndexes();
#endregion
#region Private methods

@ -132,7 +132,7 @@ namespace ZeroLevel.Services.PartitionStorage
}
// 3. Rebuil index
(_accessor as BasePartition<TKey, TInput, TValue, TMeta>).RebuildFileIndex(name);
await (_accessor as BasePartition<TKey, TInput, TValue, TMeta>).RebuildFileIndex(name);
}
}
// remove temporary files

@ -159,9 +159,9 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
}
public void RebuildIndex()
public async Task RebuildIndex()
{
RebuildIndexes();
await RebuildIndexes();
if (_options.Index.Enabled)
{
Indexes.ResetCachee();
@ -442,7 +442,7 @@ namespace ZeroLevel.Services.PartitionStorage
// Rebuild index if needs
if (_options.Index.Enabled && autoReindex)
{
RebuildFileIndex(filePath);
await RebuildFileIndex(filePath);
}
}
}

@ -88,7 +88,7 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
}
public void RebuildIndex() => RebuildIndexes();
public async Task RebuildIndex() => await RebuildIndexes();
#endregion
#region Private methods

@ -6,7 +6,7 @@
</Description>
<Authors>ogoun</Authors>
<Company>ogoun</Company>
<AssemblyVersion>3.4.0.1</AssemblyVersion>
<AssemblyVersion>3.4.0.2</AssemblyVersion>
<PackageReleaseNotes>KVDB ia async now</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2023</Copyright>
@ -14,8 +14,8 @@
<PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<Version>3.4.0.1</Version>
<FileVersion>3.4.0.1</FileVersion>
<Version>3.4.0.2</Version>
<FileVersion>3.4.0.2</FileVersion>
<Platforms>AnyCPU;x64;x86</Platforms>
<PackageIcon>zero.png</PackageIcon>
<DebugType>full</DebugType>

Loading…
Cancel
Save

Powered by TurnKey Linux.