diff --git a/ConnectionTest/Client/Program.cs b/ConnectionTest/Client/Program.cs index 274d434..7a2aedc 100644 --- a/ConnectionTest/Client/Program.cs +++ b/ConnectionTest/Client/Program.cs @@ -2,21 +2,72 @@ using System.Net; using System.Threading; using ZeroLevel; +using ZeroLevel.Network; +using ZeroLevel.Services.HashFunctions; +using ZeroLevel.Services.Serialization; namespace Client { + public class Info + : IBinarySerializable + { + public uint Id; + public uint Length; + public uint Checksum; + + public void Deserialize(IBinaryReader reader) + { + this.Id = reader.ReadUInt32(); + this.Length = reader.ReadUInt32(); + this.Checksum = reader.ReadUInt32(); + } + + public void Serialize(IBinaryWriter writer) + { + writer.WriteUInt32(this.Id); + writer.WriteUInt32(this.Length); + writer.WriteUInt32(this.Checksum); + } + } + + public class Fragment + : IBinarySerializable + { + public uint Id; + public uint Offset; + public uint Checksum; + public byte[] Payload; + + public void Deserialize(IBinaryReader reader) + { + this.Id = reader.ReadUInt32(); + this.Offset = reader.ReadUInt32(); + this.Checksum = reader.ReadUInt32(); + this.Payload = reader.ReadBytes(); + } + + public void Serialize(IBinaryWriter writer) + { + writer.WriteUInt32(this.Id); + writer.WriteUInt32(this.Offset); + writer.WriteUInt32(this.Checksum); + writer.WriteBytes(this.Payload); + } + } + class Program { + private readonly static XXHashUnsafe _hash = new XXHashUnsafe(667); + static void Main(string[] args) { Log.AddConsoleLogger(); var ex = Bootstrap.CreateExchange(); - var address = ReadIP(); var port = ReadPort(); var client = ex.GetConnection(new IPEndPoint(address, port)); - Console.WriteLine("Esc - exit\r\nEnter - recreate connection\r\nSpace - send request"); - long index = 0; + + uint index = 0; while (true) { if (Console.KeyAvailable) @@ -26,60 +77,124 @@ namespace Client case ConsoleKey.Escape: client?.Dispose(); return; - case ConsoleKey.Enter: - address = ReadIP(); - port = ReadPort(); - try - { - if (client != null) - { - client.Dispose(); - } - - client = ex.GetConnection(new IPEndPoint(address, port)); - } - catch (Exception exc) - { - Log.Error(exc, "Fault recreate connection"); - } - break; - case ConsoleKey.Spacebar: - Log.Info("Send request"); - if (client == null) - { - client = ex.GetConnection(new IPEndPoint(address, port)); - if (client == null) - { - Log.Info("No connection"); - } - continue; - } - if (false == client.Request("time", "Time reqeust", s => { Log.Info($"Got time response '{s}'"); })) - { - Log.Warning("Send time request fault"); - } - break; } } - Thread.Sleep(100); + if (index % 2 == 0) + { + SendDataEqParts(client, index, 1024 * 1024 + index * 3 + 1); + } + else + { + SendDataDiffParts(client, index, 1024 * 1024 + index * 3 + 1); + } index++; - if (index % 50 == 0) + } + } + + static void SendDataDiffParts(IClient client, uint id, uint length) + { + var payload = GetByteArray(length); + var full_checksum = _hash.Hash(payload); + var info = new Info { Checksum = full_checksum, Id = id, Length = length }; + if (client.Request("start", info, res => + { + Log.Info($"Success start sending packet '{id}'"); + })) + { + uint size = 1; + uint offset = 0; + while (offset < payload.Length) { - if (client == null) + var fragment = GetFragment(id, payload, offset, size); + if (!client.Request("part", fragment, res => { - client = ex.GetConnection(new IPEndPoint(address, port)); - if (client == null) + if (!res) { - Log.Info("No connection"); + Log.Info($"Fault server incoming packet '{id}' fragment. Offset: '{offset}'. Size: '{size}' bytes."); } - continue; + })) + { + Log.Warning($"Can't start send packet '{id}' fragment. Offset: '{offset}'. Size: '{size}' bytes. No connection"); } - if (false == client.Request("whois", "Whois reqeust", s => { Log.Info($"Got whois response '{s}'"); })) + offset += size; + size += 1; + } + client.Send("complete", id); + } + else + { + Log.Warning($"Can't start send packet '{id}'. No connection"); + } + } + + static void SendDataEqParts(IClient client, uint id, uint length) + { + var payload = GetByteArray(length); + var full_checksum = _hash.Hash(payload); + var info = new Info { Checksum = full_checksum, Id = id, Length = length }; + if (client.Request("start", info, res => + { + if (res) + { + Log.Info($"Success start sending packet '{id}'"); + } + else + { + Log.Info($"Fault server start incoming packet '{id}'"); + } + })) + { + uint size = 4096; + uint offset = 0; + + while (offset < payload.Length) + { + var fragment = GetFragment(id, payload, offset, size); + if (!client.Request("part", fragment, res => + { + if (!res) + { + Log.Info($"Fault server incoming packet '{id}' fragment. Offset: '{offset}'. Size: '{size}' bytes."); + } + })) { - Log.Warning("Send whois request fault"); + Log.Warning($"Can't start send packet '{id}' fragment. Offset: '{offset}'. Size: '{size}' bytes. No connection"); } + offset += size; } + client.Send("complete", id); } + else + { + Log.Warning($"Can't start send packet '{id}'. No connection"); + } + } + + private static Fragment GetFragment(uint id, byte[] data, uint offset, uint size) + { + int diff = (int)(-(data.Length - (offset + size))); + if (diff > 0) + { + size -= (uint)diff; + } + var payload = new byte[size]; + Array.Copy(data, offset, payload, 0, size); + var ch = _hash.Hash(payload); + return new Fragment + { + Id = id, + Checksum = ch, + Offset = offset, + Payload = payload + }; + } + + private static byte[] GetByteArray(uint size) + { + Random rnd = new Random(); + Byte[] b = new Byte[size]; + rnd.NextBytes(b); + return b; } static IPAddress ReadIP() diff --git a/ConnectionTest/Server/Program.cs b/ConnectionTest/Server/Program.cs index 571db6d..fe98e89 100644 --- a/ConnectionTest/Server/Program.cs +++ b/ConnectionTest/Server/Program.cs @@ -1,26 +1,152 @@ using System; +using System.Collections.Generic; using ZeroLevel; +using ZeroLevel.Services.HashFunctions; +using ZeroLevel.Services.Serialization; namespace Server { + public class Info + : IBinarySerializable + { + public uint Id; + public uint Length; + public uint Checksum; + + public void Deserialize(IBinaryReader reader) + { + this.Id = reader.ReadUInt32(); + this.Length = reader.ReadUInt32(); + this.Checksum = reader.ReadUInt32(); + } + + public void Serialize(IBinaryWriter writer) + { + writer.WriteUInt32(this.Id); + writer.WriteUInt32(this.Length); + writer.WriteUInt32(this.Checksum); + } + } + + public class Fragment + : IBinarySerializable + { + public uint Id; + public uint Offset; + public uint Checksum; + public byte[] Payload; + + public void Deserialize(IBinaryReader reader) + { + this.Id = reader.ReadUInt32(); + this.Offset = reader.ReadUInt32(); + this.Checksum = reader.ReadUInt32(); + this.Payload = reader.ReadBytes(); + } + + public void Serialize(IBinaryWriter writer) + { + writer.WriteUInt32(this.Id); + writer.WriteUInt32(this.Offset); + writer.WriteUInt32(this.Checksum); + writer.WriteBytes(this.Payload); + } + } + + public class Data + { + public uint Checksum; + public uint Length; + public uint ActualLength; + public byte[] Payload; + } + class Program { + private readonly static Dictionary _incoming = new Dictionary(); + private readonly static XXHashUnsafe _hash = new XXHashUnsafe(667); + static void Main(string[] args) { - Log.AddConsoleLogger(); + Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.Warning); var ex = Bootstrap.CreateExchange(); - var port = ReadPort(); - var server = ex.UseHost(port); + server.RegisterInbox("time", (c, s) => { Log.Info($"Request time: [{s}]"); return DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"); }); server.RegisterInbox("whois", (c, s) => { Log.Info($"Request whois: [{s}]"); return $"[{Environment.MachineName}] {Environment.UserDomainName}\\{Environment.UserName}"; }); + server.RegisterInbox("start", (c, i) => + { + Start(i); + return true; + }); + + server.RegisterInbox("part", (c, p) => + { + return WriteFragment(p); + }); + + server.RegisterInbox("complete", (c, id) => Complete(id)); + + Log.Warning("Started"); + server.OnConnect += Server_OnConnect; server.OnDisconnect += Server_OnDisconnect; Console.ReadKey(); } + private static void Start(Info info) + { + var data = new Data + { + Checksum = info.Checksum, + ActualLength = 0, + Length = info.Length, + Payload = new byte[info.Length] + }; + _incoming.Add(info.Id, data); + Log.Info($"Start incoming data id '{info.Id}'. {info.Length} bytes. Checksum: {info.Checksum}"); + } + + private static bool WriteFragment(Fragment fragment) + { + var checksum = _hash.Hash(fragment.Payload); + if (checksum != fragment.Checksum) + { + Log.Warning($"[WriteFragment] Wrong checksum (checksum: {checksum} expected: {fragment.Checksum})! ID: '{fragment.Id}'. Offset: '{fragment.Offset}'. Length: '{fragment.Payload.Length}' bytes."); + return false; + } + if (!_incoming.ContainsKey(fragment.Id)) + { + Log.Warning($"[WriteFragment] Data ID: '{fragment.Id}' not found. Offset: '{fragment.Offset}'. Length: '{fragment.Payload.Length}' bytes."); + return false; + } + Array.Copy(fragment.Payload, 0, _incoming[fragment.Id].Payload, fragment.Offset, fragment.Payload.Length); + return true; + } + + private static void Complete(uint id) + { + if (_incoming.ContainsKey(id)) + { + var checksum = _hash.Hash(_incoming[id].Payload); + if (checksum != _incoming[id].Checksum) + { + Log.Warning($"[Complete] Wrong checksum (checksum: {checksum} expected: {_incoming[id].Checksum})! ID: '{id}'"); + } + else + { + Log.Info($"Data '{id}' successfully received"); + } + _incoming.Remove(id); + } + else + { + Log.Warning($"[Complete] Data ID '{id}' not found"); + } + } + private static void Server_OnDisconnect(ZeroLevel.Network.ISocketClient obj) { Log.Info($"Client disconnected: {obj.Endpoint.Address}:{obj.Endpoint.Port}"); diff --git a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs index bcc6742..968fc88 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs @@ -749,16 +749,7 @@ namespace ZeroLevel.Services.Serialization public byte[] ReadByteArray() { - int count = ReadInt32(); - var array = new byte[count]; - if (count > 0) - { - for (int i = 0; i < count; i++) - { - array[i] = ReadByte(); - } - } - return array; + return ReadBytes(); } public byte[][] ReadByteArrayArray()