Update PartitionStorage

ThreadSafeWriting option
pull/4/head
Ogoun 2 years ago
parent f35082fa59
commit 298e30345d

@ -8,7 +8,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.FASTER.Core" Version="2.0.22" /> <PackageReference Include="Microsoft.FASTER.Core" Version="2.1.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

@ -1,4 +1,5 @@
using System.Diagnostics; using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text; using System.Text;
using ZeroLevel; using ZeroLevel;
using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.FileSystem;
@ -8,6 +9,8 @@ namespace PartitionFileStorageTest
{ {
internal class Program internal class Program
{ {
const int PAIRS_COUNT = 200_000_000;
private class Metadata private class Metadata
{ {
public DateTime Date { get; set; } public DateTime Date { get; set; }
@ -18,33 +21,13 @@ namespace PartitionFileStorageTest
var num = new StringBuilder(); var num = new StringBuilder();
num.Append("79"); num.Append("79");
num.Append(r.Next(99).ToString("D2")); num.Append(r.Next(99).ToString("D2"));
num.Append(r.Next(999).ToString("D7")); num.Append(r.Next(9999).ToString("D7"));
return ulong.Parse(num.ToString()); return ulong.Parse(num.ToString());
} }
private static void FastTest(string root) private static void FastTest(StoreOptions<ulong, ulong, byte[], Metadata> options)
{ {
var r = new Random(Environment.TickCount); var r = new Random(Environment.TickCount);
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{
Index = new IndexOptions
{
Enabled = true,
StepType = IndexStepType.AbsoluteCount,
StepValue = 64 },
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
{
ulong s = 0;
return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
},
Partitions = new List<StoreCatalogPartition<Metadata>>
{
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
};
var store = new Store<ulong, ulong, byte[], Metadata>(options); var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) });
@ -80,115 +63,44 @@ namespace PartitionFileStorageTest
} }
} }
private static void FullStoreTest(string root) private static void FullStoreTest(StoreOptions<ulong, ulong, byte[], Metadata> options,
List<(ulong, ulong)> pairs)
{ {
var meta = new Metadata { Date = new DateTime(2022, 11, 08) };
var r = new Random(Environment.TickCount); var r = new Random(Environment.TickCount);
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{
Index = new IndexOptions
{
Enabled = true,
StepType = IndexStepType.Step,
StepValue = 1
},
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
{
ulong s = 0;
return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
},
Partitions = new List<StoreCatalogPartition<Metadata>>
{
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
};
FSUtils.CleanAndTestFolder(root);
var store = new Store<ulong, ulong, byte[], Metadata>(options); var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); var storePart = store.CreateBuilder(meta);
/*Log.Info("Fill start");
for (int i = 0; i < 10000000; i++)
{
var s = Generate(r);
var count = r.Next(200);
for (int j = 0; j < count; j++)
{
var t = Generate(r);
storePart.Store(s, t);
}
}
storePart.CompleteAdding();
Log.Info("Fill complete");
long cnt = 0;
foreach (var p in storePart.Iterate())
{
if (p.Key % 2 == 0) cnt++;
}
Log.Info(cnt.ToString());
Log.Info("Fill test complete");
storePart.Compress();
Log.Info("Compress complete");
var reader = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) });
cnt = 0;
foreach (var p in reader.Iterate())
{
if (p.Key % 2 == 0) cnt++;
}
Log.Info(cnt.ToString());
Log.Info("Compress test complete");
storePart.DropData();
Log.Info("Complete#1");
//Console.ReadKey();
FSUtils.CleanAndTestFolder(root);*/
var sw = new Stopwatch(); var sw = new Stopwatch();
sw.Start(); sw.Start();
var testKeys1 = new List<ulong>(); var testKeys1 = new List<ulong>();
var testKeys2 = new List<ulong>(); var testKeys2 = new List<ulong>();
var testData = new Dictionary<ulong, HashSet<ulong>>(); var testData = new Dictionary<ulong, HashSet<ulong>>();
var total = 0L; var total = 0L;
for (int i = 0; i < 2000000; i++) var insertCount = (int)(0.7 * pairs.Count);
for (int i = 0; i < insertCount; i++)
{ {
var s = Generate(r); var key = pairs[i].Item1;
if (testData.ContainsKey(s) == false) testData[s] = new HashSet<ulong>(); var val = pairs[i].Item2;
var count = r.Next(300); if (testData.ContainsKey(key) == false) testData[key] = new HashSet<ulong>();
testData[key].Add(val);
storePart.Store(key, val);
total++; total++;
for (int j = 0; j < count; j++) if (key % 717 == 0)
{ {
total++; testKeys1.Add(key);
var t = Generate(r);
storePart.Store(s, t);
testData[s].Add(t);
} }
if (s % 177 == 0) if (key % 117 == 0)
{ {
testKeys1.Add(s); testKeys2.Add(key);
}
if (s % 223 == 0)
{
testKeys2.Add(s);
} }
} }
sw.Stop(); sw.Stop();
Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms"); Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms");
sw.Restart(); sw.Restart();
storePart.CompleteAdding(); storePart.CompleteAdding();
storePart.Compress(); storePart.Compress();
sw.Stop(); sw.Stop();
Log.Info($"Compress: {sw.ElapsedMilliseconds}ms"); Log.Info($"Compress: {sw.ElapsedMilliseconds}ms");
@ -199,20 +111,15 @@ namespace PartitionFileStorageTest
Log.Info("Start merge test"); Log.Info("Start merge test");
sw.Restart(); sw.Restart();
var merger = store.CreateMergeAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }, data => Compressor.DecodeBytesContent(data)); var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data));
for (int i = 0; i < 2300000; i++) for (int i = insertCount; i < pairs.Count; i++)
{ {
var key = pairs[i].Item1;
var val = pairs[i].Item2;
total++; total++;
var s = Generate(r); if (testData.ContainsKey(key) == false) testData[key] = new HashSet<ulong>();
if (testData.ContainsKey(s) == false) testData[s] = new HashSet<ulong>(); testData[key].Add(val);
var count = r.Next(300); merger.Store(key, val);
for (int j = 0; j < count; j++)
{
total++;
var t = Generate(r);
merger.Store(s, t);
testData[s].Add(t);
}
} }
Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {total}"); Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {total}");
merger.Compress(); // auto reindex merger.Compress(); // auto reindex
@ -220,7 +127,7 @@ namespace PartitionFileStorageTest
Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms");
Log.Info("Test #1 reading"); Log.Info("Test #1 reading");
var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); var readPart = store.CreateAccessor(meta);
ulong totalData = 0; ulong totalData = 0;
ulong totalKeys = 0; ulong totalKeys = 0;
foreach (var key in testKeys1) foreach (var key in testKeys1)
@ -307,15 +214,190 @@ namespace PartitionFileStorageTest
Log.Info("Completed"); Log.Info("Completed");
} }
private static void FullStoreMultithreadTest(StoreOptions<ulong, ulong, byte[], Metadata> options,
List<(ulong, ulong)> pairs)
{
var meta = new Metadata { Date = new DateTime(2022, 11, 08) };
var r = new Random(Environment.TickCount);
var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storePart = store.CreateBuilder(meta);
var sw = new Stopwatch();
sw.Start();
var insertCount = (int)(0.7 * pairs.Count);
var testKeys1 = new ConcurrentBag<ulong>();
var testKeys2 = new ConcurrentBag<ulong>();
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 24 };
Parallel.ForEach(pairs.Take(insertCount).ToArray(), parallelOptions, pair =>
{
var key = pair.Item1;
var val = pair.Item2;
storePart.Store(key, val);
if (key % 717 == 0)
{
testKeys1.Add(key);
}
if (key % 117 == 0)
{
testKeys2.Add(key);
}
});
sw.Stop();
Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storePart.CompleteAdding();
storePart.Compress();
sw.Stop();
Log.Info($"Compress: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storePart.RebuildIndex();
sw.Stop();
Log.Info($"Rebuild indexes: {sw.ElapsedMilliseconds}ms");
Log.Info("Start merge test");
sw.Restart();
var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data));
Parallel.ForEach(pairs.Skip(insertCount).ToArray(), parallelOptions, pair =>
{
var key = pair.Item1;
var val = pair.Item2;
merger.Store(key, val);
});
Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {pairs.Count}");
merger.Compress(); // auto reindex
sw.Stop();
Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms");
Log.Info("Test #1 reading");
var readPart = store.CreateAccessor(meta);
ulong totalData = 0;
ulong totalKeys = 0;
foreach (var key in testKeys1)
{
var result = readPart.Find(key);
totalData += (ulong)(result.Value?.Length ?? 0);
totalKeys++;
}
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
totalData = 0;
totalKeys = 0;
Log.Info("Test #1 remove by keys");
foreach (var key in testKeys1)
{
readPart.RemoveKey(key, false);
}
sw.Restart();
readPart.RebuildIndex();
sw.Stop();
Log.Info($"Rebuild indexes after remove: {sw.ElapsedMilliseconds}ms");
Log.Info("Test #1 reading after remove");
foreach (var key in testKeys1)
{
var result = readPart.Find(key);
totalData += (ulong)(result.Value?.Length ?? 0);
totalKeys++;
}
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
totalData = 0;
totalKeys = 0;
Log.Info("Test #2 reading");
foreach (var key in testKeys2)
{
var result = readPart.Find(key);
totalData += (ulong)(result.Value?.Length ?? 0);
totalKeys++;
}
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
totalData = 0;
totalKeys = 0;
Log.Info("Test #2 remove keys batch");
readPart.RemoveKeys(testKeys2);
Log.Info("Test #2 reading after remove");
foreach (var key in testKeys2)
{
var result = readPart.Find(key);
totalData += (ulong)(result.Value?.Length ?? 0);
totalKeys++;
}
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
totalData = 0;
totalKeys = 0;
Log.Info("Iterator test");
foreach (var e in readPart.Iterate())
{
totalData += (ulong)(e.Value?.Length ?? 0);
totalKeys++;
}
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
Log.Info("Completed");
}
static void Main(string[] args) static void Main(string[] args)
{ {
var root = @"H:\temp";
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{
Index = new IndexOptions
{
Enabled = true,
StepType = IndexStepType.Step,
StepValue = 16
},
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
{
ulong s = 0;
return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
},
Partitions = new List<StoreCatalogPartition<Metadata>>
{
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
ThreadSafeWriting = false
};
var optionsMultiThread = new StoreOptions<ulong, ulong, byte[], Metadata>
{
Index = new IndexOptions
{
Enabled = true,
StepType = IndexStepType.Step,
StepValue = 16
},
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
{
ulong s = 0;
return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
},
Partitions = new List<StoreCatalogPartition<Metadata>>
{
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
ThreadSafeWriting = true
};
Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.FullDebug); Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.FullDebug);
var root = @"H:\temp"; Log.Info("Start");
//FastTest(root);
FullStoreTest(root); var pairs = new List<(ulong, ulong)>(PAIRS_COUNT);
//TestIterations(root); var r = new Random(Environment.TickCount);
//TestRangeCompressionAndInversion(); for (int i = 0; i < PAIRS_COUNT; i++)
{
pairs.Add((Generate(r), Generate(r)));
}
// FastTest(options);
FSUtils.CleanAndTestFolder(root);
FullStoreTest(options, pairs);
FSUtils.CleanAndTestFolder(root);
FullStoreMultithreadTest(optionsMultiThread, pairs);
Console.ReadKey(); Console.ReadKey();
} }
} }

@ -7,7 +7,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" /> <PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

@ -7,7 +7,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="System.Drawing.Common" Version="6.0.0" /> <PackageReference Include="System.Drawing.Common" Version="7.0.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

@ -7,7 +7,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" /> <PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
<PackageReference Include="Topshelf" Version="4.3.0" /> <PackageReference Include="Topshelf" Version="4.3.0" />
</ItemGroup> </ItemGroup>

@ -21,14 +21,6 @@ namespace ZeroLevel.HNSW
/// </remarks> /// </remarks>
public static class CosineDistance public static class CosineDistance
{ {
/// <summary>
/// Calculates cosine distance without making any optimizations.
/// </summary>
/// <param name="u">Left vector.</param>
/// <param name="v">Right vector.</param>
/// <returns>Cosine distance between u and v.</returns>
/// <summary> /// <summary>
/// Calculates cosine distance with assumption that u and v are unit vectors. /// Calculates cosine distance with assumption that u and v are unit vectors.
/// </summary> /// </summary>

@ -5,7 +5,7 @@
<Platforms>AnyCPU;x64</Platforms> <Platforms>AnyCPU;x64</Platforms>
<PlatformTarget>x64</PlatformTarget> <PlatformTarget>x64</PlatformTarget>
<DebugType>full</DebugType> <DebugType>full</DebugType>
<Version>1.0.0.3</Version> <Version>1.0.0.4</Version>
<Company>ogoun</Company> <Company>ogoun</Company>
<Authors>Ogoun</Authors> <Authors>Ogoun</Authors>
<Copyright>Copyright Ogoun 2022</Copyright> <Copyright>Copyright Ogoun 2022</Copyright>
@ -13,7 +13,8 @@
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<PackageReleaseNotes>Fix search output.</PackageReleaseNotes> <PackageReleaseNotes></PackageReleaseNotes>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

@ -35,8 +35,8 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Aurigma.GraphicsMill.Core.x64" Version="10.6.31" /> <PackageReference Include="Aurigma.GraphicsMill.Core.x64" Version="11.0.11" />
<PackageReference Include="Microsoft.ML.OnnxRuntime.Managed" Version="1.13.0" /> <PackageReference Include="Microsoft.ML.OnnxRuntime.Managed" Version="1.13.1" />
<PackageReference Include="SixLabors.ImageSharp" Version="2.1.3" /> <PackageReference Include="SixLabors.ImageSharp" Version="2.1.3" />
<PackageReference Include="SixLabors.ImageSharp.Drawing" Version="1.0.0-beta14" /> <PackageReference Include="SixLabors.ImageSharp.Drawing" Version="1.0.0-beta14" />
</ItemGroup> </ItemGroup>

@ -18,7 +18,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" /> <PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

@ -26,8 +26,8 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="System.Data.Common" Version="4.3.0" /> <PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.3" /> <PackageReference Include="System.Data.SqlClient" Version="4.8.5" />
<PackageReference Include="System.Security.Permissions" Version="6.0.0" /> <PackageReference Include="System.Security.Permissions" Version="7.0.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

@ -9,7 +9,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
<PackageReference Include="xunit" Version="2.4.2" /> <PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5"> <PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>

@ -0,0 +1,83 @@
using System.Threading;
namespace ZeroLevel.DataStructures
{
/// <summary>
/// https://referencesource.microsoft.com/#System.Web/Util/SafeBitVector32.cs,b90a9ea209d602a4
/// </summary>
public struct SafeBit32Vector
{
private volatile int _data;
internal SafeBit32Vector(int data)
{
this._data = data;
}
internal bool this[int bit]
{
get
{
int data = _data;
return (data & bit) == bit;
}
set
{
for (; ; )
{
int oldData = _data;
int newData;
if (value)
{
newData = oldData | bit;
}
else
{
newData = oldData & ~bit;
}
#pragma warning disable 0420
int result = Interlocked.CompareExchange(ref _data, newData, oldData);
#pragma warning restore 0420
if (result == oldData)
{
break;
}
}
}
}
internal bool ChangeValue(int bit, bool value)
{
for (; ; )
{
int oldData = _data;
int newData;
if (value)
{
newData = oldData | bit;
}
else
{
newData = oldData & ~bit;
}
if (oldData == newData)
{
return false;
}
#pragma warning disable 0420
int result = Interlocked.CompareExchange(ref _data, newData, oldData);
#pragma warning restore 0420
if (result == oldData)
{
return true;
}
}
}
}
}

@ -15,11 +15,11 @@ namespace ZeroLevel.Services.Collections
private readonly Action<T[], int> _dischargeAction; private readonly Action<T[], int> _dischargeAction;
public int Count => _count; public int Count => _count;
public Capacitor(int volume, Action<T[], int> dischargeAction) public Capacitor(int dischargeValue, Action<T[], int> dischargeAction)
{ {
if (volume < 1) volume = 16; if (dischargeValue < 1) dischargeValue = 16;
if (dischargeAction == null) throw new ArgumentNullException(nameof(dischargeAction)); if (dischargeAction == null) throw new ArgumentNullException(nameof(dischargeAction));
_buffer = new T[volume]; _buffer = new T[dischargeValue];
_dischargeAction = dischargeAction; _dischargeAction = dischargeAction;
} }
public void Add(T val) public void Add(T val)

@ -93,7 +93,7 @@ namespace ZeroLevel.Services.PartitionStorage
} }
if (dict.Count > _stepValue) if (dict.Count > _stepValue)
{ {
var step = (int)Math.Round(((float)dict.Count / (float)_stepValue), MidpointRounding.ToZero); var step = (int)Math.Round(dict.Count / (float)_stepValue, MidpointRounding.ToZero);
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
var d_arr = dict.OrderBy(p => p.Key).ToArray(); var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
@ -123,7 +123,7 @@ namespace ZeroLevel.Services.PartitionStorage
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{ {
var counter = _stepValue; var counter = 1;
while (reader.EOS == false) while (reader.EOS == false)
{ {
counter--; counter--;

@ -0,0 +1,8 @@
namespace ZeroLevel.Services.PartitionStorage
{
internal struct ValueIndex<TValue>
{
public TValue Value { get; set; }
public long Offset { get; set; }
}
}

@ -9,7 +9,6 @@
public class IndexOptions public class IndexOptions
{ {
public bool Enabled { get; set; } public bool Enabled { get; set; }
public IndexStepType StepType { get; set; } = IndexStepType.AbsoluteCount; public IndexStepType StepType { get; set; } = IndexStepType.AbsoluteCount;
public int StepValue { get; set; } = 64; public int StepValue { get; set; } = 64;
} }

@ -19,7 +19,6 @@ namespace ZeroLevel.Services.PartitionStorage
/// Method for key comparison /// Method for key comparison
/// </summary> /// </summary>
public Func<TKey, TKey, int> KeyComparer { get; set; } public Func<TKey, TKey, int> KeyComparer { get; set; }
/// <summary> /// <summary>
/// Storage root directory /// Storage root directory
/// </summary> /// </summary>
@ -40,6 +39,10 @@ namespace ZeroLevel.Services.PartitionStorage
/// File Partition /// File Partition
/// </summary> /// </summary>
public StoreFilePartition<TKey, TMeta> FilePartition { get; set; } public StoreFilePartition<TKey, TMeta> FilePartition { get; set; }
/// <summary>
/// Uses a thread-safe mechanism for writing to files during multi-threaded writes
/// </summary>
public bool ThreadSafeWriting { get; set; } = false;
public IndexOptions Index { get; set; } = new IndexOptions public IndexOptions Index { get; set; } = new IndexOptions
{ {
@ -85,7 +88,8 @@ namespace ZeroLevel.Services.PartitionStorage
Partitions = this.Partitions Partitions = this.Partitions
.Select(p => new StoreCatalogPartition<TMeta>(p.Name, p.PathExtractor)) .Select(p => new StoreCatalogPartition<TMeta>(p.Name, p.PathExtractor))
.ToList(), .ToList(),
RootFolder = this.RootFolder RootFolder = this.RootFolder,
ThreadSafeWriting = this.ThreadSafeWriting
}; };
return options; return options;
} }

@ -1,9 +1,9 @@
using System.IO; using System;
using System;
using ZeroLevel.Services.Serialization;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.IO;
using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage.Partition namespace ZeroLevel.Services.PartitionStorage.Partition
{ {

@ -2,6 +2,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Interfaces;
@ -13,24 +14,30 @@ namespace ZeroLevel.Services.PartitionStorage
internal sealed class StorePartitionBuilder<TKey, TInput, TValue, TMeta> internal sealed class StorePartitionBuilder<TKey, TInput, TValue, TMeta>
: BasePartition<TKey, TInput, TValue, TMeta>, IStorePartitionBuilder<TKey, TInput, TValue> : BasePartition<TKey, TInput, TValue, TMeta>, IStorePartitionBuilder<TKey, TInput, TValue>
{ {
public StorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options, private readonly Action<TKey, TInput> _storeMethod;
public StorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info, TMeta info,
IStoreSerializer<TKey, TInput, TValue> serializer) IStoreSerializer<TKey, TInput, TValue> serializer)
: base(options, info, serializer) : base(options, info, serializer)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
if (options.ThreadSafeWriting)
{
_storeMethod = StoreDirectSafe;
}
else
{
_storeMethod = StoreDirect;
}
} }
#region IStorePartitionBuilder #region IStorePartitionBuilder
public void Store(TKey key, TInput value) public void Store(TKey key, TInput value)
{ {
var fileName = _options.GetFileName(key, _info); _storeMethod.Invoke(key, value);
if (TryGetWriteStream(fileName, out var stream))
{
Serializer.KeySerializer.Invoke(stream, key);
Thread.MemoryBarrier();
Serializer.InputSerializer.Invoke(stream, value);
}
} }
public void CompleteAdding() public void CompleteAdding()
{ {
@ -70,6 +77,39 @@ namespace ZeroLevel.Services.PartitionStorage
#endregion #endregion
#region Private methods #region Private methods
private void StoreDirect(TKey key, TInput value)
{
var groupKey = _options.GetFileName(key, _info);
if (TryGetWriteStream(groupKey, out var stream))
{
Serializer.KeySerializer.Invoke(stream, key);
Thread.MemoryBarrier();
Serializer.InputSerializer.Invoke(stream, value);
}
}
private void StoreDirectSafe(TKey key, TInput value)
{
var groupKey = _options.GetFileName(key, _info);
bool lockTaken = false;
if (TryGetWriteStream(groupKey, out var stream))
{
Monitor.Enter(stream, ref lockTaken);
try
{
Serializer.KeySerializer.Invoke(stream, key);
Thread.MemoryBarrier();
Serializer.InputSerializer.Invoke(stream, value);
}
finally
{
if (lockTaken)
{
Monitor.Exit(stream);
}
}
}
}
internal void CompressFile(string file) internal void CompressFile(string file)
{ {
var dict = new Dictionary<TKey, HashSet<TInput>>(); var dict = new Dictionary<TKey, HashSet<TInput>>();

@ -6,16 +6,16 @@
</Description> </Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>3.3.8.2</AssemblyVersion> <AssemblyVersion>3.3.8.3</AssemblyVersion>
<PackageReleaseNotes>PartitionStorage. New way to index</PackageReleaseNotes> <PackageReleaseNotes>PartitionStorage. ThreadSafeWriting option</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2022</Copyright> <Copyright>Copyright Ogoun 2022</Copyright>
<PackageLicenseUrl></PackageLicenseUrl> <PackageLicenseUrl></PackageLicenseUrl>
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<Version>3.3.8.2</Version> <Version>3.3.8.3</Version>
<FileVersion>3.3.8.2</FileVersion> <FileVersion>3.3.8.3</FileVersion>
<Platforms>AnyCPU;x64;x86</Platforms> <Platforms>AnyCPU;x64;x86</Platforms>
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
<DebugType>full</DebugType> <DebugType>full</DebugType>

Loading…
Cancel
Save

Powered by TurnKey Linux.