diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs index 1fcbaec..90da852 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs @@ -29,7 +29,7 @@ namespace ZeroLevel.Services.PartitionStorage /// /// Performs a search for data in the repository /// - Task> Search(StoreSearchRequest searchRequest); + IAsyncEnumerable> Search(StoreSearchRequest searchRequest); /// /// bypass all key value by meta /// diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs index d686f3b..464a005 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs @@ -24,7 +24,8 @@ namespace ZeroLevel.Services.PartitionStorage /// /// Search in a partition for a specified keys /// - Task Find(IEnumerable keys, Action searchResultHandler); + IAsyncEnumerable> Find(IEnumerable keys); + /// /// Iterating over all recorded data /// diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs index 368d561..e250b03 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs @@ -88,7 +88,8 @@ namespace ZeroLevel.Services.PartitionStorage Value = default }; } - public async Task Find(IEnumerable keys, Action searchResultHandler) + + public async IAsyncEnumerable> Find(IEnumerable keys) { var results = keys.Distinct() .GroupBy( @@ -96,9 +97,13 @@ namespace ZeroLevel.Services.PartitionStorage k => k, (key, g) => new { FileName = key, Keys = g.ToArray() }); foreach (var group in results) { - await Find(group.FileName, group.Keys, searchResultHandler); + await foreach (var kv in Find(group.FileName, group.Keys)) + { + yield return kv; + } } } + public async IAsyncEnumerable> Iterate() { if (Directory.Exists(_catalog)) @@ -204,10 +209,8 @@ namespace ZeroLevel.Services.PartitionStorage #region Private methods - private async Task Find(string fileName, TKey[] keys, Action searchResultHandler) + private async IAsyncEnumerable> Find(string fileName, TKey[] keys) { - TKey k; - TValue v; var filePath = Path.Combine(_catalog, fileName); if (File.Exists(filePath)) { @@ -242,7 +245,7 @@ namespace ZeroLevel.Services.PartitionStorage var c = _options.KeyComparer(searchKey, kv.Value); if (c == 0) { - searchResultHandler.Invoke(kv.Value, vv.Value); + yield return new KV(kv.Value, vv.Value); break; } else if (c == -1) @@ -274,7 +277,7 @@ namespace ZeroLevel.Services.PartitionStorage var c = _options.KeyComparer(keys_arr[index], kv.Value); if (c == 0) { - searchResultHandler.Invoke(kv.Value, vv.Value); + yield return new KV(kv.Value, vv.Value); index++; } else if (c == -1) diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs index dbc8a6c..37877d7 100644 --- a/ZeroLevel/Services/PartitionStorage/Store.cs +++ b/ZeroLevel/Services/PartitionStorage/Store.cs @@ -70,38 +70,29 @@ namespace ZeroLevel.Services.PartitionStorage _fileAccessorCachee.DropAllIndexReaders(); } - public async Task> Search(StoreSearchRequest searchRequest) + public async IAsyncEnumerable> Search(StoreSearchRequest searchRequest) { - var result = new StoreSearchResult(); - var results = new ConcurrentDictionary>>(); if (searchRequest.PartitionSearchRequests?.Any() ?? false) { var partitionsSearchInfo = searchRequest .PartitionSearchRequests .ToDictionary(r => r.Info, r => r.Keys); - var options = new ParallelOptions - { - MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism - }; - await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) => + foreach(var pair in partitionsSearchInfo) { var accessor = CreateAccessor(pair.Key); if (accessor != null) { using (accessor) { - var set = new List>(); - await foreach (var kv in accessor.Iterate()) + var set = new ConcurrentBag>(); + await foreach (var kv in accessor.Find(pair.Value)) { - set.Add(new KV(kv.Key, kv.Value)); + yield return kv; } - results[pair.Key] = set; } } - }); + } } - result.Results = results; - return result; } public void Dispose()