diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs
index 3ecb613..b53ea3b 100644
--- a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs
+++ b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs
@@ -57,18 +57,27 @@ namespace ZeroLevel.Services.PartitionStorage
RebuildFileIndexWithSteps(file);
}
}
+
///
/// Delete the index for the specified file
///
internal void DropFileIndex(string file)
{
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
- _phisicalFileAccessorCachee.DropIndexReader(index_file);
- if (File.Exists(index_file))
+ _phisicalFileAccessorCachee.LockFile(index_file);
+ try
+ {
+ if (File.Exists(index_file))
+ {
+ File.Delete(index_file);
+ }
+ }
+ finally
{
- File.Delete(index_file);
+ _phisicalFileAccessorCachee.UnlockFile(index_file);
}
}
+
///
/// Rebuild index with specified number of steps for specified file
///
@@ -93,17 +102,29 @@ namespace ZeroLevel.Services.PartitionStorage
{
var step = (int)Math.Round(dict.Count / (float)_stepValue, MidpointRounding.ToZero);
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
- DropFileIndex(index_file);
- var d_arr = dict.OrderBy(p => p.Key).ToArray();
- using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
+
+ _phisicalFileAccessorCachee.LockFile(index_file);
+ if (File.Exists(index_file))
+ {
+ File.Delete(index_file);
+ }
+ try
{
- for (int i = 0; i < _stepValue; i++)
+ var d_arr = dict.OrderBy(p => p.Key).ToArray();
+ using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{
- var pair = d_arr[i * step];
- writer.WriteCompatible(pair.Key);
- writer.WriteLong(pair.Value);
+ for (int i = 0; i < _stepValue; i++)
+ {
+ var pair = d_arr[i * step];
+ writer.WriteCompatible(pair.Key);
+ writer.WriteLong(pair.Value);
+ }
}
}
+ finally
+ {
+ _phisicalFileAccessorCachee.UnlockFile(index_file);
+ }
}
}
///
@@ -118,24 +139,35 @@ namespace ZeroLevel.Services.PartitionStorage
using (var reader = new MemoryStreamReader(new FileStream(Path.Combine(_dataCatalog, file), FileMode.Open, FileAccess.Read, FileShare.None)))
{
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
- DropFileIndex(index_file);
- using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
+ _phisicalFileAccessorCachee.LockFile(index_file);
+ if (File.Exists(index_file))
{
- var counter = 1;
- while (reader.EOS == false)
+ File.Delete(index_file);
+ }
+ try
+ {
+ using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{
- counter--;
- var pos = reader.Position;
- var k = _keyDeserializer.Invoke(reader);
- _valueDeserializer.Invoke(reader);
- if (counter == 0)
+ var counter = 1;
+ while (reader.EOS == false)
{
- writer.WriteCompatible(k);
- writer.WriteLong(pos);
- counter = _stepValue;
+ counter--;
+ var pos = reader.Position;
+ var k = _keyDeserializer.Invoke(reader);
+ _valueDeserializer.Invoke(reader);
+ if (counter == 0)
+ {
+ writer.WriteCompatible(k);
+ writer.WriteLong(pos);
+ counter = _stepValue;
+ }
}
}
}
+ finally
+ {
+ _phisicalFileAccessorCachee.UnlockFile(index_file);
+ }
}
}
}
diff --git a/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs
index 7f5c2fc..cf34413 100644
--- a/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs
+++ b/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs
@@ -124,42 +124,50 @@ namespace ZeroLevel.Services.PartitionStorage.Partition
TKey key;
TInput input;
var dict = new Dictionary>();
- using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024)))
+ PhisicalFileAccessorCachee.LockFile(file);
+ try
{
- while (reader.EOS == false)
+ using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024)))
{
- if (false == Serializer.KeyDeserializer.Invoke(reader, out key))
+ while (reader.EOS == false)
{
- throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key.");
- }
- if (false == dict.ContainsKey(key))
- {
- dict[key] = new HashSet();
- }
- if (reader.EOS)
- {
- break;
+ if (false == Serializer.KeyDeserializer.Invoke(reader, out key))
+ {
+ throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key.");
+ }
+ if (false == dict.ContainsKey(key))
+ {
+ dict[key] = new HashSet();
+ }
+ if (reader.EOS)
+ {
+ break;
+ }
+ if (false == Serializer.InputDeserializer.Invoke(reader, out input))
+ {
+ throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault input value.");
+ }
+ dict[key].Add(input);
}
- if (false == Serializer.InputDeserializer.Invoke(reader, out input))
+ }
+ var tempFile = FSUtils.GetAppLocalTemporaryFile();
+ using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
+ {
+ // sort for search acceleration
+ foreach (var pair in dict.OrderBy(p => p.Key))
{
- throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault input value.");
+ var v = _options.MergeFunction(pair.Value);
+ writer.SerializeCompatible(pair.Key);
+ writer.SerializeCompatible(v);
}
- dict[key].Add(input);
}
+ File.Delete(file);
+ File.Move(tempFile, file, true);
}
- var tempFile = FSUtils.GetAppLocalTemporaryFile();
- using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
+ finally
{
- // sort for search acceleration
- foreach (var pair in dict.OrderBy(p => p.Key))
- {
- var v = _options.MergeFunction(pair.Value);
- writer.SerializeCompatible(pair.Key);
- writer.SerializeCompatible(v);
- }
+ PhisicalFileAccessorCachee.UnlockFile(file);
}
- File.Delete(file);
- File.Move(tempFile, file, true);
}
#endregion
}
diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs
index 7de90c7..42651ce 100644
--- a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs
+++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs
@@ -115,12 +115,20 @@ namespace ZeroLevel.Services.PartitionStorage
// 2. Replace source
var name = Path.GetFileName(file);
var updateFilePath = Path.Combine(folder, name);
- if (File.Exists(updateFilePath))
+
+ _phisicalFileAccessor.LockFile(updateFilePath);
+ try
+ {
+ if (File.Exists(updateFilePath))
+ {
+ File.Delete(updateFilePath);
+ }
+ File.Move(file, updateFilePath, true);
+ }
+ finally
{
- _phisicalFileAccessor.DropDataReader(updateFilePath);
- File.Delete(updateFilePath);
+ _phisicalFileAccessor.UnlockFile(updateFilePath);
}
- File.Move(file, updateFilePath, true);
// 3. Rebuil index
(_accessor as BasePartition).RebuildFileIndex(name);
diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs
index ec11b9f..eb15b6e 100644
--- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs
+++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs
@@ -5,6 +5,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
+using ZeroLevel.Services.Memory;
using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.PartitionStorage.Partition;
using ZeroLevel.Services.Serialization;
@@ -139,49 +140,57 @@ namespace ZeroLevel.Services.PartitionStorage
{
TKey key;
TInput input;
- var dict = new Dictionary>();
- var accessor = PhisicalFileAccessorCachee.GetDataAccessor(file, 0);
- if (accessor != null)
+ PhisicalFileAccessorCachee.LockFile(file);
+ try
{
- using (var reader = new MemoryStreamReader(accessor))
+ var dict = new Dictionary>();
+ var accessor = new StreamVewAccessor(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 1024 * 1024 * 32));
+ if (accessor != null)
{
- while (reader.EOS == false)
+ using (var reader = new MemoryStreamReader(accessor))
{
- if (Serializer.KeyDeserializer.Invoke(reader, out key) == false)
+ while (reader.EOS == false)
{
- throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key.");
- }
- if (false == dict.ContainsKey(key))
- {
- dict[key] = new HashSet();
- }
- if (reader.EOS)
- {
- break;
- }
- if (Serializer.InputDeserializer.Invoke(reader, out input) == false)
- {
- throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read input value.");
+ if (Serializer.KeyDeserializer.Invoke(reader, out key) == false)
+ {
+ throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key.");
+ }
+ if (false == dict.ContainsKey(key))
+ {
+ dict[key] = new HashSet();
+ }
+ if (reader.EOS)
+ {
+ break;
+ }
+ if (Serializer.InputDeserializer.Invoke(reader, out input) == false)
+ {
+ throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read input value.");
+ }
+ dict[key].Add(input);
}
- dict[key].Add(input);
}
}
- }
- var tempFile = FSUtils.GetAppLocalTemporaryFile();
- using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
- {
- // sort for search acceleration
- foreach (var pair in dict.OrderBy(p => p.Key))
+
+ var tempFile = FSUtils.GetAppLocalTemporaryFile();
+ using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
{
- var v = _options.MergeFunction(pair.Value);
- writer.SerializeCompatible(pair.Key);
- Thread.MemoryBarrier();
- writer.SerializeCompatible(v);
+ // sort for search acceleration
+ foreach (var pair in dict.OrderBy(p => p.Key))
+ {
+ var v = _options.MergeFunction(pair.Value);
+ writer.SerializeCompatible(pair.Key);
+ Thread.MemoryBarrier();
+ writer.SerializeCompatible(v);
+ }
}
+ File.Delete(file);
+ File.Move(tempFile, file, true);
+ }
+ finally
+ {
+ PhisicalFileAccessorCachee.UnlockFile(file);
}
- PhisicalFileAccessorCachee.DropDataReader(file);
- File.Delete(file);
- File.Move(tempFile, file, true);
}
#endregion
}
diff --git a/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs b/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs
index 493a55f..6b277ad 100644
--- a/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs
+++ b/ZeroLevel/Services/PartitionStorage/PhisicalFileAccessorCachee.cs
@@ -1,4 +1,5 @@
using System;
+using System.Collections.Generic;
using System.IO;
using ZeroLevel.Services.Cache;
using ZeroLevel.Services.FileSystem;
@@ -12,6 +13,8 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly TimerCachee _indexReadersCachee;
private readonly TimerCachee _dataReadersCachee;
+ private readonly HashSet _lockedFiles = new HashSet();
+
public PhisicalFileAccessorCachee(TimeSpan dataExpirationPeriod, TimeSpan indexExpirationPeriod)
{
_dataReadersCachee = new TimerCachee(dataExpirationPeriod, s => new ParallelFileReader(s), i => i.Dispose(), 8192);
@@ -32,32 +35,40 @@ namespace ZeroLevel.Services.PartitionStorage
}
public IViewAccessor GetDataAccessor(string filePath, long offset)
{
- var reader = GetDataReader(filePath);
- try
+ if (false == _lockedFiles.Contains(filePath))
{
+ var reader = GetDataReader(filePath);
+ try
+ {
+ return reader.GetAccessor(offset);
+ }
+ catch (ObjectDisposedException)
+ {
+ _dataReadersCachee.Drop(filePath);
+ reader = _dataReadersCachee.Get(filePath);
+ }
return reader.GetAccessor(offset);
}
- catch (ObjectDisposedException)
- {
- _dataReadersCachee.Drop(filePath);
- reader = _dataReadersCachee.Get(filePath);
- }
- return reader.GetAccessor(offset);
+ return null;
}
public IViewAccessor GetDataAccessor(string filePath, long offset, int length)
{
- var reader = GetDataReader(filePath);
- try
+ if (false == _lockedFiles.Contains(filePath))
{
+ var reader = GetDataReader(filePath);
+ try
+ {
+ return reader.GetAccessor(offset, length);
+ }
+ catch (ObjectDisposedException)
+ {
+ _dataReadersCachee.Drop(filePath);
+ reader = _dataReadersCachee.Get(filePath);
+ }
return reader.GetAccessor(offset, length);
}
- catch (ObjectDisposedException)
- {
- _dataReadersCachee.Drop(filePath);
- reader = _dataReadersCachee.Get(filePath);
- }
- return reader.GetAccessor(offset, length);
+ return null;
}
public void DropAllDataReaders()
{
@@ -79,32 +90,40 @@ namespace ZeroLevel.Services.PartitionStorage
}
public IViewAccessor GetIndexAccessor(string filePath, long offset)
{
- var reader = GetIndexReader(filePath);
- try
+ if (false == _lockedFiles.Contains(filePath))
{
+ var reader = GetIndexReader(filePath);
+ try
+ {
+ return reader.GetAccessor(offset);
+ }
+ catch (ObjectDisposedException)
+ {
+ _indexReadersCachee.Drop(filePath);
+ reader = _indexReadersCachee.Get(filePath);
+ }
return reader.GetAccessor(offset);
}
- catch (ObjectDisposedException)
- {
- _indexReadersCachee.Drop(filePath);
- reader = _indexReadersCachee.Get(filePath);
- }
- return reader.GetAccessor(offset);
+ return null;
}
public IViewAccessor GetIndexAccessor(string filePath, long offset, int length)
{
- var reader = GetIndexReader(filePath);
- try
+ if (false == _lockedFiles.Contains(filePath))
{
+ var reader = GetIndexReader(filePath);
+ try
+ {
+ return reader.GetAccessor(offset, length);
+ }
+ catch (ObjectDisposedException)
+ {
+ _indexReadersCachee.Drop(filePath);
+ reader = _indexReadersCachee.Get(filePath);
+ }
return reader.GetAccessor(offset, length);
}
- catch (ObjectDisposedException)
- {
- _indexReadersCachee.Drop(filePath);
- reader = _indexReadersCachee.Get(filePath);
- }
- return reader.GetAccessor(offset, length);
+ return null;
}
public void DropAllIndexReaders()
{
@@ -112,6 +131,18 @@ namespace ZeroLevel.Services.PartitionStorage
}
#endregion
+ public void LockFile(string filePath)
+ {
+ _lockedFiles.Add(filePath);
+ DropDataReader(filePath);
+ DropIndexReader(filePath);
+ }
+
+ public void UnlockFile(string filePath)
+ {
+ _lockedFiles.Remove(filePath);
+ }
+
public void Dispose()
{
_dataReadersCachee.Dispose();
diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj
index 3a731ce..2717624 100644
--- a/ZeroLevel/ZeroLevel.csproj
+++ b/ZeroLevel/ZeroLevel.csproj
@@ -6,16 +6,16 @@
ogoun
ogoun
- 3.3.8.9
- Partition storage. Suppress exception when find invoke
+ 3.3.9.0
+ Partition storage. Fix concurrent work
https://github.com/ogoun/Zero/wiki
Copyright Ogoun 2023
https://github.com/ogoun/Zero
git
- 3.3.8.9
- 3.3.8.9
+ 3.3.9.0
+ 3.3.9.0
AnyCPU;x64;x86
zero.png
full