diff --git a/ZeroLevel/Services/Network/Contract/IZBackward.cs b/ZeroLevel/Services/Network/Contract/IZBackward.cs index 8b9f7ef..bfe3ded 100644 --- a/ZeroLevel/Services/Network/Contract/IZBackward.cs +++ b/ZeroLevel/Services/Network/Contract/IZBackward.cs @@ -7,5 +7,6 @@ namespace ZeroLevel.Network IPEndPoint Endpoint { get; } void SendBackward(Frame frame); + void SendBackward(string inbox, T message); } } \ No newline at end of file diff --git a/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs b/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs new file mode 100644 index 0000000..f583826 --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs @@ -0,0 +1,78 @@ +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Threading; +using ZeroLevel.Network; +using ZeroLevel.Services.Network.FileTransfer.Model; +using ZeroLevel.Services.Pools; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + public abstract class BaseFileTransfer + { + private readonly FileReceiver _receiver; + internal FileReceiver Receiver => _receiver; + + private ObjectPool _taskPool = new ObjectPool(() => new FileTransferTask(), 100); + private BlockingCollection _tasks = new BlockingCollection(); + private readonly Thread _uploadFileThread; + /*private int _maxParallelFileTransfer; + private int _currentFileTransfers;*/ + + internal BaseFileTransfer(string baseFolder/*, int maxParallelFileTransfer = 6*/) + { + _receiver = new FileReceiver(baseFolder); + _uploadFileThread = new Thread(UploadFileProcessing); + _uploadFileThread.IsBackground = true; + _uploadFileThread.Start(); + /*_maxParallelFileTransfer = maxParallelFileTransfer; + _currentFileTransfers = 0;*/ + } + + protected void PushTransferTask(string filePath, Action completeHandler = null, Action errorHandler = null, IZBackward client = null) + { + if (string.IsNullOrWhiteSpace(filePath)) + { + throw new ArgumentNullException(nameof(filePath)); + } + if (false == File.Exists(filePath)) + { + throw new FileNotFoundException(filePath); + } + var task = _taskPool.Allocate(); + task.CompletedHandler = completeHandler; + task.ErrorHandler = errorHandler; + task.FilePath = filePath; + task.Client = client; + _tasks.Add(task); + } + + private void UploadFileProcessing() + { + while (true) + { + var task = _tasks.Take(); + try + { + ExecuteSendFile(GetReaderFor(task.FilePath), task); + task.CompletedHandler?.Invoke(task.FilePath); + } + catch (Exception ex) + { + task.ErrorHandler?.Invoke(task.FilePath, ex.ToString()); + } + finally + { + _taskPool.Free(task); + } + } + } + + internal abstract void ExecuteSendFile(FileReader reader, FileTransferTask task); + + private FileReader GetReaderFor(string filePath) + { + return new FileReader(filePath); + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/ClientFolderNameMapperDelegate.cs b/ZeroLevel/Services/Network/FileTransfer/ClientFolderNameMapperDelegate.cs new file mode 100644 index 0000000..81b5e26 --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/ClientFolderNameMapperDelegate.cs @@ -0,0 +1,6 @@ +using ZeroLevel.Network; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + public delegate string ClientFolderNameMapper(IExClient client); +} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileClient.cs b/ZeroLevel/Services/Network/FileTransfer/FileClient.cs new file mode 100644 index 0000000..cd57903 --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/FileClient.cs @@ -0,0 +1,52 @@ +using System; +using System.IO; +using ZeroLevel.Network; +using ZeroLevel.Services.Network.FileTransfer.Model; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + public sealed class FileClient + : BaseFileTransfer, IFileClient + { + private readonly IExClient _client; + private readonly string _baseFolder; + private readonly ClientFolderNameMapper _nameMapper; + private readonly bool _disposeClient; + + internal FileClient(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) + : base(baseFolder) + { + _client = client ?? throw new Exception(nameof(client)); + _baseFolder = baseFolder ?? throw new Exception(nameof(baseFolder)); + _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); + _disposeClient = disposeClient; + } + + public void Dispose() + { + if (_disposeClient) + { + _client?.Dispose(); + } + } + + public void Send(string fileName, Action completeHandler = null, Action errorHandler = null) + { + PushTransferTask(fileName, completeHandler, errorHandler); + } + + internal override void ExecuteSendFile(FileReader reader, FileTransferTask task) + { + Log.Info($"Start upload file {reader.Path}"); + var startinfo = reader.GetStartInfo(); + startinfo.FilePath = Path.GetFileName(startinfo.FilePath); + _client.Send("__upload_file_start", startinfo); + foreach (var chunk in reader.Read()) + { + _client.Send("__upload_file_frame", chunk); + } + _client.Send("__upload_file_complete", reader.GetCompleteInfo()); + Log.Info($"Stop upload file {reader.Path}"); + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs b/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs new file mode 100644 index 0000000..522e720 --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs @@ -0,0 +1,24 @@ +using ZeroLevel.Network; +using ZeroLevel.Services.FileSystem; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + public static class FileClientFactory + { + public static IFileClient Create(string serverEndpoint, string baseFolder, ClientFolderNameMapper nameMapper = null) + { + return CreateFileServerClient(ExchangeTransportFactory.GetClient(serverEndpoint), baseFolder, + nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true); + } + + public static IFileClient Create(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper = null) + { + return CreateFileServerClient(client, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false); + } + + private static IFileClient CreateFileServerClient(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) + { + return new FileClient(client, baseFolder, nameMapper, disposeClient); + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileReader.cs b/ZeroLevel/Services/Network/FileTransfer/FileReader.cs new file mode 100644 index 0000000..d275fbc --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/FileReader.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; +using System.IO; +using ZeroLevel.Services.Network.FileTransfer.Model; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + internal sealed class FileReader + { + private readonly FileStartFrame _startInfo; + public string Path { get; } + private const int CHUNK_SIZE = 512 * 1024; + + public FileReader(string path) + { + Path = path; + _startInfo = FileStartFrame.GetTransferFileInfo(path); + } + + public FileStartFrame GetStartInfo() + { + return _startInfo; + } + + public IEnumerable Read() + { + long offset = 0; + using (FileStream stream = new FileStream(Path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + { + int bytesRead; + var buffer = new byte[CHUNK_SIZE]; + while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0) + { + var fragment = new FileFrame + { + UploadTaskId = _startInfo.FileUploadTaskId, + Offset = offset * CHUNK_SIZE, + Payload = new byte[bytesRead] + }; + Array.Copy(buffer, 0, fragment.Payload, 0, bytesRead); + offset = offset + 1; + yield return fragment; + } + } + GC.Collect(); + } + + public FileEndFrame GetCompleteInfo() + { + return new FileEndFrame { FileUploadTaskId = _startInfo.FileUploadTaskId }; + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs b/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs new file mode 100644 index 0000000..c9a13d3 --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs @@ -0,0 +1,132 @@ +using System; +using System.Collections.Generic; +using System.IO; +using ZeroLevel.Services.Network.FileTransfer.Model; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + public class FileReceiver + { + private class FileWriter + : IDisposable + { + private readonly FileStream _stream; + internal DateTime _writeTime { get; private set; } = DateTime.UtcNow; + + public FileWriter(string path) + { + _stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); + } + + public void Write(long offset, byte[] data) + { + _stream.Position = offset; + _stream.Write(data, 0, data.Length); + _writeTime = DateTime.Now; + } + + public bool IsTimeoutBy(TimeSpan period) + { + return (DateTime.Now - _writeTime) > period; + } + + public void Dispose() + { + _stream.Flush(); + _stream.Close(); + _stream.Dispose(); + } + } + private string _basePath; + private string _disk_prefix; + + private readonly Dictionary _incoming = new Dictionary(); + private readonly object _locker = new object(); + private long _cleanErrorsTaskId; + + public FileReceiver(string path, string disk_prefix = "DRIVE_") + { + _disk_prefix = disk_prefix; + _basePath = path; + _cleanErrorsTaskId = Sheduller.RemindEvery(TimeSpan.FromMinutes(1), CleanBadFiles); + } + + + private void CleanBadFiles() + { + lock (_locker) + { + foreach (var pair in _incoming) + { + if (pair.Value.IsTimeoutBy(TimeSpan.FromMinutes(3))) + { + Remove(pair.Key); + } + } + } + } + + public void Incoming(FileStartFrame info, string clientFolderName) + { + if (false == _incoming.ContainsKey(info.FileUploadTaskId)) + { + lock (_locker) + { + if (false == _incoming.ContainsKey(info.FileUploadTaskId)) + { + string path = BuildFilePath(clientFolderName, info.FilePath); + _incoming.Add(info.FileUploadTaskId, new FileWriter(path)); + } + } + } + } + + public void Incoming(FileFrame chunk) + { + FileWriter stream; + if (_incoming.TryGetValue(chunk.UploadTaskId, out stream)) + { + stream.Write(chunk.Offset, chunk.Payload); + } + } + + public void Incoming(FileEndFrame info) + { + lock (_locker) + { + Remove(info.FileUploadTaskId); + } + } + + private void Remove(int uploadTaskId) + { + FileWriter stream; + if (_incoming.TryGetValue(uploadTaskId, out stream)) + { + _incoming.Remove(uploadTaskId); + stream?.Dispose(); + } + } + + private string BuildFilePath(string client_folder_name, string clientPath) + { + if (string.IsNullOrEmpty(client_folder_name)) + { + if (false == Directory.Exists(_basePath)) + { + Directory.CreateDirectory(_basePath); + } + return Path.Combine(_basePath, Path.GetFileName(clientPath)); + } + else + { + string folder = Path.Combine(Path.Combine(_basePath, client_folder_name), Path.GetDirectoryName(clientPath).Replace(":", "_DRIVE")); + if (false == System.IO.Directory.Exists(folder)) + { + System.IO.Directory.CreateDirectory(folder); + } + return Path.Combine(folder, Path.GetFileName(clientPath)); + } + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileServer.cs b/ZeroLevel/Services/Network/FileTransfer/FileServer.cs new file mode 100644 index 0000000..d4f0183 --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/FileServer.cs @@ -0,0 +1,52 @@ +using System; +using System.IO; +using ZeroLevel.Network; +using ZeroLevel.Services.Network.FileTransfer.Model; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + public sealed class FileServer + : BaseFileTransfer, IFileServer + { + private readonly IExService _service; + private readonly string _baseFolder; + private readonly ServerFolderNameMapperDelegate _nameMapper; + private readonly bool _disposeService; + + internal FileServer(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService) + : base(baseFolder) + { + _service = service ?? throw new Exception(nameof(service)); + _baseFolder = baseFolder ?? throw new Exception(nameof(baseFolder)); + _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); + _disposeService = disposeService; + } + + public void Dispose() + { + if (_disposeService) + { + _service?.Dispose(); + } + } + + public void Send(IZBackward client, string fileName, Action completeHandler = null, Action errorHandler = null) + { + PushTransferTask(fileName, completeHandler, errorHandler, client); + } + + internal override void ExecuteSendFile(FileReader reader, FileTransferTask task) + { + Log.Info($"Start upload file {reader.Path}"); + var startinfo = reader.GetStartInfo(); + startinfo.FilePath = Path.GetFileName(startinfo.FilePath); + task.Client.SendBackward("__upload_file_start", startinfo); + foreach (var chunk in reader.Read()) + { + task.Client.SendBackward("__upload_file_frame", chunk); + } + task.Client.SendBackward("__upload_file_complete", reader.GetCompleteInfo()); + Log.Info($"Stop upload file {reader.Path}"); + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs b/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs new file mode 100644 index 0000000..8176301 --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs @@ -0,0 +1,23 @@ +using ZeroLevel.Network; +using ZeroLevel.Services.FileSystem; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + public static class FileServerFactory + { + public static IFileServer Create(int port, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null) + { + return CreateFileServer(ExchangeTransportFactory.GetServer(port), baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true); + } + + public static IFileServer Create(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null) + { + return CreateFileServer(service, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false); + } + + private static IFileServer CreateFileServer(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService) + { + return new FileServer(service, baseFolder, nameMapper, disposeService); + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/IFileClient.cs b/ZeroLevel/Services/Network/FileTransfer/IFileClient.cs new file mode 100644 index 0000000..507e16f --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/IFileClient.cs @@ -0,0 +1,10 @@ +using System; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + public interface IFileClient + : IDisposable + { + void Send(string fileName, Action completeHandler = null, Action errorHandler = null); + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs b/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs new file mode 100644 index 0000000..4355dde --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs @@ -0,0 +1,11 @@ +using System; +using ZeroLevel.Network; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + public interface IFileServer + : IDisposable + { + void Send(IZBackward client, string fileName, Action completeHandler = null, Action errorHandler = null); + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/FileEndFrame.cs b/ZeroLevel/Services/Network/FileTransfer/Model/FileEndFrame.cs new file mode 100644 index 0000000..e6ef3e9 --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/Model/FileEndFrame.cs @@ -0,0 +1,20 @@ +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services.Network.FileTransfer.Model +{ + public sealed class FileEndFrame + : IBinarySerializable + { + public int FileUploadTaskId; + + public void Serialize(IBinaryWriter writer) + { + writer.WriteInt32(this.FileUploadTaskId); + } + + public void Deserialize(IBinaryReader reader) + { + this.FileUploadTaskId = reader.ReadInt32(); + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs b/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs new file mode 100644 index 0000000..2eb52dd --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs @@ -0,0 +1,26 @@ +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services.Network.FileTransfer.Model +{ + public sealed class FileFrame : + IBinarySerializable + { + public int UploadTaskId { get; set; } + public long Offset { get; set; } + public byte[] Payload { get; set; } + + public void Serialize(IBinaryWriter writer) + { + writer.WriteInt32(this.UploadTaskId); + writer.WriteLong(this.Offset); + writer.WriteBytes(this.Payload); + } + + public void Deserialize(IBinaryReader reader) + { + this.UploadTaskId = reader.ReadInt32(); + this.Offset = reader.ReadLong(); + this.Payload = reader.ReadBytes(); + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/FileStartFrame.cs b/ZeroLevel/Services/Network/FileTransfer/Model/FileStartFrame.cs new file mode 100644 index 0000000..957d65f --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/Model/FileStartFrame.cs @@ -0,0 +1,40 @@ +using System.Threading; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services.Network.FileTransfer.Model +{ + public sealed class FileStartFrame + : IBinarySerializable + { + private static int _uploadTaskIdCounter = 0; + + public int FileUploadTaskId; + public string FilePath; + public long Size; + + public void Serialize(IBinaryWriter writer) + { + writer.WriteInt32(this.FileUploadTaskId); + writer.WriteString(this.FilePath); + writer.WriteLong(this.Size); + } + + public void Deserialize(IBinaryReader reader) + { + this.FileUploadTaskId = reader.ReadInt32(); + this.FilePath = reader.ReadString(); + this.Size = reader.ReadLong(); + } + + public static FileStartFrame GetTransferFileInfo(string path) + { + var fi = new System.IO.FileInfo(path); + return new FileStartFrame + { + FilePath = fi.FullName, + FileUploadTaskId = Interlocked.Increment(ref _uploadTaskIdCounter), + Size = fi.Length + }; + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs b/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs new file mode 100644 index 0000000..a61084c --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs @@ -0,0 +1,13 @@ +using System; +using ZeroLevel.Network; + +namespace ZeroLevel.Services.Network.FileTransfer.Model +{ + internal class FileTransferTask + { + public string FilePath; + public Action CompletedHandler; + public Action ErrorHandler; + public IZBackward Client; + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/ServerFolderNameMapperDelegate.cs b/ZeroLevel/Services/Network/FileTransfer/ServerFolderNameMapperDelegate.cs new file mode 100644 index 0000000..27e8417 --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/ServerFolderNameMapperDelegate.cs @@ -0,0 +1,6 @@ +using ZeroLevel.Network; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + public delegate string ServerFolderNameMapperDelegate(IZBackward connection); +} diff --git a/ZeroLevel/Services/Network/Services/ExClient.cs b/ZeroLevel/Services/Network/Services/ExClient.cs index 8798f56..7afc76a 100644 --- a/ZeroLevel/Services/Network/Services/ExClient.cs +++ b/ZeroLevel/Services/Network/Services/ExClient.cs @@ -105,5 +105,10 @@ namespace ZeroLevel.Network { _fe.Send(frame); } + + public void SendBackward(string inbox, T obj) + { + Send(inbox, obj); + } } } \ No newline at end of file diff --git a/ZeroLevel/Services/Network/ZSocketServerClient.cs b/ZeroLevel/Services/Network/ZSocketServerClient.cs index 7805942..3380d4e 100644 --- a/ZeroLevel/Services/Network/ZSocketServerClient.cs +++ b/ZeroLevel/Services/Network/ZSocketServerClient.cs @@ -205,5 +205,26 @@ namespace ZeroLevel.Network if (other == null) return false; return this.Endpoint.Compare(other.Endpoint) == 0; } + + public void SendBackward(string inbox, T message) + { + var frame = FrameBuilder.BuildFrame(message, inbox); + if (Status == ZTransportStatus.Working && false == _send_queue.IsCompleted && false == _send_queue.IsAddingCompleted) + { + var data = MessageSerializer.Serialize(frame); + try + { + _send_queue.Add(NetworkStreamFastObfuscator.PrepareData(data)); + } + catch (ObjectDisposedException) + { + // Ignore + } + finally + { + frame?.Release(); + } + } + } } } \ No newline at end of file diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index 2d8122e..c825a81 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -5,15 +5,15 @@ Infrastructure layer library ogoun ogoun - 2.0.4.0 - IDXReader, SoftMax function + 2.0.6.0 + File client/server https://github.com/ogoun/Zero/wiki Copyright Ogoun 2019 https://opensource.org/licenses/MIT https://raw.githubusercontent.com/ogoun/Zero/master/zero.png https://github.com/ogoun/Zero GitHub - 2.0.5 + 2.0.6