Partition storage

Append MMF
pull/4/head
Ogoun 2 years ago
parent 295498ae04
commit 691b40691d

@ -1,8 +1,9 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Text;
using ZeroLevel; using ZeroLevel;
using ZeroLevel.Collections;
using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Memory;
using ZeroLevel.Services.PartitionStorage; using ZeroLevel.Services.PartitionStorage;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
@ -11,20 +12,18 @@ namespace PartitionFileStorageTest
internal class Program internal class Program
{ {
// const int PAIRS_COUNT = 200_000_000; // const int PAIRS_COUNT = 200_000_000;
const int PAIRS_COUNT = 2000_000; const long PAIRS_COUNT = 100_000_000;
private class Metadata private class Metadata
{ {
public DateTime Date { get; set; } public DateTime Date { get; set; }
} }
const ulong num_base = 79770000000;
private static ulong Generate(Random r) private static ulong Generate(Random r)
{ {
var num = new StringBuilder(); return num_base + (uint)r.Next(999999);
num.Append("79");
num.Append(r.Next(99).ToString("D2"));
num.Append(r.Next(9999).ToString("D7"));
return ulong.Parse(num.ToString());
} }
private static void FastTest(StoreOptions<ulong, ulong, byte[], Metadata> options) private static void FastTest(StoreOptions<ulong, ulong, byte[], Metadata> options)
@ -210,11 +209,20 @@ namespace PartitionFileStorageTest
} }
} }
} }
store.Dispose();
Log.Info("Completed"); Log.Info("Completed");
} }
private static void FullStoreMultithreadTest(StoreOptions<ulong, ulong, byte[], Metadata> options, private static IEnumerable<(ulong, ulong)> MassGenerator(long count)
List<(ulong, ulong)> pairs) {
var r = new Random(Environment.TickCount);
for (long i = 0; i < count; i++)
{
yield return (Generate(r), Generate(r));
}
}
private static void FullStoreMultithreadTest(StoreOptions<ulong, ulong, byte[], Metadata> options)
{ {
var meta = new Metadata { Date = new DateTime(2022, 11, 08) }; var meta = new Metadata { Date = new DateTime(2022, 11, 08) };
var r = new Random(Environment.TickCount); var r = new Random(Environment.TickCount);
@ -222,12 +230,13 @@ namespace PartitionFileStorageTest
var storePart = store.CreateBuilder(meta); var storePart = store.CreateBuilder(meta);
var sw = new Stopwatch(); var sw = new Stopwatch();
sw.Start(); sw.Start();
var insertCount = (int)(0.7 * pairs.Count); var insertCount = (long)(0.7 * PAIRS_COUNT);
var testKeys1 = new ConcurrentBag<ulong>(); var testKeys1 = new ConcurrentBag<ulong>();
var testKeys2 = new ConcurrentBag<ulong>(); var testKeys2 = new ConcurrentBag<ulong>();
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 24 }; var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 24 };
var Keys = new ConcurrentHashSet<ulong>();
Parallel.ForEach(pairs.Take(insertCount).ToArray(), parallelOptions, pair => Parallel.ForEach(MassGenerator((long)(0.7 * PAIRS_COUNT)), parallelOptions, pair =>
{ {
var key = pair.Item1; var key = pair.Item1;
var val = pair.Item2; var val = pair.Item2;
@ -240,11 +249,12 @@ namespace PartitionFileStorageTest
{ {
testKeys2.Add(key); testKeys2.Add(key);
} }
Keys.Add(key);
}); });
if (storePart.TotalRecords != insertCount) if (storePart.TotalRecords != insertCount)
{ {
Log.Error($"Count of stored record no equals expected. Recorded: {storePart.TotalRecords}. Expected: {insertCount}"); Log.Error($"Count of stored record no equals expected. Recorded: {storePart.TotalRecords}. Expected: {insertCount}. Unique keys: {Keys.Count}");
} }
sw.Stop(); sw.Stop();
@ -263,27 +273,29 @@ namespace PartitionFileStorageTest
sw.Restart(); sw.Restart();
var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data)); var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data));
Parallel.ForEach(pairs.Skip(insertCount).ToArray(), parallelOptions, pair => Parallel.ForEach(MassGenerator((long)(0.3 * PAIRS_COUNT)), parallelOptions, pair =>
{ {
var key = pair.Item1; var key = pair.Item1;
var val = pair.Item2; var val = pair.Item2;
merger.Store(key, val); merger.Store(key, val);
Keys.Add(key);
}); });
if (merger.TotalRecords != (pairs.Count - insertCount)) if (merger.TotalRecords != ((long)(0.3 * PAIRS_COUNT)))
{ {
Log.Error($"Count of stored record no equals expected. Recorded: {merger.TotalRecords}. Expected: {(pairs.Count - insertCount)}"); Log.Error($"Count of stored record no equals expected. Recorded: {merger.TotalRecords}. Expected: {((long)(0.3 * PAIRS_COUNT))}");
} }
Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {pairs.Count}"); Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {PAIRS_COUNT}. Unique keys: {Keys.Count}");
merger.Compress(); // auto reindex merger.Compress(); // auto reindex
sw.Stop(); sw.Stop();
Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms");
Log.Info("Test #1 reading");
var readPart = store.CreateAccessor(meta);
ulong totalData = 0; ulong totalData = 0;
ulong totalKeys = 0; ulong totalKeys = 0;
var readPart = store.CreateAccessor(meta);
/*
Log.Info("Test #1 reading");
foreach (var key in testKeys1) foreach (var key in testKeys1)
{ {
var result = readPart.Find(key); var result = readPart.Find(key);
@ -312,6 +324,7 @@ namespace PartitionFileStorageTest
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
totalData = 0; totalData = 0;
totalKeys = 0; totalKeys = 0;
*/
Log.Info("Test #2 reading"); Log.Info("Test #2 reading");
foreach (var key in testKeys2) foreach (var key in testKeys2)
{ {
@ -319,11 +332,15 @@ namespace PartitionFileStorageTest
totalData += (ulong)(result.Value?.Length ?? 0); totalData += (ulong)(result.Value?.Length ?? 0);
totalKeys++; totalKeys++;
} }
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); Log.Info($"\t\tFound: {totalKeys}/{Keys.Count} keys. {totalData} bytes");
totalData = 0; totalData = 0;
totalKeys = 0; totalKeys = 0;
Log.Info("Test #2 remove keys batch"); Log.Info("Test #2 remove keys batch");
readPart.RemoveKeys(testKeys2); readPart.RemoveKeys(testKeys2);
foreach (var k in testKeys2)
{
Keys.TryRemove(k);
}
Log.Info("Test #2 reading after remove"); Log.Info("Test #2 reading after remove");
foreach (var key in testKeys2) foreach (var key in testKeys2)
{ {
@ -341,7 +358,8 @@ namespace PartitionFileStorageTest
totalData += (ulong)(e.Value?.Length ?? 0); totalData += (ulong)(e.Value?.Length ?? 0);
totalKeys++; totalKeys++;
} }
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); Log.Info($"\t\tFound: {totalKeys}/{Keys.Count} keys. {totalData} bytes");
store.Dispose();
Log.Info("Completed"); Log.Info("Completed");
} }
@ -368,6 +386,23 @@ namespace PartitionFileStorageTest
} }
// 2 Test index // 2 Test index
var fileReader = new ParallelFileReader(filePath);
foreach (var pair in index)
{
var accessor = fileReader.GetAccessor(pair.Value);
using (var reader = new MemoryStreamReader(accessor))
{
var k = serializer.KeyDeserializer.Invoke(reader);
if (k != pair.Key)
{
Log.Warning("Broken index");
}
var v = serializer.ValueDeserializer.Invoke(reader);
}
}
/*
using (var reader = new MemoryStreamReader(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024))) using (var reader = new MemoryStreamReader(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024)))
{ {
foreach (var pair in index) foreach (var pair in index)
@ -381,6 +416,7 @@ namespace PartitionFileStorageTest
var v = serializer.ValueDeserializer.Invoke(reader); var v = serializer.ValueDeserializer.Invoke(reader);
} }
} }
*/
} }
private static void FaultUncompressedReadTest(string filePath) private static void FaultUncompressedReadTest(string filePath)
@ -416,13 +452,6 @@ namespace PartitionFileStorageTest
static void Main(string[] args) static void Main(string[] args)
{ {
/*FaultIndexTest(@"H:\temp\85");
return;*/
/*
FaultUncompressedReadTest(@"H:\temp\107");
return;
*/
var root = @"H:\temp"; var root = @"H:\temp";
var options = new StoreOptions<ulong, ulong, byte[], Metadata> var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{ {
@ -430,7 +459,7 @@ namespace PartitionFileStorageTest
{ {
Enabled = true, Enabled = true,
StepType = IndexStepType.Step, StepType = IndexStepType.Step,
StepValue = 16, StepValue = 32,
EnableIndexInMemoryCachee = true EnableIndexInMemoryCachee = true
}, },
RootFolder = root, RootFolder = root,
@ -453,7 +482,7 @@ namespace PartitionFileStorageTest
{ {
Enabled = true, Enabled = true,
StepType = IndexStepType.Step, StepType = IndexStepType.Step,
StepValue = 16, StepValue = 32,
EnableIndexInMemoryCachee = true EnableIndexInMemoryCachee = true
}, },
RootFolder = root, RootFolder = root,
@ -468,24 +497,30 @@ namespace PartitionFileStorageTest
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd")) new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
}, },
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
ThreadSafeWriting = true ThreadSafeWriting = true,
MaxDegreeOfParallelism = 16
}; };
Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.FullDebug); Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.FullDebug);
Log.Info("Start"); /*
var pairs = new List<(ulong, ulong)>(PAIRS_COUNT); var pairs = new List<(ulong, ulong)>(PAIRS_COUNT);
var r = new Random(Environment.TickCount); var r = new Random(Environment.TickCount);
Log.Info("Start create dataset");
for (int i = 0; i < PAIRS_COUNT; i++) for (int i = 0; i < PAIRS_COUNT; i++)
{ {
pairs.Add((Generate(r), Generate(r))); pairs.Add((Generate(r), Generate(r)));
} }
*/
Log.Info("Start test");
// FastTest(options); // FastTest(options);
/* FSUtils.CleanAndTestFolder(root); FSUtils.CleanAndTestFolder(root);
FullStoreMultithreadTest(optionsMultiThread, pairs);*/ FullStoreMultithreadTest(optionsMultiThread);
/*
FSUtils.CleanAndTestFolder(root); FSUtils.CleanAndTestFolder(root);
FullStoreTest(options, pairs); FullStoreTest(options, pairs);
*/
//TestParallelFileReadingMMF();
/* /*
@ -494,5 +529,83 @@ namespace PartitionFileStorageTest
*/ */
Console.ReadKey(); Console.ReadKey();
} }
static void TestParallelFileReading()
{
var path = @"C:\Users\Ogoun\Downloads\Lego_super_hero.iso";
var threads = new List<Thread>();
for (int i = 0; i < 100; i++)
{
var k = i;
var reader = GetReadStream(path);
var t = new Thread(() => PartReader(reader, 1000000 + k * 1000));
t.IsBackground = true;
threads.Add(t);
}
foreach (var t in threads)
{
t.Start();
}
}
static void TestParallelFileReadingMMF()
{
var filereader = new ParallelFileReader(@"C:\Users\Ogoun\Downloads\Lego_super_hero.iso");
var threads = new List<Thread>();
for (int i = 0; i < 100; i++)
{
var k = i;
var t = new Thread(() => PartReaderMMF(filereader.GetAccessor(1000000 + k * 1000)));
t.IsBackground = true;
threads.Add(t);
}
foreach (var t in threads)
{
t.Start();
}
}
static MemoryStreamReader GetReadStream(string filePath)
{
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
return new MemoryStreamReader(stream);
}
static void PartReader(MemoryStreamReader reader, long offset)
{
var count = 0;
using (reader)
{
reader.SetPosition(offset);
for (int i = 0; i < 1000000; i++)
{
if (reader.ReadByte() == 127) count++;
}
}
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId}: {count}");
}
static void PartReaderMMF(IViewAccessor accessor)
{
var count = 0;
var lastPosition = accessor.Position;
using (var reader = new MemoryStreamReader(accessor))
{
for (int i = 0; i < 1000000; i++)
{
if (reader.ReadByte() == 127) count++;
if (lastPosition > reader.Position)
{
// Test for correct absolute position
Console.WriteLine("Fock!");
}
lastPosition = reader.Position;
}
}
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId}: {count}");
}
} }
} }

@ -0,0 +1,137 @@
using System;
using System.Collections.Generic;
using ZeroLevel.Services.Shedulling;
namespace ZeroLevel.Services.Cache
{
internal sealed class TimerCachee<T>
: IDisposable
{
private sealed class CacheeItem<T>
{
public T Value { get; set; }
public DateTime LastAcessTime { get; set; }
}
private readonly IDictionary<string, CacheeItem<T>> _cachee;
private readonly object _cacheeLock = new object();
private readonly ISheduller _sheduller;
private readonly Func<string, T> _factory;
private readonly Action<T> _onDisposeAction;
private readonly TimeSpan _expirationPeriod;
public TimerCachee(TimeSpan expirationPeriod, Func<string, T> factory, Action<T> onDisposeAction, int capacity = 512)
{
_factory = factory;
_onDisposeAction = onDisposeAction;
_sheduller = Sheduller.Create();
_cachee = new Dictionary<string, CacheeItem<T>>(capacity);
_expirationPeriod = expirationPeriod;
var ts = TimeSpan.FromSeconds(60);
if (ts.TotalSeconds > expirationPeriod.TotalSeconds)
{
ts = expirationPeriod;
}
_sheduller.RemindEvery(ts, _ => CheckAndCleacnCachee());
}
public T Get(string key)
{
lock (_cacheeLock)
{
if (_cachee.TryGetValue(key, out var v))
{
v.LastAcessTime = DateTime.UtcNow;
return v.Value;
}
}
var obj = _factory.Invoke(key);
var item = new CacheeItem<T> { Value = obj, LastAcessTime = DateTime.UtcNow };
lock (_cacheeLock)
{
_cachee[key] = item;
}
return obj;
}
public void Drop(string key)
{
lock (_cacheeLock)
{
if (_cachee.TryGetValue(key, out var v))
{
try
{
if (_onDisposeAction != null)
{
_onDisposeAction.Invoke(v.Value);
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[TimerCachee.Drop] Key '{key}'. Fault dispose.");
}
_cachee.Remove(key);
}
}
}
private void CheckAndCleacnCachee()
{
lock (_cacheeLock)
{
var keysToRemove = new List<string>(_cachee.Count);
foreach (var pair in _cachee)
{
if ((DateTime.UtcNow - pair.Value.LastAcessTime) > _expirationPeriod)
{
keysToRemove.Add(pair.Key);
}
}
foreach (var key in keysToRemove)
{
try
{
if (_onDisposeAction != null)
{
_onDisposeAction.Invoke(_cachee[key].Value);
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[TimerCachee.CheckAndCleacnCachee] Key '{key}'");
}
_cachee.Remove(key);
}
}
}
public void DropAll()
{
lock (_cacheeLock)
{
foreach (var pair in _cachee)
{
try
{
if (_onDisposeAction != null)
{
_onDisposeAction.Invoke(pair.Value.Value);
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[TimerCachee.DropAll] Key '{pair.Key}'");
}
}
_cachee.Clear();
}
}
public void Dispose()
{
_sheduller?.Clean();
_sheduller?.Dispose();
DropAll();
}
}
}

@ -0,0 +1,41 @@
using System;
using System.IO;
using System.IO.MemoryMappedFiles;
using ZeroLevel.Services.Memory;
namespace ZeroLevel.Services.FileSystem
{
public sealed class ParallelFileReader
: IDisposable
{
private MemoryMappedFile _mmf;
private readonly long _fileLength;
public long FileLength => _fileLength;
public ParallelFileReader(string filePath)
{
_fileLength = new FileInfo(filePath).Length;
_mmf = MemoryMappedFile.CreateFromFile(filePath, FileMode.Open, Guid.NewGuid().ToString(), 0, MemoryMappedFileAccess.Read);
}
public IViewAccessor GetAccessor(long offset)
{
var length = _fileLength - offset;
return new MMFViewAccessor(_mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read), offset);
}
public IViewAccessor GetAccessor(long offset, int length)
{
if ((offset + length) > _fileLength)
{
throw new OutOfMemoryException($"Offset + Length ({offset + length}) more than file length ({_fileLength})");
}
return new MMFViewAccessor(_mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read), offset);
}
public void Dispose()
{
_mmf?.Dispose();
}
}
}

@ -0,0 +1,17 @@
using System;
namespace ZeroLevel.Services.Memory
{
public interface IViewAccessor
: IDisposable
{
/// <summary>
/// End of view
/// </summary>
bool EOV { get; }
long Position { get; }
byte[] ReadBuffer(int count);
bool CheckOutOfRange(int offset);
void Seek(long offset);
}
}

@ -0,0 +1,48 @@
using System;
using System.IO;
using System.IO.MemoryMappedFiles;
namespace ZeroLevel.Services.Memory
{
internal class MMFViewAccessor
: IViewAccessor
{
private readonly MemoryMappedViewStream _accessor;
private readonly long _absoluteOffset;
public MMFViewAccessor(MemoryMappedViewStream accessor, long offset)
{
_accessor = accessor;
_absoluteOffset = offset;
}
public bool EOV => _accessor.Position >= _accessor.Length;
public long Position => _absoluteOffset + _accessor.Position;
public bool CheckOutOfRange(int offset)
{
return offset < 0 || (_accessor.Position + offset) > _accessor.Length;
}
public void Dispose()
{
_accessor?.Dispose();
}
public byte[] ReadBuffer(int count)
{
if (count == 0) return null;
var buffer = new byte[count];
var readedCount = _accessor.Read(buffer, 0, count);
if (count != readedCount)
throw new InvalidOperationException($"The stream returned less data ({count} bytes) than expected ({readedCount} bytes)");
return buffer;
}
public void Seek(long offset)
{
_accessor.Seek(offset, SeekOrigin.Begin);
}
}
}

@ -0,0 +1,44 @@
using System;
using System.IO;
namespace ZeroLevel.Services.Memory
{
internal sealed class StreamVewAccessor
: IViewAccessor
{
private readonly Stream _stream;
public StreamVewAccessor(Stream stream)
{
_stream = stream;
}
public bool EOV => _stream.Position >= _stream.Length;
public long Position => _stream.Position;
public bool CheckOutOfRange(int offset)
{
return offset < 0 || (_stream.Position + offset) > _stream.Length;
}
public byte[] ReadBuffer(int count)
{
if (count == 0) return null;
var buffer = new byte[count];
var readedCount = _stream.Read(buffer, 0, count);
if (count != readedCount)
throw new InvalidOperationException($"The stream returned less data ({count} bytes) than expected ({readedCount} bytes)");
return buffer;
}
public void Dispose()
{
_stream.Dispose();
}
public void Seek(long offset)
{
_stream.Seek(offset, SeekOrigin.Begin);
}
}
}

@ -2,7 +2,6 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
@ -19,7 +18,8 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly int _stepValue; private readonly int _stepValue;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer; private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer; private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
public IndexBuilder(IndexStepType indexType, int stepValue, string dataCatalog) private readonly PhisicalFileAccessorCachee _phisicalFileAccessorCachee;
public IndexBuilder(IndexStepType indexType, int stepValue, string dataCatalog, PhisicalFileAccessorCachee phisicalFileAccessorCachee)
{ {
_dataCatalog = dataCatalog; _dataCatalog = dataCatalog;
_indexCatalog = Path.Combine(dataCatalog, INDEX_SUBFOLDER_NAME); _indexCatalog = Path.Combine(dataCatalog, INDEX_SUBFOLDER_NAME);
@ -27,19 +27,19 @@ namespace ZeroLevel.Services.PartitionStorage
_stepValue = stepValue; _stepValue = stepValue;
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>(); _keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>(); _valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
_phisicalFileAccessorCachee = phisicalFileAccessorCachee;
} }
/// <summary> /// <summary>
/// Rebuild indexes for all files /// Rebuild indexes for all files
/// </summary> /// </summary>
internal void RebuildIndex() internal void RebuildIndex()
{ {
FSUtils.CleanAndTestFolder(_indexCatalog);
var files = Directory.GetFiles(_dataCatalog); var files = Directory.GetFiles(_dataCatalog);
if (files != null && files.Length > 0) if (files != null && files.Length > 0)
{ {
foreach (var file in files) foreach (var file in files)
{ {
RebuildFileIndex(file); RebuildFileIndex(Path.GetFileName(file));
} }
} }
} }
@ -63,6 +63,7 @@ namespace ZeroLevel.Services.PartitionStorage
internal void DropFileIndex(string file) internal void DropFileIndex(string file)
{ {
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
_phisicalFileAccessorCachee.DropIndexReader(index_file);
if (File.Exists(index_file)) if (File.Exists(index_file))
{ {
File.Delete(index_file); File.Delete(index_file);
@ -78,9 +79,7 @@ namespace ZeroLevel.Services.PartitionStorage
Directory.CreateDirectory(_indexCatalog); Directory.CreateDirectory(_indexCatalog);
} }
var dict = new Dictionary<TKey, long>(); var dict = new Dictionary<TKey, long>();
if (TryGetReadStream(file, out var reader)) using (var reader = new MemoryStreamReader(new FileStream(Path.Combine(_dataCatalog, file), FileMode.Open, FileAccess.Read, FileShare.None)))
{
using (reader)
{ {
while (reader.EOS == false) while (reader.EOS == false)
{ {
@ -90,11 +89,11 @@ namespace ZeroLevel.Services.PartitionStorage
_valueDeserializer.Invoke(reader); _valueDeserializer.Invoke(reader);
} }
} }
}
if (dict.Count > _stepValue) if (dict.Count > _stepValue)
{ {
var step = (int)Math.Round(dict.Count / (float)_stepValue, MidpointRounding.ToZero); var step = (int)Math.Round(dict.Count / (float)_stepValue, MidpointRounding.ToZero);
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
DropFileIndex(index_file);
var d_arr = dict.OrderBy(p => p.Key).ToArray(); var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{ {
@ -116,11 +115,10 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
Directory.CreateDirectory(_indexCatalog); Directory.CreateDirectory(_indexCatalog);
} }
if (TryGetReadStream(file, out var reader)) using (var reader = new MemoryStreamReader(new FileStream(Path.Combine(_dataCatalog, file), FileMode.Open, FileAccess.Read, FileShare.None)))
{
using (reader)
{ {
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
DropFileIndex(index_file);
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{ {
var counter = 1; var counter = 1;
@ -141,24 +139,4 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
} }
/// <summary>
/// Attempting to open a file for reading
/// </summary>
private bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
{
try
{
var filePath = Path.Combine(_dataCatalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
reader = new MemoryStreamReader(stream);
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionAccessor.TryGetReadStream]");
}
reader = null;
return false;
}
}
} }

@ -4,5 +4,6 @@
{ {
public TKey Key { get; set; } public TKey Key { get; set; }
public long Offset { get; set; } public long Offset { get; set; }
public int Length { get; set; }
} }
} }

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using ZeroLevel.Services.Memory;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
@ -15,13 +16,13 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly bool _enableIndexInMemoryCachee; private readonly bool _enableIndexInMemoryCachee;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer; private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly TMeta _meta; private readonly TMeta _meta;
private readonly PhisicalFileAccessorCachee _phisicalFileAccessorCachee;
private readonly Dictionary<string, KeyIndex<TKey>[]> _indexCachee = null;
public StorePartitionSparseIndex(string partitionFolder, TMeta meta, public StorePartitionSparseIndex(string partitionFolder, TMeta meta,
StoreFilePartition<TKey, TMeta> filePartition, StoreFilePartition<TKey, TMeta> filePartition,
Func<TKey, TKey, int> keyComparer, Func<TKey, TKey, int> keyComparer,
bool enableIndexInMemoryCachee) bool enableIndexInMemoryCachee,
PhisicalFileAccessorCachee phisicalFileAccessorCachee)
{ {
_indexFolder = Path.Combine(partitionFolder, "__indexes__"); _indexFolder = Path.Combine(partitionFolder, "__indexes__");
_indexExists = Directory.Exists(_indexFolder); _indexExists = Directory.Exists(_indexFolder);
@ -32,7 +33,7 @@ namespace ZeroLevel.Services.PartitionStorage
_enableIndexInMemoryCachee = enableIndexInMemoryCachee; _enableIndexInMemoryCachee = enableIndexInMemoryCachee;
if (_enableIndexInMemoryCachee) if (_enableIndexInMemoryCachee)
{ {
_indexCachee = new Dictionary<string, KeyIndex<TKey>[]>(1024); _phisicalFileAccessorCachee = phisicalFileAccessorCachee;
} }
} }
@ -110,46 +111,26 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
if (_enableIndexInMemoryCachee) if (_enableIndexInMemoryCachee)
{ {
lock (_casheupdatelock) _phisicalFileAccessorCachee.DropAllIndexReaders();
{
_indexCachee.Clear();
}
} }
} }
public void RemoveCacheeItem(string name) public void RemoveCacheeItem(string name)
{ {
var file = Path.Combine(_indexFolder, name);
if (_enableIndexInMemoryCachee) if (_enableIndexInMemoryCachee)
{ {
lock (_casheupdatelock) _phisicalFileAccessorCachee.DropIndexReader(file);
{
_indexCachee.Remove(name);
}
} }
} }
private readonly object _casheupdatelock = new object();
private KeyIndex<TKey>[] GetFileIndex(TKey key) private KeyIndex<TKey>[] GetFileIndex(TKey key)
{ {
var indexName = _filePartition.FileNameExtractor.Invoke(key, _meta); var indexName = _filePartition.FileNameExtractor.Invoke(key, _meta);
var filePath = Path.Combine(_indexFolder, indexName);
try try
{ {
if (_enableIndexInMemoryCachee) return ReadIndexesFromIndexFile(filePath);
{
if (_indexCachee.TryGetValue(indexName, out var index))
{
return index;
}
lock (_casheupdatelock)
{
_indexCachee[indexName] = ReadIndexesFromIndexFile(indexName);
return _indexCachee[indexName];
}
}
else
{
return ReadIndexesFromIndexFile(indexName);
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -158,19 +139,35 @@ namespace ZeroLevel.Services.PartitionStorage
return null; return null;
} }
private KeyIndex<TKey>[] ReadIndexesFromIndexFile(string indexName) private KeyIndex<TKey>[] ReadIndexesFromIndexFile(string filePath)
{ {
var file = Path.Combine(_indexFolder, indexName); if (File.Exists(filePath))
if (File.Exists(file))
{ {
var list = new List<KeyIndex<TKey>>(); var list = new List<KeyIndex<TKey>>();
using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None))) var accessor = _enableIndexInMemoryCachee ? _phisicalFileAccessorCachee.GetIndexAccessor(filePath, 0) : new StreamVewAccessor(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.None));
using (var reader = new MemoryStreamReader(accessor))
{
var index = new KeyIndex<TKey>();
if (reader.EOS == false)
{ {
index.Key = _keyDeserializer.Invoke(reader);
}
if (reader.EOS == false)
{
index.Offset = reader.ReadLong();
}
while (reader.EOS == false) while (reader.EOS == false)
{ {
var k = _keyDeserializer.Invoke(reader); var k = _keyDeserializer.Invoke(reader);
var o = reader.ReadLong(); var o = reader.ReadLong();
list.Add(new KeyIndex<TKey> { Key = k, Offset = o }); index.Length = (int)(o - index.Offset);
list.Add(index);
index = new KeyIndex<TKey> { Key = k, Offset = o };
}
if (index.Offset > 0)
{
index.Length = 0;
list.Add(index);
} }
} }
return list.ToArray(); return list.ToArray();

@ -1,5 +1,6 @@
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
{ {
/*TODO IN FUTURE*/
internal struct ValueIndex<TValue> internal struct ValueIndex<TValue>
{ {
public TValue Value { get; set; } public TValue Value { get; set; }

@ -43,6 +43,10 @@ namespace ZeroLevel.Services.PartitionStorage
/// Uses a thread-safe mechanism for writing to files during multi-threaded writes /// Uses a thread-safe mechanism for writing to files during multi-threaded writes
/// </summary> /// </summary>
public bool ThreadSafeWriting { get; set; } = false; public bool ThreadSafeWriting { get; set; } = false;
/// <summary>
/// Period before memory mapped file was closed, after last access time
/// </summary>
public TimeSpan PhisicalFileAccessorExpirationPeriod { get; set; } = TimeSpan.FromMinutes(30);
public IndexOptions Index { get; set; } = new IndexOptions public IndexOptions Index { get; set; } = new IndexOptions
{ {

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.IO; using System.IO;
using System.Threading; using System.Threading;
using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Memory;
using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
@ -24,9 +25,12 @@ namespace ZeroLevel.Services.PartitionStorage.Partition
private readonly IndexBuilder<TKey, TValue> _indexBuilder; private readonly IndexBuilder<TKey, TValue> _indexBuilder;
private readonly Dictionary<string, MemoryStreamWriter> _writeStreams = new Dictionary<string, MemoryStreamWriter>(); private readonly Dictionary<string, MemoryStreamWriter> _writeStreams = new Dictionary<string, MemoryStreamWriter>();
private readonly PhisicalFileAccessorCachee _phisicalFileAccessor;
protected PhisicalFileAccessorCachee PhisicalFileAccessorCachee => _phisicalFileAccessor;
internal BasePartition(StoreOptions<TKey, TInput, TValue, TMeta> options, internal BasePartition(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info, TMeta info,
IStoreSerializer<TKey, TInput, TValue> serializer) IStoreSerializer<TKey, TInput, TValue> serializer, PhisicalFileAccessorCachee fileAccessorCachee)
{ {
_options = options; _options = options;
_info = info; _info = info;
@ -35,7 +39,8 @@ namespace ZeroLevel.Services.PartitionStorage.Partition
{ {
Directory.CreateDirectory(_catalog); Directory.CreateDirectory(_catalog);
} }
_indexBuilder = _options.Index.Enabled ? new IndexBuilder<TKey, TValue>(_options.Index.StepType, _options.Index.StepValue, _catalog) : null; _phisicalFileAccessor = fileAccessorCachee;
_indexBuilder = _options.Index.Enabled ? new IndexBuilder<TKey, TValue>(_options.Index.StepType, _options.Index.StepValue, _catalog, fileAccessorCachee) : null;
Serializer = serializer; Serializer = serializer;
} }
@ -155,5 +160,41 @@ namespace ZeroLevel.Services.PartitionStorage.Partition
reader = null; reader = null;
return false; return false;
} }
protected IViewAccessor GetViewAccessor(TKey key, long offset)
{
var fileName = _options.GetFileName(key, _info);
var filePath = Path.Combine(_catalog, fileName);
return GetViewAccessor(filePath, offset);
}
protected IViewAccessor GetViewAccessor(TKey key, long offset, int length)
{
var fileName = _options.GetFileName(key, _info);
var filePath = Path.Combine(_catalog, fileName);
return GetViewAccessor(filePath, offset, length);
}
protected IViewAccessor GetViewAccessor(string filePath, long offset)
{
try
{
return PhisicalFileAccessorCachee.GetDataAccessor(filePath, offset);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[StorePartitionAccessor.GetViewAccessor] '{filePath}'");
}
return null;
}
protected IViewAccessor GetViewAccessor(string filePath, long offset, int length)
{
try
{
return PhisicalFileAccessorCachee.GetDataAccessor(filePath, offset, length);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[StorePartitionAccessor.GetViewAccessor] '{filePath}'");
}
return null;
}
} }
} }

@ -2,7 +2,6 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.FileSystem;
@ -22,8 +21,9 @@ namespace ZeroLevel.Services.PartitionStorage.Partition
public CompactKeyStorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options, public CompactKeyStorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info, TMeta info,
IStoreSerializer<TKey, TInput, TValue> serializer) IStoreSerializer<TKey, TInput, TValue> serializer,
: base(options, info, serializer) PhisicalFileAccessorCachee fileAccessorCachee)
: base(options, info, serializer, fileAccessorCachee)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
if (options.ThreadSafeWriting) if (options.ThreadSafeWriting)

@ -19,7 +19,7 @@ namespace ZeroLevel.Services.PartitionStorage
/// Exists compressed catalog /// Exists compressed catalog
/// </summary> /// </summary>
private readonly IStorePartitionAccessor<TKey, TInput, TValue> _accessor; private readonly IStorePartitionAccessor<TKey, TInput, TValue> _accessor;
private readonly PhisicalFileAccessorCachee _phisicalFileAccessor;
private readonly string _temporaryFolder; private readonly string _temporaryFolder;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer; private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer; private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
@ -37,15 +37,18 @@ namespace ZeroLevel.Services.PartitionStorage
public StoreMergePartitionAccessor(StoreOptions<TKey, TInput, TValue, TMeta> options, public StoreMergePartitionAccessor(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info, TMeta info,
Func<TValue, IEnumerable<TInput>> decompress, Func<TValue, IEnumerable<TInput>> decompress,
IStoreSerializer<TKey, TInput, TValue> serializer) IStoreSerializer<TKey, TInput, TValue> serializer,
PhisicalFileAccessorCachee cachee)
{ {
if (decompress == null) throw new ArgumentNullException(nameof(decompress)); if (decompress == null) throw new ArgumentNullException(nameof(decompress));
_decompress = decompress; _decompress = decompress;
_accessor = new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(options, info, serializer); _accessor = new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(options, info, serializer, cachee);
_temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); _temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString());
var tempOptions = options.Clone(); var tempOptions = options.Clone();
tempOptions.RootFolder = _temporaryFolder; tempOptions.RootFolder = _temporaryFolder;
_temporaryAccessor = new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(tempOptions, info, serializer); _temporaryAccessor = new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(tempOptions, info, serializer, cachee);
_phisicalFileAccessor = cachee;
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>(); _keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>(); _valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
@ -103,13 +106,21 @@ namespace ZeroLevel.Services.PartitionStorage
// replace old file by new // replace old file by new
foreach (var file in newFiles) foreach (var file in newFiles)
{ {
_phisicalFileAccessor.DropDataReader(file);
// 1. Remove index file // 1. Remove index file
(_accessor as StorePartitionAccessor<TKey, TInput, TValue, TMeta>) (_accessor as StorePartitionAccessor<TKey, TInput, TValue, TMeta>)
.DropFileIndex(file); .DropFileIndex(file);
// 2. Replace source // 2. Replace source
var name = Path.GetFileName(file); var name = Path.GetFileName(file);
File.Move(file, Path.Combine(folder, name), true); var updateFilePath = Path.Combine(folder, name);
if (File.Exists(updateFilePath))
{
_phisicalFileAccessor.DropDataReader(updateFilePath);
File.Delete(updateFilePath);
}
File.Move(file, updateFilePath, true);
// 3. Rebuil index // 3. Rebuil index
(_accessor as BasePartition<TKey, TInput, TValue, TMeta>).RebuildFileIndex(name); (_accessor as BasePartition<TKey, TInput, TValue, TMeta>).RebuildFileIndex(name);

@ -3,8 +3,10 @@ using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Memory;
using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.PartitionStorage.Partition; using ZeroLevel.Services.PartitionStorage.Partition;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
{ {
@ -12,39 +14,37 @@ namespace ZeroLevel.Services.PartitionStorage
: BasePartition<TKey, TInput, TValue, TMeta>, IStorePartitionAccessor<TKey, TInput, TValue> : BasePartition<TKey, TInput, TValue, TMeta>, IStorePartitionAccessor<TKey, TInput, TValue>
{ {
private readonly StorePartitionSparseIndex<TKey, TMeta> Indexes; private readonly StorePartitionSparseIndex<TKey, TMeta> Indexes;
public StorePartitionAccessor(StoreOptions<TKey, TInput, TValue, TMeta> options, public StorePartitionAccessor(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info, TMeta info,
IStoreSerializer<TKey, TInput, TValue> serializer) IStoreSerializer<TKey, TInput, TValue> serializer,
: base(options, info, serializer) PhisicalFileAccessorCachee phisicalFileAccessor)
: base(options, info, serializer, phisicalFileAccessor)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
if (options.Index.Enabled) if (options.Index.Enabled)
{ {
Indexes = new StorePartitionSparseIndex<TKey, TMeta>(_catalog, _info, options.FilePartition, options.KeyComparer, options.Index.EnableIndexInMemoryCachee); Indexes = new StorePartitionSparseIndex<TKey, TMeta>(_catalog, _info, options.FilePartition, options.KeyComparer, options.Index.EnableIndexInMemoryCachee, phisicalFileAccessor);
} }
} }
#region IStorePartitionAccessor #region IStorePartitionAccessor
public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key) public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key)
{ {
var fileName = _options.GetFileName(key, _info); IViewAccessor memoryAccessor;
if (File.Exists(Path.Combine(_catalog, fileName)))
{
long startOffset = 0;
if (_options.Index.Enabled) if (_options.Index.Enabled)
{ {
var offset = Indexes.GetOffset(key); var offset = Indexes.GetOffset(key);
startOffset = offset.Offset; memoryAccessor = offset.Length > 0 ? GetViewAccessor(key, offset.Offset, offset.Length) : GetViewAccessor(key, offset.Offset);
} }
if (TryGetReadStream(fileName, out var reader)) else
{ {
using (reader) memoryAccessor = GetViewAccessor(key, 0);
}
if (memoryAccessor != null)
{ {
if (startOffset > 0) using (var reader = new MemoryStreamReader(memoryAccessor))
{ {
reader.Seek(startOffset, SeekOrigin.Begin);
}
while (reader.EOS == false) while (reader.EOS == false)
{ {
var k = Serializer.KeyDeserializer.Invoke(reader); var k = Serializer.KeyDeserializer.Invoke(reader);
@ -63,16 +63,6 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
} }
else
{
return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = key,
Status = SearchResult.FileLocked,
Value = default
};
}
}
return new StorePartitionKeyValueSearchResult<TKey, TValue> return new StorePartitionKeyValueSearchResult<TKey, TValue>
{ {
Key = key, Key = key,
@ -101,9 +91,10 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
foreach (var file in files) foreach (var file in files)
{ {
if (TryGetReadStream(file, out var reader)) var accessor = PhisicalFileAccessorCachee.GetDataAccessor(file, 0);
if (accessor != null)
{ {
using (reader) using (var reader = new MemoryStreamReader(accessor))
{ {
while (reader.EOS == false) while (reader.EOS == false)
{ {
@ -119,11 +110,13 @@ namespace ZeroLevel.Services.PartitionStorage
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key) public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key)
{ {
var fileName = _options.GetFileName(key, _info); var fileName = _options.GetFileName(key, _info);
if (File.Exists(Path.Combine(_catalog, fileName))) var filePath = Path.Combine(_catalog, fileName);
if (File.Exists(filePath))
{ {
if (TryGetReadStream(fileName, out var reader)) var accessor = PhisicalFileAccessorCachee.GetDataAccessor(filePath, 0);
if (accessor != null)
{ {
using (reader) using (var reader = new MemoryStreamReader(accessor))
{ {
while (reader.EOS == false) while (reader.EOS == false)
{ {
@ -188,20 +181,27 @@ namespace ZeroLevel.Services.PartitionStorage
private IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(string fileName, private IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(string fileName,
TKey[] keys) TKey[] keys)
{ {
if (File.Exists(Path.Combine(_catalog, fileName))) var filePath = Path.Combine(_catalog, fileName);
{
if (_options.Index.Enabled) if (_options.Index.Enabled)
{ {
var offsets = Indexes.GetOffset(keys, true); var offsets = Indexes.GetOffset(keys, true);
if (TryGetReadStream(fileName, out var reader))
{
using (reader)
{
for (int i = 0; i < keys.Length; i++) for (int i = 0; i < keys.Length; i++)
{ {
var searchKey = keys[i]; var searchKey = keys[i];
var off = offsets[i]; var offset = offsets[i];
reader.Seek(off.Offset, SeekOrigin.Begin); IViewAccessor memoryAccessor;
if (offset.Length > 0)
{
memoryAccessor = GetViewAccessor(filePath, offset.Offset, offset.Length);
}
else
{
memoryAccessor = GetViewAccessor(filePath, offset.Offset);
}
if (memoryAccessor != null)
{
using (var reader = new MemoryStreamReader(memoryAccessor))
{
while (reader.EOS == false) while (reader.EOS == false)
{ {
var k = Serializer.KeyDeserializer.Invoke(reader); var k = Serializer.KeyDeserializer.Invoke(reader);
@ -228,9 +228,10 @@ namespace ZeroLevel.Services.PartitionStorage
} }
else else
{ {
if (TryGetReadStream(fileName, out var reader)) var memoryAccessor = GetViewAccessor(filePath, 0);
if (memoryAccessor != null)
{ {
using (reader) using (var reader = new MemoryStreamReader(memoryAccessor))
{ {
int index = 0; int index = 0;
var keys_arr = keys.OrderBy(k => k).ToArray(); var keys_arr = keys.OrderBy(k => k).ToArray();
@ -265,27 +266,32 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
} }
}
private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove, bool autoReindex) private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove, bool autoReindex)
{ {
var filePath = Path.Combine(_catalog, fileName); var filePath = Path.Combine(_catalog, fileName);
if (File.Exists(filePath))
{
// 1. Find ranges // 1. Find ranges
var ranges = new List<FilePositionRange>(); var ranges = new List<FilePositionRange>();
if (_options.Index.Enabled && autoReindex) if (_options.Index.Enabled && autoReindex)
{ {
var offsets = Indexes.GetOffset(keys, true); var offsets = Indexes.GetOffset(keys, true);
if (TryGetReadStream(fileName, out var reader))
{
using (reader)
{
for (int i = 0; i < keys.Length; i++) for (int i = 0; i < keys.Length; i++)
{ {
var searchKey = keys[i]; var searchKey = keys[i];
var off = offsets[i]; var offset = offsets[i];
reader.Seek(off.Offset, SeekOrigin.Begin); IViewAccessor memoryAccessor;
if (offset.Length > 0)
{
memoryAccessor = GetViewAccessor(filePath, offset.Offset, offset.Length);
}
else
{
memoryAccessor = GetViewAccessor(filePath, offset.Offset);
}
if (memoryAccessor != null)
{
using (var reader = new MemoryStreamReader(memoryAccessor))
{
while (reader.EOS == false) while (reader.EOS == false)
{ {
var startPosition = reader.Position; var startPosition = reader.Position;
@ -308,9 +314,10 @@ namespace ZeroLevel.Services.PartitionStorage
} }
else else
{ {
if (TryGetReadStream(fileName, out var reader)) var memoryAccessor = GetViewAccessor(filePath, 0);
if (memoryAccessor != null)
{ {
using (reader) using (var reader = new MemoryStreamReader(memoryAccessor))
{ {
int index = 0; int index = 0;
var keys_arr = keys.OrderBy(k => k).ToArray(); var keys_arr = keys.OrderBy(k => k).ToArray();
@ -370,6 +377,8 @@ namespace ZeroLevel.Services.PartitionStorage
} }
// 3. Replace from temporary to original // 3. Replace from temporary to original
PhisicalFileAccessorCachee.DropDataReader(filePath);
File.Delete(filePath);
File.Move(tempFile, filePath, true); File.Move(tempFile, filePath, true);
// Rebuild index if needs // Rebuild index if needs
@ -378,7 +387,6 @@ namespace ZeroLevel.Services.PartitionStorage
RebuildFileIndex(filePath); RebuildFileIndex(filePath);
} }
} }
}
#endregion #endregion

@ -22,8 +22,9 @@ namespace ZeroLevel.Services.PartitionStorage
public StorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options, public StorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info, TMeta info,
IStoreSerializer<TKey, TInput, TValue> serializer) IStoreSerializer<TKey, TInput, TValue> serializer,
: base(options, info, serializer) PhisicalFileAccessorCachee fileAccessorCachee)
: base(options, info, serializer, fileAccessorCachee)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
if (options.ThreadSafeWriting) if (options.ThreadSafeWriting)
@ -67,9 +68,10 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
foreach (var file in files) foreach (var file in files)
{ {
if (TryGetReadStream(file, out var reader)) var accessor = GetViewAccessor(file, 0);
if (accessor != null)
{ {
using (reader) using (var reader = new MemoryStreamReader(accessor))
{ {
while (reader.EOS == false) while (reader.EOS == false)
{ {
@ -134,7 +136,10 @@ namespace ZeroLevel.Services.PartitionStorage
internal void CompressFile(string file) internal void CompressFile(string file)
{ {
var dict = new Dictionary<TKey, HashSet<TInput>>(); var dict = new Dictionary<TKey, HashSet<TInput>>();
using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024))) var accessor = PhisicalFileAccessorCachee.GetDataAccessor(file, 0);
if (accessor != null)
{
using (var reader = new MemoryStreamReader(accessor))
{ {
while (reader.EOS == false) while (reader.EOS == false)
{ {
@ -151,6 +156,7 @@ namespace ZeroLevel.Services.PartitionStorage
dict[key].Add(input); dict[key].Add(input);
} }
} }
}
var tempFile = FSUtils.GetAppLocalTemporaryFile(); var tempFile = FSUtils.GetAppLocalTemporaryFile();
using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
{ {
@ -163,6 +169,7 @@ namespace ZeroLevel.Services.PartitionStorage
writer.SerializeCompatible(v); writer.SerializeCompatible(v);
} }
} }
PhisicalFileAccessorCachee.DropDataReader(file);
File.Delete(file); File.Delete(file);
File.Move(tempFile, file, true); File.Move(tempFile, file, true);
} }

@ -0,0 +1,121 @@
using System;
using System.IO;
using ZeroLevel.Services.Cache;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Memory;
namespace ZeroLevel.Services.PartitionStorage
{
internal sealed class PhisicalFileAccessorCachee
: IDisposable
{
private readonly TimerCachee<ParallelFileReader> _indexReadersCachee;
private readonly TimerCachee<ParallelFileReader> _dataReadersCachee;
public PhisicalFileAccessorCachee(TimeSpan dataExpirationPeriod, TimeSpan indexExpirationPeriod)
{
_dataReadersCachee = new TimerCachee<ParallelFileReader>(dataExpirationPeriod, s => new ParallelFileReader(s), i => i.Dispose(), 8192);
_indexReadersCachee = new TimerCachee<ParallelFileReader>(indexExpirationPeriod, s => new ParallelFileReader(s), i => i.Dispose(), 8192);
}
#region DATA
public void DropDataReader(string filePath)
{
_dataReadersCachee.Drop(filePath);
}
private ParallelFileReader GetDataReader(string filePath)
{
if (File.Exists(filePath) == false)
throw new FileNotFoundException(filePath);
return _dataReadersCachee.Get(filePath);
}
public IViewAccessor GetDataAccessor(string filePath, long offset)
{
var reader = GetDataReader(filePath);
try
{
return reader.GetAccessor(offset);
}
catch (ObjectDisposedException)
{
_dataReadersCachee.Drop(filePath);
reader = _dataReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset);
}
public IViewAccessor GetDataAccessor(string filePath, long offset, int length)
{
var reader = GetDataReader(filePath);
try
{
return reader.GetAccessor(offset, length);
}
catch (ObjectDisposedException)
{
_dataReadersCachee.Drop(filePath);
reader = _dataReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset, length);
}
public void DropAllDataReaders()
{
_dataReadersCachee.DropAll();
}
#endregion
#region Indexes
public void DropIndexReader(string filePath)
{
_indexReadersCachee.Drop(filePath);
}
private ParallelFileReader GetIndexReader(string filePath)
{
if (File.Exists(filePath) == false)
throw new FileNotFoundException(filePath);
return _indexReadersCachee.Get(filePath);
}
public IViewAccessor GetIndexAccessor(string filePath, long offset)
{
var reader = GetIndexReader(filePath);
try
{
return reader.GetAccessor(offset);
}
catch (ObjectDisposedException)
{
_indexReadersCachee.Drop(filePath);
reader = _indexReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset);
}
public IViewAccessor GetIndexAccessor(string filePath, long offset, int length)
{
var reader = GetIndexReader(filePath);
try
{
return reader.GetAccessor(offset, length);
}
catch (ObjectDisposedException)
{
_indexReadersCachee.Drop(filePath);
reader = _indexReadersCachee.Get(filePath);
}
return reader.GetAccessor(offset, length);
}
public void DropAllIndexReaders()
{
_indexReadersCachee.DropAll();
}
#endregion
public void Dispose()
{
_dataReadersCachee.Dispose();
_indexReadersCachee.Dispose();
}
}
}

@ -4,6 +4,6 @@
{ {
Success, Success,
NotFound, NotFound,
FileLocked FileLockedOrUnavaliable
} }
} }

@ -10,10 +10,12 @@ using ZeroLevel.Services.PartitionStorage.Interfaces;
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
{ {
public class Store<TKey, TInput, TValue, TMeta> : public class Store<TKey, TInput, TValue, TMeta> :
IStore<TKey, TInput, TValue, TMeta> IStore<TKey, TInput, TValue, TMeta>, IDisposable
{ {
private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options; private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly IStoreSerializer<TKey, TInput, TValue> _serializer; private readonly IStoreSerializer<TKey, TInput, TValue> _serializer;
private readonly PhisicalFileAccessorCachee _fileAccessorCachee;
public Store(StoreOptions<TKey, TInput, TValue, TMeta> options, public Store(StoreOptions<TKey, TInput, TValue, TMeta> options,
IStoreSerializer<TKey, TInput, TValue> serializer = null) IStoreSerializer<TKey, TInput, TValue> serializer = null)
{ {
@ -31,6 +33,7 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
Directory.CreateDirectory(_options.RootFolder); Directory.CreateDirectory(_options.RootFolder);
} }
_fileAccessorCachee = new PhisicalFileAccessorCachee(options.PhisicalFileAccessorExpirationPeriod, TimeSpan.FromHours(2));
} }
public void RemovePartition(TMeta info) public void RemovePartition(TMeta info)
@ -42,17 +45,17 @@ namespace ZeroLevel.Services.PartitionStorage
public IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info) public IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info)
{ {
return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, _serializer); return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, _serializer, _fileAccessorCachee);
} }
public IStorePartitionBuilder<TKey, TInput, TValue> CreateBuilder(TMeta info) public IStorePartitionBuilder<TKey, TInput, TValue> CreateBuilder(TMeta info)
{ {
return new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(_options, info, _serializer); return new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(_options, info, _serializer, _fileAccessorCachee);
} }
public IStorePartitionMergeBuilder<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor) public IStorePartitionMergeBuilder<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor)
{ {
return new StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, decompressor, _serializer); return new StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, decompressor, _serializer, _fileAccessorCachee);
} }
public async Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest) public async Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest)
@ -81,5 +84,10 @@ namespace ZeroLevel.Services.PartitionStorage
result.Results = results; result.Results = results;
return result; return result;
} }
public void Dispose()
{
_fileAccessorCachee.Dispose();
}
} }
} }

@ -1,7 +1,6 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Net; using System.Net;
namespace ZeroLevel.Services.Serialization namespace ZeroLevel.Services.Serialization
@ -133,6 +132,6 @@ namespace ZeroLevel.Services.Serialization
#endregion Extensions #endregion Extensions
Stream Stream { get; } void SetPosition(long position);
} }
} }

@ -5,6 +5,7 @@ using System.IO;
using System.Net; using System.Net;
using System.Text; using System.Text;
using ZeroLevel.Services.Extensions; using ZeroLevel.Services.Extensions;
using ZeroLevel.Services.Memory;
namespace ZeroLevel.Services.Serialization namespace ZeroLevel.Services.Serialization
{ {
@ -14,7 +15,7 @@ namespace ZeroLevel.Services.Serialization
public sealed class MemoryStreamReader public sealed class MemoryStreamReader
: IBinaryReader : IBinaryReader
{ {
private readonly Stream _stream; private readonly IViewAccessor _accessor;
private bool _reverseByteOrder = false; private bool _reverseByteOrder = false;
public void ReverseByteOrder(bool use_reverse_byte_order) public void ReverseByteOrder(bool use_reverse_byte_order)
@ -25,43 +26,45 @@ namespace ZeroLevel.Services.Serialization
/// <summary> /// <summary>
/// End of stream /// End of stream
/// </summary> /// </summary>
public bool EOS => _stream.Position >= _stream.Length; public bool EOS => _accessor.EOV;
public MemoryStreamReader(byte[] data) public MemoryStreamReader(byte[] data)
{ {
if (data == null) if (data == null)
throw new ArgumentNullException(nameof(data)); throw new ArgumentNullException(nameof(data));
_stream = new MemoryStream(data); _accessor = new StreamVewAccessor(new MemoryStream(data));
} }
public MemoryStreamReader(Stream stream) public MemoryStreamReader(Stream stream)
{ {
if (stream == null) if (stream == null)
throw new ArgumentNullException(nameof(stream)); throw new ArgumentNullException(nameof(stream));
_stream = stream; _accessor = new StreamVewAccessor(stream);
} }
public MemoryStreamReader(MemoryStreamReader reader) public MemoryStreamReader(MemoryStreamReader reader)
{ {
if (reader == null) if (reader == null)
throw new ArgumentNullException(nameof(reader)); throw new ArgumentNullException(nameof(reader));
_stream = reader._stream; _accessor = reader._accessor;
} }
public void Seek(long offset, SeekOrigin origin) public MemoryStreamReader(IViewAccessor accessor)
{ {
_stream.Seek(offset, origin); if (accessor == null)
throw new ArgumentNullException(nameof(accessor));
_accessor = accessor;
} }
public long Position => _stream.Position; public long Position => _accessor.Position;
public void SetPosition(long position) => _accessor.Seek(position);
/// <summary> /// <summary>
/// Flag reading /// Flag reading
/// </summary> /// </summary>
public bool ReadBoolean() public bool ReadBoolean()
{ {
if (CheckOutOfRange(1))
throw new OutOfMemoryException("Array index out of bounds");
return BitConverter.ToBoolean(new byte[1] { ReadByte() }, 0); return BitConverter.ToBoolean(new byte[1] { ReadByte() }, 0);
} }
@ -70,15 +73,12 @@ namespace ZeroLevel.Services.Serialization
/// </summary> /// </summary>
public byte ReadByte() public byte ReadByte()
{ {
if (CheckOutOfRange(1)) var buffer = ReadBuffer(1);
throw new OutOfMemoryException("Array index out of bounds"); return buffer[0];
return (byte)_stream.ReadByte();
} }
public char ReadChar() public char ReadChar()
{ {
if (CheckOutOfRange(2))
throw new OutOfMemoryException("Array index out of bounds");
var buffer = ReadBuffer(2); var buffer = ReadBuffer(2);
return BitConverter.ToChar(buffer, 0); return BitConverter.ToChar(buffer, 0);
} }
@ -187,13 +187,9 @@ namespace ZeroLevel.Services.Serialization
/// </summary> /// </summary>
public byte[] ReadBuffer(int count) public byte[] ReadBuffer(int count)
{ {
if (count == 0) return null;
if (CheckOutOfRange(count)) if (CheckOutOfRange(count))
throw new OutOfMemoryException("Array index out of bounds"); throw new OutOfMemoryException("Array index out of bounds");
var buffer = new byte[count]; var buffer = _accessor.ReadBuffer(count);
var readedCount = _stream.Read(buffer, 0, count);
if (count != readedCount)
throw new InvalidOperationException($"The stream returned less data ({count} bytes) than expected ({readedCount} bytes)");
if (_reverseByteOrder && count > 1) if (_reverseByteOrder && count > 1)
{ {
byte b; byte b;
@ -262,7 +258,7 @@ namespace ZeroLevel.Services.Serialization
/// </summary> /// </summary>
public bool CheckOutOfRange(int offset) public bool CheckOutOfRange(int offset)
{ {
return offset < 0 || (_stream.Position + offset) > _stream.Length; return _accessor.CheckOutOfRange(offset);
} }
#region Extensions #region Extensions
@ -1137,10 +1133,7 @@ namespace ZeroLevel.Services.Serialization
public void Dispose() public void Dispose()
{ {
_stream.Dispose(); _accessor.Dispose();
} }
public Stream Stream => _stream;
} }
} }

@ -6,16 +6,16 @@
</Description> </Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>3.3.8.7</AssemblyVersion> <AssemblyVersion>3.3.8.8</AssemblyVersion>
<PackageReleaseNotes>Fix build configuration from commandf line args.</PackageReleaseNotes> <PackageReleaseNotes>MMF for partition storage</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2022</Copyright> <Copyright>Copyright Ogoun 2023</Copyright>
<PackageLicenseUrl></PackageLicenseUrl> <PackageLicenseUrl></PackageLicenseUrl>
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<Version>3.3.8.7</Version> <Version>3.3.8.8</Version>
<FileVersion>3.3.8.7</FileVersion> <FileVersion>3.3.8.8</FileVersion>
<Platforms>AnyCPU;x64;x86</Platforms> <Platforms>AnyCPU;x64;x86</Platforms>
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
<DebugType>full</DebugType> <DebugType>full</DebugType>

Loading…
Cancel
Save

Powered by TurnKey Linux.