PartitionStorage

TotalRecords, count of recorded and merged values
pull/4/head
Ogoun 2 years ago
parent 298e30345d
commit 045b393a95

@ -9,7 +9,8 @@ namespace PartitionFileStorageTest
{ {
internal class Program internal class Program
{ {
const int PAIRS_COUNT = 200_000_000; // const int PAIRS_COUNT = 200_000_000;
const int PAIRS_COUNT = 200_000;
private class Metadata private class Metadata
{ {
@ -75,7 +76,6 @@ namespace PartitionFileStorageTest
var testKeys1 = new List<ulong>(); var testKeys1 = new List<ulong>();
var testKeys2 = new List<ulong>(); var testKeys2 = new List<ulong>();
var testData = new Dictionary<ulong, HashSet<ulong>>(); var testData = new Dictionary<ulong, HashSet<ulong>>();
var total = 0L;
var insertCount = (int)(0.7 * pairs.Count); var insertCount = (int)(0.7 * pairs.Count);
@ -86,7 +86,6 @@ namespace PartitionFileStorageTest
if (testData.ContainsKey(key) == false) testData[key] = new HashSet<ulong>(); if (testData.ContainsKey(key) == false) testData[key] = new HashSet<ulong>();
testData[key].Add(val); testData[key].Add(val);
storePart.Store(key, val); storePart.Store(key, val);
total++;
if (key % 717 == 0) if (key % 717 == 0)
{ {
testKeys1.Add(key); testKeys1.Add(key);
@ -98,7 +97,7 @@ namespace PartitionFileStorageTest
} }
sw.Stop(); sw.Stop();
Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms"); Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms. Records writed: {storePart.TotalRecords}");
sw.Restart(); sw.Restart();
storePart.CompleteAdding(); storePart.CompleteAdding();
storePart.Compress(); storePart.Compress();
@ -116,12 +115,11 @@ namespace PartitionFileStorageTest
{ {
var key = pairs[i].Item1; var key = pairs[i].Item1;
var val = pairs[i].Item2; var val = pairs[i].Item2;
total++;
if (testData.ContainsKey(key) == false) testData[key] = new HashSet<ulong>(); if (testData.ContainsKey(key) == false) testData[key] = new HashSet<ulong>();
testData[key].Add(val); testData[key].Add(val);
merger.Store(key, val); merger.Store(key, val);
} }
Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {total}"); Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. New records merged: {merger.TotalRecords}");
merger.Compress(); // auto reindex merger.Compress(); // auto reindex
sw.Stop(); sw.Stop();
Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms");
@ -396,8 +394,10 @@ namespace PartitionFileStorageTest
// FastTest(options); // FastTest(options);
FSUtils.CleanAndTestFolder(root); FSUtils.CleanAndTestFolder(root);
FullStoreTest(options, pairs); FullStoreTest(options, pairs);
/*
FSUtils.CleanAndTestFolder(root); FSUtils.CleanAndTestFolder(root);
FullStoreMultithreadTest(optionsMultiThread, pairs); FullStoreMultithreadTest(optionsMultiThread, pairs);
*/
Console.ReadKey(); Console.ReadKey();
} }
} }

@ -11,6 +11,10 @@ namespace ZeroLevel.Services.PartitionStorage
public interface IStorePartitionBuilder<TKey, TInput, TValue> public interface IStorePartitionBuilder<TKey, TInput, TValue>
: IStorePartitionBase<TKey, TInput, TValue> : IStorePartitionBase<TKey, TInput, TValue>
{ {
long TotalRecords
{
get;
}
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TInput>> Iterate(); IEnumerable<StorePartitionKeyValueSearchResult<TKey, TInput>> Iterate();
/// <summary> /// <summary>
/// Writing a key-value pair /// Writing a key-value pair

@ -9,6 +9,10 @@
public interface IStorePartitionMergeBuilder<TKey, TInput, TValue> public interface IStorePartitionMergeBuilder<TKey, TInput, TValue>
: IStorePartitionBase<TKey, TInput, TValue> : IStorePartitionBase<TKey, TInput, TValue>
{ {
long TotalRecords
{
get;
}
/// <summary> /// <summary>
/// Writing a key-value pair /// Writing a key-value pair
/// </summary> /// </summary>

@ -24,6 +24,11 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer; private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer; private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
public long TotalRecords
{
get { return _temporaryAccessor.TotalRecords; }
}
/// <summary> /// <summary>
/// Write catalog /// Write catalog
/// </summary> /// </summary>

@ -2,7 +2,6 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Interfaces;
@ -16,6 +15,10 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
private readonly Action<TKey, TInput> _storeMethod; private readonly Action<TKey, TInput> _storeMethod;
private long _totalRecords = 0;
public long TotalRecords { get { return _totalRecords; } }
public StorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options, public StorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info, TMeta info,
IStoreSerializer<TKey, TInput, TValue> serializer) IStoreSerializer<TKey, TInput, TValue> serializer)
@ -38,11 +41,14 @@ namespace ZeroLevel.Services.PartitionStorage
public void Store(TKey key, TInput value) public void Store(TKey key, TInput value)
{ {
_storeMethod.Invoke(key, value); _storeMethod.Invoke(key, value);
Interlocked.Increment(ref _totalRecords);
} }
public void CompleteAdding() public void CompleteAdding()
{ {
CloseWriteStreams(); CloseWriteStreams();
} }
public void Compress() public void Compress()
{ {
var files = Directory.GetFiles(_catalog); var files = Directory.GetFiles(_catalog);

@ -6,7 +6,7 @@
</Description> </Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>3.3.8.3</AssemblyVersion> <AssemblyVersion>3.3.8.4</AssemblyVersion>
<PackageReleaseNotes>PartitionStorage. ThreadSafeWriting option</PackageReleaseNotes> <PackageReleaseNotes>PartitionStorage. ThreadSafeWriting option</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2022</Copyright> <Copyright>Copyright Ogoun 2022</Copyright>
@ -14,8 +14,8 @@
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<Version>3.3.8.3</Version> <Version>3.3.8.4</Version>
<FileVersion>3.3.8.3</FileVersion> <FileVersion>3.3.8.4</FileVersion>
<Platforms>AnyCPU;x64;x86</Platforms> <Platforms>AnyCPU;x64;x86</Platforms>
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
<DebugType>full</DebugType> <DebugType>full</DebugType>

Loading…
Cancel
Save

Powered by TurnKey Linux.