FileTransfer upd

Fix client to server
pull/1/head
a.bozhenov 6 years ago
parent 8f6e333540
commit 9c914a2387

@ -30,5 +30,7 @@ namespace ZeroLevel.Network
void RegisterInbox<T>(string inbox, Action<T, long, IZBackward> handler); void RegisterInbox<T>(string inbox, Action<T, long, IZBackward> handler);
void RegisterInbox<T>(Action<T, long, IZBackward> handler); void RegisterInbox<T>(Action<T, long, IZBackward> handler);
void RegisterInbox<Treq, Tresp>(string inbox, Func<Treq, long, IZBackward, Tresp> handler);
} }
} }

@ -1,4 +1,6 @@
using System.Net; using System;
using System.Net;
using ZeroLevel.Models;
namespace ZeroLevel.Network namespace ZeroLevel.Network
{ {

@ -1,5 +1,7 @@
using System; using System;
using System.IO; using System.IO;
using System.Threading;
using ZeroLevel.Models;
using ZeroLevel.Network; using ZeroLevel.Network;
using ZeroLevel.Services.Network.FileTransfer.Model; using ZeroLevel.Services.Network.FileTransfer.Model;
@ -21,9 +23,9 @@ namespace ZeroLevel.Services.Network.FileTransfer
_nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper));
_disposeClient = disposeClient; _disposeClient = disposeClient;
_client.RegisterInbox<FileStartFrame>("__upload_file_start", (f, _, __) => Receiver.Incoming(f, nameMapper(_client))); _client.RegisterInbox<FileStartFrame, InvokeResult>("__upload_file_start", (f, _, __) => Receiver.Incoming(f, nameMapper(_client)));
_client.RegisterInbox<FileFrame>("__upload_file_frame", (f, _, __) => Receiver.Incoming(f)); _client.RegisterInbox<FileFrame, InvokeResult>("__upload_file_frame", (f, _, __) => Receiver.Incoming(f));
_client.RegisterInbox<FileEndFrame>("__upload_file_complete", (f, _, __) => Receiver.Incoming(f)); _client.RegisterInbox<FileEndFrame, InvokeResult>("__upload_file_complete", (f, _, __) => Receiver.Incoming(f));
} }
public void Dispose() public void Dispose()
@ -43,13 +45,54 @@ namespace ZeroLevel.Services.Network.FileTransfer
{ {
Log.Info($"Start upload file {reader.Path}"); Log.Info($"Start upload file {reader.Path}");
var startinfo = reader.GetStartInfo(); var startinfo = reader.GetStartInfo();
_client.Send("__upload_file_start", startinfo);
foreach (var chunk in reader.Read()) using (var signal = new ManualResetEvent(false))
{ {
_client.Send("__upload_file_frame", chunk); bool next = false;
if (false == _client.Request<FileStartFrame, InvokeResult>("__upload_file_start", startinfo,
r =>
{
next = r.Success;
signal.Set();
}).Success)
{
next = false;
signal.Set();
}
signal.WaitOne(5000);
if (next)
{
foreach (var chunk in reader.Read())
{
signal.Reset();
if (_client.Request<FileFrame, InvokeResult>("__upload_file_frame", chunk, r =>
{
next = r.Success;
signal.Set();
}).Success == false)
{
next = false;
signal.Set();
}
signal.WaitOne();
if (!next)
{
break;
}
}
}
if (next)
{
_client.Request<FileEndFrame, InvokeResult>("__upload_file_complete", reader.GetCompleteInfo(), r =>
{
if (r.Success == false)
{
Log.Warning($"Unsuccess send file. {r.Comment}");
}
});
}
} }
_client.Send("__upload_file_complete", reader.GetCompleteInfo()); Log.Debug($"Stop upload file {reader.Path}");
Log.Info($"Stop upload file {reader.Path}");
} }
} }
} }

@ -45,8 +45,6 @@ namespace ZeroLevel.Services.Network.FileTransfer
private readonly object _locker = new object(); private readonly object _locker = new object();
private long _cleanErrorsTaskId; private long _cleanErrorsTaskId;
private readonly Dictionary<long, object> _incomingLocks = new Dictionary<long, object>();
public FileReceiver(string path, string disk_prefix = "DRIVE_") public FileReceiver(string path, string disk_prefix = "DRIVE_")
{ {
_disk_prefix = disk_prefix; _disk_prefix = disk_prefix;
@ -69,7 +67,7 @@ namespace ZeroLevel.Services.Network.FileTransfer
} }
} }
public void Incoming(FileStartFrame info, string clientFolderName) public InvokeResult Incoming(FileStartFrame info, string clientFolderName)
{ {
try try
{ {
@ -79,12 +77,8 @@ namespace ZeroLevel.Services.Network.FileTransfer
{ {
if (false == _incoming.ContainsKey(info.UploadFileTaskId)) if (false == _incoming.ContainsKey(info.UploadFileTaskId))
{ {
_incomingLocks.Add(info.UploadFileTaskId, new object()); string path = BuildFilePath(clientFolderName, info.FilePath);
lock (_incomingLocks[info.UploadFileTaskId]) _incoming.Add(info.UploadFileTaskId, new FileWriter(path));
{
string path = BuildFilePath(clientFolderName, info.FilePath);
_incoming.Add(info.UploadFileTaskId, new FileWriter(path));
}
} }
} }
} }
@ -92,29 +86,31 @@ namespace ZeroLevel.Services.Network.FileTransfer
catch (Exception ex) catch (Exception ex)
{ {
Log.Error("[FileReceiver]", ex); Log.Error("[FileReceiver]", ex);
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding();
} }
public void Incoming(FileFrame chunk) public InvokeResult Incoming(FileFrame chunk)
{ {
try try
{ {
FileWriter stream; FileWriter stream;
if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream)) if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream))
{ {
lock (_incomingLocks[chunk.UploadFileTaskId]) stream.Write(chunk.Offset, chunk.Payload);
{ return InvokeResult.Succeeding();
stream.Write(chunk.Offset, chunk.Payload);
}
} }
return InvokeResult.Fault("File not expected.");
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error("[FileReceiver]", ex); Log.Error("[FileReceiver]", ex);
return InvokeResult.Fault(ex.Message);
} }
} }
public void Incoming(FileEndFrame info) public InvokeResult Incoming(FileEndFrame info)
{ {
try try
{ {
@ -126,7 +122,9 @@ namespace ZeroLevel.Services.Network.FileTransfer
catch (Exception ex) catch (Exception ex)
{ {
Log.Error("[FileReceiver]", ex); Log.Error("[FileReceiver]", ex);
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding();
} }
private void Remove(long uploadTaskId) private void Remove(long uploadTaskId)

@ -1,5 +1,6 @@
using System; using System;
using System.IO; using System.IO;
using System.Threading;
using ZeroLevel.Models; using ZeroLevel.Models;
using ZeroLevel.Network; using ZeroLevel.Network;
using ZeroLevel.Services.Network.FileTransfer.Model; using ZeroLevel.Services.Network.FileTransfer.Model;
@ -22,9 +23,9 @@ namespace ZeroLevel.Services.Network.FileTransfer
_nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper));
_disposeService = disposeService; _disposeService = disposeService;
_service.RegisterInbox<FileStartFrame>("__upload_file_start", (f, _, client) => Receiver.Incoming(f, nameMapper(client))); _service.RegisterInbox<FileStartFrame, InvokeResult>("__upload_file_start", (f, _, client) => Receiver.Incoming(f, nameMapper(client)));
_service.RegisterInbox<FileFrame>("__upload_file_frame", (f, _, __) => Receiver.Incoming(f)); _service.RegisterInbox<FileFrame, InvokeResult>("__upload_file_frame", (f, _, __) => Receiver.Incoming(f));
_service.RegisterInbox<FileEndFrame>("__upload_file_complete", (f, _, __) => Receiver.Incoming(f)); _service.RegisterInbox<FileEndFrame, InvokeResult>("__upload_file_complete", (f, _, __) => Receiver.Incoming(f));
} }
public void Dispose() public void Dispose()
@ -42,16 +43,54 @@ namespace ZeroLevel.Services.Network.FileTransfer
internal override void ExecuteSendFile(FileReader reader, FileTransferTask task) internal override void ExecuteSendFile(FileReader reader, FileTransferTask task)
{ {
/*
Log.Info($"Start upload file {reader.Path}"); Log.Info($"Start upload file {reader.Path}");
var startinfo = reader.GetStartInfo(); var startinfo = reader.GetStartInfo();
startinfo.FilePath = Path.GetFileName(startinfo.FilePath); using (var signal = new ManualResetEvent(false))
task.Client.SendBackward("__upload_file_start", startinfo);
foreach (var chunk in reader.Read())
{ {
task.Client.SendBackward("__upload_file_frame", chunk); bool next = false;
if (false == task.Client.RequestBackward<FileStartFrame, InvokeResult>("__upload_file_start", startinfo,
r =>
{
next = r.Success;
signal.Set();
}).Success)
{
next = false;
signal.Set();
}
signal.WaitOne(5000);
if (next)
{
foreach (var chunk in reader.Read())
{
signal.Reset();
if (task.Client.RequestBackward<FileFrame, InvokeResult>("__upload_file_frame", chunk, r => next = r.Success).Success == false)
{
next = false;
signal.Set();
}
signal.WaitOne();
if (!next)
{
break;
}
}
}
if (next)
{
task.Client.RequestBackward<FileEndFrame, InvokeResult>("__upload_file_complete", reader.GetCompleteInfo(), r =>
{
if (r.Success == false)
{
Log.Warning($"Unsuccess send file. {r.Comment}");
}
});
}
} }
task.Client.SendBackward("__upload_file_complete", reader.GetCompleteInfo()); Log.Debug($"Stop upload file {reader.Path}");*/
Log.Info($"Stop upload file {reader.Path}");
} }
} }
} }

@ -71,6 +71,11 @@ namespace ZeroLevel.Network
_router.RegisterInbox(DEFAULT_MESSAGE_INBOX, handler); _router.RegisterInbox(DEFAULT_MESSAGE_INBOX, handler);
} }
public void RegisterInbox<Treq, Tresp>(string inbox, Func<Treq, long, IZBackward, Tresp> handler)
{
_router.RegisterInbox<Treq, Tresp>(inbox, handler);
}
public InvokeResult Request<Tresp>(Action<Tresp> callback) public InvokeResult Request<Tresp>(Action<Tresp> callback)
{ {
return _fe.Request<Tresp>(DEFAULT_REQUEST_INBOX, resp => callback(resp)); return _fe.Request<Tresp>(DEFAULT_REQUEST_INBOX, resp => callback(resp));

Loading…
Cancel
Save

Powered by TurnKey Linux.