using System; using System.Collections.Generic; using System.IO; using System.Linq; using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Partition; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage { /// /// Performs merging of new data with existing data in the partition /// internal sealed class StoreMergePartitionAccessor : IStorePartitionMergeBuilder { private readonly Func> _decompress; /// /// Exists compressed catalog /// private readonly IStorePartitionAccessor _accessor; private readonly string _temporaryFolder; private readonly Func _keyDeserializer; private readonly Func _valueDeserializer; public long TotalRecords { get { return _temporaryAccessor.TotalRecords; } } /// /// Write catalog /// private readonly IStorePartitionBuilder _temporaryAccessor; public StoreMergePartitionAccessor(StoreOptions options, TMeta info, Func> decompress, IStoreSerializer serializer) { if (decompress == null) throw new ArgumentNullException(nameof(decompress)); _decompress = decompress; _accessor = new StorePartitionAccessor(options, info, serializer); _temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); var tempOptions = options.Clone(); tempOptions.RootFolder = _temporaryFolder; _temporaryAccessor = new StorePartitionBuilder(tempOptions, info, serializer); _keyDeserializer = MessageSerializer.GetDeserializer(); _valueDeserializer = MessageSerializer.GetDeserializer(); } #region API methods /// /// Deletes only new entries. Existing entries remain unchanged. /// public void DropData() => _temporaryAccessor.DropData(); public string GetCatalogPath() => _accessor.GetCatalogPath(); public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value); public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(), _temporaryAccessor.CountDataFiles()); /// /// Performs compression/grouping of recorded data in a partition /// public void Compress() { var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath()); if (newFiles != null && newFiles.Length > 0) { var folder = _accessor.GetCatalogPath(); var existsFiles = Directory.GetFiles(folder) ?.ToDictionary(f => Path.GetFileName(f), f => f); foreach (var file in newFiles) { var name = Path.GetFileName(file); // if datafile by key exists if (existsFiles.ContainsKey(name)) { // append all records from existing file to new foreach (var r in IterateReadKeyInputs(existsFiles[name])) { foreach (var i in r.Value) { _temporaryAccessor.Store(r.Key, i); } } } } _temporaryAccessor.CompleteAdding(); // compress new file foreach (var file in newFiles) { (_temporaryAccessor as StorePartitionBuilder) .CompressFile(file); } // replace old file by new foreach (var file in newFiles) { // 1. Remove index file (_accessor as StorePartitionAccessor) .DropFileIndex(file); // 2. Replace source var name = Path.GetFileName(file); File.Move(file, Path.Combine(folder, name), true); // 3. Rebuil index (_accessor as BasePartition).RebuildFileIndex(name); } } // remove temporary files _temporaryAccessor.DropData(); Directory.Delete(_temporaryFolder, true); } #endregion #region Private methods private IEnumerable>> IterateReadKeyInputs(string filePath) { if (File.Exists(filePath)) { var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); using (var reader = new MemoryStreamReader(stream)) { while (reader.EOS == false) { var k = _keyDeserializer.Invoke(reader); var v = _valueDeserializer.Invoke(reader); var input = _decompress(v); yield return new StorePartitionKeyValueSearchResult> { Key = k, Value = input, Status = SearchResult.Success }; } } } } #endregion public void Dispose() { _accessor.Dispose(); _temporaryAccessor.Dispose(); } } }