diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index c621acb..152483a 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -141,10 +141,10 @@ namespace PartitionFileStorageTest var val = pairs[i].Item2; if (testData.ContainsKey(key) == false) testData[key] = new HashSet(); testData[key].Add(val); - merger.Store(key, val); + await merger.Store(key, val); } Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. New records merged: {merger.TotalRecords}"); - merger.Compress(); // auto reindex + await merger.Compress(); // auto reindex sw.Stop(); Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); @@ -319,7 +319,7 @@ namespace PartitionFileStorageTest } Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {PAIRS_COUNT}. Unique keys: {Keys.Count}"); - merger.Compress(); // auto reindex + await merger.Compress(); // auto reindex } sw.Stop(); Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms"); diff --git a/TestPipeLine/Processor/ProcessorService.cs b/TestPipeLine/Processor/ProcessorService.cs index da44735..345bdcd 100644 --- a/TestPipeLine/Processor/ProcessorService.cs +++ b/TestPipeLine/Processor/ProcessorService.cs @@ -1,7 +1,5 @@ -using System; -using System.Collections.Concurrent; +using System.Collections.Concurrent; using System.Net; -using System.Threading; using ZeroLevel; using ZeroLevel.Network; using ZeroLevel.Services.Applications; diff --git a/ZeroLevel/Services/Collections/ConcurrentHashSet.cs b/ZeroLevel/Services/Collections/ConcurrentHashSet.cs index aaa053a..3eadcc4 100644 --- a/ZeroLevel/Services/Collections/ConcurrentHashSet.cs +++ b/ZeroLevel/Services/Collections/ConcurrentHashSet.cs @@ -163,7 +163,7 @@ namespace ZeroLevel.Collections /// /// The /// implementation to use when comparing items. - public ConcurrentHashSet(IEqualityComparer? comparer) + public ConcurrentHashSet(IEqualityComparer comparer) : this(DefaultConcurrencyLevel, DefaultCapacity, true, comparer) { } @@ -184,7 +184,7 @@ namespace ZeroLevel.Collections /// is a null reference /// (Nothing in Visual Basic). /// - public ConcurrentHashSet(IEnumerable collection, IEqualityComparer? comparer) + public ConcurrentHashSet(IEnumerable collection, IEqualityComparer comparer) : this(comparer) { if (collection == null) throw new ArgumentNullException(nameof(collection)); @@ -211,7 +211,7 @@ namespace ZeroLevel.Collections /// /// is less than 1. /// - public ConcurrentHashSet(int concurrencyLevel, IEnumerable collection, IEqualityComparer? comparer) + public ConcurrentHashSet(int concurrencyLevel, IEnumerable collection, IEqualityComparer comparer) : this(concurrencyLevel, DefaultCapacity, false, comparer) { if (collection == null) throw new ArgumentNullException(nameof(collection)); @@ -235,12 +235,12 @@ namespace ZeroLevel.Collections /// is less than 1. -or- /// is less than 0. /// - public ConcurrentHashSet(int concurrencyLevel, int capacity, IEqualityComparer? comparer) + public ConcurrentHashSet(int concurrencyLevel, int capacity, IEqualityComparer comparer) : this(concurrencyLevel, capacity, false, comparer) { } - private ConcurrentHashSet(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer? comparer) + private ConcurrentHashSet(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer comparer) { if (concurrencyLevel < 1) throw new ArgumentOutOfRangeException(nameof(concurrencyLevel)); if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); @@ -375,7 +375,7 @@ namespace ZeroLevel.Collections continue; } - Node? previous = null; + Node previous = null; for (var current = tables.Buckets[bucketNo]; current != null; current = current.Next) { Debug.Assert((previous == null && current == tables.Buckets[bucketNo]) || previous!.Next == current); @@ -439,8 +439,8 @@ namespace ZeroLevel.Collections private readonly ConcurrentHashSet _set; - private Node?[]? _buckets; - private Node? _node; + private Node[] _buckets; + private Node _node; private int _i; private int _state; @@ -468,7 +468,7 @@ namespace ZeroLevel.Collections /// The element in the collection at the current position of the enumerator. public T Current { get; private set; } - object? IEnumerator.Current => Current; + object IEnumerator.Current => Current; /// /// Sets the enumerator to its initial position, which is before the first element in the collection. @@ -501,7 +501,7 @@ namespace ZeroLevel.Collections goto case StateOuterloop; case StateOuterloop: - Node?[]? buckets = _buckets; + Node[] buckets = _buckets; Debug.Assert(buckets != null); int i = ++_i; @@ -516,7 +516,7 @@ namespace ZeroLevel.Collections goto default; case StateInnerLoop: - Node? node = _node; + Node node = _node; if (node != null) { Current = node.Item; @@ -606,7 +606,7 @@ namespace ZeroLevel.Collections } // Try to find this item in the bucket - Node? previous = null; + Node previous = null; for (var current = tables.Buckets[bucketNo]; current != null; current = current.Next) { Debug.Assert(previous == null && current == tables.Buckets[bucketNo] || previous!.Next == current); @@ -878,12 +878,12 @@ namespace ZeroLevel.Collections private class Tables { - public readonly Node?[] Buckets; + public readonly Node[] Buckets; public readonly object[] Locks; public readonly int[] CountPerLock; - public Tables(Node?[] buckets, object[] locks, int[] countPerLock) + public Tables(Node[] buckets, object[] locks, int[] countPerLock) { Buckets = buckets; Locks = locks; @@ -896,9 +896,9 @@ namespace ZeroLevel.Collections public readonly T Item; public readonly int Hashcode; - public volatile Node? Next; + public volatile Node Next; - public Node(T item, int hashcode, Node? next) + public Node(T item, int hashcode, Node next) { Item = item; Hashcode = hashcode; diff --git a/ZeroLevel/Services/Config/Implementation/EnvironmentVariablesConfigReader.cs b/ZeroLevel/Services/Config/Implementation/EnvironmentVariablesConfigReader.cs index 7ad7156..3ab089e 100644 --- a/ZeroLevel/Services/Config/Implementation/EnvironmentVariablesConfigReader.cs +++ b/ZeroLevel/Services/Config/Implementation/EnvironmentVariablesConfigReader.cs @@ -12,7 +12,7 @@ namespace ZeroLevel.Services.Config.Implementation while (enumerator.MoveNext()) { string key = (string)enumerator.Entry.Key; - string value = ((string?)enumerator.Entry.Value) ?? string.Empty; + string value = ((string)enumerator.Entry.Value) ?? string.Empty; result.Append(key, value); } return result; diff --git a/ZeroLevel/Services/MemoryPools/DefaultObjectPool.cs b/ZeroLevel/Services/MemoryPools/DefaultObjectPool.cs index 4c8f37f..7b96f80 100644 --- a/ZeroLevel/Services/MemoryPools/DefaultObjectPool.cs +++ b/ZeroLevel/Services/MemoryPools/DefaultObjectPool.cs @@ -16,10 +16,10 @@ namespace MemoryPools private protected readonly ObjectWrapper[] _items; private protected readonly IPooledObjectPolicy _policy; private protected readonly bool _isDefaultPolicy; - private protected T? _firstItem; + private protected T _firstItem; // This class was introduced in 2.1 to avoid the interface call where possible - private protected readonly PooledObjectPolicy? _fastPolicy; + private protected readonly PooledObjectPolicy _fastPolicy; /// /// Creates an instance of . @@ -95,7 +95,7 @@ namespace MemoryPools private protected struct ObjectWrapper { - public T? Element; + public T Element; } } } diff --git a/ZeroLevel/Services/MemoryPools/DisposableObjectPool.cs b/ZeroLevel/Services/MemoryPools/DisposableObjectPool.cs index afd5567..0ee749b 100644 --- a/ZeroLevel/Services/MemoryPools/DisposableObjectPool.cs +++ b/ZeroLevel/Services/MemoryPools/DisposableObjectPool.cs @@ -5,7 +5,7 @@ using System.Threading; namespace MemoryPools { - internal sealed class DisposableObjectPool + internal sealed class DisposableObjectPool : DefaultObjectPool, IDisposable where T : class { private volatile bool _isDisposed; @@ -81,9 +81,9 @@ namespace MemoryPools } } - private static void DisposeItem(T? item) + private static void DisposeItem(T item) { - if (item is IDisposable disposable) + if (item != null && item is IDisposable disposable) { disposable.Dispose(); } diff --git a/ZeroLevel/Services/MemoryPools/ObjectPool.cs b/ZeroLevel/Services/MemoryPools/ObjectPool.cs index 9d60a97..35c092c 100644 --- a/ZeroLevel/Services/MemoryPools/ObjectPool.cs +++ b/ZeroLevel/Services/MemoryPools/ObjectPool.cs @@ -23,7 +23,7 @@ namespace MemoryPools public static class ObjectPool { /// - public static ObjectPool Create(IPooledObjectPolicy? policy = null) where T : class, new() + public static ObjectPool Create(IPooledObjectPolicy policy = null) where T : class, new() { var provider = new DefaultObjectPoolProvider(); return provider.Create(policy ?? new DefaultPooledObjectPolicy()); diff --git a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs index 2bfbbd8..141dce2 100644 --- a/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Indexes/IndexBuilder.cs @@ -41,10 +41,9 @@ namespace ZeroLevel.Services.PartitionStorage var files = Directory.GetFiles(_dataCatalog); if (files != null && files.Length > 0) { - foreach (var file in files) { - RebuildFileIndex(Path.GetFileName(file)); + await RebuildFileIndex(Path.GetFileName(file)); } } } @@ -121,7 +120,7 @@ namespace ZeroLevel.Services.PartitionStorage for (int i = 0; i < _stepValue; i++) { var pair = d_arr[i * step]; - writer.WriteCompatible(pair.Key); + await Serializer.KeySerializer.Invoke(writer, pair.Key); writer.WriteLong(pair.Value); } } @@ -162,7 +161,7 @@ namespace ZeroLevel.Services.PartitionStorage await Serializer.ValueDeserializer.Invoke(reader); if (counter == 0) { - writer.WriteCompatible(k); + await Serializer.KeySerializer.Invoke(writer, k.Value); writer.WriteLong(pos); counter = _stepValue; } diff --git a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs index 14b71a4..f85370d 100644 --- a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs +++ b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs @@ -15,6 +15,7 @@ namespace ZeroLevel.Services.PartitionStorage /// Meta information for partition search public class StoreOptions { + private const string DEFAULT_FILE_NAME = "defaultGroup"; /// /// Method for key comparison /// @@ -57,7 +58,12 @@ namespace ZeroLevel.Services.PartitionStorage internal string GetFileName(TKey key, TMeta info) { - return FilePartition.FileNameExtractor(key, info); + var name = FilePartition.FileNameExtractor(key, info); + if (string.IsNullOrWhiteSpace(name)) + { + name = DEFAULT_FILE_NAME; + } + return name; } internal string GetCatalogPath(TMeta info) { diff --git a/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs index 4449c25..b79869f 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/BasePartition.cs @@ -20,11 +20,14 @@ namespace ZeroLevel.Services.PartitionStorage.Partition protected readonly TMeta _info; protected readonly string _catalog; + + private SemaphoreSlim _writersLock = new SemaphoreSlim(1); + private readonly Dictionary _writeStreams = new Dictionary(); + protected IStoreSerializer Serializer { get; } protected readonly StoreOptions _options; private readonly IndexBuilder _indexBuilder; - private readonly Dictionary _writeStreams = new Dictionary(); private readonly PhisicalFileAccessorCachee _phisicalFileAccessor; protected PhisicalFileAccessorCachee PhisicalFileAccessorCachee => _phisicalFileAccessor; @@ -41,8 +44,8 @@ namespace ZeroLevel.Services.PartitionStorage.Partition Directory.CreateDirectory(_catalog); } _phisicalFileAccessor = fileAccessorCachee; - _indexBuilder = _options.Index.Enabled ? new IndexBuilder(_options.Index.StepType, _options.Index.StepValue, _catalog, fileAccessorCachee, Serializer) : null; Serializer = serializer; + _indexBuilder = _options.Index.Enabled ? new IndexBuilder(_options.Index.StepType, _options.Index.StepValue, _catalog, fileAccessorCachee, Serializer) : null; } #region IStorePartitionBase @@ -52,7 +55,7 @@ namespace ZeroLevel.Services.PartitionStorage.Partition public void Dispose() { CloseWriteStreams(); - Release(); + Release(); } #endregion @@ -99,11 +102,77 @@ namespace ZeroLevel.Services.PartitionStorage.Partition { s.Value.Stream.Flush(); s.Value.Dispose(); + s.Value.DisposeAsync(); } catch { } } _writeStreams.Clear(); } + + protected async Task WriteStreamAction(string fileName, Func writeAction) + { + MemoryStreamWriter writer; + if (_writeStreams.TryGetValue(fileName, out writer) == false) + { + await _writersLock.WaitAsync(); + try + { + if (_writeStreams.TryGetValue(fileName, out writer) == false) + { + var filePath = Path.Combine(_catalog, fileName); + var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); + var new_w = new MemoryStreamWriter(stream); + _writeStreams[fileName] = new_w; + writer = new_w; + } + } + finally + { + _writersLock.Release(); + } + } + await writeAction.Invoke(writer); + } + + protected async Task SafeWriteStreamAction(string fileName, Func writeAction) + { + MemoryStreamWriter writer; + if (_writeStreams.TryGetValue(fileName, out writer) == false) + { + await _writersLock.WaitAsync(); + try + { + if (_writeStreams.TryGetValue(fileName, out writer) == false) + { + var filePath = Path.Combine(_catalog, fileName); + var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024); + var new_w = new MemoryStreamWriter(stream); + _writeStreams[fileName] = new_w; + writer = new_w; + } + } + finally + { + _writersLock.Release(); + } + } + await writeAction.Invoke(writer); + /* + await writer.WaitLockAsync(); + try + { + await writeAction.Invoke(writer); + } + finally + { + writer.Release(); + }*/ + } + + + + + /* /// /// Attempting to open a file for writing /// @@ -145,6 +214,11 @@ namespace ZeroLevel.Services.PartitionStorage.Partition writer = null; return false; } + */ + + + + /// /// Attempting to open a file for reading /// diff --git a/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs index ddd8046..19410d3 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/CompactKeyStorePartitionBuilder.cs @@ -88,35 +88,24 @@ namespace ZeroLevel.Services.PartitionStorage.Partition #endregion #region Private methods + private async Task StoreDirect(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); - if (TryGetWriteStream(groupKey, out var stream)) + await WriteStreamAction(groupKey, async stream => { await Serializer.KeySerializer.Invoke(stream, key); await Serializer.InputSerializer.Invoke(stream, value); - } + }); } private async Task StoreDirectSafe(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); - bool lockTaken = false; - if (TryGetWriteStream(groupKey, out var stream)) + await SafeWriteStreamAction(groupKey, async stream => { - Monitor.Enter(stream, ref lockTaken); - try - { - await Serializer.KeySerializer.Invoke(stream, key); - await Serializer.InputSerializer.Invoke(stream, value); - } - finally - { - if (lockTaken) - { - Monitor.Exit(stream); - } - } - } + await Serializer.KeySerializer.Invoke(stream, key); + await Serializer.InputSerializer.Invoke(stream, value); + }); } internal async Task CompressFile(string file) diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index b36be88..32c8f49 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -95,46 +95,38 @@ namespace ZeroLevel.Services.PartitionStorage private async Task StoreDirect(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); - if (TryGetWriteStream(groupKey, out var stream)) + try { - await Serializer.KeySerializer.Invoke(stream, key); - Thread.MemoryBarrier(); - await Serializer.InputSerializer.Invoke(stream, value); + await WriteStreamAction(groupKey, async stream => + { + await Serializer.KeySerializer.Invoke(stream, key); + await Serializer.InputSerializer.Invoke(stream, value); + }); return true; } - else + catch (Exception ex) { - Log.SystemError($"Fault create write stream for key '{groupKey}'"); - } - return false; + Log.SystemError(ex, $"[StoreDirect] Fault use writeStream for key '{groupKey}'"); + return false; + } } private async Task StoreDirectSafe(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); - bool lockTaken = false; - if (TryGetWriteStream(groupKey, out var stream)) + try { - Monitor.Enter(stream, ref lockTaken); - try + await SafeWriteStreamAction(groupKey, async stream => { await Serializer.KeySerializer.Invoke(stream, key); - Thread.MemoryBarrier(); await Serializer.InputSerializer.Invoke(stream, value); - return true; - } - finally - { - if (lockTaken) - { - Monitor.Exit(stream); - } - } + }); + return true; } - else + catch(Exception ex) { - Log.SystemError($"Fault create write stream for key '{groupKey}'"); + Log.SystemError(ex, $"[StoreDirectSafe] Fault use writeStream for key '{groupKey}'"); + return false; } - return false; } internal async Task CompressFile(string file) diff --git a/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs b/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs index 78bd4bd..4e93967 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs @@ -6,6 +6,7 @@ using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Text; +using System.Threading; using System.Threading.Tasks; using ZeroLevel.Services.Extensions; @@ -463,6 +464,10 @@ namespace ZeroLevel.Services.Serialization public partial class MemoryStreamWriter : IAsyncBinaryWriter { + private SemaphoreSlim _writeLock = new SemaphoreSlim(1); + public async Task WaitLockAsync() => await _writeLock.WaitAsync(); + public void Release() => _writeLock.Release(); + /// /// Write char (2 bytes) /// @@ -663,10 +668,9 @@ namespace ZeroLevel.Services.Serialization } } - public async Task DisposeAsync() + public void DisposeAsync() { - await _stream.FlushAsync(); - await _stream.DisposeAsync(); + _writeLock.Dispose(); } #region Extension @@ -676,40 +680,51 @@ namespace ZeroLevel.Services.Serialization /// /// Increase writing by batches /// - private async Task OptimizedWriteCollectionByChunksAsync(IEnumerable collection, Action saveAction, int chunk_size) + private async Task OptimizedWriteCollectionByChunksAsync(IEnumerable collection, Action saveAction, Func asyncSaveAction, int chunk_size) { if (collection != null) { - MockCount(); - int count = 0; - if (_stream is MemoryStream) + if (_stream.CanSeek == false) { + WriteInt32(collection.Count()); foreach (var item in collection) { - saveAction.Invoke(this, item); - count++; + await asyncSaveAction.Invoke(this, item); } } else { - using (var ms = new MemoryStream()) + MockCount(); + int count = 0; + if (_stream is MemoryStream) { - using (var writer = new MemoryStreamWriter(ms)) + foreach (var item in collection) + { + saveAction.Invoke(this, item); + count++; + } + } + else + { + using (var ms = new MemoryStream()) { - foreach (var items in collection.Chunkify(chunk_size)) + using (var writer = new MemoryStreamWriter(ms)) { - foreach (var item in items) + foreach (var items in collection.Chunkify(chunk_size)) { - saveAction.Invoke(writer, item); - count++; + foreach (var item in items) + { + saveAction.Invoke(writer, item); + count++; + } + await WriteRawBytesAsyncNoLength(writer.Complete()); + writer.Stream.Position = 0; } - await WriteRawBytesAsyncNoLength(writer.Complete()); - writer.Stream.Position = 0; } } } + UpdateCount(count); } - UpdateCount(count); } else { @@ -759,71 +774,82 @@ namespace ZeroLevel.Services.Serialization } } - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteIP(i), BATCH_MEMORY_SIZE_LIMIT / 5); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteIP(i), (w, i) => w.WriteIPAsync(i), BATCH_MEMORY_SIZE_LIMIT / 5); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteIPEndpoint(i), BATCH_MEMORY_SIZE_LIMIT / 9); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteIPEndpoint(i), (w, i) => w.WriteIPEndpointAsync(i), BATCH_MEMORY_SIZE_LIMIT / 9); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteGuid(i), BATCH_MEMORY_SIZE_LIMIT / 16); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteGuid(i), (w, i) => w.WriteGuidAsync(i), BATCH_MEMORY_SIZE_LIMIT / 16); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDateTime(i), BATCH_MEMORY_SIZE_LIMIT / 9); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDateTime(i), (w, i) => w.WriteDateTimeAsync(i), BATCH_MEMORY_SIZE_LIMIT / 9); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDateTime(i), BATCH_MEMORY_SIZE_LIMIT / 9); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDateTime(i), (w, i) => w.WriteDateTimeAsync(i), BATCH_MEMORY_SIZE_LIMIT / 9); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteULong(i), BATCH_MEMORY_SIZE_LIMIT / 8); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteULong(i), (w, i) => w.WriteULongAsync(i), BATCH_MEMORY_SIZE_LIMIT / 8); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteUInt32(i), BATCH_MEMORY_SIZE_LIMIT / 4); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteUInt32(i), (w, i) => w.WriteUInt32Async(i), BATCH_MEMORY_SIZE_LIMIT / 4); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteChar(i), BATCH_MEMORY_SIZE_LIMIT / 2); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteChar(i), (w, i) => w.WriteCharAsync(i), BATCH_MEMORY_SIZE_LIMIT / 2); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteShort(i), BATCH_MEMORY_SIZE_LIMIT / 2); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteShort(i), (w, i) => w.WriteShortAsync(i), BATCH_MEMORY_SIZE_LIMIT / 2); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteUShort(i), BATCH_MEMORY_SIZE_LIMIT / 2); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteUShort(i), (w, i) => w.WriteUShortAsync(i), BATCH_MEMORY_SIZE_LIMIT / 2); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteLong(i), BATCH_MEMORY_SIZE_LIMIT / 8); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteLong(i), (w, i) => w.WriteLongAsync(i), BATCH_MEMORY_SIZE_LIMIT / 8); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteInt32(i), BATCH_MEMORY_SIZE_LIMIT / 4); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteInt32(i), (w, i) => w.WriteInt32Async(i), BATCH_MEMORY_SIZE_LIMIT / 4); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteFloat(i), BATCH_MEMORY_SIZE_LIMIT / 4); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteFloat(i), (w, i) => w.WriteFloatAsync(i), BATCH_MEMORY_SIZE_LIMIT / 4); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDouble(i), BATCH_MEMORY_SIZE_LIMIT / 8); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDouble(i), (w, i) => w.WriteDoubleAsync(i), BATCH_MEMORY_SIZE_LIMIT / 8); public async Task WriteCollectionAsync(IEnumerable collection) { if (collection != null) { - MockCount(); - - int count = 0; - if (_stream is MemoryStream) + if (_stream.CanSeek == false) { + WriteInt32(collection.Count()); foreach (var item in collection) { WriteBoolean(item); - count++; } } else { - var buffer = new byte[BATCH_MEMORY_SIZE_LIMIT]; - int index = 0; - foreach (var b in collection) + MockCount(); + + int count = 0; + if (_stream is MemoryStream) { - buffer[index] = b ? ONE : ZERO; - index++; - if (index == BATCH_MEMORY_SIZE_LIMIT) + foreach (var item in collection) { - await _stream.WriteAsync(buffer, 0, buffer.Length); - index = 0; + WriteBoolean(item); + count++; } - count++; } - if (index != 0) + else { - _stream.Write(buffer, 0, index); + var buffer = new byte[BATCH_MEMORY_SIZE_LIMIT]; + int index = 0; + foreach (var b in collection) + { + buffer[index] = b ? ONE : ZERO; + index++; + if (index == BATCH_MEMORY_SIZE_LIMIT) + { + await _stream.WriteAsync(buffer, 0, buffer.Length); + index = 0; + } + count++; + } + if (index != 0) + { + _stream.Write(buffer, 0, index); + } } - } - UpdateCount(count); + UpdateCount(count); + } } else { @@ -835,39 +861,49 @@ namespace ZeroLevel.Services.Serialization { if (collection != null) { - MockCount(); - - int count = 0; - if (_stream is MemoryStream) + if (_stream.CanSeek == false) { + WriteInt32(collection.Count()); foreach (var item in collection) { WriteByte(item); - count++; } } else { - var buffer = new byte[BATCH_MEMORY_SIZE_LIMIT]; - int index = 0; - foreach (var b in collection) + MockCount(); + int count = 0; + if (_stream is MemoryStream) { - buffer[index] = b; - index++; - if (index == BATCH_MEMORY_SIZE_LIMIT) + foreach (var item in collection) { - await _stream.WriteAsync(buffer, 0, buffer.Length); - index = 0; + WriteByte(item); + count++; } - count++; } - if (index != 0) + else { - _stream.Write(buffer, 0, index); + var buffer = new byte[BATCH_MEMORY_SIZE_LIMIT]; + int index = 0; + foreach (var b in collection) + { + buffer[index] = b; + index++; + if (index == BATCH_MEMORY_SIZE_LIMIT) + { + await _stream.WriteAsync(buffer, 0, buffer.Length); + index = 0; + } + count++; + } + if (index != 0) + { + _stream.Write(buffer, 0, index); + } } - } - UpdateCount(count); + UpdateCount(count); + } } else { @@ -879,26 +915,37 @@ namespace ZeroLevel.Services.Serialization { if (collection != null) { - MockCount(); - - int count = 0; - if (_stream is MemoryStream) + if (_stream.CanSeek == false) { + WriteInt32(collection.Count()); foreach (var item in collection) { WriteBytes(item); - count++; } } else { - foreach (var b in collection) + MockCount(); + + int count = 0; + if (_stream is MemoryStream) { - await WriteBytesAsync(b); - count++; + foreach (var item in collection) + { + WriteBytes(item); + count++; + } } + else + { + foreach (var b in collection) + { + await WriteBytesAsync(b); + count++; + } + } + UpdateCount(count); } - UpdateCount(count); } else { @@ -906,9 +953,9 @@ namespace ZeroLevel.Services.Serialization } } - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDecimal(i), BATCH_MEMORY_SIZE_LIMIT / 16); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteDecimal(i), (w, i) => w.WriteDecimalAsync(i), BATCH_MEMORY_SIZE_LIMIT / 16); - public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteTimeSpan(i), BATCH_MEMORY_SIZE_LIMIT / 16); + public async Task WriteCollectionAsync(IEnumerable collection) => await OptimizedWriteCollectionByChunksAsync(collection, (w, i) => w.WriteTimeSpan(i), (w, i) => w.WriteTimeSpanAsync(i), BATCH_MEMORY_SIZE_LIMIT / 16); #endregion #region Arrays diff --git a/ZeroLevel/Services/TokenEncryptor.cs b/ZeroLevel/Services/TokenEncryptor.cs index 51f3af1..44dad0a 100644 --- a/ZeroLevel/Services/TokenEncryptor.cs +++ b/ZeroLevel/Services/TokenEncryptor.cs @@ -22,7 +22,7 @@ namespace ZeroLevel.Services public byte[] Encrypt(byte[] data) { - using (Aes aes = new AesManaged()) + using (Aes aes = AesManaged.Create()) { aes.Padding = PaddingMode.PKCS7; aes.KeySize = AesKeySizeInBits; @@ -44,7 +44,7 @@ namespace ZeroLevel.Services public byte[] Decrypt(byte[] data) { - using (Aes aes = new AesManaged()) + using (Aes aes = AesManaged.Create()) { aes.Padding = PaddingMode.PKCS7; aes.KeySize = AesKeySizeInBits; diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index b23f9bd..1a755a6 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -6,16 +6,16 @@ ogoun ogoun - 3.4.0.5 - KVDB + 3.4.0.6 + KVDB fixes https://github.com/ogoun/Zero/wiki Copyright Ogoun 2023 https://github.com/ogoun/Zero git - 3.4.0.5 - 3.4.0.5 + 3.4.0.6 + 3.4.0.6 AnyCPU;x64;x86 zero.png full