Fix & update

Fix file transfer
Added serialization for short, ushort, uint32, uint64
pull/1/head
a.bozhenov 5 years ago
parent 09cec53edd
commit 902ae03885

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Services.HashFunctions;
namespace ZeroLevel.Network.FileTransfer
{
@ -37,6 +38,10 @@ namespace ZeroLevel.Network.FileTransfer
Payload = new byte[bytesRead]
};
Array.Copy(buffer, 0, fragment.Payload, 0, bytesRead);
var hash = Murmur3.ComputeHash(fragment.Payload);
fragment.ChecksumL = BitConverter.ToUInt64(hash, 0);
fragment.ChecksumH = BitConverter.ToUInt64(hash, 8);
offset = offset + 1;
yield return fragment;
}

@ -1,4 +1,5 @@
using System;
using ZeroLevel.Models;
namespace ZeroLevel.Network.FileTransfer
{
@ -16,15 +17,18 @@ namespace ZeroLevel.Network.FileTransfer
if (false == router.ContainsRequestorInbox("__file_transfer_start_transfer__"))
{
router.RegisterInbox<FileStartFrame>("__file_transfer_start_transfer__", (c, f) => _receiver.Incoming(f, nameMapper(c)));
router.RegisterInbox<FileStartFrame, InvokeResult>("__file_transfer_start_transfer__",
(c, f) => _receiver.Incoming(f, nameMapper(c)));
}
if (false == router.ContainsRequestorInbox("__file_transfer_frame__"))
{
router.RegisterInbox<FileFrame>("__file_transfer_frame__", (_, f) => _receiver.Incoming(f));
router.RegisterInbox<FileFrame, InvokeResult>("__file_transfer_frame__",
(_, f) => _receiver.Incoming(f));
}
if (false == router.ContainsRequestorInbox("__file_transfer_complete_transfer__"))
{
router.RegisterInbox<FileEndFrame>("__file_transfer_complete_transfer__", (_, f) => _receiver.Incoming(f));
router.RegisterInbox<FileEndFrame, InvokeResult>("__file_transfer_complete_transfer__",
(_, f) => _receiver.Incoming(f));
}
if (false == router.ContainsRequestorInbox("__file_transfer_ping__"))
{

@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using ZeroLevel.Models;
using ZeroLevel.Services.Pools;
namespace ZeroLevel.Network.FileTransfer
@ -11,6 +12,11 @@ namespace ZeroLevel.Network.FileTransfer
private BlockingCollection<FileTransferTask> _tasks = new BlockingCollection<FileTransferTask>();
private ObjectPool<FileTransferTask> _taskPool = new ObjectPool<FileTransferTask>(() => new FileTransferTask(), 100);
private readonly Thread _uploadFileThread;
private bool _resendWhenServerError = false;
private bool _resendWhenClientError = false;
public void ResendWhenServerError(bool resend = true) => _resendWhenServerError = resend;
public void ResendWhenClientError(bool resend = true) => _resendWhenClientError = resend;
public FileSender()
{
@ -78,23 +84,44 @@ namespace ZeroLevel.Network.FileTransfer
return connected;
}
private static bool Send<T>(ExClient client, string inbox, T frame,
bool resendWhenConnectionError, bool resendWhenServerError)
{
bool sended = false;
var handle = new Action<InvokeResult>(ir =>
{
if (ir.Success == false && resendWhenServerError)
{
Send<T>(client, inbox, frame, resendWhenConnectionError, false);
}
});
sended = client.Request<T, InvokeResult>(inbox, frame, handle).Success;
if (sended == false && resendWhenConnectionError)
{
sended = client.Request<T, InvokeResult>(inbox, frame, handle).Success;
}
return sended;
}
internal void ExecuteSendFile(FileReader reader, FileTransferTask task)
{
Log.Info($"Start upload file {reader.Path}");
var startinfo = reader.GetStartInfo();
if (false == task.Client.Send<FileStartFrame>("__file_transfer_start_transfer__", startinfo).Success)
if (!Send(task.Client, "__file_transfer_start_transfer__", startinfo, _resendWhenClientError, _resendWhenServerError))
{
Log.Debug($"Upload file {reader.Path} interrupted");
return;
}
foreach (var chunk in reader.Read())
{
if (task.Client.Send<FileFrame>("__file_transfer_frame__", chunk).Success == false)
if (!Send(task.Client, "__file_transfer_frame__", chunk, _resendWhenClientError, _resendWhenServerError))
{
Log.Debug($"Upload file {reader.Path} interrupted");
return;
}
}
task.Client.Send<FileEndFrame>("__file_transfer_complete_transfer__", reader.GetCompleteInfo());
Log.Debug($"Stop upload file {reader.Path}");
Send(task.Client, "__file_transfer_complete_transfer__", reader.GetCompleteInfo(), _resendWhenClientError, _resendWhenServerError);
}
}
}

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Models;
using ZeroLevel.Services.HashFunctions;
namespace ZeroLevel.Network.FileTransfer
{
@ -108,6 +109,16 @@ namespace ZeroLevel.Network.FileTransfer
_FileWriter stream;
if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream))
{
var hash = Murmur3.ComputeHash(chunk.Payload);
var checksumL = BitConverter.ToUInt64(hash, 0);
var checksumH = BitConverter.ToUInt64(hash, 8);
if (chunk.ChecksumH != checksumH
|| chunk.ChecksumL != checksumL)
return InvokeResult.Fault("Checksum incorrect");
stream.Write(chunk.Offset, chunk.Payload);
return InvokeResult.Succeeding();
}

@ -10,12 +10,17 @@ namespace ZeroLevel.Network.FileTransfer
public long UploadFileTaskId { get; set; }
public long Offset { get; set; }
public byte[] Payload { get; set; }
public ulong ChecksumL { get; set; }
public ulong ChecksumH { get; set; }
public void Serialize(IBinaryWriter writer)
{
writer.WriteLong(this.UploadFileTaskId);
writer.WriteLong(this.Offset);
writer.WriteBytes(this.Payload);
writer.WriteULong(this.ChecksumL);
writer.WriteULong(this.ChecksumH);
}
public void Deserialize(IBinaryReader reader)
@ -23,6 +28,9 @@ namespace ZeroLevel.Network.FileTransfer
this.UploadFileTaskId = reader.ReadLong();
this.Offset = reader.ReadLong();
this.Payload = reader.ReadBytes();
this.ChecksumL = reader.ReadULong();
this.ChecksumH = reader.ReadULong();
}
}
}

@ -82,7 +82,7 @@ namespace ZeroLevel.Network
var connection = new SocketClient(client_socket, _router);
connection.OnDisconnect += Connection_OnDisconnect;
_connections[connection.Endpoint] = new ExClient(connection);
connection.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS));
ConnectEventRise(_connections[connection.Endpoint]);
}
catch (Exception ex)

@ -36,8 +36,7 @@ namespace ZeroLevel.Network
if (instance.Status == SocketClientStatus.Initialized
|| instance.Status == SocketClientStatus.Working)
{
_clientInstances[key] = instance;
instance.Socket.OnDisconnect += Socket_OnDisconnect;
_clientInstances[key] = instance;
instance.Socket.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS));
return instance;
}
@ -54,18 +53,6 @@ namespace ZeroLevel.Network
return null;
}
private void Socket_OnDisconnect(ISocketClient socket)
{
socket.OnDisconnect -= Socket_OnDisconnect;
ExClient removed;
string key = $"{socket.Endpoint.Address}:{socket.Endpoint.Port}";
if (_clientInstances.TryRemove(key, out removed))
{
removed.Dispose();
}
}
public SocketServer GetServer(IPEndPoint endpoint, IRouter router)
{
string key = $"{endpoint.Address}:{endpoint.Port}";

@ -16,10 +16,18 @@ namespace ZeroLevel.Services.Serialization
float ReadFloat();
short ReadShort();
ushort ReadUShort();
Int32 ReadInt32();
UInt32 ReadUInt32();
Int64 ReadLong();
UInt64 ReadULong();
string ReadString();
Guid ReadGuid();

@ -14,14 +14,22 @@ namespace ZeroLevel.Services.Serialization
void WriteBytes(byte[] val);
void WriteShort(short number);
void WriteUShort(ushort number);
void WriteDouble(double val);
void WriteFloat(float val);
void WriteInt32(Int32 number);
void WriteUInt32(UInt32 number);
void WriteLong(Int64 number);
void WriteULong(UInt64 number);
void WriteString(string line);
void WriteGuid(Guid guid);

@ -66,6 +66,18 @@ namespace ZeroLevel.Services.Serialization
return ReadBuffer(length);
}
public short ReadShort()
{
var buffer = ReadBuffer(2);
return BitConverter.ToInt16(buffer, 0);
}
public ushort ReadUShort()
{
var buffer = ReadBuffer(2);
return BitConverter.ToUInt16(buffer, 0);
}
/// <summary>
/// Read 32-bit integer (4 bytes)
/// </summary>
@ -75,6 +87,12 @@ namespace ZeroLevel.Services.Serialization
return BitConverter.ToInt32(buffer, 0);
}
public UInt32 ReadUInt32()
{
var buffer = ReadBuffer(4);
return BitConverter.ToUInt32(buffer, 0);
}
public decimal ReadDecimal()
{
var p1 = ReadInt32();
@ -93,6 +111,12 @@ namespace ZeroLevel.Services.Serialization
return BitConverter.ToInt64(buffer, 0);
}
public UInt64 ReadULong()
{
var buffer = ReadBuffer(8);
return BitConverter.ToUInt64(buffer, 0);
}
public TimeSpan ReadTimeSpan()
{
return new TimeSpan(ReadLong());

@ -62,6 +62,19 @@ namespace ZeroLevel.Services.Serialization
}
}
/// <summary>
/// Record a 32-bit integer (4 bytes)
/// </summary>
public void WriteShort(short number)
{
_stream.Write(BitConverter.GetBytes(number), 0, 2);
}
public void WriteUShort(ushort number)
{
_stream.Write(BitConverter.GetBytes(number), 0, 2);
}
/// <summary>
/// Record a 32-bit integer (4 bytes)
/// </summary>
@ -70,6 +83,11 @@ namespace ZeroLevel.Services.Serialization
_stream.Write(BitConverter.GetBytes(number), 0, 4);
}
public void WriteUInt32(UInt32 number)
{
_stream.Write(BitConverter.GetBytes(number), 0, 4);
}
/// <summary>
/// Record an integer 64-bit number (8 bytes)
/// </summary>
@ -78,6 +96,11 @@ namespace ZeroLevel.Services.Serialization
_stream.Write(BitConverter.GetBytes(number), 0, 8);
}
public void WriteULong(UInt64 number)
{
_stream.Write(BitConverter.GetBytes(number), 0, 8);
}
public void WriteTimeSpan(TimeSpan period)
{
WriteLong(period.Ticks);

Loading…
Cancel
Save

Powered by TurnKey Linux.