using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.PartitionStorage.Interfaces; namespace ZeroLevel.Services.PartitionStorage { public class Store : IStore, IDisposable { private readonly StoreOptions _options; private readonly IStoreSerializer _serializer; private readonly PhisicalFileAccessorCachee _fileAccessorCachee; public Store(StoreOptions options, IStoreSerializer serializer = null) { if (options == null) throw new ArgumentNullException(nameof(options)); _options = options; if (serializer == null) { _serializer = new StoreStandartSerializer(); } else { _serializer = serializer; } if (Directory.Exists(_options.RootFolder) == false) { Directory.CreateDirectory(_options.RootFolder); } _fileAccessorCachee = new PhisicalFileAccessorCachee(options.PhisicalFileAccessorExpirationPeriod, TimeSpan.FromHours(2)); } public void RemovePartition(TMeta info) { var partition = CreateAccessor(info); partition.DropData(); FSUtils.RemoveFolder(partition.GetCatalogPath()); } public IStorePartitionAccessor CreateAccessor(TMeta info) { return new StorePartitionAccessor(_options, info, _serializer, _fileAccessorCachee); } public IStorePartitionBuilder CreateBuilder(TMeta info) { return new StorePartitionBuilder(_options, info, _serializer, _fileAccessorCachee); } public IStorePartitionMergeBuilder CreateMergeAccessor(TMeta info, Func> decompressor) { return new StoreMergePartitionAccessor(_options, info, decompressor, _serializer, _fileAccessorCachee); } public void DropCache() { _fileAccessorCachee.DropAllDataReaders(); _fileAccessorCachee.DropAllIndexReaders(); } public async Task> Search(StoreSearchRequest searchRequest) { var result = new StoreSearchResult(); var results = new ConcurrentDictionary>>(); if (searchRequest.PartitionSearchRequests?.Any() ?? false) { var partitionsSearchInfo = searchRequest .PartitionSearchRequests .ToDictionary(r => r.Info, r => r.Keys); var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism }; await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) => { using (var accessor = CreateAccessor(pair.Key)) { results[pair.Key] = accessor .Find(pair.Value) .ToArray(); } }); } result.Results = results; return result; } public void Dispose() { _fileAccessorCachee.Dispose(); } } }