using System; using System.Collections.Generic; using System.IO; using System.Linq; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage { /// /// For writing new values in exist partition /// /// ORDER: Store -> CompleteAddingAndCompress -> RebuildIndex /// /// public class StoreMergePartitionAccessor : IStorePartitionBuilder { private readonly Func> _decompress; /// /// Exists compressed catalog /// private readonly IStorePartitionAccessor _accessor; private readonly string _temporaryFolder; /// /// Write catalog /// private readonly IStorePartitionBuilder _temporaryAccessor; public StoreMergePartitionAccessor(StoreOptions options, TMeta info, Func> decompress) { if (decompress == null) throw new ArgumentNullException(nameof(decompress)); _decompress = decompress; _accessor = new StorePartitionAccessor(options, info); _temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); var tempOptions = options.Clone(); tempOptions.RootFolder = _temporaryFolder; _temporaryAccessor = new StorePartitionBuilder(tempOptions, info); } private IEnumerable>> IterateReadKeyInputs(string filePath) { if (File.Exists(filePath)) { var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024); using (var reader = new MemoryStreamReader(stream)) { while (reader.EOS == false) { var k = reader.ReadCompatible(); var v = reader.ReadCompatible(); var input = _decompress(v); yield return new StorePartitionKeyValueSearchResult> { Key = k, Value = input, Found = true }; } } } } public void CompleteAddingAndCompress() { var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath()); if (newFiles != null && newFiles.Length > 0) { var folder = _accessor.GetCatalogPath(); var existsFiles = Directory.GetFiles(folder) ?.ToDictionary(f => Path.GetFileName(f), f => f); foreach (var file in newFiles) { var name = Path.GetFileName(file); // if datafile by key exists if (existsFiles.ContainsKey(name)) { // append all records from existing file to new foreach (var r in IterateReadKeyInputs(existsFiles[name])) { foreach (var i in r.Value) { _temporaryAccessor.Store(r.Key, i); } } } } (_temporaryAccessor as StorePartitionBuilder).CloseStreams(); // compress new file foreach (var file in newFiles) { (_temporaryAccessor as StorePartitionBuilder) .CompressFile(file); } // replace old file by new foreach (var file in newFiles) { var name = Path.GetFileName(file); File.Move(file, Path.Combine(folder, name), true); } } // remove temporary files _temporaryAccessor.DropData(); Directory.Delete(_temporaryFolder, true); } /// /// Deletes only new entries. Existing entries remain unchanged. /// public void DropData() => _temporaryAccessor.DropData(); public string GetCatalogPath() => _accessor.GetCatalogPath(); public void RebuildIndex() => _accessor.RebuildIndex(); public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value); public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(), _temporaryAccessor.CountDataFiles()); public void Dispose() { _accessor.Dispose(); _temporaryAccessor.Dispose(); } } }