File transfer fixes

pull/1/head
a.bozhenov 5 years ago
parent 9963ef76b6
commit 09cec53edd

@ -1,5 +1,8 @@
using Microsoft.Win32; using Microsoft.Win32;
using System;
using System.Windows; using System.Windows;
using ZeroLevel;
using ZeroLevel.Network;
using ZeroLevel.Network.FileTransfer; using ZeroLevel.Network.FileTransfer;
namespace FileTransferClient namespace FileTransferClient
@ -9,16 +12,22 @@ namespace FileTransferClient
/// </summary> /// </summary>
public partial class MainWindow : Window public partial class MainWindow : Window
{ {
private IFileClient client; private FileSender _client;
private IExchange _exchange;
public MainWindow() public MainWindow()
{ {
InitializeComponent(); InitializeComponent();
_exchange = Bootstrap.CreateExchange();
_client = new FileSender();
} }
private void Button_Click(object sender, RoutedEventArgs e) private void Button_Click(object sender, RoutedEventArgs e)
{ {
client = FileClientFactory.Create(tbEndpoint.Text, System.IO.Path.Combine(ZeroLevel.Configuration.BaseDirectory, "INCOMING")); if (false == _client.Connected(_exchange.GetConnection(tbEndpoint.Text), TimeSpan.FromMilliseconds(300)))
{
MessageBox.Show("No connection");
}
} }
private void Button_Click_1(object sender, RoutedEventArgs e) private void Button_Click_1(object sender, RoutedEventArgs e)
@ -26,7 +35,7 @@ namespace FileTransferClient
var ofd = new OpenFileDialog(); var ofd = new OpenFileDialog();
if (ofd.ShowDialog() == true) if (ofd.ShowDialog() == true)
{ {
client.Send(ofd.FileName); _client.Send(_exchange.GetConnection(tbEndpoint.Text), ofd.FileName);
} }
} }

@ -1,5 +1,7 @@
using System.IO; using System.IO;
using System.Windows; using System.Windows;
using ZeroLevel;
using ZeroLevel.Network;
using ZeroLevel.Network.FileTransfer; using ZeroLevel.Network.FileTransfer;
namespace FileTransferServer namespace FileTransferServer
@ -12,9 +14,11 @@ namespace FileTransferServer
public MainWindow() public MainWindow()
{ {
InitializeComponent(); InitializeComponent();
_exchange = Bootstrap.CreateExchange();
} }
private IFileServer _server; private FileReceiver _server;
private IExchange _exchange;
private void Button_Click(object sender, RoutedEventArgs e) private void Button_Click(object sender, RoutedEventArgs e)
{ {
@ -33,13 +37,12 @@ namespace FileTransferServer
MessageBox.Show("Incorrect parameters"); MessageBox.Show("Incorrect parameters");
return; return;
} }
var router = _exchange.UseHost(port);
_server = FileServerFactory.Create(port, tbFolder.Text); _server = new FileReceiver(router, tbFolder.Text, c => $"{c.Endpoint.Address}{c.Endpoint.Port}");
} }
private void Button_Click_1(object sender, RoutedEventArgs e) private void Button_Click_1(object sender, RoutedEventArgs e)
{ {
_server = null;
} }
} }
} }

@ -3,6 +3,7 @@ using System.IO;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
using System.Reflection; using System.Reflection;
using ZeroLevel.Network;
using ZeroLevel.Services.Logging; using ZeroLevel.Services.Logging;
namespace ZeroLevel namespace ZeroLevel
@ -184,6 +185,8 @@ namespace ZeroLevel
return service; return service;
} }
public static IExchange CreateExchange() => new Exchange(null);
public static void Shutdown() public static void Shutdown()
{ {
try { Sheduller.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose default sheduller error"); } try { Sheduller.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose default sheduller error"); }

@ -26,12 +26,12 @@ namespace ZeroLevel.Network
/// <summary> /// <summary>
/// If during the specified period there was no network activity, send a ping-request /// If during the specified period there was no network activity, send a ping-request
/// </summary> /// </summary>
protected const long HEARTBEAT_PING_PERIOD_TICKS = 1500 * TimeSpan.TicksPerMillisecond; internal const long HEARTBEAT_PING_PERIOD_TICKS = 1500 * TimeSpan.TicksPerMillisecond;
/// <summary> /// <summary>
/// Connection check period /// Connection check period
/// </summary> /// </summary>
protected const int MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS = 7500; internal const int MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS = 7500;
/// <summary> /// <summary>
/// The period of the request, after which it is considered unsuccessful /// The period of the request, after which it is considered unsuccessful

@ -19,5 +19,9 @@
IServer RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler); IServer RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler);
IServer RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler); IServer RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler);
#endregion #endregion
bool ContainsInbox(string inbox);
bool ContainsHandlerInbox(string inbox);
bool ContainsRequestorInbox(string inbox);
} }
} }

@ -15,6 +15,7 @@ namespace ZeroLevel.Network
public IPEndPoint EndPoint => _client?.Endpoint; public IPEndPoint EndPoint => _client?.Endpoint;
public SocketClientStatus Status => _client.Status; public SocketClientStatus Status => _client.Status;
public IRouter Router => _client.Router; public IRouter Router => _client.Router;
public ISocketClient Socket => _client;
public ExClient(ISocketClient client) public ExClient(ISocketClient client)
{ {

@ -1,72 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using ZeroLevel.Services.Pools;
namespace ZeroLevel.Network.FileTransfer
{
public abstract class BaseFileTransfer
{
private readonly FileReceiver _receiver;
internal FileReceiver Receiver => _receiver;
private ObjectPool<FileTransferTask> _taskPool = new ObjectPool<FileTransferTask>(() => new FileTransferTask(), 100);
private BlockingCollection<FileTransferTask> _tasks = new BlockingCollection<FileTransferTask>();
private readonly Thread _uploadFileThread;
internal BaseFileTransfer(string baseFolder)
{
_receiver = new FileReceiver(baseFolder);
_uploadFileThread = new Thread(UploadFileProcessing);
_uploadFileThread.IsBackground = true;
_uploadFileThread.Start();
}
protected void PushTransferTask(string filePath, Action<string> completeHandler = null, Action<string, string> errorHandler = null, ExClient client = null)
{
if (string.IsNullOrWhiteSpace(filePath))
{
throw new ArgumentNullException(nameof(filePath));
}
if (false == File.Exists(filePath))
{
throw new FileNotFoundException(filePath);
}
var task = _taskPool.Allocate();
task.CompletedHandler = completeHandler;
task.ErrorHandler = errorHandler;
task.FilePath = filePath;
task.Client = client;
_tasks.Add(task);
}
private void UploadFileProcessing()
{
while (true)
{
var task = _tasks.Take();
try
{
ExecuteSendFile(GetReaderFor(task.FilePath), task);
task.CompletedHandler?.Invoke(task.FilePath);
}
catch (Exception ex)
{
task.ErrorHandler?.Invoke(task.FilePath, ex.ToString());
}
finally
{
_taskPool.Free(task);
}
}
}
internal abstract void ExecuteSendFile(FileReader reader, FileTransferTask task);
private FileReader GetReaderFor(string filePath)
{
return new FileReader(filePath);
}
}
}

@ -1,95 +0,0 @@
using System;
using System.Threading;
using ZeroLevel.Models;
namespace ZeroLevel.Network.FileTransfer
{
public sealed class FileClient
: BaseFileTransfer, IFileClient
{
private readonly ExClient _client;
private readonly string _baseFolder;
private readonly ClientFolderNameMapper _nameMapper;
private readonly bool _disposeClient;
internal FileClient(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
: base(baseFolder)
{
_client = client ?? throw new Exception(nameof(client));
_baseFolder = baseFolder ?? throw new Exception(nameof(baseFolder));
_nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper));
_disposeClient = disposeClient;
_client.Router.RegisterInbox<FileStartFrame>("__upload_file_start", (c, f) => Receiver.Incoming(f, nameMapper(c)));
_client.Router.RegisterInbox<FileFrame>("__upload_file_frame", (c, f) => Receiver.Incoming(f));
_client.Router.RegisterInbox<FileEndFrame>("__upload_file_complete", (c, f) => Receiver.Incoming(f));
}
public void Dispose()
{
if (_disposeClient)
{
_client?.Dispose();
}
}
public void Send(string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
{
PushTransferTask(fileName, completeHandler, errorHandler);
}
internal override void ExecuteSendFile(FileReader reader, FileTransferTask task)
{
Log.Info($"Start upload file {reader.Path}");
var startinfo = reader.GetStartInfo();
using (var signal = new ManualResetEvent(false))
{
bool next = false;
if (false == _client.Request<FileStartFrame, InvokeResult>("__upload_file_start", startinfo,
r =>
{
next = r.Success;
signal.Set();
}).Success)
{
next = false;
signal.Set();
}
signal.WaitOne(5000);
if (next)
{
foreach (var chunk in reader.Read())
{
signal.Reset();
if (_client.Request<FileFrame, InvokeResult>("__upload_file_frame", chunk, r =>
{
next = r.Success;
signal.Set();
}).Success == false)
{
next = false;
signal.Set();
}
signal.WaitOne();
if (!next)
{
break;
}
}
}
if (next)
{
_client.Request<FileEndFrame, InvokeResult>("__upload_file_complete", reader.GetCompleteInfo(), r =>
{
if (r.Success == false)
{
Log.Warning($"Unsuccess send file. {r.Comment}");
}
});
}
}
Log.Debug($"Stop upload file {reader.Path}");
}
}
}

@ -1,24 +0,0 @@
using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Network.FileTransfer
{
public static class FileClientFactory
{
public static IFileClient Create(string serverEndpoint, string baseFolder, ClientFolderNameMapper nameMapper = null)
{
var client = new ExClient(new SocketClient(NetUtils.CreateIPEndPoint(serverEndpoint), new Router()));
return CreateFileServerClient(client, baseFolder,
nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true);
}
public static IFileClient Create(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper = null)
{
return CreateFileServerClient(client, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false);
}
private static IFileClient CreateFileServerClient(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
{
return new FileClient(client, baseFolder, nameMapper, disposeClient);
}
}
}

@ -1,175 +1,34 @@
using System; using System;
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Models;
namespace ZeroLevel.Network.FileTransfer namespace ZeroLevel.Network.FileTransfer
{ {
public class FileReceiver public sealed class FileReceiver
{ {
private class FileWriter private readonly string _baseFolder;
: IDisposable private readonly ClientFolderNameMapper _nameMapper;
{ private readonly FileWriter _receiver;
private readonly FileStream _stream;
internal DateTime _writeTime { get; private set; } = DateTime.UtcNow;
private bool _gotCompleteMessage = false;
public bool GotCompleteMessage() => _gotCompleteMessage = true;
public bool ReadyToRemove() public FileReceiver(IRouter router, string baseFolder, ClientFolderNameMapper nameMapper)
{
if (_gotCompleteMessage)
{
return (DateTime.UtcNow - _writeTime).TotalSeconds > 15;
}
return false;
}
public FileWriter(string path)
{
_stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None);
}
public void Write(long offset, byte[] data)
{
_stream.Position = offset;
_stream.Write(data, 0, data.Length);
_writeTime = DateTime.Now;
}
public bool IsTimeoutBy(TimeSpan period)
{
return (DateTime.Now - _writeTime) > period;
}
public void Dispose()
{
_stream.Flush();
_stream.Close();
_stream.Dispose();
}
}
private string _basePath;
private string _disk_prefix;
private readonly Dictionary<long, FileWriter> _incoming = new Dictionary<long, FileWriter>();
private readonly object _locker = new object();
private long _cleanErrorsTaskId;
public FileReceiver(string path, string disk_prefix = "DRIVE_")
{ {
_disk_prefix = disk_prefix; _baseFolder = baseFolder ?? throw new Exception(nameof(baseFolder));
_basePath = path; _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper));
_cleanErrorsTaskId = Sheduller.RemindEvery(TimeSpan.FromMinutes(1), CleanBadFiles); _receiver = new FileWriter(baseFolder);
}
private void CleanBadFiles() if (false == router.ContainsRequestorInbox("__file_transfer_start_transfer__"))
{
lock (_locker)
{ {
foreach (var pair in _incoming) router.RegisterInbox<FileStartFrame>("__file_transfer_start_transfer__", (c, f) => _receiver.Incoming(f, nameMapper(c)));
{
if (pair.Value.IsTimeoutBy(TimeSpan.FromMinutes(3)) || pair.Value.ReadyToRemove())
{
Remove(pair.Key);
}
}
} }
} if (false == router.ContainsRequestorInbox("__file_transfer_frame__"))
public InvokeResult Incoming(FileStartFrame info, string clientFolderName)
{
try
{ {
if (false == _incoming.ContainsKey(info.UploadFileTaskId)) router.RegisterInbox<FileFrame>("__file_transfer_frame__", (_, f) => _receiver.Incoming(f));
{
lock (_locker)
{
if (false == _incoming.ContainsKey(info.UploadFileTaskId))
{
string path = BuildFilePath(clientFolderName, info.FilePath);
_incoming.Add(info.UploadFileTaskId, new FileWriter(path));
}
}
}
} }
catch (Exception ex) if (false == router.ContainsRequestorInbox("__file_transfer_complete_transfer__"))
{
Log.Error("[FileReceiver]", ex);
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
public InvokeResult Incoming(FileFrame chunk)
{
try
{
FileWriter stream;
if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream))
{
stream.Write(chunk.Offset, chunk.Payload);
return InvokeResult.Succeeding();
}
return InvokeResult.Fault("File not expected.");
}
catch (Exception ex)
{
Log.Error("[FileReceiver]", ex);
return InvokeResult.Fault(ex.Message);
}
}
public InvokeResult Incoming(FileEndFrame info)
{
try
{
lock (_locker)
{
FileWriter stream;
if (_incoming.TryGetValue(info.UploadFileTaskId, out stream))
{
stream.GotCompleteMessage();
}
}
}
catch (Exception ex)
{
Log.Error("[FileReceiver]", ex);
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
private void Remove(long uploadTaskId)
{
FileWriter stream;
if (_incoming.TryGetValue(uploadTaskId, out stream))
{
_incoming.Remove(uploadTaskId);
stream?.Dispose();
}
}
private string BuildFilePath(string client_folder_name, string clientPath)
{
if (string.IsNullOrEmpty(client_folder_name))
{ {
if (false == Directory.Exists(_basePath)) router.RegisterInbox<FileEndFrame>("__file_transfer_complete_transfer__", (_, f) => _receiver.Incoming(f));
{
Directory.CreateDirectory(_basePath);
}
return Path.Combine(_basePath, Path.GetFileName(clientPath));
} }
else if (false == router.ContainsRequestorInbox("__file_transfer_ping__"))
{ {
string folder = Path.Combine(Path.Combine(_basePath, client_folder_name), Path.GetDirectoryName(clientPath).Replace(":", "_DRIVE")); router.RegisterInbox<bool>("__file_transfer_ping__", (_) => true);
if (false == System.IO.Directory.Exists(folder))
{
System.IO.Directory.CreateDirectory(folder);
}
return Path.Combine(folder, Path.GetFileName(clientPath));
} }
} }
} }

@ -0,0 +1,100 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using ZeroLevel.Services.Pools;
namespace ZeroLevel.Network.FileTransfer
{
public sealed class FileSender
{
private BlockingCollection<FileTransferTask> _tasks = new BlockingCollection<FileTransferTask>();
private ObjectPool<FileTransferTask> _taskPool = new ObjectPool<FileTransferTask>(() => new FileTransferTask(), 100);
private readonly Thread _uploadFileThread;
public FileSender()
{
_uploadFileThread = new Thread(UploadFileProcessing);
_uploadFileThread.IsBackground = true;
_uploadFileThread.Start();
}
public void Send(ExClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
{
PushTransferTask(client, fileName, completeHandler, errorHandler);
}
private void PushTransferTask(ExClient client, string filePath, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
{
if (client == null)
{
throw new ArgumentNullException(nameof(client));
}
if (string.IsNullOrWhiteSpace(filePath))
{
throw new ArgumentNullException(nameof(filePath));
}
if (false == File.Exists(filePath))
{
throw new FileNotFoundException(filePath);
}
var task = _taskPool.Allocate();
task.CompletedHandler = completeHandler;
task.ErrorHandler = errorHandler;
task.FilePath = filePath;
task.Client = client;
_tasks.Add(task);
}
private void UploadFileProcessing()
{
while (true)
{
var task = _tasks.Take();
try
{
ExecuteSendFile(new FileReader(task.FilePath), task);
task.CompletedHandler?.Invoke(task.FilePath);
}
catch (Exception ex)
{
task.ErrorHandler?.Invoke(task.FilePath, ex.ToString());
}
finally
{
_taskPool.Free(task);
}
}
}
public bool Connected(ExClient client, TimeSpan timeout)
{
bool connected = false;
using (var waiter = new ManualResetEvent(false))
{
client.Request<bool>("__file_transfer_ping__", (response) => { connected = response; waiter.Set(); });
waiter.WaitOne(timeout);
}
return connected;
}
internal void ExecuteSendFile(FileReader reader, FileTransferTask task)
{
Log.Info($"Start upload file {reader.Path}");
var startinfo = reader.GetStartInfo();
if (false == task.Client.Send<FileStartFrame>("__file_transfer_start_transfer__", startinfo).Success)
{
return;
}
foreach (var chunk in reader.Read())
{
if (task.Client.Send<FileFrame>("__file_transfer_frame__", chunk).Success == false)
{
return;
}
}
task.Client.Send<FileEndFrame>("__file_transfer_complete_transfer__", reader.GetCompleteInfo());
Log.Debug($"Stop upload file {reader.Path}");
}
}
}

@ -1,51 +0,0 @@
using System;
using ZeroLevel.Models;
namespace ZeroLevel.Network.FileTransfer
{
public sealed class FileServer
: BaseFileTransfer, IFileServer
{
private readonly IRouter _service;
private readonly string _baseFolder;
private readonly ServerFolderNameMapperDelegate _nameMapper;
private readonly bool _disposeService;
internal FileServer(IRouter service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService)
: base(baseFolder)
{
_service = service ?? throw new Exception(nameof(service));
_baseFolder = baseFolder ?? throw new Exception(nameof(baseFolder));
_nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper));
_disposeService = disposeService;
_service.RegisterInbox<FileStartFrame, InvokeResult>("__upload_file_start", (client, f) => Receiver.Incoming(f, nameMapper(client)));
_service.RegisterInbox<FileFrame, InvokeResult>("__upload_file_frame", (client, f) => Receiver.Incoming(f));
_service.RegisterInbox<FileEndFrame, InvokeResult>("__upload_file_complete", (client, f) => Receiver.Incoming(f));
}
public void Send(ExClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
{
PushTransferTask(fileName, completeHandler, errorHandler, client);
}
internal override void ExecuteSendFile(FileReader reader, FileTransferTask task)
{
Log.Info($"Start upload file {reader.Path}");
var startinfo = reader.GetStartInfo();
if (false == task.Client.Send<FileStartFrame>("__upload_file_start", startinfo).Success)
{
return;
}
foreach (var chunk in reader.Read())
{
if (task.Client.Send<FileFrame>("__upload_file_frame", chunk).Success == false)
{
return;
}
}
task.Client.Send<FileEndFrame>("__upload_file_complete", reader.GetCompleteInfo());
Log.Debug($"Stop upload file {reader.Path}");
}
}
}

@ -1,23 +0,0 @@
using ZeroLevel.Network;
using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Network.FileTransfer
{
public static class FileServerFactory
{
public static IFileServer Create(int port, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null)
{
return null;// CreateFileServer(ExchangeTransportFactory.GetServer(port), baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true);
}
public static IFileServer Create(IZeroService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null)
{
return null;// CreateFileServer(service, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false);
}
private static IFileServer CreateFileServer(IZeroService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService)
{
return null;// new FileServer(service, baseFolder, nameMapper, disposeService);
}
}
}

@ -0,0 +1,179 @@
using System;
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Models;
namespace ZeroLevel.Network.FileTransfer
{
internal sealed class FileWriter
{
private class _FileWriter
: IDisposable
{
private readonly FileStream _stream;
internal DateTime _writeTime { get; private set; } = DateTime.UtcNow;
private bool _gotCompleteMessage = false;
public bool GotCompleteMessage() => _gotCompleteMessage = true;
public bool ReadyToRemove()
{
if (_gotCompleteMessage)
{
return (DateTime.UtcNow - _writeTime).TotalSeconds > 15;
}
return false;
}
public _FileWriter(string path)
{
_stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None);
}
public void Write(long offset, byte[] data)
{
_stream.Position = offset;
_stream.Write(data, 0, data.Length);
_writeTime = DateTime.Now;
}
public bool IsTimeoutBy(TimeSpan period)
{
return (DateTime.Now - _writeTime) > period;
}
public void Dispose()
{
_stream.Flush();
_stream.Close();
_stream.Dispose();
}
}
private string _basePath;
private string _disk_prefix;
private readonly Dictionary<long, _FileWriter> _incoming = new Dictionary<long, _FileWriter>();
private readonly object _locker = new object();
private long _cleanErrorsTaskId;
public FileWriter(string path, string disk_prefix = "DRIVE_")
{
_disk_prefix = disk_prefix;
_basePath = path;
_cleanErrorsTaskId = Sheduller.RemindEvery(TimeSpan.FromMinutes(1), CleanBadFiles);
}
private void CleanBadFiles()
{
lock (_locker)
{
foreach (var pair in _incoming)
{
if (pair.Value.IsTimeoutBy(TimeSpan.FromMinutes(3)) || pair.Value.ReadyToRemove())
{
Remove(pair.Key);
}
}
}
}
public InvokeResult Incoming(FileStartFrame info, string clientFolderName)
{
try
{
if (false == _incoming.ContainsKey(info.UploadFileTaskId))
{
lock (_locker)
{
if (false == _incoming.ContainsKey(info.UploadFileTaskId))
{
string path = BuildFilePath(clientFolderName, info.FilePath);
_incoming.Add(info.UploadFileTaskId, new _FileWriter(path));
}
}
}
}
catch (Exception ex)
{
Log.Error("[FileWriter.Incoming(FileStartFrame)]", ex);
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
public InvokeResult Incoming(FileFrame chunk)
{
try
{
_FileWriter stream;
if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream))
{
stream.Write(chunk.Offset, chunk.Payload);
return InvokeResult.Succeeding();
}
return InvokeResult.Fault("File not expected.");
}
catch (Exception ex)
{
Log.Error("[FileWriter.Incoming(FileFrame)]", ex);
return InvokeResult.Fault(ex.Message);
}
}
public InvokeResult Incoming(FileEndFrame info)
{
try
{
lock (_locker)
{
_FileWriter stream;
if (_incoming.TryGetValue(info.UploadFileTaskId, out stream) && stream != null)
{
using (stream)
{
stream.GotCompleteMessage();
}
_incoming.Remove(info.UploadFileTaskId);
}
}
}
catch (Exception ex)
{
Log.Error("[FileWriter.Incoming(FileEndFrame)]", ex);
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
private void Remove(long uploadTaskId)
{
_FileWriter stream;
if (_incoming.TryGetValue(uploadTaskId, out stream))
{
_incoming.Remove(uploadTaskId);
stream?.Dispose();
}
}
private string BuildFilePath(string client_folder_name, string clientPath)
{
if (string.IsNullOrEmpty(client_folder_name))
{
if (false == Directory.Exists(_basePath))
{
Directory.CreateDirectory(_basePath);
}
return Path.Combine(_basePath, Path.GetFileName(clientPath));
}
else
{
string folder = Path.Combine(Path.Combine(_basePath, client_folder_name), Path.GetDirectoryName(clientPath).Replace(":", "_DRIVE"));
if (false == System.IO.Directory.Exists(folder))
{
System.IO.Directory.CreateDirectory(folder);
}
return Path.Combine(folder, Path.GetFileName(clientPath));
}
}
}
}

@ -1,10 +0,0 @@
using System;
namespace ZeroLevel.Network.FileTransfer
{
public interface IFileClient
: IDisposable
{
void Send(string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null);
}
}

@ -1,10 +0,0 @@
using System;
using ZeroLevel.Network;
namespace ZeroLevel.Network.FileTransfer
{
public interface IFileServer
{
void Send(ExClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null);
}
}

@ -1,6 +0,0 @@
using ZeroLevel.Network;
namespace ZeroLevel.Network.FileTransfer
{
public delegate string ServerFolderNameMapperDelegate(ISocketClient connection);
}

@ -66,25 +66,24 @@ namespace ZeroLevel.Network
} }
#region API #region API
public event Action<ISocketClient> OnConnect = (s) => { }; public event Action<ISocketClient> OnConnect = (_) => { };
public event Action<ISocketClient> OnDisconnect = (s) => { }; public event Action<ISocketClient> OnDisconnect = (_) => { };
public event Action<ISocketClient, byte[], int> OnIncomingData = (_, __, ___) => { }; public event Action<ISocketClient, byte[], int> OnIncomingData = (_, __, ___) => { };
public IPEndPoint Endpoint { get; } public IPEndPoint Endpoint { get; }
public void Request(Frame frame, Action<byte[]> callback, Action<string> fail = null) public void Request(Frame frame, Action<byte[]> callback, Action<string> fail = null)
{ {
if (frame == null) throw new ArgumentNullException(nameof(frame)); if (frame == null) throw new ArgumentNullException(nameof(frame));
if (frame != null && false == _send_queue.IsAddingCompleted) if (frame != null && !_send_queue.IsAddingCompleted)
{ {
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{ {
Thread.Sleep(50); Thread.Sleep(50);
} }
int id;
var sendInfo = new SendInfo var sendInfo = new SendInfo
{ {
isRequest = true, isRequest = true,
data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out id) data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id)
}; };
sendInfo.identity = id; sendInfo.identity = id;
_requests.RegisterForFrame(id, callback, fail); _requests.RegisterForFrame(id, callback, fail);
@ -101,7 +100,7 @@ namespace ZeroLevel.Network
public void Send(Frame frame) public void Send(Frame frame)
{ {
if (frame == null) throw new ArgumentNullException(nameof(frame)); if (frame == null) throw new ArgumentNullException(nameof(frame));
if (frame != null && false == _send_queue.IsAddingCompleted) if (frame != null && !_send_queue.IsAddingCompleted)
{ {
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{ {
@ -120,7 +119,7 @@ namespace ZeroLevel.Network
public void Response(byte[] data, int identity) public void Response(byte[] data, int identity)
{ {
if (data == null) throw new ArgumentNullException(nameof(data)); if (data == null) throw new ArgumentNullException(nameof(data));
if (false == _send_queue.IsAddingCompleted) if (!_send_queue.IsAddingCompleted)
{ {
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{ {
@ -219,7 +218,7 @@ namespace ZeroLevel.Network
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.SystemError(ex, $"[SocketClient.TryConnect] Connection fault"); Log.SystemError(ex, "[SocketClient.TryConnect] Connection fault");
Broken(); Broken();
return false; return false;
} }
@ -260,6 +259,7 @@ namespace ZeroLevel.Network
{ {
Log.SystemError(ex, "[SocketClient.Heartbeat.EnsureConnection]"); Log.SystemError(ex, "[SocketClient.Heartbeat.EnsureConnection]");
Broken(); Broken();
OnDisconnect(this);
return; return;
} }
_requests.TestForTimeouts(); _requests.TestForTimeouts();

@ -18,6 +18,7 @@ namespace ZeroLevel.Network
public IPEndPoint LocalEndpoint { get; } public IPEndPoint LocalEndpoint { get; }
public event Action<ISocketClient> OnDisconnect = _ => { }; public event Action<ISocketClient> OnDisconnect = _ => { };
public event Action<ExClient> OnConnect = _ => { }; public event Action<ExClient> OnConnect = _ => { };
public IEnumerable<IPEndPoint> ConnectionList public IEnumerable<IPEndPoint> ConnectionList
{ {
get get
@ -33,6 +34,7 @@ namespace ZeroLevel.Network
} }
} }
} }
private void DisconnectEventRise(ISocketClient client) private void DisconnectEventRise(ISocketClient client)
{ {
try try
@ -42,6 +44,7 @@ namespace ZeroLevel.Network
catch catch
{ } { }
} }
private void ConnectEventRise(ExClient client) private void ConnectEventRise(ExClient client)
{ {
try try
@ -112,7 +115,18 @@ namespace ZeroLevel.Network
public override void Dispose() public override void Dispose()
{ {
try
{
foreach (var c in _connections)
{
c.Value.Dispose();
}
_connections.Clear();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketServer.Dispose]");
}
} }
} }
} }

@ -11,7 +11,6 @@ namespace ZeroLevel.Network
private static readonly ConcurrentDictionary<string, ExClient> _clientInstances = new ConcurrentDictionary<string, ExClient>(); private static readonly ConcurrentDictionary<string, ExClient> _clientInstances = new ConcurrentDictionary<string, ExClient>();
private readonly ConcurrentDictionary<string, SocketServer> _serverInstances = new ConcurrentDictionary<string, SocketServer>(); private readonly ConcurrentDictionary<string, SocketServer> _serverInstances = new ConcurrentDictionary<string, SocketServer>();
private HashSet<string> _clients = new HashSet<string>();
internal IEnumerable<SocketServer> ServerList => _serverInstances.Values; internal IEnumerable<SocketServer> ServerList => _serverInstances.Values;
@ -35,9 +34,11 @@ namespace ZeroLevel.Network
instance = new ExClient(new SocketClient(endpoint, router ?? new Router())); instance = new ExClient(new SocketClient(endpoint, router ?? new Router()));
instance.ForceConnect(); instance.ForceConnect();
if (instance.Status == SocketClientStatus.Initialized if (instance.Status == SocketClientStatus.Initialized
||instance.Status == SocketClientStatus.Working) || instance.Status == SocketClientStatus.Working)
{ {
_clientInstances[key] = instance; _clientInstances[key] = instance;
instance.Socket.OnDisconnect += Socket_OnDisconnect;
instance.Socket.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS));
return instance; return instance;
} }
} }
@ -53,6 +54,18 @@ namespace ZeroLevel.Network
return null; return null;
} }
private void Socket_OnDisconnect(ISocketClient socket)
{
socket.OnDisconnect -= Socket_OnDisconnect;
ExClient removed;
string key = $"{socket.Endpoint.Address}:{socket.Endpoint.Port}";
if (_clientInstances.TryRemove(key, out removed))
{
removed.Dispose();
}
}
public SocketServer GetServer(IPEndPoint endpoint, IRouter router) public SocketServer GetServer(IPEndPoint endpoint, IRouter router)
{ {
string key = $"{endpoint.Address}:{endpoint.Port}"; string key = $"{endpoint.Address}:{endpoint.Port}";
@ -68,7 +81,8 @@ namespace ZeroLevel.Network
public void Dispose() public void Dispose()
{ {
ExClient removed; ExClient removed;
foreach (var client in _clients) var clients = new HashSet<string>(_clientInstances.Keys);
foreach (var client in clients)
{ {
try try
{ {
@ -82,7 +96,6 @@ namespace ZeroLevel.Network
Log.Error(ex, $"[ExClientServerCachee.Dispose()] Dispose SocketClient to endpoint {client}"); Log.Error(ex, $"[ExClientServerCachee.Dispose()] Dispose SocketClient to endpoint {client}");
} }
} }
_clients.Clear();
foreach (var server in _serverInstances) foreach (var server in _serverInstances)
{ {
try try

@ -178,6 +178,21 @@ namespace ZeroLevel.Network
#endregion Invokation #endregion Invokation
public bool ContainsInbox(string inbox)
{
return _handlers.ContainsKey(inbox) || _requestors.ContainsKey(inbox);
}
public bool ContainsHandlerInbox(string inbox)
{
return _handlers.ContainsKey(inbox);
}
public bool ContainsRequestorInbox(string inbox)
{
return _requestors.ContainsKey(inbox);
}
#region Message handlers registration #region Message handlers registration
public IServer RegisterInbox(string inbox, MessageHandler handler) public IServer RegisterInbox(string inbox, MessageHandler handler)
{ {
@ -288,5 +303,8 @@ namespace ZeroLevel.Network
public IServer RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler) { return this; } public IServer RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler) { return this; }
public IServer RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler) { return this; } public IServer RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler) { return this; }
public IServer RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler) { return this; } public IServer RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler) { return this; }
public bool ContainsInbox(string inbox) => false;
public bool ContainsHandlerInbox(string inbox) => false;
public bool ContainsRequestorInbox(string inbox) => false;
} }
} }

Loading…
Cancel
Save

Powered by TurnKey Linux.