diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index 6b8f88f..9f78672 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -4,13 +4,14 @@ using System.Text; using ZeroLevel; using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.PartitionStorage; +using ZeroLevel.Services.Serialization; namespace PartitionFileStorageTest { internal class Program { -// const int PAIRS_COUNT = 200_000_000; - const int PAIRS_COUNT = 200_000; + // const int PAIRS_COUNT = 200_000_000; + const int PAIRS_COUNT = 2000_000; private class Metadata { @@ -241,6 +242,11 @@ namespace PartitionFileStorageTest } }); + if (storePart.TotalRecords != insertCount) + { + Log.Error($"Count of stored record no equals expected. Recorded: {storePart.TotalRecords}. Expected: {insertCount}"); + } + sw.Stop(); Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms"); sw.Restart(); @@ -264,6 +270,11 @@ namespace PartitionFileStorageTest merger.Store(key, val); }); + if (merger.TotalRecords != (pairs.Count - insertCount)) + { + Log.Error($"Count of stored record no equals expected. Recorded: {merger.TotalRecords}. Expected: {(pairs.Count - insertCount)}"); + } + Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {pairs.Count}"); merger.Compress(); // auto reindex sw.Stop(); @@ -334,16 +345,58 @@ namespace PartitionFileStorageTest Log.Info("Completed"); } + private static void FaultIndexTest(string filePath) + { + var serializer = new StoreStandartSerializer(); + // 1 build index + var index = new Dictionary(); + using (var reader = new MemoryStreamReader(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024))) + { + var counter = 1; + while (reader.EOS == false) + { + counter--; + var pos = reader.Position; + var k = serializer.KeyDeserializer.Invoke(reader); + serializer.ValueDeserializer.Invoke(reader); + if (counter == 0) + { + index[k] = pos; + counter = 16; + } + } + } + + // 2 Test index + using (var reader = new MemoryStreamReader(new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024))) + { + foreach (var pair in index) + { + reader.Stream.Seek(pair.Value, SeekOrigin.Begin); + var k = serializer.KeyDeserializer.Invoke(reader); + if (k != pair.Key) + { + Log.Warning("Broken index"); + } + var v = serializer.ValueDeserializer.Invoke(reader); + } + } + } + static void Main(string[] args) { - var root = @"H:\temp"; + /*FaultIndexTest(@"H:\temp\85"); + return;*/ + + var root = @"H:\temp"; var options = new StoreOptions { Index = new IndexOptions { Enabled = true, StepType = IndexStepType.Step, - StepValue = 16 + StepValue = 16, + EnableIndexInMemoryCachee= true }, RootFolder = root, FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), @@ -365,7 +418,8 @@ namespace PartitionFileStorageTest { Enabled = true, StepType = IndexStepType.Step, - StepValue = 16 + StepValue = 16, + EnableIndexInMemoryCachee = true }, RootFolder = root, FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), @@ -393,10 +447,14 @@ namespace PartitionFileStorageTest // FastTest(options); FSUtils.CleanAndTestFolder(root); + + FullStoreMultithreadTest(optionsMultiThread, pairs); FullStoreTest(options, pairs); + /* + FSUtils.CleanAndTestFolder(root); - FullStoreMultithreadTest(optionsMultiThread, pairs); + */ Console.ReadKey(); } diff --git a/ZeroLevel.NN/Architectures/YoloV5/Yolov5Detector.cs b/ZeroLevel.NN/Architectures/YoloV5/Yolov5Detector.cs index 855dc43..50a2733 100644 --- a/ZeroLevel.NN/Architectures/YoloV5/Yolov5Detector.cs +++ b/ZeroLevel.NN/Architectures/YoloV5/Yolov5Detector.cs @@ -9,8 +9,8 @@ namespace ZeroLevel.NN.Architectures.YoloV5 { private int INPUT_WIDTH = 640; private int INPUT_HEIGHT = 640; - private int CROP_WIDTH = 1440; - private int CROP_HEIGHT = 1440; + private int CROP_WIDTH = 1280; + private int CROP_HEIGHT = 1280; public Yolov5Detector(string modelPath, int inputWidth = 640, int inputHeight = 640, bool gpu = false) : base(modelPath, gpu) @@ -43,7 +43,15 @@ namespace ZeroLevel.NN.Architectures.YoloV5 var result = new List(); Extract(new Dictionary> { { "images", input } }, d => { - var output = d["output"]; + Tensor output; + if (d.ContainsKey("output")) + { + output = d["output"]; + } + else + { + output = d.First().Value; + } /* var output350 = d["350"]; var output498 = d["498"]; @@ -93,7 +101,16 @@ namespace ZeroLevel.NN.Architectures.YoloV5 { Extract(new Dictionary> { { "images", input.Tensor } }, d => { - var output = d["output"]; + Tensor output; + if (d.ContainsKey("output")) + { + output = d["output"]; + } + else + { + output = d.First().Value; + } + /* var output350 = d["350"]; var output498 = d["498"]; diff --git a/ZeroLevel.NN/Services/ImagePreprocessor.cs b/ZeroLevel.NN/Services/ImagePreprocessor.cs index 4a07855..1559dca 100644 --- a/ZeroLevel.NN/Services/ImagePreprocessor.cs +++ b/ZeroLevel.NN/Services/ImagePreprocessor.cs @@ -143,6 +143,13 @@ namespace ZeroLevel.NN { var append = options.ChannelType == PredictorChannelType.ChannelFirst ? _precompiledChannelFirstAction : _precompiledChannelLastAction; + if (image.PixelType.BitsPerPixel != 24) + { + var i = image; + image = i.CloneAs(); + i.Dispose(); + } + ((Image)image).ProcessPixelRows(pixels => { if (options.InvertXY) diff --git a/ZeroLevel.NN/ZeroLevel.NN.csproj b/ZeroLevel.NN/ZeroLevel.NN.csproj index 4cb3854..a0b8bf9 100644 --- a/ZeroLevel.NN/ZeroLevel.NN.csproj +++ b/ZeroLevel.NN/ZeroLevel.NN.csproj @@ -35,7 +35,7 @@ - + diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/StorePartitionSparseIndex.cs b/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/StorePartitionSparseIndex.cs index a52091a..14abcf1 100644 --- a/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/StorePartitionSparseIndex.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/KeysIndex/StorePartitionSparseIndex.cs @@ -8,18 +8,20 @@ namespace ZeroLevel.Services.PartitionStorage internal sealed class StorePartitionSparseIndex : IStorePartitionIndex { - private readonly Dictionary[]> _indexCachee - = new Dictionary[]>(1024); - private readonly StoreFilePartition _filePartition; private readonly Func _keyComparer; private readonly string _indexFolder; private readonly bool _indexExists = false; + private readonly bool _enableIndexInMemoryCachee; private readonly Func _keyDeserializer; private readonly TMeta _meta; + + private readonly Dictionary[]> _indexCachee = null; + public StorePartitionSparseIndex(string partitionFolder, TMeta meta, StoreFilePartition filePartition, - Func keyComparer) + Func keyComparer, + bool enableIndexInMemoryCachee) { _indexFolder = Path.Combine(partitionFolder, "__indexes__"); _indexExists = Directory.Exists(_indexFolder); @@ -27,6 +29,11 @@ namespace ZeroLevel.Services.PartitionStorage _keyComparer = keyComparer; _filePartition = filePartition; _keyDeserializer = MessageSerializer.GetDeserializer(); + _enableIndexInMemoryCachee = enableIndexInMemoryCachee; + if (_enableIndexInMemoryCachee) + { + _indexCachee = new Dictionary[]>(1024); + } } public KeyIndex GetOffset(TKey key) @@ -99,11 +106,60 @@ namespace ZeroLevel.Services.PartitionStorage return index[position]; } + public void ResetCachee() + { + if (_enableIndexInMemoryCachee) + { + lock (_casheupdatelock) + { + _indexCachee.Clear(); + } + } + } + + public void RemoveCacheeItem(string name) + { + if (_enableIndexInMemoryCachee) + { + lock (_casheupdatelock) + { + _indexCachee.Remove(name); + } + } + } + + private readonly object _casheupdatelock = new object(); private KeyIndex[] GetFileIndex(TKey key) { var indexName = _filePartition.FileNameExtractor.Invoke(key, _meta); - if (_indexCachee.TryGetValue(indexName, out var index)) return index; + try + { + if (_enableIndexInMemoryCachee) + { + if (_indexCachee.TryGetValue(indexName, out var index)) + { + return index; + } + lock (_casheupdatelock) + { + _indexCachee[indexName] = ReadIndexesFromIndexFile(indexName); + return _indexCachee[indexName]; + } + } + else + { + return ReadIndexesFromIndexFile(indexName); + } + } + catch (Exception ex) + { + Log.SystemError(ex, "[StorePartitionSparseIndex.GetFileIndex] No cachee"); + } + return null; + } + private KeyIndex[] ReadIndexesFromIndexFile(string indexName) + { var file = Path.Combine(_indexFolder, indexName); if (File.Exists(file)) { @@ -117,8 +173,7 @@ namespace ZeroLevel.Services.PartitionStorage list.Add(new KeyIndex { Key = k, Offset = o }); } } - _indexCachee[indexName] = list.ToArray(); - return _indexCachee[indexName]; + return list.ToArray(); } return null; } diff --git a/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs index 84a03df..f8419b6 100644 --- a/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs +++ b/ZeroLevel/Services/PartitionStorage/Options/IndexOptions.cs @@ -11,5 +11,6 @@ public bool Enabled { get; set; } public IndexStepType StepType { get; set; } = IndexStepType.AbsoluteCount; public int StepValue { get; set; } = 64; + public bool EnableIndexInMemoryCachee { get; set; } = false; } } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs index 37ddf49..ee128ed 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs @@ -1,6 +1,7 @@ using System; -using System.Collections.Concurrent; +using System.Collections.Generic; using System.IO; +using System.Threading; using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.Serialization; @@ -14,16 +15,16 @@ namespace ZeroLevel.Services.PartitionStorage.Partition : IStorePartitionBase { public string Catalog { get { return _catalog; } } - + protected readonly TMeta _info; protected readonly string _catalog; protected IStoreSerializer Serializer { get; } protected readonly StoreOptions _options; private readonly IndexBuilder _indexBuilder; - private readonly ConcurrentDictionary _writeStreams = new ConcurrentDictionary(); + private readonly Dictionary _writeStreams = new Dictionary(); - internal BasePartition(StoreOptions options, + internal BasePartition(StoreOptions options, TMeta info, IStoreSerializer serializer) { @@ -101,13 +102,32 @@ namespace ZeroLevel.Services.PartitionStorage.Partition { try { - writer = _writeStreams.GetOrAdd(fileName, k => + bool taken = false; + Monitor.Enter(_writeStreams, ref taken); + try { - var filePath = Path.Combine(_catalog, k); - var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); - return new MemoryStreamWriter(stream); - }); - return true; + if (_writeStreams.TryGetValue(fileName, out var w)) + { + writer = w; + return true; + } + else + { + var filePath = Path.Combine(_catalog, fileName); + var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); + var new_w = new MemoryStreamWriter(stream); + _writeStreams[fileName] = new_w; + writer = new_w; + return true; + } + } + finally + { + if (taken) + { + Monitor.Exit(_writeStreams); + } + } } catch (Exception ex) { diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs index 4fa445b..e98fe62 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs @@ -144,6 +144,10 @@ namespace ZeroLevel.Services.PartitionStorage } } } + else + { + Log.SystemError($"File not found '{filePath}'"); + } } #endregion diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs index 7da8789..0565b19 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs @@ -11,12 +11,18 @@ namespace ZeroLevel.Services.PartitionStorage internal sealed class StorePartitionAccessor : BasePartition, IStorePartitionAccessor { + private readonly StorePartitionSparseIndex Indexes; + public StorePartitionAccessor(StoreOptions options, TMeta info, IStoreSerializer serializer) : base(options, info, serializer) { if (options == null) throw new ArgumentNullException(nameof(options)); + if (options.Index.Enabled) + { + Indexes = new StorePartitionSparseIndex(_catalog, _info, options.FilePartition, options.KeyComparer, options.Index.EnableIndexInMemoryCachee); + } } #region IStorePartitionAccessor @@ -28,8 +34,7 @@ namespace ZeroLevel.Services.PartitionStorage long startOffset = 0; if (_options.Index.Enabled) { - var index = new StorePartitionSparseIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); - var offset = index.GetOffset(key); + var offset = Indexes.GetOffset(key); startOffset = offset.Offset; } if (TryGetReadStream(fileName, out var reader)) @@ -130,7 +135,14 @@ namespace ZeroLevel.Services.PartitionStorage } } } - public void RebuildIndex() => RebuildIndexes(); + public void RebuildIndex() + { + RebuildIndexes(); + if (_options.Index.Enabled) + { + Indexes.ResetCachee(); + } + } public void RemoveAllExceptKey(TKey key, bool autoReindex = true) { RemoveAllExceptKeys(new[] { key }, autoReindex); @@ -144,6 +156,10 @@ namespace ZeroLevel.Services.PartitionStorage foreach (var group in results) { RemoveKeyGroup(group.FileName, group.Keys, false, autoReindex); + if (_options.Index.Enabled) + { + Indexes.RemoveCacheeItem(group.FileName); + } } } public void RemoveKey(TKey key, bool autoReindex = false) @@ -159,6 +175,10 @@ namespace ZeroLevel.Services.PartitionStorage foreach (var group in results) { RemoveKeyGroup(group.FileName, group.Keys, true, autoReindex); + if (_options.Index.Enabled) + { + Indexes.RemoveCacheeItem(group.FileName); + } } } #endregion @@ -172,8 +192,7 @@ namespace ZeroLevel.Services.PartitionStorage { if (_options.Index.Enabled) { - var index = new StorePartitionSparseIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); - var offsets = index.GetOffset(keys, true); + var offsets = Indexes.GetOffset(keys, true); if (TryGetReadStream(fileName, out var reader)) { using (reader) @@ -257,8 +276,7 @@ namespace ZeroLevel.Services.PartitionStorage var ranges = new List(); if (_options.Index.Enabled && autoReindex) { - var index = new StorePartitionSparseIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); - var offsets = index.GetOffset(keys, true); + var offsets = Indexes.GetOffset(keys, true); if (TryGetReadStream(fileName, out var reader)) { using (reader) diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index e3c655d..ce7db6b 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -14,7 +14,7 @@ namespace ZeroLevel.Services.PartitionStorage internal sealed class StorePartitionBuilder : BasePartition, IStorePartitionBuilder { - private readonly Action _storeMethod; + private readonly Func _storeMethod; private long _totalRecords = 0; @@ -41,8 +41,10 @@ namespace ZeroLevel.Services.PartitionStorage public void Store(TKey key, TInput value) { - _storeMethod.Invoke(key, value); - Interlocked.Increment(ref _totalRecords); + if (_storeMethod.Invoke(key, value)) + { + Interlocked.Increment(ref _totalRecords); + } } public void CompleteAdding() @@ -84,7 +86,7 @@ namespace ZeroLevel.Services.PartitionStorage #endregion #region Private methods - private void StoreDirect(TKey key, TInput value) + private bool StoreDirect(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); if (TryGetWriteStream(groupKey, out var stream)) @@ -92,9 +94,15 @@ namespace ZeroLevel.Services.PartitionStorage Serializer.KeySerializer.Invoke(stream, key); Thread.MemoryBarrier(); Serializer.InputSerializer.Invoke(stream, value); + return true; + } + else + { + Log.SystemError($"Fault create write stream for key '{groupKey}'"); } + return false; } - private void StoreDirectSafe(TKey key, TInput value) + private bool StoreDirectSafe(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); bool lockTaken = false; @@ -106,6 +114,7 @@ namespace ZeroLevel.Services.PartitionStorage Serializer.KeySerializer.Invoke(stream, key); Thread.MemoryBarrier(); Serializer.InputSerializer.Invoke(stream, value); + return true; } finally { @@ -115,6 +124,11 @@ namespace ZeroLevel.Services.PartitionStorage } } } + else + { + Log.SystemError($"Fault create write stream for key '{groupKey}'"); + } + return false; } internal void CompressFile(string file) @@ -145,6 +159,7 @@ namespace ZeroLevel.Services.PartitionStorage { var v = _options.MergeFunction(pair.Value); writer.SerializeCompatible(pair.Key); + Thread.MemoryBarrier(); writer.SerializeCompatible(v); } } diff --git a/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs b/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs index 7f72869..973231f 100644 --- a/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs +++ b/ZeroLevel/Services/PartitionStorage/StoreStandartSerializer.cs @@ -4,7 +4,8 @@ using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage { - internal sealed class StoreStandartSerializer + // TODO INTERNAL + public sealed class StoreStandartSerializer : IStoreSerializer { private readonly Action _keySerializer; diff --git a/ZeroLevel/Services/Reflection/TypeHelpers.cs b/ZeroLevel/Services/Reflection/TypeHelpers.cs index ebc5a3d..1fc203c 100644 --- a/ZeroLevel/Services/Reflection/TypeHelpers.cs +++ b/ZeroLevel/Services/Reflection/TypeHelpers.cs @@ -47,10 +47,7 @@ namespace ZeroLevel.Services.Reflection return false; } } -<<<<<<< Updated upstream -======= ->>>>>>> Stashed changes public static bool IsNumericTypeWithFloating(Type type) { switch (Type.GetTypeCode(type)) diff --git a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs index 2d349fd..6cb2863 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs @@ -262,7 +262,7 @@ namespace ZeroLevel.Services.Serialization /// public bool CheckOutOfRange(int offset) { - return (_stream.Position + offset) > _stream.Length; + return offset < 0 || (_stream.Position + offset) > _stream.Length; } #region Extensions