diff --git a/FileTransferClient/MainWindow.xaml.cs b/FileTransferClient/MainWindow.xaml.cs index e8bcee3..acb195c 100644 --- a/FileTransferClient/MainWindow.xaml.cs +++ b/FileTransferClient/MainWindow.xaml.cs @@ -1,5 +1,8 @@ using Microsoft.Win32; +using System; using System.Windows; +using ZeroLevel; +using ZeroLevel.Network; using ZeroLevel.Network.FileTransfer; namespace FileTransferClient @@ -9,16 +12,22 @@ namespace FileTransferClient /// public partial class MainWindow : Window { - private IFileClient client; + private FileSender _client; + private IExchange _exchange; public MainWindow() { InitializeComponent(); + _exchange = Bootstrap.CreateExchange(); + _client = new FileSender(); } private void Button_Click(object sender, RoutedEventArgs e) { - client = FileClientFactory.Create(tbEndpoint.Text, System.IO.Path.Combine(ZeroLevel.Configuration.BaseDirectory, "INCOMING")); + if (false == _client.Connected(_exchange.GetConnection(tbEndpoint.Text), TimeSpan.FromMilliseconds(300))) + { + MessageBox.Show("No connection"); + } } private void Button_Click_1(object sender, RoutedEventArgs e) @@ -26,7 +35,7 @@ namespace FileTransferClient var ofd = new OpenFileDialog(); if (ofd.ShowDialog() == true) { - client.Send(ofd.FileName); + _client.Send(_exchange.GetConnection(tbEndpoint.Text), ofd.FileName); } } diff --git a/FileTransferServer/MainWindow.xaml.cs b/FileTransferServer/MainWindow.xaml.cs index 02d1b86..9f67960 100644 --- a/FileTransferServer/MainWindow.xaml.cs +++ b/FileTransferServer/MainWindow.xaml.cs @@ -1,5 +1,7 @@ using System.IO; using System.Windows; +using ZeroLevel; +using ZeroLevel.Network; using ZeroLevel.Network.FileTransfer; namespace FileTransferServer @@ -12,9 +14,11 @@ namespace FileTransferServer public MainWindow() { InitializeComponent(); + _exchange = Bootstrap.CreateExchange(); } - private IFileServer _server; + private FileReceiver _server; + private IExchange _exchange; private void Button_Click(object sender, RoutedEventArgs e) { @@ -33,13 +37,12 @@ namespace FileTransferServer MessageBox.Show("Incorrect parameters"); return; } - - _server = FileServerFactory.Create(port, tbFolder.Text); + var router = _exchange.UseHost(port); + _server = new FileReceiver(router, tbFolder.Text, c => $"{c.Endpoint.Address}{c.Endpoint.Port}"); } private void Button_Click_1(object sender, RoutedEventArgs e) { - _server = null; } } } diff --git a/ZeroLevel/Services/Bootstrap.cs b/ZeroLevel/Services/Bootstrap.cs index 991b6ad..9ba66df 100644 --- a/ZeroLevel/Services/Bootstrap.cs +++ b/ZeroLevel/Services/Bootstrap.cs @@ -3,6 +3,7 @@ using System.IO; using System.Linq; using System.Net; using System.Reflection; +using ZeroLevel.Network; using ZeroLevel.Services.Logging; namespace ZeroLevel @@ -184,6 +185,8 @@ namespace ZeroLevel return service; } + public static IExchange CreateExchange() => new Exchange(null); + public static void Shutdown() { try { Sheduller.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose default sheduller error"); } diff --git a/ZeroLevel/Services/Network/BaseSocket.cs b/ZeroLevel/Services/Network/BaseSocket.cs index 6c5f3fb..70e765a 100644 --- a/ZeroLevel/Services/Network/BaseSocket.cs +++ b/ZeroLevel/Services/Network/BaseSocket.cs @@ -26,12 +26,12 @@ namespace ZeroLevel.Network /// /// If during the specified period there was no network activity, send a ping-request /// - protected const long HEARTBEAT_PING_PERIOD_TICKS = 1500 * TimeSpan.TicksPerMillisecond; + internal const long HEARTBEAT_PING_PERIOD_TICKS = 1500 * TimeSpan.TicksPerMillisecond; /// /// Connection check period /// - protected const int MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS = 7500; + internal const int MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS = 7500; /// /// The period of the request, after which it is considered unsuccessful diff --git a/ZeroLevel/Services/Network/Contracts/IServer.cs b/ZeroLevel/Services/Network/Contracts/IServer.cs index aaa5eba..5f43b2d 100644 --- a/ZeroLevel/Services/Network/Contracts/IServer.cs +++ b/ZeroLevel/Services/Network/Contracts/IServer.cs @@ -19,5 +19,9 @@ IServer RegisterInbox(RequestHandler handler); IServer RegisterInbox(RequestHandler handler); #endregion + + bool ContainsInbox(string inbox); + bool ContainsHandlerInbox(string inbox); + bool ContainsRequestorInbox(string inbox); } } diff --git a/ZeroLevel/Services/Network/ExClient.cs b/ZeroLevel/Services/Network/ExClient.cs index 2599bf9..3b554ea 100644 --- a/ZeroLevel/Services/Network/ExClient.cs +++ b/ZeroLevel/Services/Network/ExClient.cs @@ -15,6 +15,7 @@ namespace ZeroLevel.Network public IPEndPoint EndPoint => _client?.Endpoint; public SocketClientStatus Status => _client.Status; public IRouter Router => _client.Router; + public ISocketClient Socket => _client; public ExClient(ISocketClient client) { diff --git a/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs b/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs deleted file mode 100644 index 8a078ce..0000000 --- a/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.IO; -using System.Threading; -using ZeroLevel.Services.Pools; - -namespace ZeroLevel.Network.FileTransfer -{ - public abstract class BaseFileTransfer - { - private readonly FileReceiver _receiver; - internal FileReceiver Receiver => _receiver; - - private ObjectPool _taskPool = new ObjectPool(() => new FileTransferTask(), 100); - private BlockingCollection _tasks = new BlockingCollection(); - private readonly Thread _uploadFileThread; - - internal BaseFileTransfer(string baseFolder) - { - _receiver = new FileReceiver(baseFolder); - _uploadFileThread = new Thread(UploadFileProcessing); - _uploadFileThread.IsBackground = true; - _uploadFileThread.Start(); - } - - protected void PushTransferTask(string filePath, Action completeHandler = null, Action errorHandler = null, ExClient client = null) - { - if (string.IsNullOrWhiteSpace(filePath)) - { - throw new ArgumentNullException(nameof(filePath)); - } - if (false == File.Exists(filePath)) - { - throw new FileNotFoundException(filePath); - } - var task = _taskPool.Allocate(); - task.CompletedHandler = completeHandler; - task.ErrorHandler = errorHandler; - task.FilePath = filePath; - task.Client = client; - _tasks.Add(task); - } - - private void UploadFileProcessing() - { - while (true) - { - var task = _tasks.Take(); - try - { - ExecuteSendFile(GetReaderFor(task.FilePath), task); - task.CompletedHandler?.Invoke(task.FilePath); - } - catch (Exception ex) - { - task.ErrorHandler?.Invoke(task.FilePath, ex.ToString()); - } - finally - { - _taskPool.Free(task); - } - } - } - - internal abstract void ExecuteSendFile(FileReader reader, FileTransferTask task); - - private FileReader GetReaderFor(string filePath) - { - return new FileReader(filePath); - } - } -} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileClient.cs b/ZeroLevel/Services/Network/FileTransfer/FileClient.cs deleted file mode 100644 index ade114e..0000000 --- a/ZeroLevel/Services/Network/FileTransfer/FileClient.cs +++ /dev/null @@ -1,95 +0,0 @@ -using System; -using System.Threading; -using ZeroLevel.Models; - -namespace ZeroLevel.Network.FileTransfer -{ - public sealed class FileClient - : BaseFileTransfer, IFileClient - { - private readonly ExClient _client; - private readonly string _baseFolder; - private readonly ClientFolderNameMapper _nameMapper; - private readonly bool _disposeClient; - - internal FileClient(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) - : base(baseFolder) - { - _client = client ?? throw new Exception(nameof(client)); - _baseFolder = baseFolder ?? throw new Exception(nameof(baseFolder)); - _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); - _disposeClient = disposeClient; - - _client.Router.RegisterInbox("__upload_file_start", (c, f) => Receiver.Incoming(f, nameMapper(c))); - _client.Router.RegisterInbox("__upload_file_frame", (c, f) => Receiver.Incoming(f)); - _client.Router.RegisterInbox("__upload_file_complete", (c, f) => Receiver.Incoming(f)); - } - - public void Dispose() - { - if (_disposeClient) - { - _client?.Dispose(); - } - } - - public void Send(string fileName, Action completeHandler = null, Action errorHandler = null) - { - PushTransferTask(fileName, completeHandler, errorHandler); - } - - internal override void ExecuteSendFile(FileReader reader, FileTransferTask task) - { - Log.Info($"Start upload file {reader.Path}"); - var startinfo = reader.GetStartInfo(); - - using (var signal = new ManualResetEvent(false)) - { - bool next = false; - if (false == _client.Request("__upload_file_start", startinfo, - r => - { - next = r.Success; - signal.Set(); - }).Success) - { - next = false; - signal.Set(); - } - signal.WaitOne(5000); - if (next) - { - foreach (var chunk in reader.Read()) - { - signal.Reset(); - if (_client.Request("__upload_file_frame", chunk, r => - { - next = r.Success; - signal.Set(); - }).Success == false) - { - next = false; - signal.Set(); - } - signal.WaitOne(); - if (!next) - { - break; - } - } - } - if (next) - { - _client.Request("__upload_file_complete", reader.GetCompleteInfo(), r => - { - if (r.Success == false) - { - Log.Warning($"Unsuccess send file. {r.Comment}"); - } - }); - } - } - Log.Debug($"Stop upload file {reader.Path}"); - } - } -} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs b/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs deleted file mode 100644 index a8f5cd0..0000000 --- a/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs +++ /dev/null @@ -1,24 +0,0 @@ -using ZeroLevel.Services.FileSystem; - -namespace ZeroLevel.Network.FileTransfer -{ - public static class FileClientFactory - { - public static IFileClient Create(string serverEndpoint, string baseFolder, ClientFolderNameMapper nameMapper = null) - { - var client = new ExClient(new SocketClient(NetUtils.CreateIPEndPoint(serverEndpoint), new Router())); - return CreateFileServerClient(client, baseFolder, - nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true); - } - - public static IFileClient Create(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper = null) - { - return CreateFileServerClient(client, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false); - } - - private static IFileClient CreateFileServerClient(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) - { - return new FileClient(client, baseFolder, nameMapper, disposeClient); - } - } -} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs b/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs index e295220..e09cd36 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs @@ -1,175 +1,34 @@ using System; -using System.Collections.Generic; -using System.IO; -using ZeroLevel.Models; namespace ZeroLevel.Network.FileTransfer { - public class FileReceiver + public sealed class FileReceiver { - private class FileWriter - : IDisposable - { - private readonly FileStream _stream; - internal DateTime _writeTime { get; private set; } = DateTime.UtcNow; - private bool _gotCompleteMessage = false; - - public bool GotCompleteMessage() => _gotCompleteMessage = true; + private readonly string _baseFolder; + private readonly ClientFolderNameMapper _nameMapper; + private readonly FileWriter _receiver; - public bool ReadyToRemove() - { - if (_gotCompleteMessage) - { - return (DateTime.UtcNow - _writeTime).TotalSeconds > 15; - } - return false; - } - - public FileWriter(string path) - { - _stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); - } - - public void Write(long offset, byte[] data) - { - _stream.Position = offset; - _stream.Write(data, 0, data.Length); - _writeTime = DateTime.Now; - } - - public bool IsTimeoutBy(TimeSpan period) - { - return (DateTime.Now - _writeTime) > period; - } - - public void Dispose() - { - _stream.Flush(); - _stream.Close(); - _stream.Dispose(); - } - } - private string _basePath; - private string _disk_prefix; - - private readonly Dictionary _incoming = new Dictionary(); - private readonly object _locker = new object(); - private long _cleanErrorsTaskId; - - public FileReceiver(string path, string disk_prefix = "DRIVE_") + public FileReceiver(IRouter router, string baseFolder, ClientFolderNameMapper nameMapper) { - _disk_prefix = disk_prefix; - _basePath = path; - _cleanErrorsTaskId = Sheduller.RemindEvery(TimeSpan.FromMinutes(1), CleanBadFiles); - } - + _baseFolder = baseFolder ?? throw new Exception(nameof(baseFolder)); + _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); + _receiver = new FileWriter(baseFolder); - private void CleanBadFiles() - { - lock (_locker) + if (false == router.ContainsRequestorInbox("__file_transfer_start_transfer__")) { - foreach (var pair in _incoming) - { - if (pair.Value.IsTimeoutBy(TimeSpan.FromMinutes(3)) || pair.Value.ReadyToRemove()) - { - Remove(pair.Key); - } - } + router.RegisterInbox("__file_transfer_start_transfer__", (c, f) => _receiver.Incoming(f, nameMapper(c))); } - } - - public InvokeResult Incoming(FileStartFrame info, string clientFolderName) - { - try + if (false == router.ContainsRequestorInbox("__file_transfer_frame__")) { - if (false == _incoming.ContainsKey(info.UploadFileTaskId)) - { - lock (_locker) - { - if (false == _incoming.ContainsKey(info.UploadFileTaskId)) - { - string path = BuildFilePath(clientFolderName, info.FilePath); - _incoming.Add(info.UploadFileTaskId, new FileWriter(path)); - } - } - } + router.RegisterInbox("__file_transfer_frame__", (_, f) => _receiver.Incoming(f)); } - catch (Exception ex) - { - Log.Error("[FileReceiver]", ex); - return InvokeResult.Fault(ex.Message); - } - return InvokeResult.Succeeding(); - } - - public InvokeResult Incoming(FileFrame chunk) - { - try - { - FileWriter stream; - if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream)) - { - stream.Write(chunk.Offset, chunk.Payload); - return InvokeResult.Succeeding(); - } - return InvokeResult.Fault("File not expected."); - } - catch (Exception ex) - { - Log.Error("[FileReceiver]", ex); - return InvokeResult.Fault(ex.Message); - } - } - - public InvokeResult Incoming(FileEndFrame info) - { - try - { - lock (_locker) - { - FileWriter stream; - if (_incoming.TryGetValue(info.UploadFileTaskId, out stream)) - { - stream.GotCompleteMessage(); - } - } - } - catch (Exception ex) - { - Log.Error("[FileReceiver]", ex); - return InvokeResult.Fault(ex.Message); - } - return InvokeResult.Succeeding(); - } - - private void Remove(long uploadTaskId) - { - FileWriter stream; - if (_incoming.TryGetValue(uploadTaskId, out stream)) - { - _incoming.Remove(uploadTaskId); - stream?.Dispose(); - } - } - - private string BuildFilePath(string client_folder_name, string clientPath) - { - if (string.IsNullOrEmpty(client_folder_name)) + if (false == router.ContainsRequestorInbox("__file_transfer_complete_transfer__")) { - if (false == Directory.Exists(_basePath)) - { - Directory.CreateDirectory(_basePath); - } - return Path.Combine(_basePath, Path.GetFileName(clientPath)); + router.RegisterInbox("__file_transfer_complete_transfer__", (_, f) => _receiver.Incoming(f)); } - else + if (false == router.ContainsRequestorInbox("__file_transfer_ping__")) { - string folder = Path.Combine(Path.Combine(_basePath, client_folder_name), Path.GetDirectoryName(clientPath).Replace(":", "_DRIVE")); - if (false == System.IO.Directory.Exists(folder)) - { - System.IO.Directory.CreateDirectory(folder); - } - return Path.Combine(folder, Path.GetFileName(clientPath)); + router.RegisterInbox("__file_transfer_ping__", (_) => true); } } } diff --git a/ZeroLevel/Services/Network/FileTransfer/FileSender.cs b/ZeroLevel/Services/Network/FileTransfer/FileSender.cs new file mode 100644 index 0000000..91cf35f --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/FileSender.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Threading; +using ZeroLevel.Services.Pools; + +namespace ZeroLevel.Network.FileTransfer +{ + public sealed class FileSender + { + private BlockingCollection _tasks = new BlockingCollection(); + private ObjectPool _taskPool = new ObjectPool(() => new FileTransferTask(), 100); + private readonly Thread _uploadFileThread; + + public FileSender() + { + _uploadFileThread = new Thread(UploadFileProcessing); + _uploadFileThread.IsBackground = true; + _uploadFileThread.Start(); + } + + public void Send(ExClient client, string fileName, Action completeHandler = null, Action errorHandler = null) + { + PushTransferTask(client, fileName, completeHandler, errorHandler); + } + + private void PushTransferTask(ExClient client, string filePath, Action completeHandler = null, Action errorHandler = null) + { + if (client == null) + { + throw new ArgumentNullException(nameof(client)); + } + if (string.IsNullOrWhiteSpace(filePath)) + { + throw new ArgumentNullException(nameof(filePath)); + } + if (false == File.Exists(filePath)) + { + throw new FileNotFoundException(filePath); + } + var task = _taskPool.Allocate(); + task.CompletedHandler = completeHandler; + task.ErrorHandler = errorHandler; + task.FilePath = filePath; + task.Client = client; + _tasks.Add(task); + } + + private void UploadFileProcessing() + { + while (true) + { + var task = _tasks.Take(); + try + { + ExecuteSendFile(new FileReader(task.FilePath), task); + task.CompletedHandler?.Invoke(task.FilePath); + } + catch (Exception ex) + { + task.ErrorHandler?.Invoke(task.FilePath, ex.ToString()); + } + finally + { + _taskPool.Free(task); + } + } + } + + public bool Connected(ExClient client, TimeSpan timeout) + { + bool connected = false; + using (var waiter = new ManualResetEvent(false)) + { + client.Request("__file_transfer_ping__", (response) => { connected = response; waiter.Set(); }); + waiter.WaitOne(timeout); + } + return connected; + } + + internal void ExecuteSendFile(FileReader reader, FileTransferTask task) + { + Log.Info($"Start upload file {reader.Path}"); + var startinfo = reader.GetStartInfo(); + if (false == task.Client.Send("__file_transfer_start_transfer__", startinfo).Success) + { + return; + } + foreach (var chunk in reader.Read()) + { + if (task.Client.Send("__file_transfer_frame__", chunk).Success == false) + { + return; + } + } + task.Client.Send("__file_transfer_complete_transfer__", reader.GetCompleteInfo()); + Log.Debug($"Stop upload file {reader.Path}"); + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileServer.cs b/ZeroLevel/Services/Network/FileTransfer/FileServer.cs deleted file mode 100644 index a371afc..0000000 --- a/ZeroLevel/Services/Network/FileTransfer/FileServer.cs +++ /dev/null @@ -1,51 +0,0 @@ -using System; -using ZeroLevel.Models; - -namespace ZeroLevel.Network.FileTransfer -{ - public sealed class FileServer - : BaseFileTransfer, IFileServer - { - private readonly IRouter _service; - private readonly string _baseFolder; - private readonly ServerFolderNameMapperDelegate _nameMapper; - private readonly bool _disposeService; - - internal FileServer(IRouter service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService) - : base(baseFolder) - { - _service = service ?? throw new Exception(nameof(service)); - _baseFolder = baseFolder ?? throw new Exception(nameof(baseFolder)); - _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); - _disposeService = disposeService; - - _service.RegisterInbox("__upload_file_start", (client, f) => Receiver.Incoming(f, nameMapper(client))); - _service.RegisterInbox("__upload_file_frame", (client, f) => Receiver.Incoming(f)); - _service.RegisterInbox("__upload_file_complete", (client, f) => Receiver.Incoming(f)); - } - - public void Send(ExClient client, string fileName, Action completeHandler = null, Action errorHandler = null) - { - PushTransferTask(fileName, completeHandler, errorHandler, client); - } - - internal override void ExecuteSendFile(FileReader reader, FileTransferTask task) - { - Log.Info($"Start upload file {reader.Path}"); - var startinfo = reader.GetStartInfo(); - if (false == task.Client.Send("__upload_file_start", startinfo).Success) - { - return; - } - foreach (var chunk in reader.Read()) - { - if (task.Client.Send("__upload_file_frame", chunk).Success == false) - { - return; - } - } - task.Client.Send("__upload_file_complete", reader.GetCompleteInfo()); - Log.Debug($"Stop upload file {reader.Path}"); - } - } -} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs b/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs deleted file mode 100644 index 9db6e62..0000000 --- a/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs +++ /dev/null @@ -1,23 +0,0 @@ -using ZeroLevel.Network; -using ZeroLevel.Services.FileSystem; - -namespace ZeroLevel.Network.FileTransfer -{ - public static class FileServerFactory - { - public static IFileServer Create(int port, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null) - { - return null;// CreateFileServer(ExchangeTransportFactory.GetServer(port), baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true); - } - - public static IFileServer Create(IZeroService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null) - { - return null;// CreateFileServer(service, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false); - } - - private static IFileServer CreateFileServer(IZeroService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService) - { - return null;// new FileServer(service, baseFolder, nameMapper, disposeService); - } - } -} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs b/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs new file mode 100644 index 0000000..cc21479 --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs @@ -0,0 +1,179 @@ +using System; +using System.Collections.Generic; +using System.IO; +using ZeroLevel.Models; + +namespace ZeroLevel.Network.FileTransfer +{ + internal sealed class FileWriter + { + private class _FileWriter + : IDisposable + { + private readonly FileStream _stream; + internal DateTime _writeTime { get; private set; } = DateTime.UtcNow; + private bool _gotCompleteMessage = false; + + public bool GotCompleteMessage() => _gotCompleteMessage = true; + + public bool ReadyToRemove() + { + if (_gotCompleteMessage) + { + return (DateTime.UtcNow - _writeTime).TotalSeconds > 15; + } + return false; + } + + public _FileWriter(string path) + { + _stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); + } + + public void Write(long offset, byte[] data) + { + _stream.Position = offset; + _stream.Write(data, 0, data.Length); + _writeTime = DateTime.Now; + } + + public bool IsTimeoutBy(TimeSpan period) + { + return (DateTime.Now - _writeTime) > period; + } + + public void Dispose() + { + _stream.Flush(); + _stream.Close(); + _stream.Dispose(); + } + } + private string _basePath; + private string _disk_prefix; + + private readonly Dictionary _incoming = new Dictionary(); + private readonly object _locker = new object(); + private long _cleanErrorsTaskId; + + public FileWriter(string path, string disk_prefix = "DRIVE_") + { + _disk_prefix = disk_prefix; + _basePath = path; + _cleanErrorsTaskId = Sheduller.RemindEvery(TimeSpan.FromMinutes(1), CleanBadFiles); + } + + private void CleanBadFiles() + { + lock (_locker) + { + foreach (var pair in _incoming) + { + if (pair.Value.IsTimeoutBy(TimeSpan.FromMinutes(3)) || pair.Value.ReadyToRemove()) + { + Remove(pair.Key); + } + } + } + } + + public InvokeResult Incoming(FileStartFrame info, string clientFolderName) + { + try + { + if (false == _incoming.ContainsKey(info.UploadFileTaskId)) + { + lock (_locker) + { + if (false == _incoming.ContainsKey(info.UploadFileTaskId)) + { + string path = BuildFilePath(clientFolderName, info.FilePath); + _incoming.Add(info.UploadFileTaskId, new _FileWriter(path)); + } + } + } + } + catch (Exception ex) + { + Log.Error("[FileWriter.Incoming(FileStartFrame)]", ex); + return InvokeResult.Fault(ex.Message); + } + return InvokeResult.Succeeding(); + } + + public InvokeResult Incoming(FileFrame chunk) + { + try + { + _FileWriter stream; + if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream)) + { + stream.Write(chunk.Offset, chunk.Payload); + return InvokeResult.Succeeding(); + } + return InvokeResult.Fault("File not expected."); + } + catch (Exception ex) + { + Log.Error("[FileWriter.Incoming(FileFrame)]", ex); + return InvokeResult.Fault(ex.Message); + } + } + + public InvokeResult Incoming(FileEndFrame info) + { + try + { + lock (_locker) + { + _FileWriter stream; + if (_incoming.TryGetValue(info.UploadFileTaskId, out stream) && stream != null) + { + using (stream) + { + stream.GotCompleteMessage(); + } + _incoming.Remove(info.UploadFileTaskId); + } + } + } + catch (Exception ex) + { + Log.Error("[FileWriter.Incoming(FileEndFrame)]", ex); + return InvokeResult.Fault(ex.Message); + } + return InvokeResult.Succeeding(); + } + + private void Remove(long uploadTaskId) + { + _FileWriter stream; + if (_incoming.TryGetValue(uploadTaskId, out stream)) + { + _incoming.Remove(uploadTaskId); + stream?.Dispose(); + } + } + + private string BuildFilePath(string client_folder_name, string clientPath) + { + if (string.IsNullOrEmpty(client_folder_name)) + { + if (false == Directory.Exists(_basePath)) + { + Directory.CreateDirectory(_basePath); + } + return Path.Combine(_basePath, Path.GetFileName(clientPath)); + } + else + { + string folder = Path.Combine(Path.Combine(_basePath, client_folder_name), Path.GetDirectoryName(clientPath).Replace(":", "_DRIVE")); + if (false == System.IO.Directory.Exists(folder)) + { + System.IO.Directory.CreateDirectory(folder); + } + return Path.Combine(folder, Path.GetFileName(clientPath)); + } + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/IFileClient.cs b/ZeroLevel/Services/Network/FileTransfer/IFileClient.cs deleted file mode 100644 index 579a330..0000000 --- a/ZeroLevel/Services/Network/FileTransfer/IFileClient.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; - -namespace ZeroLevel.Network.FileTransfer -{ - public interface IFileClient - : IDisposable - { - void Send(string fileName, Action completeHandler = null, Action errorHandler = null); - } -} diff --git a/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs b/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs deleted file mode 100644 index d235a53..0000000 --- a/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using ZeroLevel.Network; - -namespace ZeroLevel.Network.FileTransfer -{ - public interface IFileServer - { - void Send(ExClient client, string fileName, Action completeHandler = null, Action errorHandler = null); - } -} diff --git a/ZeroLevel/Services/Network/FileTransfer/ServerFolderNameMapperDelegate.cs b/ZeroLevel/Services/Network/FileTransfer/ServerFolderNameMapperDelegate.cs deleted file mode 100644 index f83c0fc..0000000 --- a/ZeroLevel/Services/Network/FileTransfer/ServerFolderNameMapperDelegate.cs +++ /dev/null @@ -1,6 +0,0 @@ -using ZeroLevel.Network; - -namespace ZeroLevel.Network.FileTransfer -{ - public delegate string ServerFolderNameMapperDelegate(ISocketClient connection); -} diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs index 404730a..014952e 100644 --- a/ZeroLevel/Services/Network/SocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -66,25 +66,24 @@ namespace ZeroLevel.Network } #region API - public event Action OnConnect = (s) => { }; - public event Action OnDisconnect = (s) => { }; + public event Action OnConnect = (_) => { }; + public event Action OnDisconnect = (_) => { }; public event Action OnIncomingData = (_, __, ___) => { }; public IPEndPoint Endpoint { get; } public void Request(Frame frame, Action callback, Action fail = null) { if (frame == null) throw new ArgumentNullException(nameof(frame)); - if (frame != null && false == _send_queue.IsAddingCompleted) + if (frame != null && !_send_queue.IsAddingCompleted) { while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { Thread.Sleep(50); } - int id; var sendInfo = new SendInfo { isRequest = true, - data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out id) + data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id) }; sendInfo.identity = id; _requests.RegisterForFrame(id, callback, fail); @@ -101,7 +100,7 @@ namespace ZeroLevel.Network public void Send(Frame frame) { if (frame == null) throw new ArgumentNullException(nameof(frame)); - if (frame != null && false == _send_queue.IsAddingCompleted) + if (frame != null && !_send_queue.IsAddingCompleted) { while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { @@ -120,7 +119,7 @@ namespace ZeroLevel.Network public void Response(byte[] data, int identity) { if (data == null) throw new ArgumentNullException(nameof(data)); - if (false == _send_queue.IsAddingCompleted) + if (!_send_queue.IsAddingCompleted) { while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { @@ -219,7 +218,7 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, $"[SocketClient.TryConnect] Connection fault"); + Log.SystemError(ex, "[SocketClient.TryConnect] Connection fault"); Broken(); return false; } @@ -260,6 +259,7 @@ namespace ZeroLevel.Network { Log.SystemError(ex, "[SocketClient.Heartbeat.EnsureConnection]"); Broken(); + OnDisconnect(this); return; } _requests.TestForTimeouts(); diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs index da7c093..d539c51 100644 --- a/ZeroLevel/Services/Network/SocketServer.cs +++ b/ZeroLevel/Services/Network/SocketServer.cs @@ -18,6 +18,7 @@ namespace ZeroLevel.Network public IPEndPoint LocalEndpoint { get; } public event Action OnDisconnect = _ => { }; public event Action OnConnect = _ => { }; + public IEnumerable ConnectionList { get @@ -33,6 +34,7 @@ namespace ZeroLevel.Network } } } + private void DisconnectEventRise(ISocketClient client) { try @@ -42,6 +44,7 @@ namespace ZeroLevel.Network catch { } } + private void ConnectEventRise(ExClient client) { try @@ -112,7 +115,18 @@ namespace ZeroLevel.Network public override void Dispose() { - + try + { + foreach (var c in _connections) + { + c.Value.Dispose(); + } + _connections.Clear(); + } + catch (Exception ex) + { + Log.SystemError(ex, "[SocketServer.Dispose]"); + } } } } diff --git a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs index cc24412..5f6715b 100644 --- a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs +++ b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs @@ -11,7 +11,6 @@ namespace ZeroLevel.Network private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); private readonly ConcurrentDictionary _serverInstances = new ConcurrentDictionary(); - private HashSet _clients = new HashSet(); internal IEnumerable ServerList => _serverInstances.Values; @@ -35,9 +34,11 @@ namespace ZeroLevel.Network instance = new ExClient(new SocketClient(endpoint, router ?? new Router())); instance.ForceConnect(); if (instance.Status == SocketClientStatus.Initialized - ||instance.Status == SocketClientStatus.Working) + || instance.Status == SocketClientStatus.Working) { _clientInstances[key] = instance; + instance.Socket.OnDisconnect += Socket_OnDisconnect; + instance.Socket.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS)); return instance; } } @@ -53,6 +54,18 @@ namespace ZeroLevel.Network return null; } + private void Socket_OnDisconnect(ISocketClient socket) + { + socket.OnDisconnect -= Socket_OnDisconnect; + + ExClient removed; + string key = $"{socket.Endpoint.Address}:{socket.Endpoint.Port}"; + if (_clientInstances.TryRemove(key, out removed)) + { + removed.Dispose(); + } + } + public SocketServer GetServer(IPEndPoint endpoint, IRouter router) { string key = $"{endpoint.Address}:{endpoint.Port}"; @@ -68,7 +81,8 @@ namespace ZeroLevel.Network public void Dispose() { ExClient removed; - foreach (var client in _clients) + var clients = new HashSet(_clientInstances.Keys); + foreach (var client in clients) { try { @@ -82,7 +96,6 @@ namespace ZeroLevel.Network Log.Error(ex, $"[ExClientServerCachee.Dispose()] Dispose SocketClient to endpoint {client}"); } } - _clients.Clear(); foreach (var server in _serverInstances) { try diff --git a/ZeroLevel/Services/Network/Utils/Router.cs b/ZeroLevel/Services/Network/Utils/Router.cs index 58610ba..50205d2 100644 --- a/ZeroLevel/Services/Network/Utils/Router.cs +++ b/ZeroLevel/Services/Network/Utils/Router.cs @@ -178,6 +178,21 @@ namespace ZeroLevel.Network #endregion Invokation + public bool ContainsInbox(string inbox) + { + return _handlers.ContainsKey(inbox) || _requestors.ContainsKey(inbox); + } + + public bool ContainsHandlerInbox(string inbox) + { + return _handlers.ContainsKey(inbox); + } + + public bool ContainsRequestorInbox(string inbox) + { + return _requestors.ContainsKey(inbox); + } + #region Message handlers registration public IServer RegisterInbox(string inbox, MessageHandler handler) { @@ -288,5 +303,8 @@ namespace ZeroLevel.Network public IServer RegisterInbox(string inbox, RequestHandler handler) { return this; } public IServer RegisterInbox(RequestHandler handler) { return this; } public IServer RegisterInbox(RequestHandler handler) { return this; } + public bool ContainsInbox(string inbox) => false; + public bool ContainsHandlerInbox(string inbox) => false; + public bool ContainsRequestorInbox(string inbox) => false; } } diff --git a/ZeroLevel/Services/Network/ServiceRouteStorage.cs b/ZeroLevel/Services/Network/Utils/ServiceRouteStorage.cs similarity index 100% rename from ZeroLevel/Services/Network/ServiceRouteStorage.cs rename to ZeroLevel/Services/Network/Utils/ServiceRouteStorage.cs