diff --git a/PartitionFileStorageTest/PartitionFileStorageTest.csproj b/PartitionFileStorageTest/PartitionFileStorageTest.csproj
index 575110a..f8b057f 100644
--- a/PartitionFileStorageTest/PartitionFileStorageTest.csproj
+++ b/PartitionFileStorageTest/PartitionFileStorageTest.csproj
@@ -15,4 +15,4 @@
-
+
\ No newline at end of file
diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs
index 7193a37..3c502e0 100644
--- a/PartitionFileStorageTest/Program.cs
+++ b/PartitionFileStorageTest/Program.cs
@@ -27,7 +27,11 @@ namespace PartitionFileStorageTest
var r = new Random(Environment.TickCount);
var options = new StoreOptions
{
- Index = new IndexOptions { Enabled = true, FileIndexCount = 64 },
+ Index = new IndexOptions
+ {
+ Enabled = true,
+ StepType = IndexStepType.AbsoluteCount,
+ StepValue = 64 },
RootFolder = root,
FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
@@ -81,7 +85,12 @@ namespace PartitionFileStorageTest
var r = new Random(Environment.TickCount);
var options = new StoreOptions
{
- Index = new IndexOptions { Enabled = true, FileIndexCount = 64 },
+ Index = new IndexOptions
+ {
+ Enabled = true,
+ StepType = IndexStepType.Step,
+ StepValue = 1
+ },
RootFolder = root,
FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
@@ -99,7 +108,7 @@ namespace PartitionFileStorageTest
var store = new Store(options);
var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) });
-
+
/*Log.Info("Fill start");
for (int i = 0; i < 10000000; i++)
{
@@ -141,7 +150,7 @@ namespace PartitionFileStorageTest
//Console.ReadKey();
FSUtils.CleanAndTestFolder(root);*/
-
+
var sw = new Stopwatch();
sw.Start();
@@ -179,7 +188,7 @@ namespace PartitionFileStorageTest
sw.Stop();
Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms");
sw.Restart();
- storePart.CompleteAdding();
+ storePart.CompleteAdding();
storePart.Compress();
sw.Stop();
Log.Info($"Compress: {sw.ElapsedMilliseconds}ms");
@@ -309,62 +318,5 @@ namespace PartitionFileStorageTest
//TestRangeCompressionAndInversion();
Console.ReadKey();
}
-
- private static void TestRangeCompressionAndInversion()
- {
- var list = new List();
- list.Add(new FilePositionRange { Start = 5, End = 12 });
- list.Add(new FilePositionRange { Start = 16, End = 21 });
- RangeCompression(list);
- foreach (var r in list)
- {
- Console.WriteLine($"{r.Start}: {r.End}");
- }
- Console.WriteLine("Invert ranges");
- var inverted = RangeInversion(list, 21);
- foreach (var r in inverted)
- {
- Console.WriteLine($"{r.Start}: {r.End}");
- }
- }
-
- private static void RangeCompression(List ranges)
- {
- for (var i = 0; i < ranges.Count - 1; i++)
- {
- var current = ranges[i];
- var next = ranges[i + 1];
- if (current.End == next.Start)
- {
- current.End = next.End;
- ranges.RemoveAt(i + 1);
- i--;
- }
- }
- }
-
- private static List RangeInversion(List ranges, long length)
- {
- if ((ranges?.Count ?? 0) == 0) return new List { new FilePositionRange { Start = 0, End = length } };
- var inverted = new List();
- var current = new FilePositionRange { Start = 0, End = ranges[0].Start };
- for (var i = 0; i < ranges.Count; i++)
- {
- current.End = ranges[i].Start;
- if (current.Start != current.End)
- {
- inverted.Add(new FilePositionRange { Start = current.Start, End = current.End });
- }
- current.Start = ranges[i].End;
- }
- if (current.End != length)
- {
- if (current.Start != length)
- {
- inverted.Add(new FilePositionRange { Start = current.Start, End = length });
- }
- }
- return inverted;
- }
}
}
\ No newline at end of file
diff --git a/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs b/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs
index 77e6493..03c3f6e 100644
--- a/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs
+++ b/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs
@@ -1,6 +1,6 @@
namespace ZeroLevel.Services.PartitionStorage
{
- public class FilePositionRange
+ internal sealed class FilePositionRange
{
public long Start;
public long End;
diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs
new file mode 100644
index 0000000..6930764
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs
@@ -0,0 +1,164 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using ZeroLevel.Services.FileSystem;
+using ZeroLevel.Services.Serialization;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ ///
+ /// Responsible for building index files
+ ///
+ internal sealed class IndexBuilder
+ {
+ private const string INDEX_SUBFOLDER_NAME = "__indexes__";
+ private readonly IndexStepType _indexType;
+ private readonly string _indexCatalog;
+ private readonly string _dataCatalog;
+ private readonly int _stepValue;
+ private readonly Func _keyDeserializer;
+ private readonly Func _valueDeserializer;
+ public IndexBuilder(IndexStepType indexType, int stepValue, string dataCatalog)
+ {
+ _dataCatalog = dataCatalog;
+ _indexCatalog = Path.Combine(dataCatalog, INDEX_SUBFOLDER_NAME);
+ _indexType = indexType;
+ _stepValue = stepValue;
+ _keyDeserializer = MessageSerializer.GetDeserializer();
+ _valueDeserializer = MessageSerializer.GetDeserializer();
+ }
+ ///
+ /// Rebuild indexes for all files
+ ///
+ internal void RebuildIndex()
+ {
+ FSUtils.CleanAndTestFolder(_indexCatalog);
+ var files = Directory.GetFiles(_dataCatalog);
+ if (files != null && files.Length > 0)
+ {
+ foreach (var file in files)
+ {
+ RebuildFileIndex(file);
+ }
+ }
+ }
+ ///
+ /// Rebuild index for the specified file
+ ///
+ internal void RebuildFileIndex(string file)
+ {
+ if (_indexType == IndexStepType.AbsoluteCount)
+ {
+ RebuildFileIndexWithAbsoluteCountIndexes(file);
+ }
+ else
+ {
+ RebuildFileIndexWithSteps(file);
+ }
+ }
+ ///
+ /// Delete the index for the specified file
+ ///
+ internal void DropFileIndex(string file)
+ {
+ var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
+ if (File.Exists(index_file))
+ {
+ File.Delete(index_file);
+ }
+ }
+ ///
+ /// Rebuild index with specified number of steps for specified file
+ ///
+ private void RebuildFileIndexWithAbsoluteCountIndexes(string file)
+ {
+ if (false == Directory.Exists(_indexCatalog))
+ {
+ Directory.CreateDirectory(_indexCatalog);
+ }
+ var dict = new Dictionary();
+ if (TryGetReadStream(file, out var reader))
+ {
+ using (reader)
+ {
+ while (reader.EOS == false)
+ {
+ var pos = reader.Position;
+ var k = _keyDeserializer.Invoke(reader);
+ dict[k] = pos;
+ _valueDeserializer.Invoke(reader);
+ }
+ }
+ }
+ if (dict.Count > _stepValue)
+ {
+ var step = (int)Math.Round(((float)dict.Count / (float)_stepValue), MidpointRounding.ToZero);
+ var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
+ var d_arr = dict.OrderBy(p => p.Key).ToArray();
+ using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
+ {
+ for (int i = 0; i < _stepValue; i++)
+ {
+ var pair = d_arr[i * step];
+ writer.WriteCompatible(pair.Key);
+ writer.WriteLong(pair.Value);
+ }
+ }
+ }
+ }
+ ///
+ /// Rebuild index with specified step for keys
+ ///
+ private void RebuildFileIndexWithSteps(string file)
+ {
+ if (false == Directory.Exists(_indexCatalog))
+ {
+ Directory.CreateDirectory(_indexCatalog);
+ }
+ if (TryGetReadStream(file, out var reader))
+ {
+ using (reader)
+ {
+ var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
+ using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
+ {
+ var counter = _stepValue;
+ while (reader.EOS == false)
+ {
+ counter--;
+ var pos = reader.Position;
+ var k = _keyDeserializer.Invoke(reader);
+ _valueDeserializer.Invoke(reader);
+ if (counter == 0)
+ {
+ writer.WriteCompatible(k);
+ writer.WriteLong(pos);
+ counter = _stepValue;
+ }
+ }
+ }
+ }
+ }
+ }
+ ///
+ /// Attempting to open a file for reading
+ ///
+ private bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
+ {
+ try
+ {
+ var filePath = Path.Combine(_dataCatalog, fileName);
+ var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
+ reader = new MemoryStreamReader(stream);
+ return true;
+ }
+ catch (Exception ex)
+ {
+ Log.SystemError(ex, "[StorePartitionAccessor.TryGetReadStream]");
+ }
+ reader = null;
+ return false;
+ }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs
index 55ee461..a52091a 100644
--- a/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs
+++ b/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs
@@ -5,7 +5,7 @@ using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
- internal class StorePartitionSparseIndex
+ internal sealed class StorePartitionSparseIndex
: IStorePartitionIndex
{
private readonly Dictionary[]> _indexCachee
@@ -101,7 +101,7 @@ namespace ZeroLevel.Services.PartitionStorage
private KeyIndex[] GetFileIndex(TKey key)
{
- var indexName = _filePartition.PathExtractor.Invoke(key, _meta);
+ var indexName = _filePartition.FileNameExtractor.Invoke(key, _meta);
if (_indexCachee.TryGetValue(indexName, out var index)) return index;
var file = Path.Combine(_indexFolder, indexName);
diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs
index 3865a90..76cc7d1 100644
--- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs
+++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs
@@ -12,23 +12,48 @@ namespace ZeroLevel.Services.PartitionStorage
: IStorePartitionBase
{
///
- /// Rebuild indexes
+ /// Rebuilds indexes for data in a partition
///
void RebuildIndex();
///
- /// Find in catalog partition by key
+ /// Search in a partition for a specified key
///
StorePartitionKeyValueSearchResult Find(TKey key);
///
- /// Find in catalog partition by keys
+ /// Search in a partition for a specified keys
///
IEnumerable> Find(IEnumerable keys);
+ ///
+ /// Iterating over all recorded data
+ ///
IEnumerable> Iterate();
+ ///
+ /// Iterating over all recorded data of the file with the specified key
+ ///
IEnumerable> IterateKeyBacket(TKey key);
-
+ ///
+ /// Deleting the specified key and associated data
+ ///
+ /// Key
+ /// true - automatically rebuild the index of the file from which data was deleted (default = false)
void RemoveKey(TKey key, bool autoReindex = false);
+ ///
+ /// Deleting the specified keys and associated data
+ ///
+ /// Keys
+ /// true - automatically rebuild the index of the file from which data was deleted (default = true)
void RemoveKeys(IEnumerable keys, bool autoReindex = true);
- void RemoveAllExceptKey(TKey key, bool autoReindex = false);
+ ///
+ /// Delete all keys with data except the specified key
+ ///
+ /// Key
+ /// true - automatically rebuild the index of the file from which data was deleted (default = true)
+ void RemoveAllExceptKey(TKey key, bool autoReindex = true);
+ ///
+ /// Delete all keys with data other than the specified ones
+ ///
+ /// Keys
+ /// true - automatically rebuild the index of the file from which data was deleted (default = true)
void RemoveAllExceptKeys(IEnumerable keys, bool autoReindex = true);
}
}
diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs
index 57a9a8a..2310a23 100644
--- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs
+++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs
@@ -9,7 +9,7 @@ namespace ZeroLevel.Services.PartitionStorage
/// Type of one input value
/// Type of records aggregate
public interface IStorePartitionBuilder
- : IStorePartitionBase
+ : IStorePartitionBase
{
IEnumerable> Iterate();
///
@@ -21,11 +21,11 @@ namespace ZeroLevel.Services.PartitionStorage
///
void CompleteAdding();
///
- /// Perform the conversion of the records from (TKey; TInput) to (TKey; TValue). Called after CompleteAdding
+ /// Performs compression/grouping of recorded data in a partition
///
void Compress();
///
- /// Rebuilds index files. Only for compressed data.
+ /// Rebuilds indexes for data in a partition
///
void RebuildIndex();
}
diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs
new file mode 100644
index 0000000..453092d
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreSerializer.cs
@@ -0,0 +1,14 @@
+using System;
+using ZeroLevel.Services.Serialization;
+
+namespace ZeroLevel.Services.PartitionStorage.Interfaces
+{
+ public interface IStoreSerializer
+ {
+ Action KeySerializer { get; }
+ Action InputSerializer { get; }
+ Func KeyDeserializer { get; }
+ Func InputDeserializer { get; }
+ Func ValueDeserializer { get; }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs
index e1830fc..300ba51 100644
--- a/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs
+++ b/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs
@@ -1,8 +1,16 @@
namespace ZeroLevel.Services.PartitionStorage
{
+ public enum IndexStepType
+ {
+ AbsoluteCount,
+ Step
+ }
+
public class IndexOptions
{
public bool Enabled { get; set; }
- public int FileIndexCount { get; set; } = 64;
+
+ public IndexStepType StepType { get; set; } = IndexStepType.AbsoluteCount;
+ public int StepValue { get; set; } = 64;
}
}
diff --git a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs
index 9311230..88afd78 100644
--- a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs
+++ b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs
@@ -44,12 +44,13 @@ namespace ZeroLevel.Services.PartitionStorage
public IndexOptions Index { get; set; } = new IndexOptions
{
Enabled = false,
- FileIndexCount = 64
+ StepValue = 64,
+ StepType = IndexStepType.AbsoluteCount
};
internal string GetFileName(TKey key, TMeta info)
{
- return FilePartition.PathExtractor(key, info);
+ return FilePartition.FileNameExtractor(key, info);
}
internal string GetCatalogPath(TMeta info)
{
@@ -74,7 +75,8 @@ namespace ZeroLevel.Services.PartitionStorage
Index = new IndexOptions
{
Enabled = this.Index.Enabled,
- FileIndexCount = this.Index.FileIndexCount
+ StepValue = 64,
+ StepType = IndexStepType.AbsoluteCount
},
FilePartition = this.FilePartition,
KeyComparer = this.KeyComparer,
diff --git a/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs
new file mode 100644
index 0000000..ae91785
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs
@@ -0,0 +1,139 @@
+using System.IO;
+using System;
+using ZeroLevel.Services.Serialization;
+using System.Collections.Concurrent;
+using ZeroLevel.Services.FileSystem;
+using ZeroLevel.Services.PartitionStorage.Interfaces;
+
+namespace ZeroLevel.Services.PartitionStorage.Partition
+{
+ ///
+ /// General operations with a partition
+ ///
+ internal abstract class BasePartition
+ : IStorePartitionBase
+ {
+ public string Catalog { get { return _catalog; } }
+
+ protected readonly TMeta _info;
+ protected readonly string _catalog;
+ protected IStoreSerializer Serializer { get; }
+ protected readonly StoreOptions _options;
+
+ private readonly IndexBuilder _indexBuilder;
+ private readonly ConcurrentDictionary _writeStreams = new ConcurrentDictionary();
+
+ internal BasePartition(StoreOptions options,
+ TMeta info,
+ IStoreSerializer serializer)
+ {
+ _options = options;
+ _info = info;
+ _catalog = _options.GetCatalogPath(info);
+ if (Directory.Exists(_catalog) == false)
+ {
+ Directory.CreateDirectory(_catalog);
+ }
+ _indexBuilder = _options.Index.Enabled ? new IndexBuilder(_options.Index.StepType, _options.Index.StepValue, _catalog) : null;
+ Serializer = serializer;
+ }
+
+ #region IStorePartitionBase
+ public int CountDataFiles() => Directory.Exists(_catalog) ? (Directory.GetFiles(_catalog)?.Length ?? 0) : 0;
+ public string GetCatalogPath() => _catalog;
+ public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
+ public void Dispose()
+ {
+ CloseWriteStreams();
+ }
+ #endregion
+
+ ///
+ /// Rebuild indexes for all files
+ ///
+ protected void RebuildIndexes()
+ {
+ if (_options.Index.Enabled)
+ {
+ _indexBuilder.RebuildIndex();
+ }
+ }
+ ///
+ /// Rebuild index for the specified file
+ ///
+ internal void RebuildFileIndex(string file)
+ {
+ if (_options.Index.Enabled)
+ {
+ _indexBuilder.RebuildFileIndex(file);
+ }
+ }
+ ///
+ /// Delete the index for the specified file
+ ///
+ internal void DropFileIndex(string file)
+ {
+ if (_options.Index.Enabled)
+ {
+ _indexBuilder.DropFileIndex(file);
+ }
+ }
+ ///
+ /// Close all streams for writing
+ ///
+ protected void CloseWriteStreams()
+ {
+ foreach (var s in _writeStreams)
+ {
+ try
+ {
+ s.Value.Stream.Flush();
+ s.Value.Dispose();
+ }
+ catch { }
+ }
+ _writeStreams.Clear();
+ }
+ ///
+ /// Attempting to open a file for writing
+ ///
+ protected bool TryGetWriteStream(string fileName, out MemoryStreamWriter writer)
+ {
+ try
+ {
+ writer = _writeStreams.GetOrAdd(fileName, k =>
+ {
+ var filePath = Path.Combine(_catalog, k);
+ var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
+ return new MemoryStreamWriter(stream);
+ });
+ return true;
+ }
+ catch (Exception ex)
+ {
+ Log.SystemError(ex, "[StorePartitionBuilder.TryGetWriteStream]");
+ }
+ writer = null;
+ return false;
+ }
+ ///
+ /// Attempting to open a file for reading
+ ///
+ protected bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
+ {
+ try
+ {
+ var filePath = Path.Combine(_catalog, fileName);
+ var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
+ reader = new MemoryStreamReader(stream);
+ return true;
+ }
+ catch (Exception ex)
+ {
+ Log.SystemError(ex, "[StorePartitionBuilder.TryGetReadStream]");
+ }
+ reader = null;
+ return false;
+ }
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreCatalogPartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreCatalogPartition.cs
index 8db142a..e5c99a3 100644
--- a/ZeroLevel/Services/PartitionStorage/Partition/StoreCatalogPartition.cs
+++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreCatalogPartition.cs
@@ -2,9 +2,18 @@
namespace ZeroLevel.Services.PartitionStorage
{
+ ///
+ /// Partition, contains the method of forming the path
+ ///
public class StoreCatalogPartition
{
+ ///
+ /// Name of partition, just for info
+ ///
public string Name { get; }
+ ///
+ /// Path generator
+ ///
public Func PathExtractor { get; }
public StoreCatalogPartition(string name, Func pathExtractor)
diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreFilePartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreFilePartition.cs
index 8cd8384..4eda3c4 100644
--- a/ZeroLevel/Services/PartitionStorage/Partition/StoreFilePartition.cs
+++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreFilePartition.cs
@@ -2,15 +2,24 @@
namespace ZeroLevel.Services.PartitionStorage
{
+ ///
+ /// File partition, contains the method of forming the path
+ ///
public class StoreFilePartition
{
+ ///
+ /// Name of partition, just for info
+ ///
public string Name { get; }
- public Func PathExtractor { get; }
+ ///
+ /// File name generator
+ ///
+ public Func FileNameExtractor { get; }
public StoreFilePartition(string name, Func pathExtractor)
{
Name = name;
- PathExtractor = pathExtractor;
+ FileNameExtractor = pathExtractor;
}
}
}
diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs
index ae80c44..5ccdb03 100644
--- a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs
+++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs
@@ -3,17 +3,15 @@ using System.Collections.Generic;
using System.IO;
using System.Linq;
using ZeroLevel.Services.PartitionStorage.Interfaces;
+using ZeroLevel.Services.PartitionStorage.Partition;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
///
- /// For writing new values in exist partition
- ///
- /// ORDER: Store -> CompleteAddingAndCompress -> RebuildIndex
- ///
+ /// Performs merging of new data with existing data in the partition
///
- public class StoreMergePartitionAccessor
+ internal sealed class StoreMergePartitionAccessor
: IStorePartitionMergeBuilder
{
private readonly Func> _decompress;
@@ -30,16 +28,19 @@ namespace ZeroLevel.Services.PartitionStorage
/// Write catalog
///
private readonly IStorePartitionBuilder _temporaryAccessor;
+
public StoreMergePartitionAccessor(StoreOptions options,
- TMeta info, Func> decompress)
+ TMeta info,
+ Func> decompress,
+ IStoreSerializer serializer)
{
if (decompress == null) throw new ArgumentNullException(nameof(decompress));
_decompress = decompress;
- _accessor = new StorePartitionAccessor(options, info);
+ _accessor = new StorePartitionAccessor(options, info, serializer);
_temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString());
var tempOptions = options.Clone();
tempOptions.RootFolder = _temporaryFolder;
- _temporaryAccessor = new StorePartitionBuilder(tempOptions, info);
+ _temporaryAccessor = new StorePartitionBuilder(tempOptions, info, serializer);
_keyDeserializer = MessageSerializer.GetDeserializer();
_valueDeserializer = MessageSerializer.GetDeserializer();
@@ -54,6 +55,10 @@ namespace ZeroLevel.Services.PartitionStorage
public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value);
public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(),
_temporaryAccessor.CountDataFiles());
+
+ ///
+ /// Performs compression/grouping of recorded data in a partition
+ ///
public void Compress()
{
var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath());
@@ -102,8 +107,7 @@ namespace ZeroLevel.Services.PartitionStorage
File.Move(file, Path.Combine(folder, name), true);
// 3. Rebuil index
- (_accessor as StorePartitionAccessor)
- .RebuildFileIndex(file);
+ (_accessor as BasePartition).RebuildFileIndex(name);
}
}
// remove temporary files
diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs
index 06b2d27..863cf1a 100644
--- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs
+++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs
@@ -2,41 +2,23 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
-using ZeroLevel.Services.FileSystem;
-using ZeroLevel.Services.Serialization;
+using ZeroLevel.Services.PartitionStorage.Interfaces;
+using ZeroLevel.Services.PartitionStorage.Partition;
namespace ZeroLevel.Services.PartitionStorage
{
- public class StorePartitionAccessor
- : IStorePartitionAccessor
+ internal sealed class StorePartitionAccessor
+ : BasePartition, IStorePartitionAccessor
{
- private readonly StoreOptions _options;
- private readonly string _catalog;
- private readonly string _indexCatalog;
- private readonly TMeta _info;
-
- private readonly Func _keyDeserializer;
- private readonly Func _valueDeserializer;
-
- public string Catalog { get { return _catalog; } }
- public StorePartitionAccessor(StoreOptions options, TMeta info)
+ public StorePartitionAccessor(StoreOptions options,
+ TMeta info,
+ IStoreSerializer serializer)
+ : base(options, info, serializer)
{
if (options == null) throw new ArgumentNullException(nameof(options));
- _info = info;
- _options = options;
- _catalog = _options.GetCatalogPath(info);
- if (_options.Index.Enabled)
- {
- _indexCatalog = Path.Combine(_catalog, "__indexes__");
- }
- _keyDeserializer = MessageSerializer.GetDeserializer();
- _valueDeserializer = MessageSerializer.GetDeserializer();
}
- #region API methods
- public int CountDataFiles() => Directory.Exists(_catalog) ? (Directory.GetFiles(_catalog)?.Length ?? 0) : 0;
- public string GetCatalogPath() => _catalog;
- public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
+ #region IStorePartitionAccessor
public StorePartitionKeyValueSearchResult Find(TKey key)
{
var fileName = _options.GetFileName(key, _info);
@@ -59,8 +41,8 @@ namespace ZeroLevel.Services.PartitionStorage
}
while (reader.EOS == false)
{
- var k = _keyDeserializer.Invoke(reader);
- var v = _valueDeserializer.Invoke(reader);
+ var k = Serializer.KeyDeserializer.Invoke(reader);
+ var v = Serializer.ValueDeserializer.Invoke(reader);
var c = _options.KeyComparer(key, k);
if (c == 0) return new StorePartitionKeyValueSearchResult
{
@@ -119,8 +101,8 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
- var k = _keyDeserializer.Invoke(reader);
- var v = _valueDeserializer.Invoke(reader);
+ var k = Serializer.KeyDeserializer.Invoke(reader);
+ var v = Serializer.ValueDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Status = SearchResult.Success };
}
}
@@ -139,30 +121,16 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
- var k = _keyDeserializer.Invoke(reader);
- var v = _valueDeserializer.Invoke(reader);
+ var k = Serializer.KeyDeserializer.Invoke(reader);
+ var v = Serializer.ValueDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Status = SearchResult.Success };
}
}
}
}
}
- public void RebuildIndex()
- {
- if (_options.Index.Enabled)
- {
- FSUtils.CleanAndTestFolder(_indexCatalog);
- var files = Directory.GetFiles(_catalog);
- if (files != null && files.Length > 0)
- {
- foreach (var file in files)
- {
- RebuildFileIndex(file);
- }
- }
- }
- }
- public void RemoveAllExceptKey(TKey key, bool autoReindex = false)
+ public void RebuildIndex() => RebuildIndexes();
+ public void RemoveAllExceptKey(TKey key, bool autoReindex = true)
{
RemoveAllExceptKeys(new[] { key }, autoReindex);
}
@@ -194,59 +162,6 @@ namespace ZeroLevel.Services.PartitionStorage
}
#endregion
- #region Internal methods
- internal void DropFileIndex(string file)
- {
- if (_options.Index.Enabled)
- {
- var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
- if (File.Exists(index_file))
- {
- File.Delete(index_file);
- }
- }
- }
- internal void RebuildFileIndex(string file)
- {
- if (_options.Index.Enabled)
- {
- if (false == Directory.Exists(_indexCatalog))
- {
- Directory.CreateDirectory(_indexCatalog);
- }
- var dict = new Dictionary();
- if (TryGetReadStream(file, out var reader))
- {
- using (reader)
- {
- while (reader.EOS == false)
- {
- var pos = reader.Position;
- var k = _keyDeserializer.Invoke(reader);
- dict[k] = pos;
- _valueDeserializer.Invoke(reader);
- }
- }
- }
- if (dict.Count > _options.Index.FileIndexCount * 8)
- {
- var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero);
- var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
- var d_arr = dict.OrderBy(p => p.Key).ToArray();
- using (var writer = new MemoryStreamWriter(
- new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
- {
- for (int i = 0; i < _options.Index.FileIndexCount; i++)
- {
- var pair = d_arr[i * step];
- writer.WriteCompatible(pair.Key);
- writer.WriteLong(pair.Value);
- }
- }
- }
- }
- }
- #endregion
#region Private methods
private IEnumerable> Find(string fileName,
@@ -269,8 +184,8 @@ namespace ZeroLevel.Services.PartitionStorage
reader.Seek(off.Offset, SeekOrigin.Begin);
while (reader.EOS == false)
{
- var k = _keyDeserializer.Invoke(reader);
- var v = _valueDeserializer.Invoke(reader);
+ var k = Serializer.KeyDeserializer.Invoke(reader);
+ var v = Serializer.ValueDeserializer.Invoke(reader);
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
@@ -301,8 +216,8 @@ namespace ZeroLevel.Services.PartitionStorage
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{
- var k = _keyDeserializer.Invoke(reader);
- var v = _valueDeserializer.Invoke(reader);
+ var k = Serializer.KeyDeserializer.Invoke(reader);
+ var v = Serializer.ValueDeserializer.Invoke(reader);
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{
@@ -355,8 +270,8 @@ namespace ZeroLevel.Services.PartitionStorage
while (reader.EOS == false)
{
var startPosition = reader.Position;
- var k = _keyDeserializer.Invoke(reader);
- _valueDeserializer.Invoke(reader);
+ var k = Serializer.KeyDeserializer.Invoke(reader);
+ Serializer.ValueDeserializer.Invoke(reader);
var endPosition = reader.Position;
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
@@ -383,8 +298,8 @@ namespace ZeroLevel.Services.PartitionStorage
while (reader.EOS == false && index < keys_arr.Length)
{
var startPosition = reader.Position;
- var k = _keyDeserializer.Invoke(reader);
- _valueDeserializer.Invoke(reader);
+ var k = Serializer.KeyDeserializer.Invoke(reader);
+ Serializer.ValueDeserializer.Invoke(reader);
var endPosition = reader.Position;
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
@@ -447,22 +362,6 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
- private bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
- {
- try
- {
- var filePath = Path.Combine(_catalog, fileName);
- var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
- reader = new MemoryStreamReader(stream);
- return true;
- }
- catch (Exception ex)
- {
- Log.SystemError(ex, "[StorePartitionAccessor.TryGetReadStream]");
- }
- reader = null;
- return false;
- }
#endregion
#region Static
@@ -514,9 +413,5 @@ namespace ZeroLevel.Services.PartitionStorage
target.Write(buffer, 0, buffer.Length);
}
#endregion
-
- public void Dispose()
- {
- }
}
}
diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs
index 16c1876..2b477b8 100644
--- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs
+++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs
@@ -1,77 +1,40 @@
using System;
-using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
-using ZeroLevel.Services.FileSystem;
+using ZeroLevel.Services.PartitionStorage.Interfaces;
+using ZeroLevel.Services.PartitionStorage.Partition;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
- public class StorePartitionBuilder
- : IStorePartitionBuilder
+ internal sealed class StorePartitionBuilder
+ : BasePartition, IStorePartitionBuilder
{
- private readonly ConcurrentDictionary _writeStreams
- = new ConcurrentDictionary();
-
- private readonly StoreOptions _options;
- private readonly string _catalog;
- private readonly TMeta _info;
- private readonly Action _keySerializer;
- private readonly Action _inputSerializer;
-
- private readonly Func _keyDeserializer;
- private readonly Func _inputDeserializer;
- private readonly Func _valueDeserializer;
- public string Catalog { get { return _catalog; } }
- public StorePartitionBuilder(StoreOptions options, TMeta info)
+ public StorePartitionBuilder(StoreOptions options,
+ TMeta info,
+ IStoreSerializer serializer)
+ : base(options, info, serializer)
{
if (options == null) throw new ArgumentNullException(nameof(options));
- _info = info;
- _options = options;
- _catalog = _options.GetCatalogPath(info);
- if (Directory.Exists(_catalog) == false)
- {
- Directory.CreateDirectory(_catalog);
- }
-
- _keySerializer = MessageSerializer.GetSerializer();
- _inputSerializer = MessageSerializer.GetSerializer();
-
- _keyDeserializer = MessageSerializer.GetDeserializer();
- _inputDeserializer = MessageSerializer.GetDeserializer();
- _valueDeserializer = MessageSerializer.GetDeserializer();
}
- #region API methods
- public int CountDataFiles() => Directory.Exists(_catalog) ? (Directory.GetFiles(_catalog)?.Length ?? 0) : 0;
- public string GetCatalogPath() => _catalog;
- public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
+ #region IStorePartitionBuilder
public void Store(TKey key, TInput value)
{
var fileName = _options.GetFileName(key, _info);
if (TryGetWriteStream(fileName, out var stream))
{
- _keySerializer.Invoke(stream, key);
+ Serializer.KeySerializer.Invoke(stream, key);
Thread.MemoryBarrier();
- _inputSerializer.Invoke(stream, value);
+ Serializer.InputSerializer.Invoke(stream, value);
}
}
public void CompleteAdding()
{
- // Close all write streams
- foreach (var s in _writeStreams)
- {
- try
- {
- s.Value.Stream.Flush();
- s.Value.Dispose();
- }
- catch { }
- }
- _writeStreams.Clear();
+ CloseWriteStreams();
}
public void Compress()
{
@@ -94,8 +57,8 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
- var key = _keyDeserializer.Invoke(reader);
- var val = _inputDeserializer.Invoke(reader);
+ var key = Serializer.KeyDeserializer.Invoke(reader);
+ var val = Serializer.InputDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult { Key = key, Value = val, Status = SearchResult.Success };
}
}
@@ -103,53 +66,7 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
}
- public void RebuildIndex()
- {
- if (_options.Index.Enabled)
- {
- var indexFolder = Path.Combine(_catalog, "__indexes__");
- FSUtils.CleanAndTestFolder(indexFolder);
- var files = Directory.GetFiles(_catalog);
- if (files != null && files.Length > 0)
- {
- var dict = new Dictionary();
- foreach (var file in files)
- {
- if (TryGetReadStream(file, out var reader))
- {
- using (reader)
- {
- while (reader.EOS == false)
- {
- var pos = reader.Stream.Position;
- var key = _keyDeserializer.Invoke(reader);
- dict[key] = pos;
- if (reader.EOS) break;
- _valueDeserializer.Invoke(reader);
- }
- }
- if (dict.Count > _options.Index.FileIndexCount * 8)
- {
- var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero);
- var index_file = Path.Combine(indexFolder, Path.GetFileName(file));
- var d_arr = dict.OrderBy(p => p.Key).ToArray();
- using (var writer = new MemoryStreamWriter(
- new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
- {
- for (int i = 0; i < _options.Index.FileIndexCount; i++)
- {
- var pair = d_arr[i * step];
- writer.WriteCompatible(pair.Key);
- writer.WriteLong(pair.Value);
- }
- }
- }
- }
- dict.Clear();
- }
- }
- }
- }
+ public void RebuildIndex() => RebuildIndexes();
#endregion
#region Private methods
@@ -160,7 +77,7 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
- var key = _keyDeserializer.Invoke(reader);
+ var key = Serializer.KeyDeserializer.Invoke(reader);
if (false == dict.ContainsKey(key))
{
dict[key] = new HashSet();
@@ -169,7 +86,7 @@ namespace ZeroLevel.Services.PartitionStorage
{
break;
}
- var input = _inputDeserializer.Invoke(reader);
+ var input = Serializer.InputDeserializer.Invoke(reader);
dict[key].Add(input);
}
}
@@ -188,45 +105,6 @@ namespace ZeroLevel.Services.PartitionStorage
File.Delete(file);
File.Move(tempFile, file, true);
}
- private bool TryGetWriteStream(string fileName, out MemoryStreamWriter writer)
- {
- try
- {
- writer = _writeStreams.GetOrAdd(fileName, k =>
- {
- var filePath = Path.Combine(_catalog, k);
- var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
- return new MemoryStreamWriter(stream);
- });
- return true;
- }
- catch (Exception ex)
- {
- Log.SystemError(ex, "[StorePartitionBuilder.TryGetWriteStream]");
- }
- writer = null;
- return false;
- }
- private bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
- {
- try
- {
- var filePath = Path.Combine(_catalog, fileName);
- var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
- reader = new MemoryStreamReader(stream);
- return true;
- }
- catch (Exception ex)
- {
- Log.SystemError(ex, "[StorePartitionBuilder.TryGetReadStream]");
- }
- reader = null;
- return false;
- }
#endregion
-
- public void Dispose()
- {
- }
}
}
diff --git a/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs
new file mode 100644
index 0000000..2dbc840
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/Search/SearchResult.cs
@@ -0,0 +1,9 @@
+namespace ZeroLevel.Services.PartitionStorage
+{
+ public enum SearchResult
+ {
+ Success,
+ NotFound,
+ FileLocked
+ }
+}
diff --git a/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs
index 5af1dc0..61ba3de 100644
--- a/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs
+++ b/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs
@@ -1,12 +1,5 @@
namespace ZeroLevel.Services.PartitionStorage
{
- public enum SearchResult
- {
- Success,
- NotFound,
- FileLocked
- }
-
public class StorePartitionKeyValueSearchResult
{
public SearchResult Status { get; set; }
diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs
index 2246da3..ce7c1ba 100644
--- a/ZeroLevel/Services/PartitionStorage/Store.cs
+++ b/ZeroLevel/Services/PartitionStorage/Store.cs
@@ -13,10 +13,20 @@ namespace ZeroLevel.Services.PartitionStorage
IStore
{
private readonly StoreOptions _options;
- public Store(StoreOptions options)
+ private readonly IStoreSerializer _serializer;
+ public Store(StoreOptions options,
+ IStoreSerializer serializer = null)
{
if (options == null) throw new ArgumentNullException(nameof(options));
_options = options;
+ if (serializer == null)
+ {
+ _serializer = new StoreStandartSerializer();
+ }
+ else
+ {
+ _serializer = serializer;
+ }
if (Directory.Exists(_options.RootFolder) == false)
{
Directory.CreateDirectory(_options.RootFolder);
@@ -32,17 +42,17 @@ namespace ZeroLevel.Services.PartitionStorage
public IStorePartitionAccessor CreateAccessor(TMeta info)
{
- return new StorePartitionAccessor(_options, info);
+ return new StorePartitionAccessor(_options, info, _serializer);
}
public IStorePartitionBuilder CreateBuilder(TMeta info)
{
- return new StorePartitionBuilder(_options, info);
+ return new StorePartitionBuilder(_options, info, _serializer);
}
public IStorePartitionMergeBuilder CreateMergeAccessor(TMeta info, Func> decompressor)
{
- return new StoreMergePartitionAccessor(_options, info, decompressor);
+ return new StoreMergePartitionAccessor(_options, info, decompressor, _serializer);
}
public async Task> Search(StoreSearchRequest searchRequest)
diff --git a/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs b/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs
new file mode 100644
index 0000000..7f72869
--- /dev/null
+++ b/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs
@@ -0,0 +1,35 @@
+using System;
+using ZeroLevel.Services.PartitionStorage.Interfaces;
+using ZeroLevel.Services.Serialization;
+
+namespace ZeroLevel.Services.PartitionStorage
+{
+ internal sealed class StoreStandartSerializer
+ : IStoreSerializer
+ {
+ private readonly Action _keySerializer;
+ private readonly Action _inputSerializer;
+ private readonly Func _keyDeserializer;
+ private readonly Func _inputDeserializer;
+ private readonly Func _valueDeserializer;
+
+ public StoreStandartSerializer()
+ {
+ _keySerializer = MessageSerializer.GetSerializer();
+ _inputSerializer = MessageSerializer.GetSerializer();
+ _keyDeserializer = MessageSerializer.GetDeserializer();
+ _inputDeserializer = MessageSerializer.GetDeserializer();
+ _valueDeserializer = MessageSerializer.GetDeserializer();
+ }
+
+ public Action KeySerializer => _keySerializer;
+
+ public Action InputSerializer => _inputSerializer;
+
+ public Func KeyDeserializer => _keyDeserializer;
+
+ public Func InputDeserializer => _inputDeserializer;
+
+ public Func ValueDeserializer => _valueDeserializer;
+ }
+}