Fix concurrent work. Partition storage

pull/4/head
Ogoun 2 years ago
parent f4e014b0e5
commit cebcb9feb2

@ -57,18 +57,27 @@ namespace ZeroLevel.Services.PartitionStorage
RebuildFileIndexWithSteps(file); RebuildFileIndexWithSteps(file);
} }
} }
/// <summary> /// <summary>
/// Delete the index for the specified file /// Delete the index for the specified file
/// </summary> /// </summary>
internal void DropFileIndex(string file) internal void DropFileIndex(string file)
{ {
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
_phisicalFileAccessorCachee.DropIndexReader(index_file); _phisicalFileAccessorCachee.LockFile(index_file);
if (File.Exists(index_file)) try
{
if (File.Exists(index_file))
{
File.Delete(index_file);
}
}
finally
{ {
File.Delete(index_file); _phisicalFileAccessorCachee.UnlockFile(index_file);
} }
} }
/// <summary> /// <summary>
/// Rebuild index with specified number of steps for specified file /// Rebuild index with specified number of steps for specified file
/// </summary> /// </summary>
@ -93,17 +102,29 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
var step = (int)Math.Round(dict.Count / (float)_stepValue, MidpointRounding.ToZero); var step = (int)Math.Round(dict.Count / (float)_stepValue, MidpointRounding.ToZero);
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
DropFileIndex(index_file);
var d_arr = dict.OrderBy(p => p.Key).ToArray(); _phisicalFileAccessorCachee.LockFile(index_file);
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) if (File.Exists(index_file))
{
File.Delete(index_file);
}
try
{ {
for (int i = 0; i < _stepValue; i++) var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{ {
var pair = d_arr[i * step]; for (int i = 0; i < _stepValue; i++)
writer.WriteCompatible(pair.Key); {
writer.WriteLong(pair.Value); var pair = d_arr[i * step];
writer.WriteCompatible(pair.Key);
writer.WriteLong(pair.Value);
}
} }
} }
finally
{
_phisicalFileAccessorCachee.UnlockFile(index_file);
}
} }
} }
/// <summary> /// <summary>
@ -118,24 +139,35 @@ namespace ZeroLevel.Services.PartitionStorage
using (var reader = new MemoryStreamReader(new FileStream(Path.Combine(_dataCatalog, file), FileMode.Open, FileAccess.Read, FileShare.None))) using (var reader = new MemoryStreamReader(new FileStream(Path.Combine(_dataCatalog, file), FileMode.Open, FileAccess.Read, FileShare.None)))
{ {
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
DropFileIndex(index_file); _phisicalFileAccessorCachee.LockFile(index_file);
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) if (File.Exists(index_file))
{ {
var counter = 1; File.Delete(index_file);
while (reader.EOS == false) }
try
{
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{ {
counter--; var counter = 1;
var pos = reader.Position; while (reader.EOS == false)
var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
if (counter == 0)
{ {
writer.WriteCompatible(k); counter--;
writer.WriteLong(pos); var pos = reader.Position;
counter = _stepValue; var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
if (counter == 0)
{
writer.WriteCompatible(k);
writer.WriteLong(pos);
counter = _stepValue;
}
} }
} }
} }
finally
{
_phisicalFileAccessorCachee.UnlockFile(index_file);
}
} }
} }
} }

@ -124,42 +124,50 @@ namespace ZeroLevel.Services.PartitionStorage.Partition
TKey key; TKey key;
TInput input; TInput input;
var dict = new Dictionary<TKey, HashSet<TInput>>(); var dict = new Dictionary<TKey, HashSet<TInput>>();
using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) PhisicalFileAccessorCachee.LockFile(file);
try
{ {
while (reader.EOS == false) using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024)))
{ {
if (false == Serializer.KeyDeserializer.Invoke(reader, out key)) while (reader.EOS == false)
{ {
throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key."); if (false == Serializer.KeyDeserializer.Invoke(reader, out key))
} {
if (false == dict.ContainsKey(key)) throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key.");
{ }
dict[key] = new HashSet<TInput>(); if (false == dict.ContainsKey(key))
} {
if (reader.EOS) dict[key] = new HashSet<TInput>();
{ }
break; if (reader.EOS)
{
break;
}
if (false == Serializer.InputDeserializer.Invoke(reader, out input))
{
throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault input value.");
}
dict[key].Add(input);
} }
if (false == Serializer.InputDeserializer.Invoke(reader, out input)) }
var tempFile = FSUtils.GetAppLocalTemporaryFile();
using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
{
// sort for search acceleration
foreach (var pair in dict.OrderBy(p => p.Key))
{ {
throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault input value."); var v = _options.MergeFunction(pair.Value);
writer.SerializeCompatible(pair.Key);
writer.SerializeCompatible(v);
} }
dict[key].Add(input);
} }
File.Delete(file);
File.Move(tempFile, file, true);
} }
var tempFile = FSUtils.GetAppLocalTemporaryFile(); finally
using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
{ {
// sort for search acceleration PhisicalFileAccessorCachee.UnlockFile(file);
foreach (var pair in dict.OrderBy(p => p.Key))
{
var v = _options.MergeFunction(pair.Value);
writer.SerializeCompatible(pair.Key);
writer.SerializeCompatible(v);
}
} }
File.Delete(file);
File.Move(tempFile, file, true);
} }
#endregion #endregion
} }

@ -115,12 +115,20 @@ namespace ZeroLevel.Services.PartitionStorage
// 2. Replace source // 2. Replace source
var name = Path.GetFileName(file); var name = Path.GetFileName(file);
var updateFilePath = Path.Combine(folder, name); var updateFilePath = Path.Combine(folder, name);
if (File.Exists(updateFilePath))
_phisicalFileAccessor.LockFile(updateFilePath);
try
{
if (File.Exists(updateFilePath))
{
File.Delete(updateFilePath);
}
File.Move(file, updateFilePath, true);
}
finally
{ {
_phisicalFileAccessor.DropDataReader(updateFilePath); _phisicalFileAccessor.UnlockFile(updateFilePath);
File.Delete(updateFilePath);
} }
File.Move(file, updateFilePath, true);
// 3. Rebuil index // 3. Rebuil index
(_accessor as BasePartition<TKey, TInput, TValue, TMeta>).RebuildFileIndex(name); (_accessor as BasePartition<TKey, TInput, TValue, TMeta>).RebuildFileIndex(name);

@ -5,6 +5,7 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Memory;
using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.PartitionStorage.Partition; using ZeroLevel.Services.PartitionStorage.Partition;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
@ -139,49 +140,57 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
TKey key; TKey key;
TInput input; TInput input;
var dict = new Dictionary<TKey, HashSet<TInput>>(); PhisicalFileAccessorCachee.LockFile(file);
var accessor = PhisicalFileAccessorCachee.GetDataAccessor(file, 0); try
if (accessor != null)
{ {
using (var reader = new MemoryStreamReader(accessor)) var dict = new Dictionary<TKey, HashSet<TInput>>();
var accessor = new StreamVewAccessor(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 1024 * 1024 * 32));
if (accessor != null)
{ {
while (reader.EOS == false) using (var reader = new MemoryStreamReader(accessor))
{ {
if (Serializer.KeyDeserializer.Invoke(reader, out key) == false) while (reader.EOS == false)
{ {
throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key."); if (Serializer.KeyDeserializer.Invoke(reader, out key) == false)
} {
if (false == dict.ContainsKey(key)) throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key.");
{ }
dict[key] = new HashSet<TInput>(); if (false == dict.ContainsKey(key))
} {
if (reader.EOS) dict[key] = new HashSet<TInput>();
{ }
break; if (reader.EOS)
} {
if (Serializer.InputDeserializer.Invoke(reader, out input) == false) break;
{ }
throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read input value."); if (Serializer.InputDeserializer.Invoke(reader, out input) == false)
{
throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read input value.");
}
dict[key].Add(input);
} }
dict[key].Add(input);
} }
} }
}
var tempFile = FSUtils.GetAppLocalTemporaryFile(); var tempFile = FSUtils.GetAppLocalTemporaryFile();
using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
{
// sort for search acceleration
foreach (var pair in dict.OrderBy(p => p.Key))
{ {
var v = _options.MergeFunction(pair.Value); // sort for search acceleration
writer.SerializeCompatible(pair.Key); foreach (var pair in dict.OrderBy(p => p.Key))
Thread.MemoryBarrier(); {
writer.SerializeCompatible(v); var v = _options.MergeFunction(pair.Value);
writer.SerializeCompatible(pair.Key);
Thread.MemoryBarrier();
writer.SerializeCompatible(v);
}
} }
File.Delete(file);
File.Move(tempFile, file, true);
}
finally
{
PhisicalFileAccessorCachee.UnlockFile(file);
} }
PhisicalFileAccessorCachee.DropDataReader(file);
File.Delete(file);
File.Move(tempFile, file, true);
} }
#endregion #endregion
} }

@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.IO; using System.IO;
using ZeroLevel.Services.Cache; using ZeroLevel.Services.Cache;
using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.FileSystem;
@ -12,6 +13,8 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly TimerCachee<ParallelFileReader> _indexReadersCachee; private readonly TimerCachee<ParallelFileReader> _indexReadersCachee;
private readonly TimerCachee<ParallelFileReader> _dataReadersCachee; private readonly TimerCachee<ParallelFileReader> _dataReadersCachee;
private readonly HashSet<string> _lockedFiles = new HashSet<string>();
public PhisicalFileAccessorCachee(TimeSpan dataExpirationPeriod, TimeSpan indexExpirationPeriod) public PhisicalFileAccessorCachee(TimeSpan dataExpirationPeriod, TimeSpan indexExpirationPeriod)
{ {
_dataReadersCachee = new TimerCachee<ParallelFileReader>(dataExpirationPeriod, s => new ParallelFileReader(s), i => i.Dispose(), 8192); _dataReadersCachee = new TimerCachee<ParallelFileReader>(dataExpirationPeriod, s => new ParallelFileReader(s), i => i.Dispose(), 8192);
@ -32,32 +35,40 @@ namespace ZeroLevel.Services.PartitionStorage
} }
public IViewAccessor GetDataAccessor(string filePath, long offset) public IViewAccessor GetDataAccessor(string filePath, long offset)
{ {
var reader = GetDataReader(filePath); if (false == _lockedFiles.Contains(filePath))
try
{ {
var reader = GetDataReader(filePath);
try
{
return reader.GetAccessor(offset);
}
catch (ObjectDisposedException)
{
_dataReadersCachee.Drop(filePath);
reader = _dataReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset); return reader.GetAccessor(offset);
} }
catch (ObjectDisposedException) return null;
{
_dataReadersCachee.Drop(filePath);
reader = _dataReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset);
} }
public IViewAccessor GetDataAccessor(string filePath, long offset, int length) public IViewAccessor GetDataAccessor(string filePath, long offset, int length)
{ {
var reader = GetDataReader(filePath); if (false == _lockedFiles.Contains(filePath))
try
{ {
var reader = GetDataReader(filePath);
try
{
return reader.GetAccessor(offset, length);
}
catch (ObjectDisposedException)
{
_dataReadersCachee.Drop(filePath);
reader = _dataReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset, length); return reader.GetAccessor(offset, length);
} }
catch (ObjectDisposedException) return null;
{
_dataReadersCachee.Drop(filePath);
reader = _dataReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset, length);
} }
public void DropAllDataReaders() public void DropAllDataReaders()
{ {
@ -79,32 +90,40 @@ namespace ZeroLevel.Services.PartitionStorage
} }
public IViewAccessor GetIndexAccessor(string filePath, long offset) public IViewAccessor GetIndexAccessor(string filePath, long offset)
{ {
var reader = GetIndexReader(filePath); if (false == _lockedFiles.Contains(filePath))
try
{ {
var reader = GetIndexReader(filePath);
try
{
return reader.GetAccessor(offset);
}
catch (ObjectDisposedException)
{
_indexReadersCachee.Drop(filePath);
reader = _indexReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset); return reader.GetAccessor(offset);
} }
catch (ObjectDisposedException) return null;
{
_indexReadersCachee.Drop(filePath);
reader = _indexReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset);
} }
public IViewAccessor GetIndexAccessor(string filePath, long offset, int length) public IViewAccessor GetIndexAccessor(string filePath, long offset, int length)
{ {
var reader = GetIndexReader(filePath); if (false == _lockedFiles.Contains(filePath))
try
{ {
var reader = GetIndexReader(filePath);
try
{
return reader.GetAccessor(offset, length);
}
catch (ObjectDisposedException)
{
_indexReadersCachee.Drop(filePath);
reader = _indexReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset, length); return reader.GetAccessor(offset, length);
} }
catch (ObjectDisposedException) return null;
{
_indexReadersCachee.Drop(filePath);
reader = _indexReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset, length);
} }
public void DropAllIndexReaders() public void DropAllIndexReaders()
{ {
@ -112,6 +131,18 @@ namespace ZeroLevel.Services.PartitionStorage
} }
#endregion #endregion
public void LockFile(string filePath)
{
_lockedFiles.Add(filePath);
DropDataReader(filePath);
DropIndexReader(filePath);
}
public void UnlockFile(string filePath)
{
_lockedFiles.Remove(filePath);
}
public void Dispose() public void Dispose()
{ {
_dataReadersCachee.Dispose(); _dataReadersCachee.Dispose();

@ -6,16 +6,16 @@
</Description> </Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>3.3.8.9</AssemblyVersion> <AssemblyVersion>3.3.9.0</AssemblyVersion>
<PackageReleaseNotes>Partition storage. Suppress exception when find invoke</PackageReleaseNotes> <PackageReleaseNotes>Partition storage. Fix concurrent work</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2023</Copyright> <Copyright>Copyright Ogoun 2023</Copyright>
<PackageLicenseUrl></PackageLicenseUrl> <PackageLicenseUrl></PackageLicenseUrl>
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<Version>3.3.8.9</Version> <Version>3.3.9.0</Version>
<FileVersion>3.3.8.9</FileVersion> <FileVersion>3.3.9.0</FileVersion>
<Platforms>AnyCPU;x64;x86</Platforms> <Platforms>AnyCPU;x64;x86</Platforms>
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
<DebugType>full</DebugType> <DebugType>full</DebugType>

Loading…
Cancel
Save

Powered by TurnKey Linux.