diff --git a/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs index 0c7a068..85b2646 100644 --- a/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/IStorePartitionAccessor.cs @@ -12,6 +12,7 @@ namespace ZeroLevel.Services.PartitionStorage public interface IStorePartitionAccessor : IDisposable { + string GetCatalogPath(); /// /// Save one record /// diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs index a2dabe7..7c50be5 100644 --- a/ZeroLevel/Services/PartitionStorage/Store.cs +++ b/ZeroLevel/Services/PartitionStorage/Store.cs @@ -32,13 +32,20 @@ namespace ZeroLevel.Services.PartitionStorage var results = new ConcurrentDictionary>>(); if (searchRequest.PartitionSearchRequests?.Any() ?? false) { - var partitionsSearchInfo = searchRequest.PartitionSearchRequests.ToDictionary(r => r.Info, r => r.Keys); - var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism }; + var partitionsSearchInfo = searchRequest + .PartitionSearchRequests + .ToDictionary(r => r.Info, r => r.Keys); + var options = new ParallelOptions + { + MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism + }; await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) => { using (var accessor = CreateAccessor(pair.Key)) { - results[pair.Key] = accessor.Find(pair.Value).ToArray(); + results[pair.Key] = accessor + .Find(pair.Value) + .ToArray(); } }); } diff --git a/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs index d1f348e..5679dea 100644 --- a/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/StorePartitionAccessor.cs @@ -30,14 +30,29 @@ namespace ZeroLevel.Services.PartitionStorage Directory.CreateDirectory(_catalog); } } - + #region API + public string GetCatalogPath() + { + return _catalog; + } public StorePartitionKeyValueSearchResult Find(TKey key) { var fileName = _options.GetFileName(key, _info); if (File.Exists(Path.Combine(_catalog, fileName))) { + long startOffset = 0; + if (_options.Index.Enabled) + { + var index = new StorePartitionIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); + var offset = index.GetOffset(key); + startOffset = offset.Offset; + } using (var reader = GetReadStream(fileName)) { + if (startOffset > 0) + { + reader.Stream.Seek(startOffset, SeekOrigin.Begin); + } while (reader.EOS == false) { var k = reader.ReadCompatible(); @@ -63,91 +78,12 @@ namespace ZeroLevel.Services.PartitionStorage Value = default }; } - - private IEnumerable> Find(string fileName, - TKey[] keys) - { - if (File.Exists(Path.Combine(_catalog, fileName))) - { - if (_options.Index.Enabled) - { - var index = new StorePartitionIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); - var offsets = index.GetOffset(keys, true); - using (var reader = GetReadStream(fileName)) - { - for (int i = 0; i < keys.Length; i++) - { - var searchKey = keys[i]; - var off = offsets[i]; - - reader.Stream.Seek(off.Offset, SeekOrigin.Begin); - while (reader.EOS == false) - { - var k = reader.ReadCompatible(); - var v = reader.ReadCompatible(); - var c = _options.KeyComparer(searchKey, k); - if (c == 0) - { - yield return new StorePartitionKeyValueSearchResult - { - Key = searchKey, - Value = v, - Found = true - }; - break; - } - else if (c == -1) - { - break; - } - } - } - } - } - else - { - using (var reader = GetReadStream(fileName)) - { - int index = 0; - var keys_arr = keys.OrderBy(k => k).ToArray(); - while (reader.EOS == false && index < keys_arr.Length) - { - var k = reader.ReadCompatible(); - var v = reader.ReadCompatible(); - var c = _options.KeyComparer(keys_arr[index], k); - if (c == 0) - { - yield return new StorePartitionKeyValueSearchResult - { - Key = keys_arr[index], - Value = v, - Found = true - }; - index++; - } - else if (c == -1) - { - do - { - index++; - if (index < keys_arr.Length) - { - c = _options.KeyComparer(keys_arr[index], k); - } - } while (index < keys_arr.Length && c == -1); - } - } - } - } - } - } - public IEnumerable> Find(IEnumerable keys) { var results = keys .GroupBy( - k => _options.GetFileName(k, _info), - k => k, (key, g) => new { FileName = key, Keys = g.ToArray() }); + k => _options.GetFileName(k, _info), + k => k, (key, g) => new { FileName = key, Keys = g.ToArray() }); foreach (var group in results) { foreach (var r in Find(group.FileName, group.Keys)) @@ -156,7 +92,6 @@ namespace ZeroLevel.Services.PartitionStorage } } } - public void CompleteStoreAndRebuild() { // Close all write streams @@ -174,39 +109,6 @@ namespace ZeroLevel.Services.PartitionStorage Parallel.ForEach(files, file => CompressFile(file)); } } - - private void CompressFile(string file) - { - var dict = new Dictionary>(); - using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) - { - while (reader.EOS == false) - { - TKey k = reader.ReadCompatible(); - TInput v = reader.ReadCompatible(); - if (false == dict.ContainsKey(k)) - { - dict[k] = new HashSet(); - } - dict[k].Add(v); - } - } - var tempPath = Path.GetTempPath(); - var tempFile = Path.Combine(tempPath, Path.GetTempFileName()); - using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) - { - // sort for search acceleration - foreach (var pair in dict.OrderBy(p => p.Key)) - { - var v = _options.MergeFunction(pair.Value); - writer.SerializeCompatible(pair.Key); - writer.SerializeCompatible(v); - } - } - File.Delete(file); - File.Move(tempFile, file, true); - } - public void Store(TKey key, TInput value) { var fileName = _options.GetFileName(key, _info); @@ -214,21 +116,6 @@ namespace ZeroLevel.Services.PartitionStorage stream.SerializeCompatible(key); stream.SerializeCompatible(value); } - private MemoryStreamWriter GetWriteStream(string fileName) - { - return _writeStreams.GetOrAdd(fileName, k => - { - var filePath = Path.Combine(_catalog, k); - var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); - return new MemoryStreamWriter(stream); - }); - } - private MemoryStreamReader GetReadStream(string fileName) - { - var filePath = Path.Combine(_catalog, fileName); - var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); - return new MemoryStreamReader(stream); - } public int CountDataFiles() { var files = Directory.GetFiles(_catalog); @@ -316,9 +203,136 @@ namespace ZeroLevel.Services.PartitionStorage } } } + #endregion + #region Private methods + private IEnumerable> Find(string fileName, + TKey[] keys) + { + if (File.Exists(Path.Combine(_catalog, fileName))) + { + if (_options.Index.Enabled) + { + var index = new StorePartitionIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); + var offsets = index.GetOffset(keys, true); + using (var reader = GetReadStream(fileName)) + { + for (int i = 0; i < keys.Length; i++) + { + var searchKey = keys[i]; + var off = offsets[i]; + reader.Stream.Seek(off.Offset, SeekOrigin.Begin); + while (reader.EOS == false) + { + var k = reader.ReadCompatible(); + var v = reader.ReadCompatible(); + var c = _options.KeyComparer(searchKey, k); + if (c == 0) + { + yield return new StorePartitionKeyValueSearchResult + { + Key = searchKey, + Value = v, + Found = true + }; + break; + } + else if (c == -1) + { + break; + } + } + } + } + } + else + { + using (var reader = GetReadStream(fileName)) + { + int index = 0; + var keys_arr = keys.OrderBy(k => k).ToArray(); + while (reader.EOS == false && index < keys_arr.Length) + { + var k = reader.ReadCompatible(); + var v = reader.ReadCompatible(); + var c = _options.KeyComparer(keys_arr[index], k); + if (c == 0) + { + yield return new StorePartitionKeyValueSearchResult + { + Key = keys_arr[index], + Value = v, + Found = true + }; + index++; + } + else if (c == -1) + { + do + { + index++; + if (index < keys_arr.Length) + { + c = _options.KeyComparer(keys_arr[index], k); + } + } while (index < keys_arr.Length && c == -1); + } + } + } + } + } + } + private void CompressFile(string file) + { + var dict = new Dictionary>(); + using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) + { + while (reader.EOS == false) + { + TKey k = reader.ReadCompatible(); + TInput v = reader.ReadCompatible(); + if (false == dict.ContainsKey(k)) + { + dict[k] = new HashSet(); + } + dict[k].Add(v); + } + } + var tempPath = Path.GetTempPath(); + var tempFile = Path.Combine(tempPath, Path.GetTempFileName()); + using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) + { + // sort for search acceleration + foreach (var pair in dict.OrderBy(p => p.Key)) + { + var v = _options.MergeFunction(pair.Value); + writer.SerializeCompatible(pair.Key); + writer.SerializeCompatible(v); + } + } + File.Delete(file); + File.Move(tempFile, file, true); + } + private MemoryStreamWriter GetWriteStream(string fileName) + { + return _writeStreams.GetOrAdd(fileName, k => + { + var filePath = Path.Combine(_catalog, k); + var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); + return new MemoryStreamWriter(stream); + }); + } + private MemoryStreamReader GetReadStream(string fileName) + { + var filePath = Path.Combine(_catalog, fileName); + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + return new MemoryStreamReader(stream); + } + #endregion public void Dispose() { } + + } } diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index 8ed9d5d..b3653c0 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -6,16 +6,16 @@ ogoun ogoun - 3.3.7.5 - PartitionStorage fix + 3.3.7.6 + PartitionStorage update indexes https://github.com/ogoun/Zero/wiki Copyright Ogoun 2022 https://github.com/ogoun/Zero git - 3.3.7.5 - 3.3.7.5 + 3.3.7.6 + 3.3.7.6 AnyCPU;x64;x86 zero.png full