diff --git a/PartitionFileStorageTest/PartitionFileStorageTest.csproj b/PartitionFileStorageTest/PartitionFileStorageTest.csproj index f8b057f..6fb9333 100644 --- a/PartitionFileStorageTest/PartitionFileStorageTest.csproj +++ b/PartitionFileStorageTest/PartitionFileStorageTest.csproj @@ -8,7 +8,7 @@ - + diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index 3c502e0..1d953bf 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -1,4 +1,5 @@ -using System.Diagnostics; +using System.Collections.Concurrent; +using System.Diagnostics; using System.Text; using ZeroLevel; using ZeroLevel.Services.FileSystem; @@ -8,6 +9,8 @@ namespace PartitionFileStorageTest { internal class Program { + const int PAIRS_COUNT = 200_000_000; + private class Metadata { public DateTime Date { get; set; } @@ -18,33 +21,13 @@ namespace PartitionFileStorageTest var num = new StringBuilder(); num.Append("79"); num.Append(r.Next(99).ToString("D2")); - num.Append(r.Next(999).ToString("D7")); + num.Append(r.Next(9999).ToString("D7")); return ulong.Parse(num.ToString()); } - private static void FastTest(string root) + private static void FastTest(StoreOptions options) { var r = new Random(Environment.TickCount); - var options = new StoreOptions - { - Index = new IndexOptions - { - Enabled = true, - StepType = IndexStepType.AbsoluteCount, - StepValue = 64 }, - RootFolder = root, - FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), - MergeFunction = list => - { - ulong s = 0; - return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); - }, - Partitions = new List> - { - new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) - }, - KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, - }; var store = new Store(options); var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); @@ -80,115 +63,44 @@ namespace PartitionFileStorageTest } } - private static void FullStoreTest(string root) + private static void FullStoreTest(StoreOptions options, + List<(ulong, ulong)> pairs) { + var meta = new Metadata { Date = new DateTime(2022, 11, 08) }; var r = new Random(Environment.TickCount); - var options = new StoreOptions - { - Index = new IndexOptions - { - Enabled = true, - StepType = IndexStepType.Step, - StepValue = 1 - }, - RootFolder = root, - FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), - MergeFunction = list => - { - ulong s = 0; - return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); - }, - Partitions = new List> - { - new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) - }, - KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, - }; - FSUtils.CleanAndTestFolder(root); - var store = new Store(options); - var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); - - /*Log.Info("Fill start"); - for (int i = 0; i < 10000000; i++) - { - var s = Generate(r); - var count = r.Next(200); - for (int j = 0; j < count; j++) - { - var t = Generate(r); - storePart.Store(s, t); - } - } - storePart.CompleteAdding(); - Log.Info("Fill complete"); - - long cnt = 0; - foreach (var p in storePart.Iterate()) - { - if (p.Key % 2 == 0) cnt++; - } - Log.Info(cnt.ToString()); - - Log.Info("Fill test complete"); - - storePart.Compress(); - Log.Info("Compress complete"); - - var reader = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); - cnt = 0; - foreach (var p in reader.Iterate()) - { - if (p.Key % 2 == 0) cnt++; - } - Log.Info(cnt.ToString()); - Log.Info("Compress test complete"); - - storePart.DropData(); - - Log.Info("Complete#1"); - //Console.ReadKey(); - - FSUtils.CleanAndTestFolder(root);*/ - - + var storePart = store.CreateBuilder(meta); var sw = new Stopwatch(); sw.Start(); - var testKeys1 = new List(); var testKeys2 = new List(); - var testData = new Dictionary>(); - var total = 0L; - for (int i = 0; i < 2000000; i++) + var insertCount = (int)(0.7 * pairs.Count); + + for (int i = 0; i < insertCount; i++) { - var s = Generate(r); - if (testData.ContainsKey(s) == false) testData[s] = new HashSet(); - var count = r.Next(300); + var key = pairs[i].Item1; + var val = pairs[i].Item2; + if (testData.ContainsKey(key) == false) testData[key] = new HashSet(); + testData[key].Add(val); + storePart.Store(key, val); total++; - for (int j = 0; j < count; j++) + if (key % 717 == 0) { - total++; - var t = Generate(r); - storePart.Store(s, t); - testData[s].Add(t); + testKeys1.Add(key); } - if (s % 177 == 0) + if (key % 117 == 0) { - testKeys1.Add(s); - } - if (s % 223 == 0) - { - testKeys2.Add(s); + testKeys2.Add(key); } } sw.Stop(); Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms"); sw.Restart(); - storePart.CompleteAdding(); + storePart.CompleteAdding(); storePart.Compress(); sw.Stop(); Log.Info($"Compress: {sw.ElapsedMilliseconds}ms"); @@ -199,20 +111,15 @@ namespace PartitionFileStorageTest Log.Info("Start merge test"); sw.Restart(); - var merger = store.CreateMergeAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }, data => Compressor.DecodeBytesContent(data)); - for (int i = 0; i < 2300000; i++) + var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data)); + for (int i = insertCount; i < pairs.Count; i++) { + var key = pairs[i].Item1; + var val = pairs[i].Item2; total++; - var s = Generate(r); - if (testData.ContainsKey(s) == false) testData[s] = new HashSet(); - var count = r.Next(300); - for (int j = 0; j < count; j++) - { - total++; - var t = Generate(r); - merger.Store(s, t); - testData[s].Add(t); - } + if (testData.ContainsKey(key) == false) testData[key] = new HashSet(); + testData[key].Add(val); + merger.Store(key, val); } Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {total}"); merger.Compress(); // auto reindex @@ -220,7 +127,7 @@ namespace PartitionFileStorageTest Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); Log.Info("Test #1 reading"); - var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); + var readPart = store.CreateAccessor(meta); ulong totalData = 0; ulong totalKeys = 0; foreach (var key in testKeys1) @@ -307,15 +214,190 @@ namespace PartitionFileStorageTest Log.Info("Completed"); } + private static void FullStoreMultithreadTest(StoreOptions options, + List<(ulong, ulong)> pairs) + { + var meta = new Metadata { Date = new DateTime(2022, 11, 08) }; + var r = new Random(Environment.TickCount); + var store = new Store(options); + var storePart = store.CreateBuilder(meta); + var sw = new Stopwatch(); + sw.Start(); + var insertCount = (int)(0.7 * pairs.Count); + var testKeys1 = new ConcurrentBag(); + var testKeys2 = new ConcurrentBag(); + var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 24 }; + + Parallel.ForEach(pairs.Take(insertCount).ToArray(), parallelOptions, pair => + { + var key = pair.Item1; + var val = pair.Item2; + storePart.Store(key, val); + if (key % 717 == 0) + { + testKeys1.Add(key); + } + if (key % 117 == 0) + { + testKeys2.Add(key); + } + }); + + sw.Stop(); + Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms"); + sw.Restart(); + storePart.CompleteAdding(); + storePart.Compress(); + sw.Stop(); + Log.Info($"Compress: {sw.ElapsedMilliseconds}ms"); + sw.Restart(); + storePart.RebuildIndex(); + sw.Stop(); + Log.Info($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); + + Log.Info("Start merge test"); + sw.Restart(); + var merger = store.CreateMergeAccessor(meta, data => Compressor.DecodeBytesContent(data)); + + Parallel.ForEach(pairs.Skip(insertCount).ToArray(), parallelOptions, pair => + { + var key = pair.Item1; + var val = pair.Item2; + merger.Store(key, val); + }); + + Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {pairs.Count}"); + merger.Compress(); // auto reindex + sw.Stop(); + Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); + + Log.Info("Test #1 reading"); + var readPart = store.CreateAccessor(meta); + ulong totalData = 0; + ulong totalKeys = 0; + foreach (var key in testKeys1) + { + var result = readPart.Find(key); + totalData += (ulong)(result.Value?.Length ?? 0); + totalKeys++; + } + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + totalData = 0; + totalKeys = 0; + Log.Info("Test #1 remove by keys"); + foreach (var key in testKeys1) + { + readPart.RemoveKey(key, false); + } + sw.Restart(); + readPart.RebuildIndex(); + sw.Stop(); + Log.Info($"Rebuild indexes after remove: {sw.ElapsedMilliseconds}ms"); + Log.Info("Test #1 reading after remove"); + foreach (var key in testKeys1) + { + var result = readPart.Find(key); + totalData += (ulong)(result.Value?.Length ?? 0); + totalKeys++; + } + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + totalData = 0; + totalKeys = 0; + Log.Info("Test #2 reading"); + foreach (var key in testKeys2) + { + var result = readPart.Find(key); + totalData += (ulong)(result.Value?.Length ?? 0); + totalKeys++; + } + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + totalData = 0; + totalKeys = 0; + Log.Info("Test #2 remove keys batch"); + readPart.RemoveKeys(testKeys2); + Log.Info("Test #2 reading after remove"); + foreach (var key in testKeys2) + { + var result = readPart.Find(key); + totalData += (ulong)(result.Value?.Length ?? 0); + totalKeys++; + } + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + totalData = 0; + totalKeys = 0; + + Log.Info("Iterator test"); + foreach (var e in readPart.Iterate()) + { + totalData += (ulong)(e.Value?.Length ?? 0); + totalKeys++; + } + Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes"); + Log.Info("Completed"); + } static void Main(string[] args) { + var root = @"H:\temp"; + var options = new StoreOptions + { + Index = new IndexOptions + { + Enabled = true, + StepType = IndexStepType.Step, + StepValue = 16 + }, + RootFolder = root, + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), + MergeFunction = list => + { + ulong s = 0; + return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); + }, + Partitions = new List> + { + new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) + }, + KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, + ThreadSafeWriting = false + }; + var optionsMultiThread = new StoreOptions + { + Index = new IndexOptions + { + Enabled = true, + StepType = IndexStepType.Step, + StepValue = 16 + }, + RootFolder = root, + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), + MergeFunction = list => + { + ulong s = 0; + return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); + }, + Partitions = new List> + { + new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) + }, + KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, + ThreadSafeWriting = true + }; Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.FullDebug); - var root = @"H:\temp"; - //FastTest(root); - FullStoreTest(root); - //TestIterations(root); - //TestRangeCompressionAndInversion(); + Log.Info("Start"); + + var pairs = new List<(ulong, ulong)>(PAIRS_COUNT); + var r = new Random(Environment.TickCount); + for (int i = 0; i < PAIRS_COUNT; i++) + { + pairs.Add((Generate(r), Generate(r))); + } + + // FastTest(options); + FSUtils.CleanAndTestFolder(root); + FullStoreTest(options, pairs); + FSUtils.CleanAndTestFolder(root); + FullStoreMultithreadTest(optionsMultiThread, pairs); Console.ReadKey(); } } diff --git a/TestApp/TestApp.csproj b/TestApp/TestApp.csproj index e6e53e0..45fe2fd 100644 --- a/TestApp/TestApp.csproj +++ b/TestApp/TestApp.csproj @@ -7,7 +7,7 @@ - + diff --git a/TestHNSW/HNSWDemo/HNSWDemo.csproj b/TestHNSW/HNSWDemo/HNSWDemo.csproj index cb7a844..29cd2e4 100644 --- a/TestHNSW/HNSWDemo/HNSWDemo.csproj +++ b/TestHNSW/HNSWDemo/HNSWDemo.csproj @@ -7,7 +7,7 @@ - + diff --git a/ZeroLevel.Discovery/ZeroLevel.Discovery.csproj b/ZeroLevel.Discovery/ZeroLevel.Discovery.csproj index 22f73b8..d23cfc5 100644 --- a/ZeroLevel.Discovery/ZeroLevel.Discovery.csproj +++ b/ZeroLevel.Discovery/ZeroLevel.Discovery.csproj @@ -7,7 +7,7 @@ - + diff --git a/ZeroLevel.HNSW/Utils/CosineDistance.cs b/ZeroLevel.HNSW/Utils/CosineDistance.cs index a94d44e..9494676 100644 --- a/ZeroLevel.HNSW/Utils/CosineDistance.cs +++ b/ZeroLevel.HNSW/Utils/CosineDistance.cs @@ -21,14 +21,6 @@ namespace ZeroLevel.HNSW /// public static class CosineDistance { - /// - /// Calculates cosine distance without making any optimizations. - /// - /// Left vector. - /// Right vector. - /// Cosine distance between u and v. - - /// /// Calculates cosine distance with assumption that u and v are unit vectors. /// diff --git a/ZeroLevel.HNSW/ZeroLevel.HNSW.csproj b/ZeroLevel.HNSW/ZeroLevel.HNSW.csproj index d47a175..bbaca30 100644 --- a/ZeroLevel.HNSW/ZeroLevel.HNSW.csproj +++ b/ZeroLevel.HNSW/ZeroLevel.HNSW.csproj @@ -5,7 +5,7 @@ AnyCPU;x64 x64 full - 1.0.0.3 + 1.0.0.4 ogoun Ogoun Copyright Ogoun 2022 @@ -13,7 +13,8 @@ zero.png https://github.com/ogoun/Zero git - Fix search output. + + MIT diff --git a/ZeroLevel.NN/ZeroLevel.NN.csproj b/ZeroLevel.NN/ZeroLevel.NN.csproj index 397a3a7..4cb3854 100644 --- a/ZeroLevel.NN/ZeroLevel.NN.csproj +++ b/ZeroLevel.NN/ZeroLevel.NN.csproj @@ -35,8 +35,8 @@ - - + + diff --git a/ZeroLevel.Qdrant/ZeroLevel.Qdrant.csproj b/ZeroLevel.Qdrant/ZeroLevel.Qdrant.csproj index a143ddb..ec569e0 100644 --- a/ZeroLevel.Qdrant/ZeroLevel.Qdrant.csproj +++ b/ZeroLevel.Qdrant/ZeroLevel.Qdrant.csproj @@ -18,7 +18,7 @@ - + diff --git a/ZeroLevel.SQL/ZeroLevel.SQL.csproj b/ZeroLevel.SQL/ZeroLevel.SQL.csproj index 82e1d8d..27f8cf2 100644 --- a/ZeroLevel.SQL/ZeroLevel.SQL.csproj +++ b/ZeroLevel.SQL/ZeroLevel.SQL.csproj @@ -26,8 +26,8 @@ - - + + diff --git a/ZeroLevel.UnitTests/ZeroLevel.UnitTests.csproj b/ZeroLevel.UnitTests/ZeroLevel.UnitTests.csproj index bbd442c..ca24844 100644 --- a/ZeroLevel.UnitTests/ZeroLevel.UnitTests.csproj +++ b/ZeroLevel.UnitTests/ZeroLevel.UnitTests.csproj @@ -9,7 +9,7 @@ - + all diff --git a/ZeroLevel/DataStructures/SafeBit32Vector.cs b/ZeroLevel/DataStructures/SafeBit32Vector.cs new file mode 100644 index 0000000..f6325fc --- /dev/null +++ b/ZeroLevel/DataStructures/SafeBit32Vector.cs @@ -0,0 +1,83 @@ +using System.Threading; + +namespace ZeroLevel.DataStructures +{ + /// + /// https://referencesource.microsoft.com/#System.Web/Util/SafeBitVector32.cs,b90a9ea209d602a4 + /// + public struct SafeBit32Vector + { + private volatile int _data; + + internal SafeBit32Vector(int data) + { + this._data = data; + } + + internal bool this[int bit] + { + get + { + int data = _data; + return (data & bit) == bit; + } + set + { + for (; ; ) + { + int oldData = _data; + int newData; + if (value) + { + newData = oldData | bit; + } + else + { + newData = oldData & ~bit; + } + +#pragma warning disable 0420 + int result = Interlocked.CompareExchange(ref _data, newData, oldData); +#pragma warning restore 0420 + + if (result == oldData) + { + break; + } + } + } + } + + + internal bool ChangeValue(int bit, bool value) + { + for (; ; ) + { + int oldData = _data; + int newData; + if (value) + { + newData = oldData | bit; + } + else + { + newData = oldData & ~bit; + } + + if (oldData == newData) + { + return false; + } + +#pragma warning disable 0420 + int result = Interlocked.CompareExchange(ref _data, newData, oldData); +#pragma warning restore 0420 + + if (result == oldData) + { + return true; + } + } + } + } +} diff --git a/ZeroLevel/Services/Collections/Condensator.cs b/ZeroLevel/Services/Collections/Capacitor.cs similarity index 89% rename from ZeroLevel/Services/Collections/Condensator.cs rename to ZeroLevel/Services/Collections/Capacitor.cs index 3a3b285..4c11fb5 100644 --- a/ZeroLevel/Services/Collections/Condensator.cs +++ b/ZeroLevel/Services/Collections/Capacitor.cs @@ -15,11 +15,11 @@ namespace ZeroLevel.Services.Collections private readonly Action _dischargeAction; public int Count => _count; - public Capacitor(int volume, Action dischargeAction) + public Capacitor(int dischargeValue, Action dischargeAction) { - if (volume < 1) volume = 16; + if (dischargeValue < 1) dischargeValue = 16; if (dischargeAction == null) throw new ArgumentNullException(nameof(dischargeAction)); - _buffer = new T[volume]; + _buffer = new T[dischargeValue]; _dischargeAction = dischargeAction; } public void Add(T val) diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs index 6930764..78d5207 100644 --- a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs @@ -93,7 +93,7 @@ namespace ZeroLevel.Services.PartitionStorage } if (dict.Count > _stepValue) { - var step = (int)Math.Round(((float)dict.Count / (float)_stepValue), MidpointRounding.ToZero); + var step = (int)Math.Round(dict.Count / (float)_stepValue, MidpointRounding.ToZero); var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); var d_arr = dict.OrderBy(p => p.Key).ToArray(); using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) @@ -123,7 +123,7 @@ namespace ZeroLevel.Services.PartitionStorage var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file)); using (var writer = new MemoryStreamWriter(new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None))) { - var counter = _stepValue; + var counter = 1; while (reader.EOS == false) { counter--; diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/KeyIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/KeyIndex.cs similarity index 100% rename from ZeroLevel/Services/PartitionStorage/Indexes/KeyIndex.cs rename to ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/KeyIndex.cs diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/StorePartitionSparseIndex.cs similarity index 100% rename from ZeroLevel/Services/PartitionStorage/Indexes/StorePartitionSparseIndex.cs rename to ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/StorePartitionSparseIndex.cs diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/ValuesIndex/ValueIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/ValuesIndex/ValueIndex.cs new file mode 100644 index 0000000..81716b3 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Indexes/ValuesIndex/ValueIndex.cs @@ -0,0 +1,8 @@ +namespace ZeroLevel.Services.PartitionStorage +{ + internal struct ValueIndex + { + public TValue Value { get; set; } + public long Offset { get; set; } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs index 300ba51..84a03df 100644 --- a/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs +++ b/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs @@ -9,7 +9,6 @@ public class IndexOptions { public bool Enabled { get; set; } - public IndexStepType StepType { get; set; } = IndexStepType.AbsoluteCount; public int StepValue { get; set; } = 64; } diff --git a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs index 88afd78..7a2e0f6 100644 --- a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs +++ b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs @@ -19,7 +19,6 @@ namespace ZeroLevel.Services.PartitionStorage /// Method for key comparison /// public Func KeyComparer { get; set; } - /// /// Storage root directory /// @@ -40,6 +39,10 @@ namespace ZeroLevel.Services.PartitionStorage /// File Partition /// public StoreFilePartition FilePartition { get; set; } + /// + /// Uses a thread-safe mechanism for writing to files during multi-threaded writes + /// + public bool ThreadSafeWriting { get; set; } = false; public IndexOptions Index { get; set; } = new IndexOptions { @@ -85,7 +88,8 @@ namespace ZeroLevel.Services.PartitionStorage Partitions = this.Partitions .Select(p => new StoreCatalogPartition(p.Name, p.PathExtractor)) .ToList(), - RootFolder = this.RootFolder + RootFolder = this.RootFolder, + ThreadSafeWriting = this.ThreadSafeWriting }; return options; } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs index ae91785..37ddf49 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs @@ -1,9 +1,9 @@ -using System.IO; -using System; -using ZeroLevel.Services.Serialization; +using System; using System.Collections.Concurrent; +using System.IO; using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.PartitionStorage.Interfaces; +using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage.Partition { diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index 2b477b8..5674c1f 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using ZeroLevel.Services.PartitionStorage.Interfaces; @@ -13,24 +14,30 @@ namespace ZeroLevel.Services.PartitionStorage internal sealed class StorePartitionBuilder : BasePartition, IStorePartitionBuilder { - public StorePartitionBuilder(StoreOptions options, + private readonly Action _storeMethod; + + public StorePartitionBuilder(StoreOptions options, TMeta info, IStoreSerializer serializer) : base(options, info, serializer) { if (options == null) throw new ArgumentNullException(nameof(options)); + if (options.ThreadSafeWriting) + { + _storeMethod = StoreDirectSafe; + } + else + { + _storeMethod = StoreDirect; + } } #region IStorePartitionBuilder + + public void Store(TKey key, TInput value) { - var fileName = _options.GetFileName(key, _info); - if (TryGetWriteStream(fileName, out var stream)) - { - Serializer.KeySerializer.Invoke(stream, key); - Thread.MemoryBarrier(); - Serializer.InputSerializer.Invoke(stream, value); - } + _storeMethod.Invoke(key, value); } public void CompleteAdding() { @@ -70,6 +77,39 @@ namespace ZeroLevel.Services.PartitionStorage #endregion #region Private methods + private void StoreDirect(TKey key, TInput value) + { + var groupKey = _options.GetFileName(key, _info); + if (TryGetWriteStream(groupKey, out var stream)) + { + Serializer.KeySerializer.Invoke(stream, key); + Thread.MemoryBarrier(); + Serializer.InputSerializer.Invoke(stream, value); + } + } + private void StoreDirectSafe(TKey key, TInput value) + { + var groupKey = _options.GetFileName(key, _info); + bool lockTaken = false; + if (TryGetWriteStream(groupKey, out var stream)) + { + Monitor.Enter(stream, ref lockTaken); + try + { + Serializer.KeySerializer.Invoke(stream, key); + Thread.MemoryBarrier(); + Serializer.InputSerializer.Invoke(stream, value); + } + finally + { + if (lockTaken) + { + Monitor.Exit(stream); + } + } + } + } + internal void CompressFile(string file) { var dict = new Dictionary>(); diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index 73c33bc..1a91375 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -6,16 +6,16 @@ ogoun ogoun - 3.3.8.2 - PartitionStorage. New way to index + 3.3.8.3 + PartitionStorage. ThreadSafeWriting option https://github.com/ogoun/Zero/wiki Copyright Ogoun 2022 https://github.com/ogoun/Zero git - 3.3.8.2 - 3.3.8.2 + 3.3.8.3 + 3.3.8.3 AnyCPU;x64;x86 zero.png full