From 7157eaa9ff8aaff478c9ee40a82776a3e9e432b8 Mon Sep 17 00:00:00 2001 From: Ogoun Date: Sat, 19 Nov 2022 20:07:56 +0300 Subject: [PATCH] Safe open streams --- .../Partition/StoreMergePartitionAccessor.cs | 13 +- .../Partition/StorePartitionAccessor.cs | 277 ++++++++++-------- .../Partition/StorePartitionBuilder.cs | 107 ++++--- .../StorePartitionKeyValueSearchResult.cs | 9 +- ZeroLevel/ZeroLevel.csproj | 6 +- 5 files changed, 242 insertions(+), 170 deletions(-) diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs index d1a3a2b..ae80c44 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs @@ -126,13 +126,12 @@ namespace ZeroLevel.Services.PartitionStorage var k = _keyDeserializer.Invoke(reader); var v = _valueDeserializer.Invoke(reader); var input = _decompress(v); - yield return - new StorePartitionKeyValueSearchResult> - { - Key = k, - Value = input, - Found = true - }; + yield return new StorePartitionKeyValueSearchResult> + { + Key = k, + Value = input, + Status = SearchResult.Success + }; } } } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs index 0b9add4..06b2d27 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs @@ -49,34 +49,46 @@ namespace ZeroLevel.Services.PartitionStorage var offset = index.GetOffset(key); startOffset = offset.Offset; } - using (var reader = GetReadStream(fileName)) + if (TryGetReadStream(fileName, out var reader)) { - if (startOffset > 0) + using (reader) { - reader.Seek(startOffset, SeekOrigin.Begin); - } - while (reader.EOS == false) - { - var k = _keyDeserializer.Invoke(reader); - var v = _valueDeserializer.Invoke(reader); - var c = _options.KeyComparer(key, k); - if (c == 0) return new StorePartitionKeyValueSearchResult + if (startOffset > 0) { - Key = key, - Value = v, - Found = true - }; - if (c == -1) + reader.Seek(startOffset, SeekOrigin.Begin); + } + while (reader.EOS == false) { - break; + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); + var c = _options.KeyComparer(key, k); + if (c == 0) return new StorePartitionKeyValueSearchResult + { + Key = key, + Value = v, + Status = SearchResult.Success + }; + if (c == -1) + { + break; + } } } } + else + { + return new StorePartitionKeyValueSearchResult + { + Key = key, + Status = SearchResult.FileLocked, + Value = default + }; + } } return new StorePartitionKeyValueSearchResult { Key = key, - Found = false, + Status = SearchResult.NotFound, Value = default }; } @@ -101,13 +113,16 @@ namespace ZeroLevel.Services.PartitionStorage { foreach (var file in files) { - using (var reader = GetReadStream(Path.GetFileName(file))) + if (TryGetReadStream(file, out var reader)) { - while (reader.EOS == false) + using (reader) { - var k = _keyDeserializer.Invoke(reader); - var v = _valueDeserializer.Invoke(reader); - yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Found = true }; + while (reader.EOS == false) + { + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); + yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Status = SearchResult.Success }; + } } } } @@ -118,13 +133,16 @@ namespace ZeroLevel.Services.PartitionStorage var fileName = _options.GetFileName(key, _info); if (File.Exists(Path.Combine(_catalog, fileName))) { - using (var reader = GetReadStream(fileName)) + if (TryGetReadStream(fileName, out var reader)) { - while (reader.EOS == false) + using (reader) { - var k = _keyDeserializer.Invoke(reader); - var v = _valueDeserializer.Invoke(reader); - yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Found = true }; + while (reader.EOS == false) + { + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); + yield return new StorePartitionKeyValueSearchResult { Key = k, Value = v, Status = SearchResult.Success }; + } } } } @@ -197,14 +215,17 @@ namespace ZeroLevel.Services.PartitionStorage Directory.CreateDirectory(_indexCatalog); } var dict = new Dictionary(); - using (var reader = GetReadStream(Path.GetFileName(file))) + if (TryGetReadStream(file, out var reader)) { - while (reader.EOS == false) + using (reader) { - var pos = reader.Position; - var k = _keyDeserializer.Invoke(reader); - dict[k] = pos; - _valueDeserializer.Invoke(reader); + while (reader.EOS == false) + { + var pos = reader.Position; + var k = _keyDeserializer.Invoke(reader); + dict[k] = pos; + _valueDeserializer.Invoke(reader); + } } } if (dict.Count > _options.Index.FileIndexCount * 8) @@ -237,31 +258,34 @@ namespace ZeroLevel.Services.PartitionStorage { var index = new StorePartitionSparseIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); var offsets = index.GetOffset(keys, true); - using (var reader = GetReadStream(fileName)) + if (TryGetReadStream(fileName, out var reader)) { - for (int i = 0; i < keys.Length; i++) + using (reader) { - var searchKey = keys[i]; - var off = offsets[i]; - reader.Seek(off.Offset, SeekOrigin.Begin); - while (reader.EOS == false) + for (int i = 0; i < keys.Length; i++) { - var k = _keyDeserializer.Invoke(reader); - var v = _valueDeserializer.Invoke(reader); - var c = _options.KeyComparer(searchKey, k); - if (c == 0) + var searchKey = keys[i]; + var off = offsets[i]; + reader.Seek(off.Offset, SeekOrigin.Begin); + while (reader.EOS == false) { - yield return new StorePartitionKeyValueSearchResult + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); + var c = _options.KeyComparer(searchKey, k); + if (c == 0) { - Key = searchKey, - Value = v, - Found = true - }; - break; - } - else if (c == -1) - { - break; + yield return new StorePartitionKeyValueSearchResult + { + Key = searchKey, + Value = v, + Status = SearchResult.Success + }; + break; + } + else if (c == -1) + { + break; + } } } } @@ -269,35 +293,38 @@ namespace ZeroLevel.Services.PartitionStorage } else { - using (var reader = GetReadStream(fileName)) + if (TryGetReadStream(fileName, out var reader)) { - int index = 0; - var keys_arr = keys.OrderBy(k => k).ToArray(); - while (reader.EOS == false && index < keys_arr.Length) + using (reader) { - var k = _keyDeserializer.Invoke(reader); - var v = _valueDeserializer.Invoke(reader); - var c = _options.KeyComparer(keys_arr[index], k); - if (c == 0) + int index = 0; + var keys_arr = keys.OrderBy(k => k).ToArray(); + while (reader.EOS == false && index < keys_arr.Length) { - yield return new StorePartitionKeyValueSearchResult - { - Key = keys_arr[index], - Value = v, - Found = true - }; - index++; - } - else if (c == -1) - { - do + var k = _keyDeserializer.Invoke(reader); + var v = _valueDeserializer.Invoke(reader); + var c = _options.KeyComparer(keys_arr[index], k); + if (c == 0) { + yield return new StorePartitionKeyValueSearchResult + { + Key = keys_arr[index], + Value = v, + Status = SearchResult.Success + }; index++; - if (index < keys_arr.Length) + } + else if (c == -1) + { + do { - c = _options.KeyComparer(keys_arr[index], k); - } - } while (index < keys_arr.Length && c == -1); + index++; + if (index < keys_arr.Length) + { + c = _options.KeyComparer(keys_arr[index], k); + } + } while (index < keys_arr.Length && c == -1); + } } } } @@ -316,27 +343,30 @@ namespace ZeroLevel.Services.PartitionStorage { var index = new StorePartitionSparseIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); var offsets = index.GetOffset(keys, true); - using (var reader = GetReadStream(fileName)) + if (TryGetReadStream(fileName, out var reader)) { - for (int i = 0; i < keys.Length; i++) + using (reader) { - var searchKey = keys[i]; - var off = offsets[i]; - reader.Seek(off.Offset, SeekOrigin.Begin); - while (reader.EOS == false) + for (int i = 0; i < keys.Length; i++) { - var startPosition = reader.Position; - var k = _keyDeserializer.Invoke(reader); - _valueDeserializer.Invoke(reader); - var endPosition = reader.Position; - var c = _options.KeyComparer(searchKey, k); - if (c == 0) - { - ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); - } - else if (c == -1) + var searchKey = keys[i]; + var off = offsets[i]; + reader.Seek(off.Offset, SeekOrigin.Begin); + while (reader.EOS == false) { - break; + var startPosition = reader.Position; + var k = _keyDeserializer.Invoke(reader); + _valueDeserializer.Invoke(reader); + var endPosition = reader.Position; + var c = _options.KeyComparer(searchKey, k); + if (c == 0) + { + ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); + } + else if (c == -1) + { + break; + } } } } @@ -344,32 +374,35 @@ namespace ZeroLevel.Services.PartitionStorage } else { - using (var reader = GetReadStream(fileName)) + if (TryGetReadStream(fileName, out var reader)) { - int index = 0; - var keys_arr = keys.OrderBy(k => k).ToArray(); - while (reader.EOS == false && index < keys_arr.Length) + using (reader) { - var startPosition = reader.Position; - var k = _keyDeserializer.Invoke(reader); - _valueDeserializer.Invoke(reader); - var endPosition = reader.Position; - var c = _options.KeyComparer(keys_arr[index], k); - if (c == 0) - { - ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); - index++; - } - else if (c == -1) + int index = 0; + var keys_arr = keys.OrderBy(k => k).ToArray(); + while (reader.EOS == false && index < keys_arr.Length) { - do + var startPosition = reader.Position; + var k = _keyDeserializer.Invoke(reader); + _valueDeserializer.Invoke(reader); + var endPosition = reader.Position; + var c = _options.KeyComparer(keys_arr[index], k); + if (c == 0) { + ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); index++; - if (index < keys_arr.Length) + } + else if (c == -1) + { + do { - c = _options.KeyComparer(keys_arr[index], k); - } - } while (index < keys_arr.Length && c == -1); + index++; + if (index < keys_arr.Length) + { + c = _options.KeyComparer(keys_arr[index], k); + } + } while (index < keys_arr.Length && c == -1); + } } } } @@ -414,11 +447,21 @@ namespace ZeroLevel.Services.PartitionStorage } } - private MemoryStreamReader GetReadStream(string fileName) + private bool TryGetReadStream(string fileName, out MemoryStreamReader reader) { - var filePath = Path.Combine(_catalog, fileName); - var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); - return new MemoryStreamReader(stream); + try + { + var filePath = Path.Combine(_catalog, fileName); + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + reader = new MemoryStreamReader(stream); + return true; + } + catch (Exception ex) + { + Log.SystemError(ex, "[StorePartitionAccessor.TryGetReadStream]"); + } + reader = null; + return false; } #endregion diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index 182a8ff..16c1876 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -52,10 +52,12 @@ namespace ZeroLevel.Services.PartitionStorage public void Store(TKey key, TInput value) { var fileName = _options.GetFileName(key, _info); - var stream = GetWriteStream(fileName); - _keySerializer.Invoke(stream, key); - Thread.MemoryBarrier(); - _inputSerializer.Invoke(stream, value); + if (TryGetWriteStream(fileName, out var stream)) + { + _keySerializer.Invoke(stream, key); + Thread.MemoryBarrier(); + _inputSerializer.Invoke(stream, value); + } } public void CompleteAdding() { @@ -86,18 +88,16 @@ namespace ZeroLevel.Services.PartitionStorage { foreach (var file in files) { - using (var reader = GetReadStream(Path.GetFileName(file))) + if (TryGetReadStream(file, out var reader)) { - while (reader.EOS == false) + using (reader) { - var key = _keyDeserializer.Invoke(reader); - if (reader.EOS) + while (reader.EOS == false) { - yield return new StorePartitionKeyValueSearchResult { Key = key, Value = default, Found = true }; - break; + var key = _keyDeserializer.Invoke(reader); + var val = _inputDeserializer.Invoke(reader); + yield return new StorePartitionKeyValueSearchResult { Key = key, Value = val, Status = SearchResult.Success }; } - var val = _inputDeserializer.Invoke(reader); - yield return new StorePartitionKeyValueSearchResult { Key = key, Value = val, Found = true }; } } } @@ -115,34 +115,37 @@ namespace ZeroLevel.Services.PartitionStorage var dict = new Dictionary(); foreach (var file in files) { - dict.Clear(); - using (var reader = GetReadStream(Path.GetFileName(file))) + if (TryGetReadStream(file, out var reader)) { - while (reader.EOS == false) + using (reader) { - var pos = reader.Stream.Position; - var key = _keyDeserializer.Invoke(reader); - dict[key] = pos; - if (reader.EOS) break; - _valueDeserializer.Invoke(reader); + while (reader.EOS == false) + { + var pos = reader.Stream.Position; + var key = _keyDeserializer.Invoke(reader); + dict[key] = pos; + if (reader.EOS) break; + _valueDeserializer.Invoke(reader); + } } - } - if (dict.Count > _options.Index.FileIndexCount * 8) - { - var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero); - var index_file = Path.Combine(indexFolder, Path.GetFileName(file)); - var d_arr = dict.OrderBy(p => p.Key).ToArray(); - using (var writer = new MemoryStreamWriter( - new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) + if (dict.Count > _options.Index.FileIndexCount * 8) { - for (int i = 0; i < _options.Index.FileIndexCount; i++) + var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero); + var index_file = Path.Combine(indexFolder, Path.GetFileName(file)); + var d_arr = dict.OrderBy(p => p.Key).ToArray(); + using (var writer = new MemoryStreamWriter( + new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) { - var pair = d_arr[i * step]; - writer.WriteCompatible(pair.Key); - writer.WriteLong(pair.Value); + for (int i = 0; i < _options.Index.FileIndexCount; i++) + { + var pair = d_arr[i * step]; + writer.WriteCompatible(pair.Key); + writer.WriteLong(pair.Value); + } } } } + dict.Clear(); } } } @@ -185,20 +188,40 @@ namespace ZeroLevel.Services.PartitionStorage File.Delete(file); File.Move(tempFile, file, true); } - private MemoryStreamWriter GetWriteStream(string fileName) + private bool TryGetWriteStream(string fileName, out MemoryStreamWriter writer) { - return _writeStreams.GetOrAdd(fileName, k => + try + { + writer = _writeStreams.GetOrAdd(fileName, k => + { + var filePath = Path.Combine(_catalog, k); + var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); + return new MemoryStreamWriter(stream); + }); + return true; + } + catch (Exception ex) { - var filePath = Path.Combine(_catalog, k); - var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); - return new MemoryStreamWriter(stream); - }); + Log.SystemError(ex, "[StorePartitionBuilder.TryGetWriteStream]"); + } + writer = null; + return false; } - private MemoryStreamReader GetReadStream(string fileName) + private bool TryGetReadStream(string fileName, out MemoryStreamReader reader) { - var filePath = Path.Combine(_catalog, fileName); - var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); - return new MemoryStreamReader(stream); + try + { + var filePath = Path.Combine(_catalog, fileName); + var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); + reader = new MemoryStreamReader(stream); + return true; + } + catch (Exception ex) + { + Log.SystemError(ex, "[StorePartitionBuilder.TryGetReadStream]"); + } + reader = null; + return false; } #endregion diff --git a/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs b/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs index a65544d..5af1dc0 100644 --- a/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs +++ b/ZeroLevel/Services/PartitionStorage/Search/StorePartitionKeyValueSearchResult.cs @@ -1,8 +1,15 @@ namespace ZeroLevel.Services.PartitionStorage { + public enum SearchResult + { + Success, + NotFound, + FileLocked + } + public class StorePartitionKeyValueSearchResult { - public bool Found { get; set; } + public SearchResult Status { get; set; } public TKey Key { get; set; } public TValue Value { get; set; } } diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index b673b0a..6e97b7d 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -6,7 +6,7 @@ ogoun ogoun - 3.3.7.9 + 3.3.8.1 PartitionStorage optimizations https://github.com/ogoun/Zero/wiki Copyright Ogoun 2022 @@ -14,8 +14,8 @@ https://github.com/ogoun/Zero git - 3.3.7.9 - 3.3.7.9 + 3.3.8.1 + 3.3.8.1 AnyCPU;x64;x86 zero.png full