From 902ae03885162fd84f626c1d466f0adfb314084f Mon Sep 17 00:00:00 2001 From: "a.bozhenov" Date: Fri, 12 Jul 2019 16:03:36 +0300 Subject: [PATCH] Fix & update Fix file transfer Added serialization for short, ushort, uint32, uint64 --- .../Network/FileTransfer/FileReader.cs | 5 +++ .../Network/FileTransfer/FileReceiver.cs | 10 ++++-- .../Network/FileTransfer/FileSender.cs | 35 ++++++++++++++++--- .../Network/FileTransfer/FileWriter.cs | 11 ++++++ .../Network/FileTransfer/Model/FileFrame.cs | 8 +++++ ZeroLevel/Services/Network/SocketServer.cs | 2 +- .../Network/Utils/ExClientServerCachee.cs | 15 +------- .../Services/Serialization/IBinaryReader.cs | 8 +++++ .../Services/Serialization/IBinaryWriter.cs | 8 +++++ .../Serialization/MemoryStreamReader.cs | 24 +++++++++++++ .../Serialization/MemoryStreamWriter.cs | 23 ++++++++++++ 11 files changed, 127 insertions(+), 22 deletions(-) diff --git a/ZeroLevel/Services/Network/FileTransfer/FileReader.cs b/ZeroLevel/Services/Network/FileTransfer/FileReader.cs index aee7a80..1e746e2 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileReader.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileReader.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.IO; +using ZeroLevel.Services.HashFunctions; namespace ZeroLevel.Network.FileTransfer { @@ -37,6 +38,10 @@ namespace ZeroLevel.Network.FileTransfer Payload = new byte[bytesRead] }; Array.Copy(buffer, 0, fragment.Payload, 0, bytesRead); + var hash = Murmur3.ComputeHash(fragment.Payload); + fragment.ChecksumL = BitConverter.ToUInt64(hash, 0); + fragment.ChecksumH = BitConverter.ToUInt64(hash, 8); + offset = offset + 1; yield return fragment; } diff --git a/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs b/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs index e09cd36..db669be 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs @@ -1,4 +1,5 @@ using System; +using ZeroLevel.Models; namespace ZeroLevel.Network.FileTransfer { @@ -16,15 +17,18 @@ namespace ZeroLevel.Network.FileTransfer if (false == router.ContainsRequestorInbox("__file_transfer_start_transfer__")) { - router.RegisterInbox("__file_transfer_start_transfer__", (c, f) => _receiver.Incoming(f, nameMapper(c))); + router.RegisterInbox("__file_transfer_start_transfer__", + (c, f) => _receiver.Incoming(f, nameMapper(c))); } if (false == router.ContainsRequestorInbox("__file_transfer_frame__")) { - router.RegisterInbox("__file_transfer_frame__", (_, f) => _receiver.Incoming(f)); + router.RegisterInbox("__file_transfer_frame__", + (_, f) => _receiver.Incoming(f)); } if (false == router.ContainsRequestorInbox("__file_transfer_complete_transfer__")) { - router.RegisterInbox("__file_transfer_complete_transfer__", (_, f) => _receiver.Incoming(f)); + router.RegisterInbox("__file_transfer_complete_transfer__", + (_, f) => _receiver.Incoming(f)); } if (false == router.ContainsRequestorInbox("__file_transfer_ping__")) { diff --git a/ZeroLevel/Services/Network/FileTransfer/FileSender.cs b/ZeroLevel/Services/Network/FileTransfer/FileSender.cs index 91cf35f..4b2af64 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileSender.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileSender.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.IO; using System.Threading; +using ZeroLevel.Models; using ZeroLevel.Services.Pools; namespace ZeroLevel.Network.FileTransfer @@ -11,6 +12,11 @@ namespace ZeroLevel.Network.FileTransfer private BlockingCollection _tasks = new BlockingCollection(); private ObjectPool _taskPool = new ObjectPool(() => new FileTransferTask(), 100); private readonly Thread _uploadFileThread; + private bool _resendWhenServerError = false; + private bool _resendWhenClientError = false; + + public void ResendWhenServerError(bool resend = true) => _resendWhenServerError = resend; + public void ResendWhenClientError(bool resend = true) => _resendWhenClientError = resend; public FileSender() { @@ -78,23 +84,44 @@ namespace ZeroLevel.Network.FileTransfer return connected; } + private static bool Send(ExClient client, string inbox, T frame, + bool resendWhenConnectionError, bool resendWhenServerError) + { + bool sended = false; + var handle = new Action(ir => + { + if (ir.Success == false && resendWhenServerError) + { + Send(client, inbox, frame, resendWhenConnectionError, false); + } + }); + sended = client.Request(inbox, frame, handle).Success; + if (sended == false && resendWhenConnectionError) + { + sended = client.Request(inbox, frame, handle).Success; + } + return sended; + } + internal void ExecuteSendFile(FileReader reader, FileTransferTask task) { Log.Info($"Start upload file {reader.Path}"); var startinfo = reader.GetStartInfo(); - if (false == task.Client.Send("__file_transfer_start_transfer__", startinfo).Success) + + if (!Send(task.Client, "__file_transfer_start_transfer__", startinfo, _resendWhenClientError, _resendWhenServerError)) { + Log.Debug($"Upload file {reader.Path} interrupted"); return; } foreach (var chunk in reader.Read()) { - if (task.Client.Send("__file_transfer_frame__", chunk).Success == false) + if (!Send(task.Client, "__file_transfer_frame__", chunk, _resendWhenClientError, _resendWhenServerError)) { + Log.Debug($"Upload file {reader.Path} interrupted"); return; } } - task.Client.Send("__file_transfer_complete_transfer__", reader.GetCompleteInfo()); - Log.Debug($"Stop upload file {reader.Path}"); + Send(task.Client, "__file_transfer_complete_transfer__", reader.GetCompleteInfo(), _resendWhenClientError, _resendWhenServerError); } } } diff --git a/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs b/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs index cc21479..91e0f9b 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.IO; using ZeroLevel.Models; +using ZeroLevel.Services.HashFunctions; namespace ZeroLevel.Network.FileTransfer { @@ -108,6 +109,16 @@ namespace ZeroLevel.Network.FileTransfer _FileWriter stream; if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream)) { + + var hash = Murmur3.ComputeHash(chunk.Payload); + var checksumL = BitConverter.ToUInt64(hash, 0); + var checksumH = BitConverter.ToUInt64(hash, 8); + + if (chunk.ChecksumH != checksumH + || chunk.ChecksumL != checksumL) + return InvokeResult.Fault("Checksum incorrect"); + + stream.Write(chunk.Offset, chunk.Payload); return InvokeResult.Succeeding(); } diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs b/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs index 3233d43..e6c477a 100644 --- a/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs +++ b/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs @@ -10,12 +10,17 @@ namespace ZeroLevel.Network.FileTransfer public long UploadFileTaskId { get; set; } public long Offset { get; set; } public byte[] Payload { get; set; } + public ulong ChecksumL { get; set; } + public ulong ChecksumH { get; set; } public void Serialize(IBinaryWriter writer) { writer.WriteLong(this.UploadFileTaskId); writer.WriteLong(this.Offset); writer.WriteBytes(this.Payload); + + writer.WriteULong(this.ChecksumL); + writer.WriteULong(this.ChecksumH); } public void Deserialize(IBinaryReader reader) @@ -23,6 +28,9 @@ namespace ZeroLevel.Network.FileTransfer this.UploadFileTaskId = reader.ReadLong(); this.Offset = reader.ReadLong(); this.Payload = reader.ReadBytes(); + + this.ChecksumL = reader.ReadULong(); + this.ChecksumH = reader.ReadULong(); } } } diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs index d539c51..100d423 100644 --- a/ZeroLevel/Services/Network/SocketServer.cs +++ b/ZeroLevel/Services/Network/SocketServer.cs @@ -82,7 +82,7 @@ namespace ZeroLevel.Network var connection = new SocketClient(client_socket, _router); connection.OnDisconnect += Connection_OnDisconnect; _connections[connection.Endpoint] = new ExClient(connection); - + connection.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS)); ConnectEventRise(_connections[connection.Endpoint]); } catch (Exception ex) diff --git a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs index 5f6715b..e9982af 100644 --- a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs +++ b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs @@ -36,8 +36,7 @@ namespace ZeroLevel.Network if (instance.Status == SocketClientStatus.Initialized || instance.Status == SocketClientStatus.Working) { - _clientInstances[key] = instance; - instance.Socket.OnDisconnect += Socket_OnDisconnect; + _clientInstances[key] = instance; instance.Socket.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS)); return instance; } @@ -54,18 +53,6 @@ namespace ZeroLevel.Network return null; } - private void Socket_OnDisconnect(ISocketClient socket) - { - socket.OnDisconnect -= Socket_OnDisconnect; - - ExClient removed; - string key = $"{socket.Endpoint.Address}:{socket.Endpoint.Port}"; - if (_clientInstances.TryRemove(key, out removed)) - { - removed.Dispose(); - } - } - public SocketServer GetServer(IPEndPoint endpoint, IRouter router) { string key = $"{endpoint.Address}:{endpoint.Port}"; diff --git a/ZeroLevel/Services/Serialization/IBinaryReader.cs b/ZeroLevel/Services/Serialization/IBinaryReader.cs index 2b53ae4..ad425bf 100644 --- a/ZeroLevel/Services/Serialization/IBinaryReader.cs +++ b/ZeroLevel/Services/Serialization/IBinaryReader.cs @@ -16,10 +16,18 @@ namespace ZeroLevel.Services.Serialization float ReadFloat(); + short ReadShort(); + + ushort ReadUShort(); + Int32 ReadInt32(); + UInt32 ReadUInt32(); + Int64 ReadLong(); + UInt64 ReadULong(); + string ReadString(); Guid ReadGuid(); diff --git a/ZeroLevel/Services/Serialization/IBinaryWriter.cs b/ZeroLevel/Services/Serialization/IBinaryWriter.cs index 98b915f..215933f 100644 --- a/ZeroLevel/Services/Serialization/IBinaryWriter.cs +++ b/ZeroLevel/Services/Serialization/IBinaryWriter.cs @@ -14,14 +14,22 @@ namespace ZeroLevel.Services.Serialization void WriteBytes(byte[] val); + void WriteShort(short number); + + void WriteUShort(ushort number); + void WriteDouble(double val); void WriteFloat(float val); void WriteInt32(Int32 number); + void WriteUInt32(UInt32 number); + void WriteLong(Int64 number); + void WriteULong(UInt64 number); + void WriteString(string line); void WriteGuid(Guid guid); diff --git a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs index c01d261..85a5876 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs @@ -66,6 +66,18 @@ namespace ZeroLevel.Services.Serialization return ReadBuffer(length); } + public short ReadShort() + { + var buffer = ReadBuffer(2); + return BitConverter.ToInt16(buffer, 0); + } + + public ushort ReadUShort() + { + var buffer = ReadBuffer(2); + return BitConverter.ToUInt16(buffer, 0); + } + /// /// Read 32-bit integer (4 bytes) /// @@ -75,6 +87,12 @@ namespace ZeroLevel.Services.Serialization return BitConverter.ToInt32(buffer, 0); } + public UInt32 ReadUInt32() + { + var buffer = ReadBuffer(4); + return BitConverter.ToUInt32(buffer, 0); + } + public decimal ReadDecimal() { var p1 = ReadInt32(); @@ -93,6 +111,12 @@ namespace ZeroLevel.Services.Serialization return BitConverter.ToInt64(buffer, 0); } + public UInt64 ReadULong() + { + var buffer = ReadBuffer(8); + return BitConverter.ToUInt64(buffer, 0); + } + public TimeSpan ReadTimeSpan() { return new TimeSpan(ReadLong()); diff --git a/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs b/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs index fd13764..b0a0d2b 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs @@ -62,6 +62,19 @@ namespace ZeroLevel.Services.Serialization } } + /// + /// Record a 32-bit integer (4 bytes) + /// + public void WriteShort(short number) + { + _stream.Write(BitConverter.GetBytes(number), 0, 2); + } + + public void WriteUShort(ushort number) + { + _stream.Write(BitConverter.GetBytes(number), 0, 2); + } + /// /// Record a 32-bit integer (4 bytes) /// @@ -70,6 +83,11 @@ namespace ZeroLevel.Services.Serialization _stream.Write(BitConverter.GetBytes(number), 0, 4); } + public void WriteUInt32(UInt32 number) + { + _stream.Write(BitConverter.GetBytes(number), 0, 4); + } + /// /// Record an integer 64-bit number (8 bytes) /// @@ -78,6 +96,11 @@ namespace ZeroLevel.Services.Serialization _stream.Write(BitConverter.GetBytes(number), 0, 8); } + public void WriteULong(UInt64 number) + { + _stream.Write(BitConverter.GetBytes(number), 0, 8); + } + public void WriteTimeSpan(TimeSpan period) { WriteLong(period.Ticks);