using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Memory;
using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage.Partition
{
///
/// General operations with a partition
///
internal abstract class BasePartition
: IStorePartitionBase
{
public string Catalog { get { return _catalog; } }
protected readonly TMeta _info;
protected readonly string _catalog;
private SemaphoreSlim _writersLock = new SemaphoreSlim(1);
private readonly Dictionary _writeStreams = new Dictionary();
protected IStoreSerializer Serializer { get; }
protected readonly StoreOptions _options;
private readonly IndexBuilder _indexBuilder;
private readonly PhisicalFileAccessorCachee _phisicalFileAccessor;
protected PhisicalFileAccessorCachee PhisicalFileAccessorCachee => _phisicalFileAccessor;
internal BasePartition(StoreOptions options,
TMeta info,
IStoreSerializer serializer, PhisicalFileAccessorCachee fileAccessorCachee)
{
_options = options;
_info = info;
_catalog = _options.GetCatalogPath(info);
if (Directory.Exists(_catalog) == false)
{
Directory.CreateDirectory(_catalog);
}
_phisicalFileAccessor = fileAccessorCachee;
Serializer = serializer;
_indexBuilder = _options.Index.Enabled ? new IndexBuilder(_options.Index.StepType, _options.Index.StepValue, _catalog, fileAccessorCachee, Serializer) : null;
}
#region IStorePartitionBase
public int CountDataFiles() => Directory.Exists(_catalog) ? (Directory.GetFiles(_catalog)?.Length ?? 0) : 0;
public string GetCatalogPath() => _catalog;
public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
public void Dispose()
{
CloseWriteStreams();
Release();
}
#endregion
public abstract void Release();
///
/// Rebuild indexes for all files
///
protected async Task RebuildIndexes()
{
if (_options.Index.Enabled)
{
await _indexBuilder.RebuildIndex();
}
}
///
/// Rebuild index for the specified file
///
internal async Task RebuildFileIndex(string file)
{
if (_options.Index.Enabled)
{
await _indexBuilder.RebuildFileIndex(file);
}
}
///
/// Delete the index for the specified file
///
internal void DropFileIndex(string file)
{
if (_options.Index.Enabled)
{
_indexBuilder.DropFileIndex(file);
}
}
///
/// Close all streams for writing
///
protected void CloseWriteStreams()
{
foreach (var s in _writeStreams)
{
try
{
s.Value.Stream.Flush();
s.Value.Dispose();
s.Value.DisposeAsync();
}
catch { }
}
_writeStreams.Clear();
}
protected async Task WriteStreamAction(string fileName, Func writeAction)
{
MemoryStreamWriter writer;
if (_writeStreams.TryGetValue(fileName, out writer) == false)
{
await _writersLock.WaitAsync();
try
{
if (_writeStreams.TryGetValue(fileName, out writer) == false)
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
var new_w = new MemoryStreamWriter(stream);
_writeStreams[fileName] = new_w;
writer = new_w;
}
}
finally
{
_writersLock.Release();
}
}
await writeAction.Invoke(writer);
}
protected async Task SafeWriteStreamAction(string fileName, Func writeAction)
{
MemoryStreamWriter writer;
if (_writeStreams.TryGetValue(fileName, out writer) == false)
{
await _writersLock.WaitAsync();
try
{
if (_writeStreams.TryGetValue(fileName, out writer) == false)
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
var new_w = new MemoryStreamWriter(stream);
_writeStreams[fileName] = new_w;
writer = new_w;
}
}
finally
{
_writersLock.Release();
}
}
await writer.WaitLockAsync();
try
{
await writeAction.Invoke(writer);
}
finally
{
writer.Release();
}
}
/*
///
/// Attempting to open a file for writing
///
protected bool TryGetWriteStream(string fileName, out MemoryStreamWriter writer)
{
try
{
bool taken = false;
Monitor.Enter(_writeStreams, ref taken);
try
{
if (_writeStreams.TryGetValue(fileName, out var w))
{
writer = w;
return true;
}
else
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
var new_w = new MemoryStreamWriter(stream);
_writeStreams[fileName] = new_w;
writer = new_w;
return true;
}
}
finally
{
if (taken)
{
Monitor.Exit(_writeStreams);
}
}
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionBuilder.TryGetWriteStream]");
}
writer = null;
return false;
}
*/
///
/// Attempting to open a file for reading
///
protected bool TryGetReadStream(string fileName, out MemoryStreamReader reader)
{
try
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
reader = new MemoryStreamReader(stream);
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, "[StorePartitionBuilder.TryGetReadStream]");
}
reader = null;
return false;
}
protected IViewAccessor GetViewAccessor(TKey key, long offset)
{
var fileName = _options.GetFileName(key, _info);
var filePath = Path.Combine(_catalog, fileName);
return GetViewAccessor(filePath, offset);
}
protected IViewAccessor GetViewAccessor(TKey key, long offset, int length)
{
var fileName = _options.GetFileName(key, _info);
var filePath = Path.Combine(_catalog, fileName);
return GetViewAccessor(filePath, offset, length);
}
protected IViewAccessor GetViewAccessor(string filePath, long offset)
{
try
{
return PhisicalFileAccessorCachee.GetDataAccessor(filePath, offset);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[StorePartitionAccessor.GetViewAccessor] '{filePath}'");
}
return null;
}
protected IViewAccessor GetViewAccessor(string filePath, long offset, int length)
{
try
{
return PhisicalFileAccessorCachee.GetDataAccessor(filePath, offset, length);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[StorePartitionAccessor.GetViewAccessor] '{filePath}'");
}
return null;
}
}
}