Safe open streams

pull/4/head
Ogoun 2 years ago
parent b3b40ade33
commit 7157eaa9ff

@ -126,13 +126,12 @@ namespace ZeroLevel.Services.PartitionStorage
var k = _keyDeserializer.Invoke(reader); var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader); var v = _valueDeserializer.Invoke(reader);
var input = _decompress(v); var input = _decompress(v);
yield return yield return new StorePartitionKeyValueSearchResult<TKey, IEnumerable<TInput>>
new StorePartitionKeyValueSearchResult<TKey, IEnumerable<TInput>> {
{ Key = k,
Key = k, Value = input,
Value = input, Status = SearchResult.Success
Found = true };
};
} }
} }
} }

@ -49,34 +49,46 @@ namespace ZeroLevel.Services.PartitionStorage
var offset = index.GetOffset(key); var offset = index.GetOffset(key);
startOffset = offset.Offset; startOffset = offset.Offset;
} }
using (var reader = GetReadStream(fileName)) if (TryGetReadStream(fileName, out var reader))
{ {
if (startOffset > 0) using (reader)
{ {
reader.Seek(startOffset, SeekOrigin.Begin); if (startOffset > 0)
}
while (reader.EOS == false)
{
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var c = _options.KeyComparer(key, k);
if (c == 0) return new StorePartitionKeyValueSearchResult<TKey, TValue>
{ {
Key = key, reader.Seek(startOffset, SeekOrigin.Begin);
Value = v, }
Found = true while (reader.EOS == false)
};
if (c == -1)
{ {
break; var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var c = _options.KeyComparer(key, k);
if (c == 0) return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = key,
Value = v,
Status = SearchResult.Success
};
if (c == -1)
{
break;
}
} }
} }
} }
else
{
return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = key,
Status = SearchResult.FileLocked,
Value = default
};
}
} }
return new StorePartitionKeyValueSearchResult<TKey, TValue> return new StorePartitionKeyValueSearchResult<TKey, TValue>
{ {
Key = key, Key = key,
Found = false, Status = SearchResult.NotFound,
Value = default Value = default
}; };
} }
@ -101,13 +113,16 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
foreach (var file in files) foreach (var file in files)
{ {
using (var reader = GetReadStream(Path.GetFileName(file))) if (TryGetReadStream(file, out var reader))
{ {
while (reader.EOS == false) using (reader)
{ {
var k = _keyDeserializer.Invoke(reader); while (reader.EOS == false)
var v = _valueDeserializer.Invoke(reader); {
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Found = true }; var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Status = SearchResult.Success };
}
} }
} }
} }
@ -118,13 +133,16 @@ namespace ZeroLevel.Services.PartitionStorage
var fileName = _options.GetFileName(key, _info); var fileName = _options.GetFileName(key, _info);
if (File.Exists(Path.Combine(_catalog, fileName))) if (File.Exists(Path.Combine(_catalog, fileName)))
{ {
using (var reader = GetReadStream(fileName)) if (TryGetReadStream(fileName, out var reader))
{ {
while (reader.EOS == false) using (reader)
{ {
var k = _keyDeserializer.Invoke(reader); while (reader.EOS == false)
var v = _valueDeserializer.Invoke(reader); {
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Found = true }; var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Status = SearchResult.Success };
}
} }
} }
} }
@ -197,14 +215,17 @@ namespace ZeroLevel.Services.PartitionStorage
Directory.CreateDirectory(_indexCatalog); Directory.CreateDirectory(_indexCatalog);
} }
var dict = new Dictionary<TKey, long>(); var dict = new Dictionary<TKey, long>();
using (var reader = GetReadStream(Path.GetFileName(file))) if (TryGetReadStream(file, out var reader))
{ {
while (reader.EOS == false) using (reader)
{ {
var pos = reader.Position; while (reader.EOS == false)
var k = _keyDeserializer.Invoke(reader); {
dict[k] = pos; var pos = reader.Position;
_valueDeserializer.Invoke(reader); var k = _keyDeserializer.Invoke(reader);
dict[k] = pos;
_valueDeserializer.Invoke(reader);
}
} }
} }
if (dict.Count > _options.Index.FileIndexCount * 8) if (dict.Count > _options.Index.FileIndexCount * 8)
@ -237,31 +258,34 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
var index = new StorePartitionSparseIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer); var index = new StorePartitionSparseIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer);
var offsets = index.GetOffset(keys, true); var offsets = index.GetOffset(keys, true);
using (var reader = GetReadStream(fileName)) if (TryGetReadStream(fileName, out var reader))
{ {
for (int i = 0; i < keys.Length; i++) using (reader)
{ {
var searchKey = keys[i]; for (int i = 0; i < keys.Length; i++)
var off = offsets[i];
reader.Seek(off.Offset, SeekOrigin.Begin);
while (reader.EOS == false)
{ {
var k = _keyDeserializer.Invoke(reader); var searchKey = keys[i];
var v = _valueDeserializer.Invoke(reader); var off = offsets[i];
var c = _options.KeyComparer(searchKey, k); reader.Seek(off.Offset, SeekOrigin.Begin);
if (c == 0) while (reader.EOS == false)
{ {
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{ {
Key = searchKey, yield return new StorePartitionKeyValueSearchResult<TKey, TValue>
Value = v, {
Found = true Key = searchKey,
}; Value = v,
break; Status = SearchResult.Success
} };
else if (c == -1) break;
{ }
break; else if (c == -1)
{
break;
}
} }
} }
} }
@ -269,35 +293,38 @@ namespace ZeroLevel.Services.PartitionStorage
} }
else else
{ {
using (var reader = GetReadStream(fileName)) if (TryGetReadStream(fileName, out var reader))
{ {
int index = 0; using (reader)
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{ {
var k = _keyDeserializer.Invoke(reader); int index = 0;
var v = _valueDeserializer.Invoke(reader); var keys_arr = keys.OrderBy(k => k).ToArray();
var c = _options.KeyComparer(keys_arr[index], k); while (reader.EOS == false && index < keys_arr.Length)
if (c == 0)
{ {
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> var k = _keyDeserializer.Invoke(reader);
{ var v = _valueDeserializer.Invoke(reader);
Key = keys_arr[index], var c = _options.KeyComparer(keys_arr[index], k);
Value = v, if (c == 0)
Found = true
};
index++;
}
else if (c == -1)
{
do
{ {
yield return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = keys_arr[index],
Value = v,
Status = SearchResult.Success
};
index++; index++;
if (index < keys_arr.Length) }
else if (c == -1)
{
do
{ {
c = _options.KeyComparer(keys_arr[index], k); index++;
} if (index < keys_arr.Length)
} while (index < keys_arr.Length && c == -1); {
c = _options.KeyComparer(keys_arr[index], k);
}
} while (index < keys_arr.Length && c == -1);
}
} }
} }
} }
@ -316,27 +343,30 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
var index = new StorePartitionSparseIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer); var index = new StorePartitionSparseIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer);
var offsets = index.GetOffset(keys, true); var offsets = index.GetOffset(keys, true);
using (var reader = GetReadStream(fileName)) if (TryGetReadStream(fileName, out var reader))
{ {
for (int i = 0; i < keys.Length; i++) using (reader)
{ {
var searchKey = keys[i]; for (int i = 0; i < keys.Length; i++)
var off = offsets[i];
reader.Seek(off.Offset, SeekOrigin.Begin);
while (reader.EOS == false)
{ {
var startPosition = reader.Position; var searchKey = keys[i];
var k = _keyDeserializer.Invoke(reader); var off = offsets[i];
_valueDeserializer.Invoke(reader); reader.Seek(off.Offset, SeekOrigin.Begin);
var endPosition = reader.Position; while (reader.EOS == false)
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition });
}
else if (c == -1)
{ {
break; var startPosition = reader.Position;
var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
var endPosition = reader.Position;
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition });
}
else if (c == -1)
{
break;
}
} }
} }
} }
@ -344,32 +374,35 @@ namespace ZeroLevel.Services.PartitionStorage
} }
else else
{ {
using (var reader = GetReadStream(fileName)) if (TryGetReadStream(fileName, out var reader))
{ {
int index = 0; using (reader)
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{ {
var startPosition = reader.Position; int index = 0;
var k = _keyDeserializer.Invoke(reader); var keys_arr = keys.OrderBy(k => k).ToArray();
_valueDeserializer.Invoke(reader); while (reader.EOS == false && index < keys_arr.Length)
var endPosition = reader.Position;
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{
ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition });
index++;
}
else if (c == -1)
{ {
do var startPosition = reader.Position;
var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
var endPosition = reader.Position;
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{ {
ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition });
index++; index++;
if (index < keys_arr.Length) }
else if (c == -1)
{
do
{ {
c = _options.KeyComparer(keys_arr[index], k); index++;
} if (index < keys_arr.Length)
} while (index < keys_arr.Length && c == -1); {
c = _options.KeyComparer(keys_arr[index], k);
}
} while (index < keys_arr.Length && c == -1);
}
} }
} }
} }
@ -414,11 +447,21 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
private MemoryStreamReader GetReadStream(string fileName) private bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
{ {
var filePath = Path.Combine(_catalog, fileName); try
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); {
return new MemoryStreamReader(stream); var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
reader = new MemoryStreamReader(stream);
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionAccessor.TryGetReadStream]");
}
reader = null;
return false;
} }
#endregion #endregion

@ -52,10 +52,12 @@ namespace ZeroLevel.Services.PartitionStorage
public void Store(TKey key, TInput value) public void Store(TKey key, TInput value)
{ {
var fileName = _options.GetFileName(key, _info); var fileName = _options.GetFileName(key, _info);
var stream = GetWriteStream(fileName); if (TryGetWriteStream(fileName, out var stream))
_keySerializer.Invoke(stream, key); {
Thread.MemoryBarrier(); _keySerializer.Invoke(stream, key);
_inputSerializer.Invoke(stream, value); Thread.MemoryBarrier();
_inputSerializer.Invoke(stream, value);
}
} }
public void CompleteAdding() public void CompleteAdding()
{ {
@ -86,18 +88,16 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
foreach (var file in files) foreach (var file in files)
{ {
using (var reader = GetReadStream(Path.GetFileName(file))) if (TryGetReadStream(file, out var reader))
{ {
while (reader.EOS == false) using (reader)
{ {
var key = _keyDeserializer.Invoke(reader); while (reader.EOS == false)
if (reader.EOS)
{ {
yield return new StorePartitionKeyValueSearchResult<TKey, TInput> { Key = key, Value = default, Found = true }; var key = _keyDeserializer.Invoke(reader);
break; var val = _inputDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult<TKey, TInput> { Key = key, Value = val, Status = SearchResult.Success };
} }
var val = _inputDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult<TKey, TInput> { Key = key, Value = val, Found = true };
} }
} }
} }
@ -115,34 +115,37 @@ namespace ZeroLevel.Services.PartitionStorage
var dict = new Dictionary<TKey, long>(); var dict = new Dictionary<TKey, long>();
foreach (var file in files) foreach (var file in files)
{ {
dict.Clear(); if (TryGetReadStream(file, out var reader))
using (var reader = GetReadStream(Path.GetFileName(file)))
{ {
while (reader.EOS == false) using (reader)
{ {
var pos = reader.Stream.Position; while (reader.EOS == false)
var key = _keyDeserializer.Invoke(reader); {
dict[key] = pos; var pos = reader.Stream.Position;
if (reader.EOS) break; var key = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader); dict[key] = pos;
if (reader.EOS) break;
_valueDeserializer.Invoke(reader);
}
} }
} if (dict.Count > _options.Index.FileIndexCount * 8)
if (dict.Count > _options.Index.FileIndexCount * 8)
{
var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero);
var index_file = Path.Combine(indexFolder, Path.GetFileName(file));
var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(
new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{ {
for (int i = 0; i < _options.Index.FileIndexCount; i++) var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero);
var index_file = Path.Combine(indexFolder, Path.GetFileName(file));
var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(
new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{ {
var pair = d_arr[i * step]; for (int i = 0; i < _options.Index.FileIndexCount; i++)
writer.WriteCompatible(pair.Key); {
writer.WriteLong(pair.Value); var pair = d_arr[i * step];
writer.WriteCompatible(pair.Key);
writer.WriteLong(pair.Value);
}
} }
} }
} }
dict.Clear();
} }
} }
} }
@ -185,20 +188,40 @@ namespace ZeroLevel.Services.PartitionStorage
File.Delete(file); File.Delete(file);
File.Move(tempFile, file, true); File.Move(tempFile, file, true);
} }
private MemoryStreamWriter GetWriteStream(string fileName) private bool TryGetWriteStream(string fileName, out MemoryStreamWriter writer)
{ {
return _writeStreams.GetOrAdd(fileName, k => try
{
writer = _writeStreams.GetOrAdd(fileName, k =>
{
var filePath = Path.Combine(_catalog, k);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
return new MemoryStreamWriter(stream);
});
return true;
}
catch (Exception ex)
{ {
var filePath = Path.Combine(_catalog, k); Log.SystemError(ex, "[StorePartitionBuilder.TryGetWriteStream]");
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); }
return new MemoryStreamWriter(stream); writer = null;
}); return false;
} }
private MemoryStreamReader GetReadStream(string fileName) private bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
{ {
var filePath = Path.Combine(_catalog, fileName); try
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); {
return new MemoryStreamReader(stream); var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
reader = new MemoryStreamReader(stream);
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionBuilder.TryGetReadStream]");
}
reader = null;
return false;
} }
#endregion #endregion

@ -1,8 +1,15 @@
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
{ {
public enum SearchResult
{
Success,
NotFound,
FileLocked
}
public class StorePartitionKeyValueSearchResult<TKey, TValue> public class StorePartitionKeyValueSearchResult<TKey, TValue>
{ {
public bool Found { get; set; } public SearchResult Status { get; set; }
public TKey Key { get; set; } public TKey Key { get; set; }
public TValue Value { get; set; } public TValue Value { get; set; }
} }

@ -6,7 +6,7 @@
</Description> </Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>3.3.7.9</AssemblyVersion> <AssemblyVersion>3.3.8.1</AssemblyVersion>
<PackageReleaseNotes>PartitionStorage optimizations</PackageReleaseNotes> <PackageReleaseNotes>PartitionStorage optimizations</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2022</Copyright> <Copyright>Copyright Ogoun 2022</Copyright>
@ -14,8 +14,8 @@
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<Version>3.3.7.9</Version> <Version>3.3.8.1</Version>
<FileVersion>3.3.7.9</FileVersion> <FileVersion>3.3.8.1</FileVersion>
<Platforms>AnyCPU;x64;x86</Platforms> <Platforms>AnyCPU;x64;x86</Platforms>
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
<DebugType>full</DebugType> <DebugType>full</DebugType>

Loading…
Cancel
Save

Powered by TurnKey Linux.