using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.PartitionStorage.Interfaces; namespace ZeroLevel.Services.PartitionStorage { public class Store : IStore, IDisposable { private readonly StoreOptions _options; private readonly IStoreSerializer _serializer; private readonly PhisicalFileAccessorCachee _fileAccessorCachee; public Store(StoreOptions options, IStoreSerializer serializer) { if (options == null) throw new ArgumentNullException(nameof(options)); if (serializer == null) throw new ArgumentNullException(nameof(serializer)); _options = options; _serializer = serializer; if (Directory.Exists(_options.RootFolder) == false) { Directory.CreateDirectory(_options.RootFolder); } _fileAccessorCachee = new PhisicalFileAccessorCachee(options.PhisicalFileAccessorExpirationPeriod, TimeSpan.FromHours(2)); } public void RemovePartition(TMeta info) { var partition = CreateAccessor(info); if (partition != null) { string path; using (partition) { path = partition.GetCatalogPath(); partition.DropData(); } FSUtils.RemoveFolder(path); } } public IStorePartitionAccessor CreateAccessor(TMeta info) { if (false == Directory.Exists(_options.GetCatalogPath(info))) { return null; } return new StorePartitionAccessor(_options, info, _serializer, _fileAccessorCachee); } public IStorePartitionBuilder CreateBuilder(TMeta info) { return new StorePartitionBuilder(_options, info, _serializer, _fileAccessorCachee); } public IStorePartitionMergeBuilder CreateMergeAccessor(TMeta info, Func> decompressor) { return new StoreMergePartitionAccessor(_options, info, decompressor, _serializer, _fileAccessorCachee); } public void DropCache() { _fileAccessorCachee.DropAllDataReaders(); _fileAccessorCachee.DropAllIndexReaders(); } public async IAsyncEnumerable> Search(StoreSearchRequest searchRequest) { if (searchRequest.PartitionSearchRequests?.Any() ?? false) { var partitionsSearchInfo = searchRequest .PartitionSearchRequests .ToDictionary(r => r.Info, r => r.Keys); foreach(var pair in partitionsSearchInfo) { var accessor = CreateAccessor(pair.Key); if (accessor != null) { using (accessor) { await foreach (var kv in accessor.Find(pair.Value)) { yield return new KVM(kv.Key, kv.Value, pair.Key); } } } } } } public void Dispose() { _fileAccessorCachee.Dispose(); } public async IAsyncEnumerable> Bypass(TMeta meta) { var accessor = CreateAccessor(meta); if (accessor != null) { using (accessor) { await foreach (var kv in accessor.Iterate()) { yield return kv; } } } } public async IAsyncEnumerable BypassKeys(TMeta meta) { var accessor = CreateAccessor(meta); if (accessor != null) { using (accessor) { await foreach (var kv in accessor.IterateKeys()) { yield return kv; } } } } public async Task Exists(TMeta meta, TKey key) { var accessor = CreateAccessor(meta); if (accessor != null) { using (accessor) { var info = await accessor.Find(key); return info.Success; } } return false; } } }