From 9c914a2387704697116990339328763038f2aa28 Mon Sep 17 00:00:00 2001 From: "a.bozhenov" Date: Mon, 3 Jun 2019 22:13:44 +0300 Subject: [PATCH] FileTransfer upd Fix client to server --- .../Services/Network/Contract/IExClient.cs | 2 + .../Services/Network/Contract/IZBackward.cs | 4 +- .../Network/FileTransfer/FileClient.cs | 59 ++++++++++++++++--- .../Network/FileTransfer/FileReceiver.cs | 28 ++++----- .../Network/FileTransfer/FileServer.cs | 57 +++++++++++++++--- .../Services/Network/Services/ExClient.cs | 5 ++ 6 files changed, 122 insertions(+), 33 deletions(-) diff --git a/ZeroLevel/Services/Network/Contract/IExClient.cs b/ZeroLevel/Services/Network/Contract/IExClient.cs index 183d711..99525c8 100644 --- a/ZeroLevel/Services/Network/Contract/IExClient.cs +++ b/ZeroLevel/Services/Network/Contract/IExClient.cs @@ -30,5 +30,7 @@ namespace ZeroLevel.Network void RegisterInbox(string inbox, Action handler); void RegisterInbox(Action handler); + + void RegisterInbox(string inbox, Func handler); } } \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Contract/IZBackward.cs b/ZeroLevel/Services/Network/Contract/IZBackward.cs index bfe3ded..cc6e3e3 100644 --- a/ZeroLevel/Services/Network/Contract/IZBackward.cs +++ b/ZeroLevel/Services/Network/Contract/IZBackward.cs @@ -1,4 +1,6 @@ -using System.Net; +using System; +using System.Net; +using ZeroLevel.Models; namespace ZeroLevel.Network { diff --git a/ZeroLevel/Services/Network/FileTransfer/FileClient.cs b/ZeroLevel/Services/Network/FileTransfer/FileClient.cs index c640686..d47b9e9 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileClient.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileClient.cs @@ -1,5 +1,7 @@ using System; using System.IO; +using System.Threading; +using ZeroLevel.Models; using ZeroLevel.Network; using ZeroLevel.Services.Network.FileTransfer.Model; @@ -21,9 +23,9 @@ namespace ZeroLevel.Services.Network.FileTransfer _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); _disposeClient = disposeClient; - _client.RegisterInbox("__upload_file_start", (f, _, __) => Receiver.Incoming(f, nameMapper(_client))); - _client.RegisterInbox("__upload_file_frame", (f, _, __) => Receiver.Incoming(f)); - _client.RegisterInbox("__upload_file_complete", (f, _, __) => Receiver.Incoming(f)); + _client.RegisterInbox("__upload_file_start", (f, _, __) => Receiver.Incoming(f, nameMapper(_client))); + _client.RegisterInbox("__upload_file_frame", (f, _, __) => Receiver.Incoming(f)); + _client.RegisterInbox("__upload_file_complete", (f, _, __) => Receiver.Incoming(f)); } public void Dispose() @@ -43,13 +45,54 @@ namespace ZeroLevel.Services.Network.FileTransfer { Log.Info($"Start upload file {reader.Path}"); var startinfo = reader.GetStartInfo(); - _client.Send("__upload_file_start", startinfo); - foreach (var chunk in reader.Read()) + + using (var signal = new ManualResetEvent(false)) { - _client.Send("__upload_file_frame", chunk); + bool next = false; + if (false == _client.Request("__upload_file_start", startinfo, + r => + { + next = r.Success; + signal.Set(); + }).Success) + { + next = false; + signal.Set(); + } + signal.WaitOne(5000); + if (next) + { + foreach (var chunk in reader.Read()) + { + signal.Reset(); + if (_client.Request("__upload_file_frame", chunk, r => + { + next = r.Success; + signal.Set(); + }).Success == false) + { + next = false; + signal.Set(); + } + signal.WaitOne(); + if (!next) + { + break; + } + } + } + if (next) + { + _client.Request("__upload_file_complete", reader.GetCompleteInfo(), r => + { + if (r.Success == false) + { + Log.Warning($"Unsuccess send file. {r.Comment}"); + } + }); + } } - _client.Send("__upload_file_complete", reader.GetCompleteInfo()); - Log.Info($"Stop upload file {reader.Path}"); + Log.Debug($"Stop upload file {reader.Path}"); } } } diff --git a/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs b/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs index f707247..0e13a1f 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs @@ -45,8 +45,6 @@ namespace ZeroLevel.Services.Network.FileTransfer private readonly object _locker = new object(); private long _cleanErrorsTaskId; - private readonly Dictionary _incomingLocks = new Dictionary(); - public FileReceiver(string path, string disk_prefix = "DRIVE_") { _disk_prefix = disk_prefix; @@ -69,7 +67,7 @@ namespace ZeroLevel.Services.Network.FileTransfer } } - public void Incoming(FileStartFrame info, string clientFolderName) + public InvokeResult Incoming(FileStartFrame info, string clientFolderName) { try { @@ -79,12 +77,8 @@ namespace ZeroLevel.Services.Network.FileTransfer { if (false == _incoming.ContainsKey(info.UploadFileTaskId)) { - _incomingLocks.Add(info.UploadFileTaskId, new object()); - lock (_incomingLocks[info.UploadFileTaskId]) - { - string path = BuildFilePath(clientFolderName, info.FilePath); - _incoming.Add(info.UploadFileTaskId, new FileWriter(path)); - } + string path = BuildFilePath(clientFolderName, info.FilePath); + _incoming.Add(info.UploadFileTaskId, new FileWriter(path)); } } } @@ -92,29 +86,31 @@ namespace ZeroLevel.Services.Network.FileTransfer catch (Exception ex) { Log.Error("[FileReceiver]", ex); + return InvokeResult.Fault(ex.Message); } + return InvokeResult.Succeeding(); } - public void Incoming(FileFrame chunk) + public InvokeResult Incoming(FileFrame chunk) { try { FileWriter stream; if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream)) { - lock (_incomingLocks[chunk.UploadFileTaskId]) - { - stream.Write(chunk.Offset, chunk.Payload); - } + stream.Write(chunk.Offset, chunk.Payload); + return InvokeResult.Succeeding(); } + return InvokeResult.Fault("File not expected."); } catch (Exception ex) { Log.Error("[FileReceiver]", ex); + return InvokeResult.Fault(ex.Message); } } - public void Incoming(FileEndFrame info) + public InvokeResult Incoming(FileEndFrame info) { try { @@ -126,7 +122,9 @@ namespace ZeroLevel.Services.Network.FileTransfer catch (Exception ex) { Log.Error("[FileReceiver]", ex); + return InvokeResult.Fault(ex.Message); } + return InvokeResult.Succeeding(); } private void Remove(long uploadTaskId) diff --git a/ZeroLevel/Services/Network/FileTransfer/FileServer.cs b/ZeroLevel/Services/Network/FileTransfer/FileServer.cs index 0494359..897ba5d 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileServer.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileServer.cs @@ -1,5 +1,6 @@ using System; using System.IO; +using System.Threading; using ZeroLevel.Models; using ZeroLevel.Network; using ZeroLevel.Services.Network.FileTransfer.Model; @@ -22,9 +23,9 @@ namespace ZeroLevel.Services.Network.FileTransfer _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); _disposeService = disposeService; - _service.RegisterInbox("__upload_file_start", (f, _, client) => Receiver.Incoming(f, nameMapper(client))); - _service.RegisterInbox("__upload_file_frame", (f, _, __) => Receiver.Incoming(f)); - _service.RegisterInbox("__upload_file_complete", (f, _, __) => Receiver.Incoming(f)); + _service.RegisterInbox("__upload_file_start", (f, _, client) => Receiver.Incoming(f, nameMapper(client))); + _service.RegisterInbox("__upload_file_frame", (f, _, __) => Receiver.Incoming(f)); + _service.RegisterInbox("__upload_file_complete", (f, _, __) => Receiver.Incoming(f)); } public void Dispose() @@ -42,16 +43,54 @@ namespace ZeroLevel.Services.Network.FileTransfer internal override void ExecuteSendFile(FileReader reader, FileTransferTask task) { + + /* Log.Info($"Start upload file {reader.Path}"); var startinfo = reader.GetStartInfo(); - startinfo.FilePath = Path.GetFileName(startinfo.FilePath); - task.Client.SendBackward("__upload_file_start", startinfo); - foreach (var chunk in reader.Read()) + using (var signal = new ManualResetEvent(false)) { - task.Client.SendBackward("__upload_file_frame", chunk); + bool next = false; + + if (false == task.Client.RequestBackward("__upload_file_start", startinfo, + r => + { + next = r.Success; + signal.Set(); + }).Success) + { + next = false; + signal.Set(); + } + signal.WaitOne(5000); + if (next) + { + foreach (var chunk in reader.Read()) + { + signal.Reset(); + if (task.Client.RequestBackward("__upload_file_frame", chunk, r => next = r.Success).Success == false) + { + next = false; + signal.Set(); + } + signal.WaitOne(); + if (!next) + { + break; + } + } + } + if (next) + { + task.Client.RequestBackward("__upload_file_complete", reader.GetCompleteInfo(), r => + { + if (r.Success == false) + { + Log.Warning($"Unsuccess send file. {r.Comment}"); + } + }); + } } - task.Client.SendBackward("__upload_file_complete", reader.GetCompleteInfo()); - Log.Info($"Stop upload file {reader.Path}"); + Log.Debug($"Stop upload file {reader.Path}");*/ } } } diff --git a/ZeroLevel/Services/Network/Services/ExClient.cs b/ZeroLevel/Services/Network/Services/ExClient.cs index 7afc76a..ab40a6b 100644 --- a/ZeroLevel/Services/Network/Services/ExClient.cs +++ b/ZeroLevel/Services/Network/Services/ExClient.cs @@ -71,6 +71,11 @@ namespace ZeroLevel.Network _router.RegisterInbox(DEFAULT_MESSAGE_INBOX, handler); } + public void RegisterInbox(string inbox, Func handler) + { + _router.RegisterInbox(inbox, handler); + } + public InvokeResult Request(Action callback) { return _fe.Request(DEFAULT_REQUEST_INBOX, resp => callback(resp));