From cebcb9feb21008cb7c0fca73c69bf8a7cd209162 Mon Sep 17 00:00:00 2001 From: Ogoun Date: Fri, 20 Jan 2023 18:34:38 +0300 Subject: [PATCH] Fix concurrent work. Partition storage --- .../PartitionStorage/Indexes/IndexBuilder.cs | 76 ++++++++++----- .../CompactKeyStorePartitionBuilder.cs | 60 +++++++----- .../Partition/StoreMergePartitionAccessor.cs | 16 +++- .../Partition/StorePartitionBuilder.cs | 75 ++++++++------- .../PhisicalFileAccessorCachee.cs | 95 ++++++++++++------- ZeroLevel/ZeroLevel.csproj | 8 +- 6 files changed, 209 insertions(+), 121 deletions(-) diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs index 3ecb613..b53ea3b 100644 --- a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs @@ -57,18 +57,27 @@ namespace ZeroLevel.Services.PartitionStorage RebuildFileIndexWithSteps(file); } } + /// /// Delete the index for the specified file /// internal void DropFileIndex(string file) { var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); - _phisicalFileAccessorCachee.DropIndexReader(index_file); - if (File.Exists(index_file)) + _phisicalFileAccessorCachee.LockFile(index_file); + try + { + if (File.Exists(index_file)) + { + File.Delete(index_file); + } + } + finally { - File.Delete(index_file); + _phisicalFileAccessorCachee.UnlockFile(index_file); } } + /// /// Rebuild index with specified number of steps for specified file /// @@ -93,17 +102,29 @@ namespace ZeroLevel.Services.PartitionStorage { var step = (int)Math.Round(dict.Count / (float)_stepValue, MidpointRounding.ToZero); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); - DropFileIndex(index_file); - var d_arr = dict.OrderBy(p => p.Key).ToArray(); - using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) + + _phisicalFileAccessorCachee.LockFile(index_file); + if (File.Exists(index_file)) + { + File.Delete(index_file); + } + try { - for (int i = 0; i < _stepValue; i++) + var d_arr = dict.OrderBy(p => p.Key).ToArray(); + using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) { - var pair = d_arr[i * step]; - writer.WriteCompatible(pair.Key); - writer.WriteLong(pair.Value); + for (int i = 0; i < _stepValue; i++) + { + var pair = d_arr[i * step]; + writer.WriteCompatible(pair.Key); + writer.WriteLong(pair.Value); + } } } + finally + { + _phisicalFileAccessorCachee.UnlockFile(index_file); + } } } /// @@ -118,24 +139,35 @@ namespace ZeroLevel.Services.PartitionStorage using (var reader = new MemoryStreamReader(new FileStream(Path.Combine(_dataCatalog, file), FileMode.Open, FileAccess.Read, FileShare.None))) { var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); - DropFileIndex(index_file); - using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) + _phisicalFileAccessorCachee.LockFile(index_file); + if (File.Exists(index_file)) { - var counter = 1; - while (reader.EOS == false) + File.Delete(index_file); + } + try + { + using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) { - counter--; - var pos = reader.Position; - var k = _keyDeserializer.Invoke(reader); - _valueDeserializer.Invoke(reader); - if (counter == 0) + var counter = 1; + while (reader.EOS == false) { - writer.WriteCompatible(k); - writer.WriteLong(pos); - counter = _stepValue; + counter--; + var pos = reader.Position; + var k = _keyDeserializer.Invoke(reader); + _valueDeserializer.Invoke(reader); + if (counter == 0) + { + writer.WriteCompatible(k); + writer.WriteLong(pos); + counter = _stepValue; + } } } } + finally + { + _phisicalFileAccessorCachee.UnlockFile(index_file); + } } } } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs index 7f5c2fc..cf34413 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs @@ -124,42 +124,50 @@ namespace ZeroLevel.Services.PartitionStorage.Partition TKey key; TInput input; var dict = new Dictionary>(); - using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) + PhisicalFileAccessorCachee.LockFile(file); + try { - while (reader.EOS == false) + using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) { - if (false == Serializer.KeyDeserializer.Invoke(reader, out key)) + while (reader.EOS == false) { - throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key."); - } - if (false == dict.ContainsKey(key)) - { - dict[key] = new HashSet(); - } - if (reader.EOS) - { - break; + if (false == Serializer.KeyDeserializer.Invoke(reader, out key)) + { + throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key."); + } + if (false == dict.ContainsKey(key)) + { + dict[key] = new HashSet(); + } + if (reader.EOS) + { + break; + } + if (false == Serializer.InputDeserializer.Invoke(reader, out input)) + { + throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault input value."); + } + dict[key].Add(input); } - if (false == Serializer.InputDeserializer.Invoke(reader, out input)) + } + var tempFile = FSUtils.GetAppLocalTemporaryFile(); + using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) + { + // sort for search acceleration + foreach (var pair in dict.OrderBy(p => p.Key)) { - throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault input value."); + var v = _options.MergeFunction(pair.Value); + writer.SerializeCompatible(pair.Key); + writer.SerializeCompatible(v); } - dict[key].Add(input); } + File.Delete(file); + File.Move(tempFile, file, true); } - var tempFile = FSUtils.GetAppLocalTemporaryFile(); - using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) + finally { - // sort for search acceleration - foreach (var pair in dict.OrderBy(p => p.Key)) - { - var v = _options.MergeFunction(pair.Value); - writer.SerializeCompatible(pair.Key); - writer.SerializeCompatible(v); - } + PhisicalFileAccessorCachee.UnlockFile(file); } - File.Delete(file); - File.Move(tempFile, file, true); } #endregion } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs index 7de90c7..42651ce 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs @@ -115,12 +115,20 @@ namespace ZeroLevel.Services.PartitionStorage // 2. Replace source var name = Path.GetFileName(file); var updateFilePath = Path.Combine(folder, name); - if (File.Exists(updateFilePath)) + + _phisicalFileAccessor.LockFile(updateFilePath); + try + { + if (File.Exists(updateFilePath)) + { + File.Delete(updateFilePath); + } + File.Move(file, updateFilePath, true); + } + finally { - _phisicalFileAccessor.DropDataReader(updateFilePath); - File.Delete(updateFilePath); + _phisicalFileAccessor.UnlockFile(updateFilePath); } - File.Move(file, updateFilePath, true); // 3. Rebuil index (_accessor as BasePartition).RebuildFileIndex(name); diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index ec11b9f..eb15b6e 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.Memory; using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Partition; using ZeroLevel.Services.Serialization; @@ -139,49 +140,57 @@ namespace ZeroLevel.Services.PartitionStorage { TKey key; TInput input; - var dict = new Dictionary>(); - var accessor = PhisicalFileAccessorCachee.GetDataAccessor(file, 0); - if (accessor != null) + PhisicalFileAccessorCachee.LockFile(file); + try { - using (var reader = new MemoryStreamReader(accessor)) + var dict = new Dictionary>(); + var accessor = new StreamVewAccessor(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 1024 * 1024 * 32)); + if (accessor != null) { - while (reader.EOS == false) + using (var reader = new MemoryStreamReader(accessor)) { - if (Serializer.KeyDeserializer.Invoke(reader, out key) == false) + while (reader.EOS == false) { - throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key."); - } - if (false == dict.ContainsKey(key)) - { - dict[key] = new HashSet(); - } - if (reader.EOS) - { - break; - } - if (Serializer.InputDeserializer.Invoke(reader, out input) == false) - { - throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read input value."); + if (Serializer.KeyDeserializer.Invoke(reader, out key) == false) + { + throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key."); + } + if (false == dict.ContainsKey(key)) + { + dict[key] = new HashSet(); + } + if (reader.EOS) + { + break; + } + if (Serializer.InputDeserializer.Invoke(reader, out input) == false) + { + throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read input value."); + } + dict[key].Add(input); } - dict[key].Add(input); } } - } - var tempFile = FSUtils.GetAppLocalTemporaryFile(); - using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) - { - // sort for search acceleration - foreach (var pair in dict.OrderBy(p => p.Key)) + + var tempFile = FSUtils.GetAppLocalTemporaryFile(); + using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) { - var v = _options.MergeFunction(pair.Value); - writer.SerializeCompatible(pair.Key); - Thread.MemoryBarrier(); - writer.SerializeCompatible(v); + // sort for search acceleration + foreach (var pair in dict.OrderBy(p => p.Key)) + { + var v = _options.MergeFunction(pair.Value); + writer.SerializeCompatible(pair.Key); + Thread.MemoryBarrier(); + writer.SerializeCompatible(v); + } } + File.Delete(file); + File.Move(tempFile, file, true); + } + finally + { + PhisicalFileAccessorCachee.UnlockFile(file); } - PhisicalFileAccessorCachee.DropDataReader(file); - File.Delete(file); - File.Move(tempFile, file, true); } #endregion } diff --git a/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs b/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs index 493a55f..6b277ad 100644 --- a/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs +++ b/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.IO; using ZeroLevel.Services.Cache; using ZeroLevel.Services.FileSystem; @@ -12,6 +13,8 @@ namespace ZeroLevel.Services.PartitionStorage private readonly TimerCachee _indexReadersCachee; private readonly TimerCachee _dataReadersCachee; + private readonly HashSet _lockedFiles = new HashSet(); + public PhisicalFileAccessorCachee(TimeSpan dataExpirationPeriod, TimeSpan indexExpirationPeriod) { _dataReadersCachee = new TimerCachee(dataExpirationPeriod, s => new ParallelFileReader(s), i => i.Dispose(), 8192); @@ -32,32 +35,40 @@ namespace ZeroLevel.Services.PartitionStorage } public IViewAccessor GetDataAccessor(string filePath, long offset) { - var reader = GetDataReader(filePath); - try + if (false == _lockedFiles.Contains(filePath)) { + var reader = GetDataReader(filePath); + try + { + return reader.GetAccessor(offset); + } + catch (ObjectDisposedException) + { + _dataReadersCachee.Drop(filePath); + reader = _dataReadersCachee.Get(filePath); + } return reader.GetAccessor(offset); } - catch (ObjectDisposedException) - { - _dataReadersCachee.Drop(filePath); - reader = _dataReadersCachee.Get(filePath); - } - return reader.GetAccessor(offset); + return null; } public IViewAccessor GetDataAccessor(string filePath, long offset, int length) { - var reader = GetDataReader(filePath); - try + if (false == _lockedFiles.Contains(filePath)) { + var reader = GetDataReader(filePath); + try + { + return reader.GetAccessor(offset, length); + } + catch (ObjectDisposedException) + { + _dataReadersCachee.Drop(filePath); + reader = _dataReadersCachee.Get(filePath); + } return reader.GetAccessor(offset, length); } - catch (ObjectDisposedException) - { - _dataReadersCachee.Drop(filePath); - reader = _dataReadersCachee.Get(filePath); - } - return reader.GetAccessor(offset, length); + return null; } public void DropAllDataReaders() { @@ -79,32 +90,40 @@ namespace ZeroLevel.Services.PartitionStorage } public IViewAccessor GetIndexAccessor(string filePath, long offset) { - var reader = GetIndexReader(filePath); - try + if (false == _lockedFiles.Contains(filePath)) { + var reader = GetIndexReader(filePath); + try + { + return reader.GetAccessor(offset); + } + catch (ObjectDisposedException) + { + _indexReadersCachee.Drop(filePath); + reader = _indexReadersCachee.Get(filePath); + } return reader.GetAccessor(offset); } - catch (ObjectDisposedException) - { - _indexReadersCachee.Drop(filePath); - reader = _indexReadersCachee.Get(filePath); - } - return reader.GetAccessor(offset); + return null; } public IViewAccessor GetIndexAccessor(string filePath, long offset, int length) { - var reader = GetIndexReader(filePath); - try + if (false == _lockedFiles.Contains(filePath)) { + var reader = GetIndexReader(filePath); + try + { + return reader.GetAccessor(offset, length); + } + catch (ObjectDisposedException) + { + _indexReadersCachee.Drop(filePath); + reader = _indexReadersCachee.Get(filePath); + } return reader.GetAccessor(offset, length); } - catch (ObjectDisposedException) - { - _indexReadersCachee.Drop(filePath); - reader = _indexReadersCachee.Get(filePath); - } - return reader.GetAccessor(offset, length); + return null; } public void DropAllIndexReaders() { @@ -112,6 +131,18 @@ namespace ZeroLevel.Services.PartitionStorage } #endregion + public void LockFile(string filePath) + { + _lockedFiles.Add(filePath); + DropDataReader(filePath); + DropIndexReader(filePath); + } + + public void UnlockFile(string filePath) + { + _lockedFiles.Remove(filePath); + } + public void Dispose() { _dataReadersCachee.Dispose(); diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index 3a731ce..2717624 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -6,16 +6,16 @@ ogoun ogoun - 3.3.8.9 - Partition storage. Suppress exception when find invoke + 3.3.9.0 + Partition storage. Fix concurrent work https://github.com/ogoun/Zero/wiki Copyright Ogoun 2023 https://github.com/ogoun/Zero git - 3.3.8.9 - 3.3.8.9 + 3.3.9.0 + 3.3.9.0 AnyCPU;x64;x86 zero.png full