mirror of https://github.com/ogoun/Zero.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
50 lines
2.0 KiB
50 lines
2.0 KiB
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace ZeroLevel.Services.PartitionStorage
|
|
{
|
|
public class Store<TKey, TInput, TValue, TMeta> :
|
|
IStore<TKey, TInput, TValue, TMeta>
|
|
{
|
|
private readonly IStoreOptions<TKey, TInput, TValue, TMeta> _options;
|
|
public Store(IStoreOptions<TKey, TInput, TValue, TMeta> options)
|
|
{
|
|
if (options == null) throw new ArgumentNullException(nameof(options));
|
|
_options = options;
|
|
if (Directory.Exists(_options.RootFolder) == false)
|
|
{
|
|
Directory.CreateDirectory(_options.RootFolder);
|
|
}
|
|
}
|
|
|
|
public IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info)
|
|
{
|
|
return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info);
|
|
}
|
|
|
|
public async Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest)
|
|
{
|
|
var result = new StoreSearchResult<TKey, TValue, TMeta>();
|
|
var results = new ConcurrentDictionary<TMeta, IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>>>();
|
|
if (searchRequest.PartitionSearchRequests?.Any() ?? false)
|
|
{
|
|
var partitionsSearchInfo = searchRequest.PartitionSearchRequests.ToDictionary(r => r.Info, r => r.Keys);
|
|
var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism };
|
|
await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) =>
|
|
{
|
|
using (var accessor = CreateAccessor(pair.Key))
|
|
{
|
|
results[pair.Key] = accessor.Find(pair.Value).ToArray();
|
|
}
|
|
});
|
|
}
|
|
result.Results = results;
|
|
return result;
|
|
}
|
|
}
|
|
}
|