Partition storage refactoring

New index type
pull/4/head
Ogoun 2 years ago
parent 7157eaa9ff
commit fa86b82d56

@ -15,4 +15,4 @@
<ProjectReference Include="..\ZeroLevel\ZeroLevel.csproj" />
</ItemGroup>
</Project>
</Project>

@ -27,7 +27,11 @@ namespace PartitionFileStorageTest
var r = new Random(Environment.TickCount);
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{
Index = new IndexOptions { Enabled = true, FileIndexCount = 64 },
Index = new IndexOptions
{
Enabled = true,
StepType = IndexStepType.AbsoluteCount,
StepValue = 64 },
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
@ -81,7 +85,12 @@ namespace PartitionFileStorageTest
var r = new Random(Environment.TickCount);
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{
Index = new IndexOptions { Enabled = true, FileIndexCount = 64 },
Index = new IndexOptions
{
Enabled = true,
StepType = IndexStepType.Step,
StepValue = 1
},
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
@ -99,7 +108,7 @@ namespace PartitionFileStorageTest
var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) });
/*Log.Info("Fill start");
for (int i = 0; i < 10000000; i++)
{
@ -141,7 +150,7 @@ namespace PartitionFileStorageTest
//Console.ReadKey();
FSUtils.CleanAndTestFolder(root);*/
var sw = new Stopwatch();
sw.Start();
@ -179,7 +188,7 @@ namespace PartitionFileStorageTest
sw.Stop();
Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storePart.CompleteAdding();
storePart.CompleteAdding();
storePart.Compress();
sw.Stop();
Log.Info($"Compress: {sw.ElapsedMilliseconds}ms");
@ -309,62 +318,5 @@ namespace PartitionFileStorageTest
//TestRangeCompressionAndInversion();
Console.ReadKey();
}
private static void TestRangeCompressionAndInversion()
{
var list = new List<FilePositionRange>();
list.Add(new FilePositionRange { Start = 5, End = 12 });
list.Add(new FilePositionRange { Start = 16, End = 21 });
RangeCompression(list);
foreach (var r in list)
{
Console.WriteLine($"{r.Start}: {r.End}");
}
Console.WriteLine("Invert ranges");
var inverted = RangeInversion(list, 21);
foreach (var r in inverted)
{
Console.WriteLine($"{r.Start}: {r.End}");
}
}
private static void RangeCompression(List<FilePositionRange> ranges)
{
for (var i = 0; i < ranges.Count - 1; i++)
{
var current = ranges[i];
var next = ranges[i + 1];
if (current.End == next.Start)
{
current.End = next.End;
ranges.RemoveAt(i + 1);
i--;
}
}
}
private static List<FilePositionRange> RangeInversion(List<FilePositionRange> ranges, long length)
{
if ((ranges?.Count ?? 0) == 0) return new List<FilePositionRange> { new FilePositionRange { Start = 0, End = length } };
var inverted = new List<FilePositionRange>();
var current = new FilePositionRange { Start = 0, End = ranges[0].Start };
for (var i = 0; i < ranges.Count; i++)
{
current.End = ranges[i].Start;
if (current.Start != current.End)
{
inverted.Add(new FilePositionRange { Start = current.Start, End = current.End });
}
current.Start = ranges[i].End;
}
if (current.End != length)
{
if (current.Start != length)
{
inverted.Add(new FilePositionRange { Start = current.Start, End = length });
}
}
return inverted;
}
}
}

@ -1,6 +1,6 @@
namespace ZeroLevel.Services.PartitionStorage
{
public class FilePositionRange
internal sealed class FilePositionRange
{
public long Start;
public long End;

@ -0,0 +1,164 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
/// <summary>
/// Responsible for building index files
/// </summary>
internal sealed class IndexBuilder<TKey, TValue>
{
private const string INDEX_SUBFOLDER_NAME = "__indexes__";
private readonly IndexStepType _indexType;
private readonly string _indexCatalog;
private readonly string _dataCatalog;
private readonly int _stepValue;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
public IndexBuilder(IndexStepType indexType, int stepValue, string dataCatalog)
{
_dataCatalog = dataCatalog;
_indexCatalog = Path.Combine(dataCatalog, INDEX_SUBFOLDER_NAME);
_indexType = indexType;
_stepValue = stepValue;
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
}
/// <summary>
/// Rebuild indexes for all files
/// </summary>
internal void RebuildIndex()
{
FSUtils.CleanAndTestFolder(_indexCatalog);
var files = Directory.GetFiles(_dataCatalog);
if (files != null && files.Length > 0)
{
foreach (var file in files)
{
RebuildFileIndex(file);
}
}
}
/// <summary>
/// Rebuild index for the specified file
/// </summary>
internal void RebuildFileIndex(string file)
{
if (_indexType == IndexStepType.AbsoluteCount)
{
RebuildFileIndexWithAbsoluteCountIndexes(file);
}
else
{
RebuildFileIndexWithSteps(file);
}
}
/// <summary>
/// Delete the index for the specified file
/// </summary>
internal void DropFileIndex(string file)
{
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
if (File.Exists(index_file))
{
File.Delete(index_file);
}
}
/// <summary>
/// Rebuild index with specified number of steps for specified file
/// </summary>
private void RebuildFileIndexWithAbsoluteCountIndexes(string file)
{
if (false == Directory.Exists(_indexCatalog))
{
Directory.CreateDirectory(_indexCatalog);
}
var dict = new Dictionary<TKey, long>();
if (TryGetReadStream(file, out var reader))
{
using (reader)
{
while (reader.EOS == false)
{
var pos = reader.Position;
var k = _keyDeserializer.Invoke(reader);
dict[k] = pos;
_valueDeserializer.Invoke(reader);
}
}
}
if (dict.Count > _stepValue)
{
var step = (int)Math.Round(((float)dict.Count / (float)_stepValue), MidpointRounding.ToZero);
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{
for (int i = 0; i < _stepValue; i++)
{
var pair = d_arr[i * step];
writer.WriteCompatible(pair.Key);
writer.WriteLong(pair.Value);
}
}
}
}
/// <summary>
/// Rebuild index with specified step for keys
/// </summary>
private void RebuildFileIndexWithSteps(string file)
{
if (false == Directory.Exists(_indexCatalog))
{
Directory.CreateDirectory(_indexCatalog);
}
if (TryGetReadStream(file, out var reader))
{
using (reader)
{
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{
var counter = _stepValue;
while (reader.EOS == false)
{
counter--;
var pos = reader.Position;
var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
if (counter == 0)
{
writer.WriteCompatible(k);
writer.WriteLong(pos);
counter = _stepValue;
}
}
}
}
}
}
/// <summary>
/// Attempting to open a file for reading
/// </summary>
private bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
{
try
{
var filePath = Path.Combine(_dataCatalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
reader = new MemoryStreamReader(stream);
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionAccessor.TryGetReadStream]");
}
reader = null;
return false;
}
}
}

@ -5,7 +5,7 @@ using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
internal class StorePartitionSparseIndex<TKey, TMeta>
internal sealed class StorePartitionSparseIndex<TKey, TMeta>
: IStorePartitionIndex<TKey>
{
private readonly Dictionary<string, KeyIndex<TKey>[]> _indexCachee
@ -101,7 +101,7 @@ namespace ZeroLevel.Services.PartitionStorage
private KeyIndex<TKey>[] GetFileIndex(TKey key)
{
var indexName = _filePartition.PathExtractor.Invoke(key, _meta);
var indexName = _filePartition.FileNameExtractor.Invoke(key, _meta);
if (_indexCachee.TryGetValue(indexName, out var index)) return index;
var file = Path.Combine(_indexFolder, indexName);

@ -12,23 +12,48 @@ namespace ZeroLevel.Services.PartitionStorage
: IStorePartitionBase<TKey, TInput, TValue>
{
/// <summary>
/// Rebuild indexes
/// Rebuilds indexes for data in a partition
/// </summary>
void RebuildIndex();
/// <summary>
/// Find in catalog partition by key
/// Search in a partition for a specified key
/// </summary>
StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key);
/// <summary>
/// Find in catalog partition by keys
/// Search in a partition for a specified keys
/// </summary>
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys);
/// <summary>
/// Iterating over all recorded data
/// </summary>
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate();
/// <summary>
/// Iterating over all recorded data of the file with the specified key
/// </summary>
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key);
/// <summary>
/// Deleting the specified key and associated data
/// </summary>
/// <param name="key">Key</param>
/// <param name="autoReindex">true - automatically rebuild the index of the file from which data was deleted (default = false)</param>
void RemoveKey(TKey key, bool autoReindex = false);
/// <summary>
/// Deleting the specified keys and associated data
/// </summary>
/// <param name="keys">Keys</param>
/// <param name="autoReindex">true - automatically rebuild the index of the file from which data was deleted (default = true)</param>
void RemoveKeys(IEnumerable<TKey> keys, bool autoReindex = true);
void RemoveAllExceptKey(TKey key, bool autoReindex = false);
/// <summary>
/// Delete all keys with data except the specified key
/// </summary>
/// <param name="key">Key</param>
/// <param name="autoReindex">true - automatically rebuild the index of the file from which data was deleted (default = true)</param>
void RemoveAllExceptKey(TKey key, bool autoReindex = true);
/// <summary>
/// Delete all keys with data other than the specified ones
/// </summary>
/// <param name="keys">Keys</param>
/// <param name="autoReindex">true - automatically rebuild the index of the file from which data was deleted (default = true)</param>
void RemoveAllExceptKeys(IEnumerable<TKey> keys, bool autoReindex = true);
}
}

@ -9,7 +9,7 @@ namespace ZeroLevel.Services.PartitionStorage
/// <typeparam name="TInput">Type of one input value</typeparam>
/// <typeparam name="TValue">Type of records aggregate</typeparam>
public interface IStorePartitionBuilder<TKey, TInput, TValue>
: IStorePartitionBase<TKey, TInput, TValue>
: IStorePartitionBase<TKey, TInput, TValue>
{
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TInput>> Iterate();
/// <summary>
@ -21,11 +21,11 @@ namespace ZeroLevel.Services.PartitionStorage
/// </summary>
void CompleteAdding();
/// <summary>
/// Perform the conversion of the records from (TKey; TInput) to (TKey; TValue). Called after CompleteAdding
/// Performs compression/grouping of recorded data in a partition
/// </summary>
void Compress();
/// <summary>
/// Rebuilds index files. Only for compressed data.
/// Rebuilds indexes for data in a partition
/// </summary>
void RebuildIndex();
}

@ -0,0 +1,14 @@
using System;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage.Interfaces
{
public interface IStoreSerializer<TKey, TInput, TValue>
{
Action<MemoryStreamWriter, TKey> KeySerializer { get; }
Action<MemoryStreamWriter, TInput> InputSerializer { get; }
Func<MemoryStreamReader, TKey> KeyDeserializer { get; }
Func<MemoryStreamReader, TInput> InputDeserializer { get; }
Func<MemoryStreamReader, TValue> ValueDeserializer { get; }
}
}

@ -1,8 +1,16 @@
namespace ZeroLevel.Services.PartitionStorage
{
public enum IndexStepType
{
AbsoluteCount,
Step
}
public class IndexOptions
{
public bool Enabled { get; set; }
public int FileIndexCount { get; set; } = 64;
public IndexStepType StepType { get; set; } = IndexStepType.AbsoluteCount;
public int StepValue { get; set; } = 64;
}
}

@ -44,12 +44,13 @@ namespace ZeroLevel.Services.PartitionStorage
public IndexOptions Index { get; set; } = new IndexOptions
{
Enabled = false,
FileIndexCount = 64
StepValue = 64,
StepType = IndexStepType.AbsoluteCount
};
internal string GetFileName(TKey key, TMeta info)
{
return FilePartition.PathExtractor(key, info);
return FilePartition.FileNameExtractor(key, info);
}
internal string GetCatalogPath(TMeta info)
{
@ -74,7 +75,8 @@ namespace ZeroLevel.Services.PartitionStorage
Index = new IndexOptions
{
Enabled = this.Index.Enabled,
FileIndexCount = this.Index.FileIndexCount
StepValue = 64,
StepType = IndexStepType.AbsoluteCount
},
FilePartition = this.FilePartition,
KeyComparer = this.KeyComparer,

@ -0,0 +1,139 @@
using System.IO;
using System;
using ZeroLevel.Services.Serialization;
using System.Collections.Concurrent;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.PartitionStorage.Interfaces;
namespace ZeroLevel.Services.PartitionStorage.Partition
{
/// <summary>
/// General operations with a partition
/// </summary>
internal abstract class BasePartition<TKey, TInput, TValue, TMeta>
: IStorePartitionBase<TKey, TInput, TValue>
{
public string Catalog { get { return _catalog; } }
protected readonly TMeta _info;
protected readonly string _catalog;
protected IStoreSerializer<TKey, TInput, TValue> Serializer { get; }
protected readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly IndexBuilder<TKey, TValue> _indexBuilder;
private readonly ConcurrentDictionary<string, MemoryStreamWriter> _writeStreams = new ConcurrentDictionary<string, MemoryStreamWriter>();
internal BasePartition(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info,
IStoreSerializer<TKey, TInput, TValue> serializer)
{
_options = options;
_info = info;
_catalog = _options.GetCatalogPath(info);
if (Directory.Exists(_catalog) == false)
{
Directory.CreateDirectory(_catalog);
}
_indexBuilder = _options.Index.Enabled ? new IndexBuilder<TKey, TValue>(_options.Index.StepType, _options.Index.StepValue, _catalog) : null;
Serializer = serializer;
}
#region IStorePartitionBase
public int CountDataFiles() => Directory.Exists(_catalog) ? (Directory.GetFiles(_catalog)?.Length ?? 0) : 0;
public string GetCatalogPath() => _catalog;
public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
public void Dispose()
{
CloseWriteStreams();
}
#endregion
/// <summary>
/// Rebuild indexes for all files
/// </summary>
protected void RebuildIndexes()
{
if (_options.Index.Enabled)
{
_indexBuilder.RebuildIndex();
}
}
/// <summary>
/// Rebuild index for the specified file
/// </summary>
internal void RebuildFileIndex(string file)
{
if (_options.Index.Enabled)
{
_indexBuilder.RebuildFileIndex(file);
}
}
/// <summary>
/// Delete the index for the specified file
/// </summary>
internal void DropFileIndex(string file)
{
if (_options.Index.Enabled)
{
_indexBuilder.DropFileIndex(file);
}
}
/// <summary>
/// Close all streams for writing
/// </summary>
protected void CloseWriteStreams()
{
foreach (var s in _writeStreams)
{
try
{
s.Value.Stream.Flush();
s.Value.Dispose();
}
catch { }
}
_writeStreams.Clear();
}
/// <summary>
/// Attempting to open a file for writing
/// </summary>
protected bool TryGetWriteStream(string fileName, out MemoryStreamWriter writer)
{
try
{
writer = _writeStreams.GetOrAdd(fileName, k =>
{
var filePath = Path.Combine(_catalog, k);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
return new MemoryStreamWriter(stream);
});
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionBuilder.TryGetWriteStream]");
}
writer = null;
return false;
}
/// <summary>
/// Attempting to open a file for reading
/// </summary>
protected bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
{
try
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
reader = new MemoryStreamReader(stream);
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionBuilder.TryGetReadStream]");
}
reader = null;
return false;
}
}
}

@ -2,9 +2,18 @@
namespace ZeroLevel.Services.PartitionStorage
{
/// <summary>
/// Partition, contains the method of forming the path
/// </summary>
public class StoreCatalogPartition<TMeta>
{
/// <summary>
/// Name of partition, just for info
/// </summary>
public string Name { get; }
/// <summary>
/// Path generator
/// </summary>
public Func<TMeta, string> PathExtractor { get; }
public StoreCatalogPartition(string name, Func<TMeta, string> pathExtractor)

@ -2,15 +2,24 @@
namespace ZeroLevel.Services.PartitionStorage
{
/// <summary>
/// File partition, contains the method of forming the path
/// </summary>
public class StoreFilePartition<TKey, TMeta>
{
/// <summary>
/// Name of partition, just for info
/// </summary>
public string Name { get; }
public Func<TKey, TMeta, string> PathExtractor { get; }
/// <summary>
/// File name generator
/// </summary>
public Func<TKey, TMeta, string> FileNameExtractor { get; }
public StoreFilePartition(string name, Func<TKey, TMeta, string> pathExtractor)
{
Name = name;
PathExtractor = pathExtractor;
FileNameExtractor = pathExtractor;
}
}
}

@ -3,17 +3,15 @@ using System.Collections.Generic;
using System.IO;
using System.Linq;
using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.PartitionStorage.Partition;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
/// <summary>
/// For writing new values in exist partition
///
/// ORDER: Store -> CompleteAddingAndCompress -> RebuildIndex
///
/// Performs merging of new data with existing data in the partition
/// </summary>
public class StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>
internal sealed class StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>
: IStorePartitionMergeBuilder<TKey, TInput, TValue>
{
private readonly Func<TValue, IEnumerable<TInput>> _decompress;
@ -30,16 +28,19 @@ namespace ZeroLevel.Services.PartitionStorage
/// Write catalog
/// </summary>
private readonly IStorePartitionBuilder<TKey, TInput, TValue> _temporaryAccessor;
public StoreMergePartitionAccessor(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info, Func<TValue, IEnumerable<TInput>> decompress)
TMeta info,
Func<TValue, IEnumerable<TInput>> decompress,
IStoreSerializer<TKey, TInput, TValue> serializer)
{
if (decompress == null) throw new ArgumentNullException(nameof(decompress));
_decompress = decompress;
_accessor = new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(options, info);
_accessor = new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(options, info, serializer);
_temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString());
var tempOptions = options.Clone();
tempOptions.RootFolder = _temporaryFolder;
_temporaryAccessor = new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(tempOptions, info);
_temporaryAccessor = new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(tempOptions, info, serializer);
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
@ -54,6 +55,10 @@ namespace ZeroLevel.Services.PartitionStorage
public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value);
public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(),
_temporaryAccessor.CountDataFiles());
/// <summary>
/// Performs compression/grouping of recorded data in a partition
/// </summary>
public void Compress()
{
var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath());
@ -102,8 +107,7 @@ namespace ZeroLevel.Services.PartitionStorage
File.Move(file, Path.Combine(folder, name), true);
// 3. Rebuil index
(_accessor as StorePartitionAccessor<TKey, TInput, TValue, TMeta>)
.RebuildFileIndex(file);
(_accessor as BasePartition<TKey, TInput, TValue, TMeta>).RebuildFileIndex(name);
}
}
// remove temporary files

@ -2,41 +2,23 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Serialization;
using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.PartitionStorage.Partition;
namespace ZeroLevel.Services.PartitionStorage
{
public class StorePartitionAccessor<TKey, TInput, TValue, TMeta>
: IStorePartitionAccessor<TKey, TInput, TValue>
internal sealed class StorePartitionAccessor<TKey, TInput, TValue, TMeta>
: BasePartition<TKey, TInput, TValue, TMeta>, IStorePartitionAccessor<TKey, TInput, TValue>
{
private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly string _catalog;
private readonly string _indexCatalog;
private readonly TMeta _info;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
public string Catalog { get { return _catalog; } }
public StorePartitionAccessor(StoreOptions<TKey, TInput, TValue, TMeta> options, TMeta info)
public StorePartitionAccessor(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info,
IStoreSerializer<TKey, TInput, TValue> serializer)
: base(options, info, serializer)
{
if (options == null) throw new ArgumentNullException(nameof(options));
_info = info;
_options = options;
_catalog = _options.GetCatalogPath(info);
if (_options.Index.Enabled)
{
_indexCatalog = Path.Combine(_catalog, "__indexes__");
}
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
}
#region API methods
public int CountDataFiles() => Directory.Exists(_catalog) ? (Directory.GetFiles(_catalog)?.Length ?? 0) : 0;
public string GetCatalogPath() => _catalog;
public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
#region IStorePartitionAccessor
public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key)
{
var fileName = _options.GetFileName(key, _info);
@ -59,8 +41,8 @@ namespace ZeroLevel.Services.PartitionStorage
}
while (reader.EOS == false)
{
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var k = Serializer.KeyDeserializer.Invoke(reader);
var v = Serializer.ValueDeserializer.Invoke(reader);
var c = _options.KeyComparer(key, k);
if (c == 0) return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
@ -119,8 +101,8 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var k = Serializer.KeyDeserializer.Invoke(reader);
var v = Serializer.ValueDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Status = SearchResult.Success };
}
}
@ -139,30 +121,16 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var k = Serializer.KeyDeserializer.Invoke(reader);
var v = Serializer.ValueDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Status = SearchResult.Success };
}
}
}
}
}
public void RebuildIndex()
{
if (_options.Index.Enabled)
{
FSUtils.CleanAndTestFolder(_indexCatalog);
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 0)
{
foreach (var file in files)
{
RebuildFileIndex(file);
}
}
}
}
public void RemoveAllExceptKey(TKey key, bool autoReindex = false)
public void RebuildIndex() => RebuildIndexes();
public void RemoveAllExceptKey(TKey key, bool autoReindex = true)
{
RemoveAllExceptKeys(new[] { key }, autoReindex);
}
@ -194,59 +162,6 @@ namespace ZeroLevel.Services.PartitionStorage
}
#endregion
#region Internal methods
internal void DropFileIndex(string file)
{
if (_options.Index.Enabled)
{
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
if (File.Exists(index_file))
{
File.Delete(index_file);
}
}
}
internal void RebuildFileIndex(string file)
{
if (_options.Index.Enabled)
{
if (false == Directory.Exists(_indexCatalog))
{
Directory.CreateDirectory(_indexCatalog);
}
var dict = new Dictionary<TKey, long>();
if (TryGetReadStream(file, out var reader))
{
using (reader)
{
while (reader.EOS == false)
{
var pos = reader.Position;
var k = _keyDeserializer.Invoke(reader);
dict[k] = pos;
_valueDeserializer.Invoke(reader);
}
}
}
if (dict.Count > _options.Index.FileIndexCount * 8)
{
var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero);
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(
new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{
for (int i = 0; i < _options.Index.FileIndexCount; i++)
{
var pair = d_arr[i * step];
writer.WriteCompatible(pair.Key);
writer.WriteLong(pair.Value);
}
}
}
}
}
#endregion
#region Private methods
private IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(string fileName,
@ -269,8 +184,8 @@ namespace ZeroLevel.Services.PartitionStorage
reader.Seek(off.Offset, SeekOrigin.Begin);
while (reader.EOS == false)
{
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var k = Serializer.KeyDeserializer.Invoke(reader);
var v = Serializer.ValueDeserializer.Invoke(reader);
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
@ -301,8 +216,8 @@ namespace ZeroLevel.Services.PartitionStorage
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var k = Serializer.KeyDeserializer.Invoke(reader);
var v = Serializer.ValueDeserializer.Invoke(reader);
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{
@ -355,8 +270,8 @@ namespace ZeroLevel.Services.PartitionStorage
while (reader.EOS == false)
{
var startPosition = reader.Position;
var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
var k = Serializer.KeyDeserializer.Invoke(reader);
Serializer.ValueDeserializer.Invoke(reader);
var endPosition = reader.Position;
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
@ -383,8 +298,8 @@ namespace ZeroLevel.Services.PartitionStorage
while (reader.EOS == false && index < keys_arr.Length)
{
var startPosition = reader.Position;
var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
var k = Serializer.KeyDeserializer.Invoke(reader);
Serializer.ValueDeserializer.Invoke(reader);
var endPosition = reader.Position;
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
@ -447,22 +362,6 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
private bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
{
try
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
reader = new MemoryStreamReader(stream);
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionAccessor.TryGetReadStream]");
}
reader = null;
return false;
}
#endregion
#region Static
@ -514,9 +413,5 @@ namespace ZeroLevel.Services.PartitionStorage
target.Write(buffer, 0, buffer.Length);
}
#endregion
public void Dispose()
{
}
}
}

@ -1,77 +1,40 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.PartitionStorage.Partition;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
public class StorePartitionBuilder<TKey, TInput, TValue, TMeta>
: IStorePartitionBuilder<TKey, TInput, TValue>
internal sealed class StorePartitionBuilder<TKey, TInput, TValue, TMeta>
: BasePartition<TKey, TInput, TValue, TMeta>, IStorePartitionBuilder<TKey, TInput, TValue>
{
private readonly ConcurrentDictionary<string, MemoryStreamWriter> _writeStreams
= new ConcurrentDictionary<string, MemoryStreamWriter>();
private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly string _catalog;
private readonly TMeta _info;
private readonly Action<MemoryStreamWriter, TKey> _keySerializer;
private readonly Action<MemoryStreamWriter, TInput> _inputSerializer;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TInput> _inputDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
public string Catalog { get { return _catalog; } }
public StorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options, TMeta info)
public StorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info,
IStoreSerializer<TKey, TInput, TValue> serializer)
: base(options, info, serializer)
{
if (options == null) throw new ArgumentNullException(nameof(options));
_info = info;
_options = options;
_catalog = _options.GetCatalogPath(info);
if (Directory.Exists(_catalog) == false)
{
Directory.CreateDirectory(_catalog);
}
_keySerializer = MessageSerializer.GetSerializer<TKey>();
_inputSerializer = MessageSerializer.GetSerializer<TInput>();
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_inputDeserializer = MessageSerializer.GetDeserializer<TInput>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
}
#region API methods
public int CountDataFiles() => Directory.Exists(_catalog) ? (Directory.GetFiles(_catalog)?.Length ?? 0) : 0;
public string GetCatalogPath() => _catalog;
public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
#region IStorePartitionBuilder
public void Store(TKey key, TInput value)
{
var fileName = _options.GetFileName(key, _info);
if (TryGetWriteStream(fileName, out var stream))
{
_keySerializer.Invoke(stream, key);
Serializer.KeySerializer.Invoke(stream, key);
Thread.MemoryBarrier();
_inputSerializer.Invoke(stream, value);
Serializer.InputSerializer.Invoke(stream, value);
}
}
public void CompleteAdding()
{
// Close all write streams
foreach (var s in _writeStreams)
{
try
{
s.Value.Stream.Flush();
s.Value.Dispose();
}
catch { }
}
_writeStreams.Clear();
CloseWriteStreams();
}
public void Compress()
{
@ -94,8 +57,8 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
var key = _keyDeserializer.Invoke(reader);
var val = _inputDeserializer.Invoke(reader);
var key = Serializer.KeyDeserializer.Invoke(reader);
var val = Serializer.InputDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult<TKey, TInput> { Key = key, Value = val, Status = SearchResult.Success };
}
}
@ -103,53 +66,7 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
}
public void RebuildIndex()
{
if (_options.Index.Enabled)
{
var indexFolder = Path.Combine(_catalog, "__indexes__");
FSUtils.CleanAndTestFolder(indexFolder);
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 0)
{
var dict = new Dictionary<TKey, long>();
foreach (var file in files)
{
if (TryGetReadStream(file, out var reader))
{
using (reader)
{
while (reader.EOS == false)
{
var pos = reader.Stream.Position;
var key = _keyDeserializer.Invoke(reader);
dict[key] = pos;
if (reader.EOS) break;
_valueDeserializer.Invoke(reader);
}
}
if (dict.Count > _options.Index.FileIndexCount * 8)
{
var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero);
var index_file = Path.Combine(indexFolder, Path.GetFileName(file));
var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(
new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{
for (int i = 0; i < _options.Index.FileIndexCount; i++)
{
var pair = d_arr[i * step];
writer.WriteCompatible(pair.Key);
writer.WriteLong(pair.Value);
}
}
}
}
dict.Clear();
}
}
}
}
public void RebuildIndex() => RebuildIndexes();
#endregion
#region Private methods
@ -160,7 +77,7 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
var key = _keyDeserializer.Invoke(reader);
var key = Serializer.KeyDeserializer.Invoke(reader);
if (false == dict.ContainsKey(key))
{
dict[key] = new HashSet<TInput>();
@ -169,7 +86,7 @@ namespace ZeroLevel.Services.PartitionStorage
{
break;
}
var input = _inputDeserializer.Invoke(reader);
var input = Serializer.InputDeserializer.Invoke(reader);
dict[key].Add(input);
}
}
@ -188,45 +105,6 @@ namespace ZeroLevel.Services.PartitionStorage
File.Delete(file);
File.Move(tempFile, file, true);
}
private bool TryGetWriteStream(string fileName, out MemoryStreamWriter writer)
{
try
{
writer = _writeStreams.GetOrAdd(fileName, k =>
{
var filePath = Path.Combine(_catalog, k);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
return new MemoryStreamWriter(stream);
});
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionBuilder.TryGetWriteStream]");
}
writer = null;
return false;
}
private bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
{
try
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
reader = new MemoryStreamReader(stream);
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionBuilder.TryGetReadStream]");
}
reader = null;
return false;
}
#endregion
public void Dispose()
{
}
}
}

@ -0,0 +1,9 @@
namespace ZeroLevel.Services.PartitionStorage
{
public enum SearchResult
{
Success,
NotFound,
FileLocked
}
}

@ -1,12 +1,5 @@
namespace ZeroLevel.Services.PartitionStorage
{
public enum SearchResult
{
Success,
NotFound,
FileLocked
}
public class StorePartitionKeyValueSearchResult<TKey, TValue>
{
public SearchResult Status { get; set; }

@ -13,10 +13,20 @@ namespace ZeroLevel.Services.PartitionStorage
IStore<TKey, TInput, TValue, TMeta>
{
private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
public Store(StoreOptions<TKey, TInput, TValue, TMeta> options)
private readonly IStoreSerializer<TKey, TInput, TValue> _serializer;
public Store(StoreOptions<TKey, TInput, TValue, TMeta> options,
IStoreSerializer<TKey, TInput, TValue> serializer = null)
{
if (options == null) throw new ArgumentNullException(nameof(options));
_options = options;
if (serializer == null)
{
_serializer = new StoreStandartSerializer<TKey, TInput, TValue>();
}
else
{
_serializer = serializer;
}
if (Directory.Exists(_options.RootFolder) == false)
{
Directory.CreateDirectory(_options.RootFolder);
@ -32,17 +42,17 @@ namespace ZeroLevel.Services.PartitionStorage
public IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info)
{
return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info);
return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, _serializer);
}
public IStorePartitionBuilder<TKey, TInput, TValue> CreateBuilder(TMeta info)
{
return new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(_options, info);
return new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(_options, info, _serializer);
}
public IStorePartitionMergeBuilder<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor)
{
return new StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, decompressor);
return new StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, decompressor, _serializer);
}
public async Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest)

@ -0,0 +1,35 @@
using System;
using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
internal sealed class StoreStandartSerializer<TKey, TInput, TValue>
: IStoreSerializer<TKey, TInput, TValue>
{
private readonly Action<MemoryStreamWriter, TKey> _keySerializer;
private readonly Action<MemoryStreamWriter, TInput> _inputSerializer;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TInput> _inputDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
public StoreStandartSerializer()
{
_keySerializer = MessageSerializer.GetSerializer<TKey>();
_inputSerializer = MessageSerializer.GetSerializer<TInput>();
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_inputDeserializer = MessageSerializer.GetDeserializer<TInput>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
}
public Action<MemoryStreamWriter, TKey> KeySerializer => _keySerializer;
public Action<MemoryStreamWriter, TInput> InputSerializer => _inputSerializer;
public Func<MemoryStreamReader, TKey> KeyDeserializer => _keyDeserializer;
public Func<MemoryStreamReader, TInput> InputDeserializer => _inputDeserializer;
public Func<MemoryStreamReader, TValue> ValueDeserializer => _valueDeserializer;
}
}
Loading…
Cancel
Save

Powered by TurnKey Linux.