You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Zero/ZeroLevel/Services/PartitionStorage/Store.cs

144 lines
5.0 KiB

2 years ago
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.PartitionStorage.Interfaces;
2 years ago
namespace ZeroLevel.Services.PartitionStorage
{
public class Store<TKey, TInput, TValue, TMeta> :
IStore<TKey, TInput, TValue, TMeta>, IDisposable
2 years ago
{
private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly IStoreSerializer<TKey, TInput, TValue> _serializer;
private readonly PhisicalFileAccessorCachee _fileAccessorCachee;
public Store(StoreOptions<TKey, TInput, TValue, TMeta> options,
IStoreSerializer<TKey, TInput, TValue> serializer = null)
2 years ago
{
if (options == null) throw new ArgumentNullException(nameof(options));
_options = options;
if (serializer == null)
{
_serializer = new StoreStandartSerializer<TKey, TInput, TValue>();
}
else
{
_serializer = serializer;
}
2 years ago
if (Directory.Exists(_options.RootFolder) == false)
{
Directory.CreateDirectory(_options.RootFolder);
}
_fileAccessorCachee = new PhisicalFileAccessorCachee(options.PhisicalFileAccessorExpirationPeriod, TimeSpan.FromHours(2));
2 years ago
}
public void RemovePartition(TMeta info)
{
var partition = CreateAccessor(info);
if (partition != null)
{
string path;
using (partition)
{
path = partition.GetCatalogPath();
partition.DropData();
}
FSUtils.RemoveFolder(path);
}
}
2 years ago
public IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info)
{
if (false == Directory.Exists(_options.GetCatalogPath(info)))
{
return null;
}
return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, _serializer, _fileAccessorCachee);
2 years ago
}
public IStorePartitionBuilder<TKey, TInput, TValue> CreateBuilder(TMeta info)
{
return new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(_options, info, _serializer, _fileAccessorCachee);
}
public IStorePartitionMergeBuilder<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor)
{
return new StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, decompressor, _serializer, _fileAccessorCachee);
}
public void DropCache()
{
_fileAccessorCachee.DropAllDataReaders();
_fileAccessorCachee.DropAllIndexReaders();
}
1 year ago
public StoreSearchResult<TKey, TValue, TMeta> Search(StoreSearchRequest<TKey, TMeta> searchRequest)
2 years ago
{
var result = new StoreSearchResult<TKey, TValue, TMeta>();
var results = new ConcurrentDictionary<TMeta, IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>>>();
if (searchRequest.PartitionSearchRequests?.Any() ?? false)
{
var partitionsSearchInfo = searchRequest
.PartitionSearchRequests
.ToDictionary(r => r.Info, r => r.Keys);
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism
};
1 year ago
Parallel.ForEach(partitionsSearchInfo, options, (pair, _) =>
2 years ago
{
var accessor = CreateAccessor(pair.Key);
if (accessor != null)
2 years ago
{
using (accessor)
{
results[pair.Key] = accessor
.Find(pair.Value)
.ToArray();
}
2 years ago
}
});
}
result.Results = results;
return result;
}
public void Dispose()
{
_fileAccessorCachee.Dispose();
}
public void Bypass(TMeta meta, Action<TKey, TValue> handler)
{
var accessor = CreateAccessor(meta);
if (accessor != null)
{
using (accessor)
{
foreach (var kv in accessor.Iterate())
{
handler.Invoke(kv.Key, kv.Value);
}
}
}
}
public bool Exists(TMeta meta, TKey key)
{
var accessor = CreateAccessor(meta);
if (accessor != null)
{
using (accessor)
{
return accessor.Find(key).Status == SearchResult.Success;
}
}
return false;
}
2 years ago
}
}

Powered by TurnKey Linux.