using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
///
/// Responsible for building index files
///
internal sealed class IndexBuilder
{
private const string INDEX_SUBFOLDER_NAME = "__indexes__";
private readonly IndexStepType _indexType;
private readonly string _indexCatalog;
private readonly string _dataCatalog;
private readonly int _stepValue;
private readonly Func _keyDeserializer;
private readonly Func _valueDeserializer;
private readonly PhisicalFileAccessorCachee _phisicalFileAccessorCachee;
public IndexBuilder(IndexStepType indexType, int stepValue, string dataCatalog, PhisicalFileAccessorCachee phisicalFileAccessorCachee)
{
_dataCatalog = dataCatalog;
_indexCatalog = Path.Combine(dataCatalog, INDEX_SUBFOLDER_NAME);
_indexType = indexType;
_stepValue = stepValue;
_keyDeserializer = MessageSerializer.GetDeserializer();
_valueDeserializer = MessageSerializer.GetDeserializer();
_phisicalFileAccessorCachee = phisicalFileAccessorCachee;
}
///
/// Rebuild indexes for all files
///
internal void RebuildIndex()
{
var files = Directory.GetFiles(_dataCatalog);
if (files != null && files.Length > 0)
{
foreach (var file in files)
{
RebuildFileIndex(Path.GetFileName(file));
}
}
}
///
/// Rebuild index for the specified file
///
internal void RebuildFileIndex(string file)
{
if (_indexType == IndexStepType.AbsoluteCount)
{
RebuildFileIndexWithAbsoluteCountIndexes(file);
}
else
{
RebuildFileIndexWithSteps(file);
}
}
///
/// Delete the index for the specified file
///
internal void DropFileIndex(string file)
{
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
_phisicalFileAccessorCachee.DropIndexReader(index_file);
if (File.Exists(index_file))
{
File.Delete(index_file);
}
}
///
/// Rebuild index with specified number of steps for specified file
///
private void RebuildFileIndexWithAbsoluteCountIndexes(string file)
{
if (false == Directory.Exists(_indexCatalog))
{
Directory.CreateDirectory(_indexCatalog);
}
var dict = new Dictionary();
using (var reader = new MemoryStreamReader(new FileStream(Path.Combine(_dataCatalog, file), FileMode.Open, FileAccess.Read, FileShare.None)))
{
while (reader.EOS == false)
{
var pos = reader.Position;
var k = _keyDeserializer.Invoke(reader);
dict[k] = pos;
_valueDeserializer.Invoke(reader);
}
}
if (dict.Count > _stepValue)
{
var step = (int)Math.Round(dict.Count / (float)_stepValue, MidpointRounding.ToZero);
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
DropFileIndex(index_file);
var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{
for (int i = 0; i < _stepValue; i++)
{
var pair = d_arr[i * step];
writer.WriteCompatible(pair.Key);
writer.WriteLong(pair.Value);
}
}
}
}
///
/// Rebuild index with specified step for keys
///
private void RebuildFileIndexWithSteps(string file)
{
if (false == Directory.Exists(_indexCatalog))
{
Directory.CreateDirectory(_indexCatalog);
}
using (var reader = new MemoryStreamReader(new FileStream(Path.Combine(_dataCatalog, file), FileMode.Open, FileAccess.Read, FileShare.None)))
{
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
DropFileIndex(index_file);
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{
var counter = 1;
while (reader.EOS == false)
{
counter--;
var pos = reader.Position;
var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
if (counter == 0)
{
writer.WriteCompatible(k);
writer.WriteLong(pos);
counter = _stepValue;
}
}
}
}
}
}
}