pull/4/head
Ogoun 2 years ago
parent 91784e256f
commit d81cf3109a

@ -37,6 +37,7 @@ namespace PartitionFileStorageTest
var store = new Store<ulong, ulong, byte[], Metadata>(options, new StoreSerializers<ulong, ulong, byte[]>(
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteBytesAsync(n),
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<byte[]>(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult<byte[]>(false, new byte[0]); } }));
@ -89,6 +90,7 @@ namespace PartitionFileStorageTest
var store = new Store<ulong, ulong, byte[], Metadata>(options, new StoreSerializers<ulong, ulong, byte[]>(
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteBytesAsync(n),
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<byte[]>(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult<byte[]>(false, new byte[0]); } }));
@ -252,6 +254,7 @@ namespace PartitionFileStorageTest
var store = new Store<ulong, ulong, byte[], Metadata>(options, new StoreSerializers<ulong, ulong, byte[]>(
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteBytesAsync(n),
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<byte[]>(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult<byte[]>(false, new byte[0]); } }));
@ -408,6 +411,7 @@ namespace PartitionFileStorageTest
var serializer = new StoreSerializers<ulong, ulong, byte[]>(
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteBytesAsync(n),
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<byte[]>(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult<byte[]>(false, new byte[0]); } });
@ -471,6 +475,7 @@ namespace PartitionFileStorageTest
var serializer = new StoreSerializers<ulong, ulong, byte[]>(
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteBytesAsync(n),
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<byte[]>(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult<byte[]>(false, new byte[0]); } });
@ -530,6 +535,7 @@ namespace PartitionFileStorageTest
var store = new Store<string, ulong, byte[], StoreMetadata>(options, new StoreSerializers<string, ulong, byte[]>(
async (w, n) => await w.WriteStringAsync(n),
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteBytesAsync(n),
async (r) => { try { return new DeserializeResult<string>(true, await r.ReadStringAsync()); } catch { return new DeserializeResult<string>(false, string.Empty); } },
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<byte[]>(true, await r.ReadBytesAsync()); } catch { return new DeserializeResult<byte[]>(false, new byte[0]); } }));

@ -1,44 +0,0 @@
using SixLabors.ImageSharp;
using SixLabors.ImageSharp.PixelFormats;
using System.Collections.Generic;
using System.Linq;
using ZeroLevel.NN.Architectures.YoloV5;
using ZeroLevel.NN.Models;
using ZeroLevel.NN.Services;
namespace TestApp
{
public class PersonDetector
{
private const string MODEL_PATH = @"nnmodels/Yolo5S/yolov5s327e.onnx";
private readonly Yolov5Detector _detector;
private float _threshold = 0.17f;
public PersonDetector()
{
_detector = new Yolov5Detector(MODEL_PATH, gpu: false);
}
public IEnumerable<YoloPrediction> Detect(string imagePath)
{
using (Image<Rgb24> image = Image.Load<Rgb24>(imagePath))
{
var t_predictions = _detector.PredictMultiply(image, true, _threshold);
t_predictions.Apply(p =>
{
p.Cx /= image.Width;
p.Cy /= image.Height;
});
if (t_predictions != null)
{
t_predictions.RemoveAll(p => (p.W * image.Width) < 10.0f || (p.H * image.Height) < 10.0f);
}
if (t_predictions.Count > 0)
{
NMS.Apply(t_predictions);
return t_predictions;
}
}
return Enumerable.Empty<YoloPrediction>();
}
}
}

@ -1,38 +1,82 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Reflection;
using ZeroLevel.Logging;
using ZeroLevel.Services.Invokation;
using ZeroLevel.Services.ObjectMapping;
using ZeroLevel.Services.Serialization;
namespace TestApp
{
internal static class Program
public record LogMessage<T>(LogLevel Level, T Message);
internal interface ILogMessageBuffer<T>
: IDisposable
{
long Count();
void Push(LogLevel level, T message);
LogMessage<T> Take();
}
internal sealed class NoLimitedLogMessageBuffer<T>
: ILogMessageBuffer<T>
{
private class Wrapper
private readonly BlockingCollection<LogMessage<T>> _messageQueue =
new BlockingCollection<LogMessage<T>>();
private bool _isDisposed = false;
public long Count()
{
public string ReadId;
public string WriteId;
public IInvokeWrapper Invoker;
if (_messageQueue.IsCompleted)
return 0;
return _messageQueue.Count;
}
public T Read<T>(IBinaryReader reader)
public void Dispose()
{
if (!_isDisposed)
{
return (T)Invoker.Invoke(reader, ReadId);
_isDisposed = true;
_messageQueue.Dispose();
}
}
public void Push(LogLevel level, T message)
{
if (_isDisposed) return;
_messageQueue.Add(new LogMessage<T>(level, message));
}
public LogMessage<T> Take()
{
return _messageQueue.Take();
}
}
internal static class Program
{
private class LogQueueWrapper
{
private string TakeMethod;
private string PushMethod;
private object Target;
public IInvokeWrapper Invoker;
public object ReadObject(IBinaryReader reader)
public LogMessage<T> Take<T>()
{
return Invoker.Invoke(reader, ReadId);
return (LogMessage<T>)Invoker.Invoke(Target, TakeMethod);
}
public void Write<T>(IBinaryWriter writer, T value)
public void Push<T>(LogLevel level, LogMessage<T> value)
{
Invoker.Invoke(writer, WriteId, new object[] { value });
Invoker.Invoke(Target, PushMethod, new object[] { level, value });
}
public void WriteObject(IBinaryWriter writer, object value)
public static LogQueueWrapper Create<T>(object target)
{
Invoker.Invoke(writer, WriteId, new object[] { value });
var wrapper = new LogQueueWrapper { Invoker = InvokeWrapper.Create(), Target = target };
wrapper.PushMethod = wrapper.Invoker.ConfigureGeneric<NoLimitedLogMessageBuffer>(typeof(T), "Push").First();
wrapper.TakeMethod = wrapper.Invoker.ConfigureGeneric<NoLimitedLogMessageBuffer>(typeof(T), "Take").First();
return wrapper;
}
}

@ -8,20 +8,13 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\ZeroLevel.NN\ZeroLevel.NN.csproj" />
<ProjectReference Include="..\ZeroLevel\ZeroLevel.csproj" />
<PackageReference Include="ZeroLevel" Version="3.3.9.9" />
</ItemGroup>
<ItemGroup>
<None Update="config.ini">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="nnmodels\Yolo5S\yolov5s327e.onnx">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

@ -0,0 +1,188 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.PartitionStorage;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.UnitTests
{
public sealed class Metadata
{
public string Date { get; set; }
public string Time { get; set; }
}
public sealed class TextData
: IBinarySerializable, IAsyncBinarySerializable
{
public string Title { get; set; }
public string Text { get; set; }
public void Deserialize(IBinaryReader reader)
{
this.Title = reader.ReadString();
this.Text = reader.ReadString();
}
public async Task DeserializeAsync(IAsyncBinaryReader reader)
{
this.Title = await reader.ReadStringAsync();
this.Text = await reader.ReadStringAsync();
}
public void Serialize(IBinaryWriter writer)
{
writer.WriteString(this.Title);
writer.WriteString(this.Text);
}
public async Task SerializeAsync(IAsyncBinaryWriter writer)
{
await writer.WriteStringAsync(this.Title);
await writer.WriteStringAsync(this.Text);
}
}
public class FSDBOptions
: IDisposable
{
public StoreOptions<ulong, TextData, TextData[], Metadata> Options { get; private set; }
public StoreSerializers<ulong, TextData, TextData[]> Serializers { get; private set; }
public FSDBOptions()
{
var root = @"H:\temp";
FSUtils.CleanAndTestFolder(root);
// user id, post
Options = new StoreOptions<ulong, TextData, TextData[], Metadata>
{
Index = new IndexOptions
{
Enabled = true,
StepType = IndexStepType.Step,
StepValue = 32,
EnableIndexInMemoryCachee = true
},
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
{
if (list == null || list.Any() == false)
{
return new TextData[0];
}
return list.GroupBy(i => i.Title).Select(pair => new TextData { Title = pair.Key, Text = string.Join(null, pair.Select(p => p.Text)) }).ToArray();
},
Partitions = new List<StoreCatalogPartition<Metadata>>
{
new StoreCatalogPartition<Metadata>("Date", m => m.Date),
new StoreCatalogPartition<Metadata>("Time", m => m.Time),
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
ThreadSafeWriting = true,
MaxDegreeOfParallelism = 16
};
Serializers = new StoreSerializers<ulong, TextData, TextData[]>(
async (w, n) => await w.WriteULongAsync(n),
async (w, n) => await w.WriteAsync(n),
async (w, n) => await w.WriteArrayAsync(n),
async (r) => { try { return new DeserializeResult<ulong>(true, await r.ReadULongAsync()); } catch { return new DeserializeResult<ulong>(false, 0); } },
async (r) => { try { return new DeserializeResult<TextData>(true, await r.ReadAsync<TextData>()); } catch { return new DeserializeResult<TextData>(false, null); } },
async (r) => { try { return new DeserializeResult<TextData[]>(true, await r.ReadArrayAsync<TextData>()); } catch { return new DeserializeResult<TextData[]>(false, new TextData[0]); } });
}
public void Dispose()
{
}
}
public class PartitionStorageTests
: IClassFixture<FSDBOptions>
{
private readonly FSDBOptions _options;
public PartitionStorageTests(FSDBOptions options)
{
_options = options;
}
[Fact]
public async Task FastFSDBTest()
{
var r = new Random(Environment.TickCount);
var store = new Store<ulong, TextData, TextData[], Metadata>(_options.Options, _options.Serializers);
// Arrange
var numbers = new ulong[] { 86438 * 128, 83439 * 128, 131238 * 128 };
var texts = new TextData[9]
{
new TextData { Title = "Title1", Text = "00" }, new TextData { Title = "Title2", Text = "01" }, new TextData { Title = "Title3", Text = "02" },
new TextData { Title = "Title1", Text = "10" }, new TextData { Title = "Title2", Text = "11" }, new TextData { Title = "Title3", Text = "12" },
new TextData { Title = "Title1", Text = "20" }, new TextData { Title = "Title2", Text = "21" }, new TextData { Title = "Title3", Text = "22" }
};
var testValues = new Dictionary<ulong, HashSet<string>>
{
{ numbers[0], new HashSet<string> { "0010", "01" } },
{ numbers[1], new HashSet<string> { "021222" } },
{ numbers[2], new HashSet<string> { "1121", "20" } }
};
Console.WriteLine("Small test start");
// Act
using (var storePart = store.CreateBuilder(new Metadata { Date = "20230720", Time = "15:00:00" }))
{
await storePart.Store(numbers[0], texts[0]); // 1 - 00
await storePart.Store(numbers[0], texts[3]); // 1 - 10
await storePart.Store(numbers[0], texts[1]); // 2 - 01
await storePart.Store(numbers[1], texts[2]); // 3 - 02
await storePart.Store(numbers[1], texts[5]); // 3 - 12
await storePart.Store(numbers[1], texts[8]); // 3 - 22
await storePart.Store(numbers[2], texts[4]); // 2 - 11
await storePart.Store(numbers[2], texts[6]); // 1 - 20
await storePart.Store(numbers[2], texts[7]); // 2 - 21
storePart.CompleteAdding();
await storePart.Compress();
}
// Assert
using (var readPart = store.CreateAccessor(new Metadata { Date = "20230720", Time = "15:00:00" }))
{
foreach (var number in numbers)
{
var result = await readPart.Find(number);
if (result.Success)
{
foreach (var td in result.Value)
{
Assert.Contains(td.Text, testValues[number]);
}
}
}
}
}
/*
[Fact]
public void IndexNoIndexFSDBTest()
{
}
[Fact]
public void StressFSDBTest()
{
}*/
}
}

@ -10,6 +10,8 @@ namespace ZeroLevel.Services.PartitionStorage.Interfaces
Func<MemoryStreamWriter, TInput, Task> InputSerializer { get; }
Func<MemoryStreamWriter, TValue, Task> ValueSerializer { get; }
Func<MemoryStreamReader, Task<DeserializeResult<TKey>>> KeyDeserializer { get; }
Func<MemoryStreamReader, Task<DeserializeResult<TInput>>> InputDeserializer { get; }

@ -58,7 +58,12 @@ namespace ZeroLevel.Services.PartitionStorage
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 0)
{
await Parallel.ForEachAsync(files, async(file, _) => await CompressFile(file));
foreach (var file in files)
{
await CompressFile(file);
}
//await Parallel.ForEachAsync(files, async(file, _) => await CompressFile(file));
}
}
public async IAsyncEnumerable<SearchResult<TKey, TInput>> Iterate()
@ -187,9 +192,9 @@ namespace ZeroLevel.Services.PartitionStorage
foreach (var pair in dict.OrderBy(p => p.Key))
{
var v = _options.MergeFunction(pair.Value);
writer.SerializeCompatible(pair.Key);
await Serializer.KeySerializer.Invoke(writer, pair.Key);
Thread.MemoryBarrier();
writer.SerializeCompatible(v);
await Serializer.ValueSerializer.Invoke(writer, v);
}
}
File.Delete(file);

@ -14,18 +14,21 @@ namespace ZeroLevel.Services.PartitionStorage
{
private readonly Func<MemoryStreamWriter, TKey, Task> _keySerializer;
private readonly Func<MemoryStreamWriter, TInput, Task> _inputSerializer;
private readonly Func<MemoryStreamWriter, TValue, Task> _valueSerializer;
private readonly Func<MemoryStreamReader, Task<DeserializeResult<TKey>>> _keyDeserializer;
private readonly Func<MemoryStreamReader, Task<DeserializeResult<TInput>>> _inputDeserializer;
private readonly Func<MemoryStreamReader, Task<DeserializeResult<TValue>>> _valueDeserializer;
public StoreSerializers(Func<MemoryStreamWriter, TKey, Task> keySerializer,
Func<MemoryStreamWriter, TInput, Task> inputSerializer,
Func<MemoryStreamWriter, TValue, Task> valueSerializer,
Func<MemoryStreamReader, Task<DeserializeResult<TKey>>> keyDeserializer,
Func<MemoryStreamReader, Task<DeserializeResult<TInput>>> inputDeserializer,
Func<MemoryStreamReader, Task<DeserializeResult<TValue>>> valueDeserializer)
{
_keySerializer = keySerializer;
_inputSerializer = inputSerializer;
_valueSerializer = valueSerializer;
_keyDeserializer = keyDeserializer;
_inputDeserializer = inputDeserializer;
_valueDeserializer = valueDeserializer;
@ -35,6 +38,8 @@ namespace ZeroLevel.Services.PartitionStorage
public Func<MemoryStreamWriter, TInput, Task> InputSerializer => _inputSerializer;
public Func<MemoryStreamWriter, TValue, Task> ValueSerializer => _valueSerializer;
public Func<MemoryStreamReader, Task<DeserializeResult<TKey>>> KeyDeserializer => _keyDeserializer;
public Func<MemoryStreamReader, Task<DeserializeResult<TInput>>> InputDeserializer => _inputDeserializer;

@ -136,7 +136,7 @@ namespace ZeroLevel.Services.Serialization
void SetPosition(long position);
}
public interface IBinaryReaderAsync
public interface IAsyncBinaryReader
: IDisposable
{
Task<bool> ReadBooleanAsync();

@ -11,8 +11,8 @@ namespace ZeroLevel.Services.Serialization
public interface IAsyncBinarySerializable
{
Task SerializeAsync(IBinaryWriter writer);
Task SerializeAsync(IAsyncBinaryWriter writer);
Task DeserializeAsync(IBinaryReader reader);
Task DeserializeAsync(IAsyncBinaryReader reader);
}
}

@ -589,7 +589,7 @@ namespace ZeroLevel.Services.Serialization
}
public partial class MemoryStreamReader
: IBinaryReaderAsync
: IAsyncBinaryReader
{
/// <summary>
/// Reading byte-package (read the size of the specified number of bytes, and then the packet itself read size)

@ -337,14 +337,18 @@ namespace ZeroLevel.Services.Serialization
public void WriteArray<T>(T[] array)
where T : IBinarySerializable
{
WriteInt32(array?.Length ?? 0);
if (array != null)
{
WriteInt32(array.Length);
for (int i = 0; i < array.Length; i++)
{
array[i].Serialize(this);
}
}
else
{
WriteInt32(0);
}
}
public void WriteArray<T>(T[] array, Action<T> writeAction)

@ -239,7 +239,7 @@ namespace ZeroLevel.Services.Serialization
}
return PrimitiveTypeSerializer.Deserialize<T>(reader);
}
public static async Task<T> DeserializeCompatibleAsync<T>(IBinaryReader reader)
public static async Task<T> DeserializeCompatibleAsync<T>(IAsyncBinaryReader reader)
{
if (typeof(IAsyncBinarySerializable).IsAssignableFrom(typeof(T)))
{
@ -247,7 +247,7 @@ namespace ZeroLevel.Services.Serialization
await direct.DeserializeAsync(reader);
return (T)direct;
}
return PrimitiveTypeSerializer.Deserialize<T>(reader);
return PrimitiveTypeSerializer.Deserialize<T>(reader as IBinaryReader);
}
public static object DeserializeCompatible(Type type, byte[] data)

Loading…
Cancel
Save

Powered by TurnKey Linux.