diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index 83ff367..d78393d 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -102,10 +102,11 @@ namespace PartitionFileStorageTest private static void BuildStore(string source, string root) { - var options = new IStoreOptions + var options = new StoreOptions { + Index = new IndexOptions { Enabled = true, FileIndexCount = 256 }, RootFolder = root, - FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 1000).ToString()), + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 512).ToString()), MergeFunction = list => { ulong s = 0; @@ -118,12 +119,11 @@ namespace PartitionFileStorageTest }, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, }; - options.Index.Enabled = true; var store = new Store(options); - var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }); - var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }); + var storeIncoming = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }); + var storeOutcoming = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }); var parser = new CallRecordParser(); var sw = new Stopwatch(); sw.Start(); @@ -165,17 +165,13 @@ namespace PartitionFileStorageTest Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); } - private static void TestReading(string source, string root) + private static void TestReading(string root) { - var options = new IStoreOptions + var options = new StoreOptions { - Index = new IndexOptions - { - Enabled = false, - FileIndexCount = 64 - }, + Index = new IndexOptions { Enabled = true, FileIndexCount = 256 }, RootFolder = root, - FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 1000).ToString()), + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 512).ToString()), MergeFunction = list => { ulong s = 0; @@ -183,10 +179,10 @@ namespace PartitionFileStorageTest }, Partitions = new List> { - new StoreCatalogPartition("timestamp", m => m.Date.ToString("yyyyMMdd")), - new StoreCatalogPartition("timestamp", m => m.Incoming ? "incoming" : "outcoming") + new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")), + new StoreCatalogPartition("Date", m => m.Incoming ? "incoming" : "outcoming") }, - KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1 + KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, }; var store = new Store(options); var request = new StoreSearchRequest @@ -245,61 +241,12 @@ namespace PartitionFileStorageTest return arr; } - private static KeyIndex BinarySearchInIndex(KeyIndex[] index, - long key, - Func keyComparer, - ref int position) - { - if (index == null || index.Length == 0) - { - return new KeyIndex { Key = key, Offset = 0 }; - } - int left = position; - int right = index.Length - 1; - int mid = 0; - long test; - while (left <= right) - { - mid = (int)Math.Floor((right + left) / 2d); - test = index[mid].Key; - var c = keyComparer(test, key); - if (c < 0) - { - left = mid + 1; - } - else if (c > 0) - { - right = mid - 1; - } - else - { - position = mid; - return index[mid]; - } - } - position = mid; - return index[mid]; - } - static void Main(string[] args) { var root = @"H:\temp"; var source = @"H:\319a9c31-d823-4dd1-89b0-7fb1bb9c4859.txt"; //BuildStore(source, root); - TestReading(source, root); - - /* - Func keyComparer = - (left, right) => - (left == right) ? 0 : (left < right) ? -1 : 1; - var indexes = Generate(77); - int position = 0; - for (long i = 65; i < 700; i++) - { - var ind = BinarySearchInIndex(indexes, i, keyComparer, ref position); - Console.WriteLine($"{i}: {ind.Offset}. [{ind.Key}]"); - } - */ + TestReading(root); Console.ReadKey(); } } diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/KeyIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/KeyIndex.cs new file mode 100644 index 0000000..7afd3e2 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Indexes/KeyIndex.cs @@ -0,0 +1,8 @@ +namespace ZeroLevel.Services.PartitionStorage +{ + internal struct KeyIndex + { + public TKey Key { get; set; } + public long Offset { get; set; } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/StorePartitionIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs similarity index 94% rename from ZeroLevel/Services/PartitionStorage/StorePartitionIndex.cs rename to ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs index d4a4c17..9049701 100644 --- a/ZeroLevel/Services/PartitionStorage/StorePartitionIndex.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs @@ -5,13 +5,7 @@ using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage { - internal struct KeyIndex - { - public TKey Key { get; set; } - public long Offset { get; set; } - } - - internal class StorePartitionIndex + internal class StorePartitionSparseIndex : IStorePartitionIndex { private readonly Dictionary[]> _indexCachee @@ -22,7 +16,7 @@ namespace ZeroLevel.Services.PartitionStorage private readonly string _indexFolder; private readonly bool _indexExists = false; private readonly TMeta _meta; - public StorePartitionIndex(string partitionFolder, TMeta meta, + public StorePartitionSparseIndex(string partitionFolder, TMeta meta, StoreFilePartition filePartition, Func keyComparer) { diff --git a/ZeroLevel/Services/PartitionStorage/IStore.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs similarity index 79% rename from ZeroLevel/Services/PartitionStorage/IStore.cs rename to ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs index 100a028..884edc8 100644 --- a/ZeroLevel/Services/PartitionStorage/IStore.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs @@ -13,9 +13,11 @@ namespace ZeroLevel.Services.PartitionStorage /// Meta information for partition search public interface IStore { - IStorePartitionAccessor CreateAccessor(TMeta info); + IStorePartitionBuilder CreateBuilder(TMeta info); + + IStorePartitionBuilder CreateMergeAccessor(TMeta info, Func> decompressor); - IStorePartitionAccessor CreateMergeAccessor(TMeta info, Func> decompressor); + IStorePartitionAccessor CreateAccessor(TMeta info); Task> Search(StoreSearchRequest searchRequest); } diff --git a/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs similarity index 54% rename from ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs rename to ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs index 7eb04ed..3a19b76 100644 --- a/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs @@ -1,28 +1,16 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; namespace ZeroLevel.Services.PartitionStorage { /// - /// Provides read-write operations in catalog partition + /// Provides read/reindex operations in catalog partition /// /// Key type /// Type of one input value /// Type of records aggregate public interface IStorePartitionAccessor - : IDisposable + : IStorePartitionBase { - string GetCatalogPath(); - /// - /// Has any files - /// - int CountDataFiles(); - /// - /// Remove all files - /// - void DropData(); - - #region API !only after data compression! /// /// Rebuild indexes /// @@ -37,18 +25,5 @@ namespace ZeroLevel.Services.PartitionStorage IEnumerable> Find(IEnumerable keys); IEnumerable> Iterate(); IEnumerable> IterateKeyBacket(TKey key); - #endregion - - #region API !only before data compression! - /// - /// Save one record - /// - void Store(TKey key, TInput value); - /// - /// Complete the recording and perform the conversion of the records from - /// (TKey; TInput) to (TKey; TValue) - /// - void CompleteAddingAndCompress(); - #endregion } } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBase.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBase.cs new file mode 100644 index 0000000..a24c544 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBase.cs @@ -0,0 +1,24 @@ +using System; + +namespace ZeroLevel.Services.PartitionStorage +{ + /// + /// Provides common operations in catalog partition + /// + /// Key type + /// Type of one input value + /// Type of records aggregate + public interface IStorePartitionBase + : IDisposable + { + string GetCatalogPath(); + /// + /// Has any files + /// + int CountDataFiles(); + /// + /// Remove all files + /// + void DropData(); + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs new file mode 100644 index 0000000..26d6379 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs @@ -0,0 +1,23 @@ +namespace ZeroLevel.Services.PartitionStorage +{ + /// + /// Provides write operations in catalog partition + /// + /// Key type + /// Type of one input value + /// Type of records aggregate + public interface IStorePartitionBuilder + : IStorePartitionBase + { + /// + /// Save one record + /// + void Store(TKey key, TInput value); + /// + /// Complete the recording and perform the conversion of the records from + /// (TKey; TInput) to (TKey; TValue) + /// + void CompleteAddingAndCompress(); + void RebuildIndex(); + } +} diff --git a/ZeroLevel/Services/PartitionStorage/IStorePartitionIndex.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionIndex.cs similarity index 67% rename from ZeroLevel/Services/PartitionStorage/IStorePartitionIndex.cs rename to ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionIndex.cs index bf4eb35..8ff9f9c 100644 --- a/ZeroLevel/Services/PartitionStorage/IStorePartitionIndex.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionIndex.cs @@ -1,6 +1,4 @@ -using System.Collections.Generic; - -namespace ZeroLevel.Services.PartitionStorage +namespace ZeroLevel.Services.PartitionStorage { internal interface IStorePartitionIndex { diff --git a/ZeroLevel/Services/PartitionStorage/Options/CacheOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/CacheOptions.cs new file mode 100644 index 0000000..6e22ec8 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Options/CacheOptions.cs @@ -0,0 +1,17 @@ +namespace ZeroLevel.Services.PartitionStorage +{ + public class CacheOptions + { + public bool UsePersistentCache { get; set; } + + public string PersistentCacheFolder { get; set; } = "cachee"; + + public int PersistentCacheRemoveTimeoutInSeconds { get; set; } = 3600; + + public bool UseMemoryCache { get; set; } + + public int MemoryCacheLimitInMb { get; set; } = 1024; + + public int MemoryCacheRemoveTimeoutInSeconds { get; set; } = 900; + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs new file mode 100644 index 0000000..e1830fc --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs @@ -0,0 +1,8 @@ +namespace ZeroLevel.Services.PartitionStorage +{ + public class IndexOptions + { + public bool Enabled { get; set; } + public int FileIndexCount { get; set; } = 64; + } +} diff --git a/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs similarity index 73% rename from ZeroLevel/Services/PartitionStorage/IStoreOptions.cs rename to ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs index 189a636..f97104a 100644 --- a/ZeroLevel/Services/PartitionStorage/IStoreOptions.cs +++ b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs @@ -6,12 +6,6 @@ using ZeroLevel.Services.FileSystem; namespace ZeroLevel.Services.PartitionStorage { - public class IndexOptions - { - public bool Enabled { get; set; } - public int FileIndexCount { get; set; } = 64; - } - /// /// Options /// @@ -19,7 +13,7 @@ namespace ZeroLevel.Services.PartitionStorage /// The value that is written in the stream /// Value after compression of TInput values by duplicate keys (TInput list or similar) /// Meta information for partition search - public class IStoreOptions + public class StoreOptions { /// /// Method for key comparison @@ -47,7 +41,17 @@ namespace ZeroLevel.Services.PartitionStorage /// public StoreFilePartition FilePartition { get; set; } - public IndexOptions Index { get; set; } = new IndexOptions { Enabled = false, FileIndexCount = 64 }; + public IndexOptions Index { get; set; } = new IndexOptions + { + Enabled = false, + FileIndexCount = 64 + }; + + public CacheOptions Cache { get; set; } = new CacheOptions + { + UseMemoryCache = false, + UsePersistentCache = false + }; internal string GetFileName(TKey key, TMeta info) { @@ -69,9 +73,9 @@ namespace ZeroLevel.Services.PartitionStorage return path; } - public IStoreOptions Clone() + public StoreOptions Clone() { - var options = new IStoreOptions + var options = new StoreOptions { Index = new IndexOptions { @@ -85,7 +89,16 @@ namespace ZeroLevel.Services.PartitionStorage Partitions = this.Partitions .Select(p => new StoreCatalogPartition(p.Name, p.PathExtractor)) .ToList(), - RootFolder = this.RootFolder + RootFolder = this.RootFolder, + Cache = new CacheOptions + { + MemoryCacheLimitInMb = this.Cache.MemoryCacheLimitInMb, + MemoryCacheRemoveTimeoutInSeconds = this.Cache.MemoryCacheRemoveTimeoutInSeconds, + PersistentCacheFolder = this.Cache.PersistentCacheFolder, + PersistentCacheRemoveTimeoutInSeconds = this.Cache.PersistentCacheRemoveTimeoutInSeconds, + UseMemoryCache = this.Cache.UseMemoryCache, + UsePersistentCache = this.Cache.UsePersistentCache + } }; return options; } diff --git a/ZeroLevel/Services/PartitionStorage/StoreCatalogPartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreCatalogPartition.cs similarity index 100% rename from ZeroLevel/Services/PartitionStorage/StoreCatalogPartition.cs rename to ZeroLevel/Services/PartitionStorage/Partition/StoreCatalogPartition.cs diff --git a/ZeroLevel/Services/PartitionStorage/StoreFilePartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreFilePartition.cs similarity index 100% rename from ZeroLevel/Services/PartitionStorage/StoreFilePartition.cs rename to ZeroLevel/Services/PartitionStorage/Partition/StoreFilePartition.cs diff --git a/ZeroLevel/Services/PartitionStorage/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs similarity index 81% rename from ZeroLevel/Services/PartitionStorage/StoreMergePartitionAccessor.cs rename to ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs index 15dbb55..fba4650 100644 --- a/ZeroLevel/Services/PartitionStorage/StoreMergePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs @@ -13,7 +13,7 @@ namespace ZeroLevel.Services.PartitionStorage /// /// public class StoreMergePartitionAccessor - : IStorePartitionAccessor + : IStorePartitionBuilder { private readonly Func> _decompress; /// @@ -23,8 +23,8 @@ namespace ZeroLevel.Services.PartitionStorage /// /// Write catalog /// - private readonly IStorePartitionAccessor _temporaryAccessor; - public StoreMergePartitionAccessor(IStoreOptions options, + private readonly IStorePartitionBuilder _temporaryAccessor; + public StoreMergePartitionAccessor(StoreOptions options, TMeta info, Func> decompress) { if (decompress == null) throw new ArgumentNullException(nameof(decompress)); @@ -33,7 +33,7 @@ namespace ZeroLevel.Services.PartitionStorage var tempCatalog = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); var tempOptions = options.Clone(); tempOptions.RootFolder = tempCatalog; - _temporaryAccessor = new StorePartitionAccessor(tempOptions, info); + _temporaryAccessor = new StorePartitionBuilder(tempOptions, info); } private IEnumerable>> @@ -86,7 +86,7 @@ namespace ZeroLevel.Services.PartitionStorage } } // compress new file - (_temporaryAccessor as StorePartitionAccessor) + (_temporaryAccessor as StorePartitionBuilder) .CompressFile(file); // replace old file by new @@ -98,16 +98,6 @@ namespace ZeroLevel.Services.PartitionStorage Directory.Delete(_temporaryAccessor.GetCatalogPath(), true); } - public StorePartitionKeyValueSearchResult Find(TKey key) - => _accessor.Find(key); - - public IEnumerable> Find(IEnumerable keys) - => _accessor.Find(keys); - public IEnumerable> Iterate() - => _accessor.Iterate(); - public IEnumerable> IterateKeyBacket(TKey key) - => _accessor.IterateKeyBacket(key); - /// /// Deletes only new entries. Existing entries remain unchanged. /// diff --git a/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs similarity index 74% rename from ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs rename to ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs index aa8db87..e6da0b8 100644 --- a/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs @@ -1,9 +1,7 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; -using System.Threading.Tasks; using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.Serialization; @@ -12,30 +10,23 @@ namespace ZeroLevel.Services.PartitionStorage public class StorePartitionAccessor : IStorePartitionAccessor { - private readonly ConcurrentDictionary _writeStreams - = new ConcurrentDictionary(); - private readonly IStoreOptions _options; + private readonly StoreOptions _options; private readonly string _catalog; private readonly TMeta _info; public string Catalog { get { return _catalog; } } - public StorePartitionAccessor(IStoreOptions options, TMeta info) + public StorePartitionAccessor(StoreOptions options, TMeta info) { if (options == null) throw new ArgumentNullException(nameof(options)); _info = info; _options = options; _catalog = _options.GetCatalogPath(info); - if (Directory.Exists(_catalog) == false) - { - Directory.CreateDirectory(_catalog); - } } public int CountDataFiles() => Directory.GetFiles(_catalog)?.Length ?? 0; public string GetCatalogPath() => _catalog; public void DropData() => FSUtils.CleanAndTestFolder(_catalog); - #region API !only after data compression! public StorePartitionKeyValueSearchResult Find(TKey key) { var fileName = _options.GetFileName(key, _info); @@ -44,7 +35,7 @@ namespace ZeroLevel.Services.PartitionStorage long startOffset = 0; if (_options.Index.Enabled) { - var index = new StorePartitionIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); + var index = new StorePartitionSparseIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); var offset = index.GetOffset(key); startOffset = offset.Offset; } @@ -171,35 +162,6 @@ namespace ZeroLevel.Services.PartitionStorage } } } - #endregion - - - #region API !only before data compression! - public void Store(TKey key, TInput value) - { - var fileName = _options.GetFileName(key, _info); - var stream = GetWriteStream(fileName); - stream.SerializeCompatible(key); - stream.SerializeCompatible(value); - } - public void CompleteAddingAndCompress() - { - // Close all write streams - foreach (var s in _writeStreams) - { - try - { - s.Value.Dispose(); - } - catch { } - } - var files = Directory.GetFiles(_catalog); - if (files != null && files.Length > 1) - { - Parallel.ForEach(files, file => CompressFile(file)); - } - } - #endregion #region Private methods private IEnumerable> Find(string fileName, @@ -209,7 +171,7 @@ namespace ZeroLevel.Services.PartitionStorage { if (_options.Index.Enabled) { - var index = new StorePartitionIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); + var index = new StorePartitionSparseIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); var offsets = index.GetOffset(keys, true); using (var reader = GetReadStream(fileName)) { @@ -278,46 +240,7 @@ namespace ZeroLevel.Services.PartitionStorage } } } - internal void CompressFile(string file) - { - var dict = new Dictionary>(); - using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) - { - while (reader.EOS == false) - { - TKey k = reader.ReadCompatible(); - TInput v = reader.ReadCompatible(); - if (false == dict.ContainsKey(k)) - { - dict[k] = new HashSet(); - } - dict[k].Add(v); - } - } - var tempPath = Path.GetTempPath(); - var tempFile = Path.Combine(tempPath, Path.GetTempFileName()); - using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) - { - // sort for search acceleration - foreach (var pair in dict.OrderBy(p => p.Key)) - { - var v = _options.MergeFunction(pair.Value); - writer.SerializeCompatible(pair.Key); - writer.SerializeCompatible(v); - } - } - File.Delete(file); - File.Move(tempFile, file, true); - } - private MemoryStreamWriter GetWriteStream(string fileName) - { - return _writeStreams.GetOrAdd(fileName, k => - { - var filePath = Path.Combine(_catalog, k); - var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); - return new MemoryStreamWriter(stream); - }); - } + private MemoryStreamReader GetReadStream(string fileName) { var filePath = Path.Combine(_catalog, fileName); @@ -328,7 +251,5 @@ namespace ZeroLevel.Services.PartitionStorage public void Dispose() { } - - } } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs new file mode 100644 index 0000000..9323aec --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -0,0 +1,160 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services.PartitionStorage +{ + public class StorePartitionBuilder + : IStorePartitionBuilder + { + private readonly ConcurrentDictionary _writeStreams + = new ConcurrentDictionary(); + private readonly StoreOptions _options; + private readonly string _catalog; + private readonly TMeta _info; + + public string Catalog { get { return _catalog; } } + public StorePartitionBuilder(StoreOptions options, TMeta info) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + _info = info; + _options = options; + _catalog = _options.GetCatalogPath(info); + if (Directory.Exists(_catalog) == false) + { + Directory.CreateDirectory(_catalog); + } + } + + public int CountDataFiles() => Directory.GetFiles(_catalog)?.Length ?? 0; + public string GetCatalogPath() => _catalog; + public void DropData() => FSUtils.CleanAndTestFolder(_catalog); + + public void Store(TKey key, TInput value) + { + var fileName = _options.GetFileName(key, _info); + var stream = GetWriteStream(fileName); + stream.SerializeCompatible(key); + stream.SerializeCompatible(value); + } + public void CompleteAddingAndCompress() + { + // Close all write streams + foreach (var s in _writeStreams) + { + try + { + s.Value.Dispose(); + } + catch { } + } + var files = Directory.GetFiles(_catalog); + if (files != null && files.Length > 1) + { + Parallel.ForEach(files, file => CompressFile(file)); + } + } + public void RebuildIndex() + { + if (_options.Index.Enabled) + { + var indexFolder = Path.Combine(_catalog, "__indexes__"); + FSUtils.CleanAndTestFolder(indexFolder); + var files = Directory.GetFiles(_catalog); + if (files != null && files.Length > 1) + { + var dict = new Dictionary(); + foreach (var file in files) + { + dict.Clear(); + using (var reader = GetReadStream(Path.GetFileName(file))) + { + while (reader.EOS == false) + { + var pos = reader.Stream.Position; + var k = reader.ReadCompatible(); + dict[k] = pos; + reader.ReadCompatible(); + } + } + if (dict.Count > _options.Index.FileIndexCount * 8) + { + var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero); + var index_file = Path.Combine(indexFolder, Path.GetFileName(file)); + var d_arr = dict.OrderBy(p => p.Key).ToArray(); + using (var writer = new MemoryStreamWriter( + new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) + { + for (int i = 0; i < _options.Index.FileIndexCount; i++) + { + var pair = d_arr[i * step]; + writer.WriteCompatible(pair.Key); + writer.WriteLong(pair.Value); + } + } + } + } + } + } + } + + #region Private methods + internal void CompressFile(string file) + { + var dict = new Dictionary>(); + using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) + { + while (reader.EOS == false) + { + TKey k = reader.ReadCompatible(); + TInput v = reader.ReadCompatible(); + if (false == dict.ContainsKey(k)) + { + dict[k] = new HashSet(); + } + dict[k].Add(v); + } + } + var tempPath = Path.GetTempPath(); + var tempFile = Path.Combine(tempPath, Path.GetTempFileName()); + using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) + { + // sort for search acceleration + foreach (var pair in dict.OrderBy(p => p.Key)) + { + var v = _options.MergeFunction(pair.Value); + writer.SerializeCompatible(pair.Key); + writer.SerializeCompatible(v); + } + } + File.Delete(file); + File.Move(tempFile, file, true); + } + private MemoryStreamWriter GetWriteStream(string fileName) + { + return _writeStreams.GetOrAdd(fileName, k => + { + var filePath = Path.Combine(_catalog, k); + var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); + return new MemoryStreamWriter(stream); + }); + } + private MemoryStreamReader GetReadStream(string fileName) + { + var filePath = Path.Combine(_catalog, fileName); + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + return new MemoryStreamReader(stream); + } + #endregion + public void Dispose() + { + } + + + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Search/PartitionSearchRequest.cs b/ZeroLevel/Services/PartitionStorage/Search/PartitionSearchRequest.cs new file mode 100644 index 0000000..74dafb4 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Search/PartitionSearchRequest.cs @@ -0,0 +1,10 @@ +using System.Collections.Generic; + +namespace ZeroLevel.Services.PartitionStorage +{ + public class PartitionSearchRequest + { + public TMeta Info { get; set; } + public IEnumerable Keys { get; set; } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/StorePartitionKeyValueSearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs similarity index 100% rename from ZeroLevel/Services/PartitionStorage/StorePartitionKeyValueSearchResult.cs rename to ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs diff --git a/ZeroLevel/Services/PartitionStorage/StoreSearchRequest.cs b/ZeroLevel/Services/PartitionStorage/Search/StoreSearchRequest.cs similarity index 61% rename from ZeroLevel/Services/PartitionStorage/StoreSearchRequest.cs rename to ZeroLevel/Services/PartitionStorage/Search/StoreSearchRequest.cs index efa5935..8753085 100644 --- a/ZeroLevel/Services/PartitionStorage/StoreSearchRequest.cs +++ b/ZeroLevel/Services/PartitionStorage/Search/StoreSearchRequest.cs @@ -2,11 +2,6 @@ namespace ZeroLevel.Services.PartitionStorage { - public class PartitionSearchRequest - { - public TMeta Info { get; set; } - public IEnumerable Keys { get; set; } - } public class StoreSearchRequest { public IEnumerable> PartitionSearchRequests { get; set; } diff --git a/ZeroLevel/Services/PartitionStorage/StoreSearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/StoreSearchResult.cs similarity index 100% rename from ZeroLevel/Services/PartitionStorage/StoreSearchResult.cs rename to ZeroLevel/Services/PartitionStorage/Search/StoreSearchResult.cs diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs index 58aadbb..9bc6494 100644 --- a/ZeroLevel/Services/PartitionStorage/Store.cs +++ b/ZeroLevel/Services/PartitionStorage/Store.cs @@ -10,8 +10,8 @@ namespace ZeroLevel.Services.PartitionStorage public class Store : IStore { - private readonly IStoreOptions _options; - public Store(IStoreOptions options) + private readonly StoreOptions _options; + public Store(StoreOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); _options = options; @@ -26,8 +26,12 @@ namespace ZeroLevel.Services.PartitionStorage return new StorePartitionAccessor(_options, info); } - public IStorePartitionAccessor CreateMergeAccessor(TMeta info - , Func> decompressor) + public IStorePartitionBuilder CreateBuilder(TMeta info) + { + return new StorePartitionBuilder(_options, info); + } + + public IStorePartitionBuilder CreateMergeAccessor(TMeta info, Func> decompressor) { return new StoreMergePartitionAccessor(_options, info, decompressor); }