using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.PartitionStorage.Partition;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
///
/// Performs merging of new data with existing data in the partition
///
internal sealed class StoreMergePartitionAccessor
: IStorePartitionMergeBuilder
{
private readonly Func> _decompress;
///
/// Exists compressed catalog
///
private readonly IStorePartitionAccessor _accessor;
private readonly PhisicalFileAccessorCachee _phisicalFileAccessor;
private readonly string _temporaryFolder;
private readonly Func _keyDeserializer;
private readonly Func _valueDeserializer;
public long TotalRecords
{
get { return _temporaryAccessor.TotalRecords; }
}
///
/// Write catalog
///
private readonly IStorePartitionBuilder _temporaryAccessor;
public StoreMergePartitionAccessor(StoreOptions options,
TMeta info,
Func> decompress,
IStoreSerializer serializer,
PhisicalFileAccessorCachee cachee)
{
if (decompress == null) throw new ArgumentNullException(nameof(decompress));
_decompress = decompress;
_accessor = new StorePartitionAccessor(options, info, serializer, cachee);
_temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString());
var tempOptions = options.Clone();
tempOptions.RootFolder = _temporaryFolder;
_temporaryAccessor = new StorePartitionBuilder(tempOptions, info, serializer, cachee);
_phisicalFileAccessor = cachee;
_keyDeserializer = MessageSerializer.GetDeserializer();
_valueDeserializer = MessageSerializer.GetDeserializer();
}
#region API methods
///
/// Deletes only new entries. Existing entries remain unchanged.
///
public void DropData() => _temporaryAccessor.DropData();
public string GetCatalogPath() => _accessor.GetCatalogPath();
public async Task Store(TKey key, TInput value) => await _temporaryAccessor.Store(key, value);
public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(),
_temporaryAccessor.CountDataFiles());
///
/// Performs compression/grouping of recorded data in a partition
///
public async Task Compress()
{
var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath());
if (newFiles != null && newFiles.Length > 0)
{
var folder = _accessor.GetCatalogPath();
var existsFiles = Directory.GetFiles(folder)
?.ToDictionary(f => Path.GetFileName(f), f => f);
foreach (var file in newFiles)
{
var name = Path.GetFileName(file);
// if datafile by key exists
if (existsFiles.ContainsKey(name))
{
// append all records from existing file to new
foreach (var r in IterateReadKeyInputs(existsFiles[name]))
{
foreach (var i in r.Value)
{
await _temporaryAccessor.Store(r.Key, i);
}
}
}
}
_temporaryAccessor.CompleteAdding();
// compress new file
foreach (var file in newFiles)
{
await (_temporaryAccessor as StorePartitionBuilder)
.CompressFile(file);
}
// replace old file by new
foreach (var file in newFiles)
{
_phisicalFileAccessor.DropDataReader(file);
// 1. Remove index file
(_accessor as StorePartitionAccessor)
.DropFileIndex(file);
// 2. Replace source
var name = Path.GetFileName(file);
var updateFilePath = Path.Combine(folder, name);
_phisicalFileAccessor.LockFile(updateFilePath);
try
{
if (File.Exists(updateFilePath))
{
File.Delete(updateFilePath);
}
File.Move(file, updateFilePath);
}
finally
{
_phisicalFileAccessor.UnlockFile(updateFilePath);
}
// 3. Rebuil index
await (_accessor as BasePartition).RebuildFileIndex(name);
}
}
// remove temporary files
_temporaryAccessor.DropData();
Directory.Delete(_temporaryFolder, true);
}
#endregion
#region Private methods
private IEnumerable>>
IterateReadKeyInputs(string filePath)
{
if (File.Exists(filePath))
{
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
using (var reader = new MemoryStreamReader(stream))
{
while (reader.EOS == false)
{
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var input = _decompress(v);
yield return new SearchResult>
{
Key = k,
Value = input,
Success = true
};
}
}
}
else
{
Log.SystemError($"File not found '{filePath}'");
}
}
#endregion
public void Dispose()
{
_accessor.Dispose();
_temporaryAccessor.Dispose();
}
}
}