diff --git a/ZeroLevel/Services/Network/ZBaseNetwork.cs b/ZeroLevel/Services/Network/BaseSocket.cs similarity index 72% rename from ZeroLevel/Services/Network/ZBaseNetwork.cs rename to ZeroLevel/Services/Network/BaseSocket.cs index acd7055..3466e51 100644 --- a/ZeroLevel/Services/Network/ZBaseNetwork.cs +++ b/ZeroLevel/Services/Network/BaseSocket.cs @@ -2,18 +2,15 @@ namespace ZeroLevel.Network { - public abstract class ZBaseNetwork - : IDisposable + public abstract class BaseSocket { - static ZBaseNetwork() + static BaseSocket() { MAX_FRAME_PAYLOAD_SIZE = Configuration.Default.FirstOrDefault("MAX_FRAME_PAYLOAD_SIZE", DEFAULT_MAX_FRAME_PAYLOAD_SIZE); } public const string DEFAULT_MESSAGE_INBOX = "__message_inbox__"; public const string DEFAULT_REQUEST_INBOX = "__request_inbox__"; - - protected const string DEFAULT_PING_INBOX = "__ping__"; protected const string DEFAULT_REQUEST_ERROR_INBOX = "__request_error__"; /// @@ -29,7 +26,7 @@ namespace ZeroLevel.Network /// /// Connection check period /// - protected const int HEARTBEAT_UPDATE_PERIOD_MS = 7500; + protected const int MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS = 7500; /// /// The period of the request, after which it is considered unsuccessful @@ -49,11 +46,11 @@ namespace ZeroLevel.Network /// public const int MAX_SEND_QUEUE_SIZE = 1024; - protected void Broken() => Status = Status == ZTransportStatus.Disposed ? Status : ZTransportStatus.Broken; - protected void Disposed() => Status = ZTransportStatus.Disposed; - protected void Working() => Status = Status == ZTransportStatus.Disposed ? Status : ZTransportStatus.Working; - public ZTransportStatus Status { get; private set; } = ZTransportStatus.Initialized; + protected void Broken() => Status = Status == SocketClientStatus.Disposed ? Status : SocketClientStatus.Broken; + protected void Disposed() => Status = SocketClientStatus.Disposed; + protected void Working() => Status = Status == SocketClientStatus.Disposed ? Status : SocketClientStatus.Working; + public SocketClientStatus Status { get; private set; } = SocketClientStatus.Initialized; public abstract void Dispose(); } -} \ No newline at end of file +} diff --git a/ZeroLevel/Services/Network/Contract/IExClient.cs b/ZeroLevel/Services/Network/Contract/IExClient.cs deleted file mode 100644 index 6593b83..0000000 --- a/ZeroLevel/Services/Network/Contract/IExClient.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System; -using System.Net; -using ZeroLevel.Models; - -namespace ZeroLevel.Network -{ - public interface IExClient - : IDisposable - { - event Action Connected; - - void ForceConnect(); - - ZTransportStatus Status { get; } - - IPEndPoint Endpoint { get; } - - InvokeResult Send(); - - InvokeResult Send(string inbox); - - InvokeResult Send(T obj); - - InvokeResult Send(string inbox, T obj); - - InvokeResult Request(Treq obj, Action callback); - - InvokeResult Request(string inbox, Treq obj, Action callback); - - InvokeResult Request(Action callback); - - InvokeResult Request(string inbox, Action callback); - - void RegisterInbox(string inbox, Action handler); - - void RegisterInbox(Action handler); - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Contract/IExService.cs b/ZeroLevel/Services/Network/Contract/IExService.cs deleted file mode 100644 index 8c8a677..0000000 --- a/ZeroLevel/Services/Network/Contract/IExService.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System; -using System.Net; - -namespace ZeroLevel.Network -{ - public interface IExService - : IDisposable - { - IPEndPoint Endpoint { get; } - event Action OnConnect; - event Action OnDisconnect; - - void RegisterInbox(string inbox, Action handler); - - void RegisterInbox(string inbox, Action handler); - - void RegisterInbox(string inbox, Func handler); - - /// - /// Replier without request - /// - void RegisterInbox(string inbox, Func handler); - - /* - DEFAULT INBOXES - */ - void RegisterInbox(Action handler); - void RegisterInbox(Action handler); - void RegisterInbox(Func handler); - void RegisterInbox(Func handler); - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Contract/IZBackward.cs b/ZeroLevel/Services/Network/Contract/IZBackward.cs deleted file mode 100644 index 6271016..0000000 --- a/ZeroLevel/Services/Network/Contract/IZBackward.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System.Net; -using ZeroLevel.Models; - -namespace ZeroLevel.Network -{ - public interface IZBackward - { - IPEndPoint Endpoint { get; } - - InvokeResult SendBackward(Frame frame); - InvokeResult SendBackward(string inbox, T message); - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Contract/IZObservableServer.cs b/ZeroLevel/Services/Network/Contract/IZObservableServer.cs deleted file mode 100644 index bf36edb..0000000 --- a/ZeroLevel/Services/Network/Contract/IZObservableServer.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Net; - -namespace ZeroLevel.Network -{ - public interface IZObservableServer - : IDisposable - { - IPEndPoint Endpoint { get; } - IEnumerable ConnectionList { get; } - - event Action OnConnect; - - event Action OnDisconnect; - - event Action OnMessage; - - event Func OnRequest; - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Contract/IZTransport.cs b/ZeroLevel/Services/Network/Contract/IZTransport.cs deleted file mode 100644 index 01ec4d6..0000000 --- a/ZeroLevel/Services/Network/Contract/IZTransport.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System; -using System.Net; - -namespace ZeroLevel.Network -{ - public interface IZTransport - : IDisposable - { - event Action OnConnect; - - event Action OnDisconnect; - - event EventHandler OnServerMessage; - - IPEndPoint Endpoint { get; } - ZTransportStatus Status { get; } - - void EnsureConnection(); - - void Send(Frame frame); - - void Request(Frame frame, Action callback, Action fail = null); - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Contract/IDiscoveryClient.cs b/ZeroLevel/Services/Network/Contracts/IDiscoveryClient.cs similarity index 100% rename from ZeroLevel/Services/Network/Contract/IDiscoveryClient.cs rename to ZeroLevel/Services/Network/Contracts/IDiscoveryClient.cs diff --git a/ZeroLevel/Services/Network/Contract/IExchangeService.cs b/ZeroLevel/Services/Network/Contracts/IExchangeService.cs similarity index 100% rename from ZeroLevel/Services/Network/Contract/IExchangeService.cs rename to ZeroLevel/Services/Network/Contracts/IExchangeService.cs diff --git a/ZeroLevel/Services/_Network/Contracts/IRouter.cs b/ZeroLevel/Services/Network/Contracts/IRouter.cs similarity index 59% rename from ZeroLevel/Services/_Network/Contracts/IRouter.cs rename to ZeroLevel/Services/Network/Contracts/IRouter.cs index d0d22f5..e334075 100644 --- a/ZeroLevel/Services/_Network/Contracts/IRouter.cs +++ b/ZeroLevel/Services/Network/Contracts/IRouter.cs @@ -1,4 +1,7 @@ -namespace ZeroLevel.Services._Network +using System; +using ZeroLevel.Models; + +namespace ZeroLevel.Network { public interface IRouter { @@ -23,13 +26,13 @@ public interface IClient { - void Send(string inbox); - void Send(string inbox, byte[] data); - void Send(string inbox, T message); + InvokeResult Send(string inbox); + InvokeResult Send(string inbox, byte[] data); + InvokeResult Send(string inbox, T message); - byte[] Request(string inbox); - byte[] Request(string inbox, byte[] data); - Tresponse Request(string inbox); - Tresponse Request(string inbox, Trequest request); + InvokeResult Request(string inbox, Action callback); + InvokeResult Request(string inbox, byte[] data, Action callback); + InvokeResult Request(string inbox, Action callback); + InvokeResult Request(string inbox, Trequest request, Action callback); } } diff --git a/ZeroLevel/Services/Network/Contracts/ISocketClient.cs b/ZeroLevel/Services/Network/Contracts/ISocketClient.cs new file mode 100644 index 0000000..772020e --- /dev/null +++ b/ZeroLevel/Services/Network/Contracts/ISocketClient.cs @@ -0,0 +1,21 @@ +using System; +using System.Net; + +namespace ZeroLevel.Network +{ + public interface ISocketClient: + IDisposable + { + event Action OnIncomingData; + event Action OnConnect; + event Action OnDisconnect; + IPEndPoint Endpoint { get; } + SocketClientStatus Status { get; } + + void ForceConnect(); + void UseKeepAlive(TimeSpan period); + void Send(Frame data); + void Request(Frame data, Action callback, Action fail = null); + void Response(byte[] data, int identity); + } +} diff --git a/ZeroLevel/Services/Network/Services/DiscoveryClient.cs b/ZeroLevel/Services/Network/DiscoveryClient.cs similarity index 99% rename from ZeroLevel/Services/Network/Services/DiscoveryClient.cs rename to ZeroLevel/Services/Network/DiscoveryClient.cs index dc6833a..125199a 100644 --- a/ZeroLevel/Services/Network/Services/DiscoveryClient.cs +++ b/ZeroLevel/Services/Network/DiscoveryClient.cs @@ -128,7 +128,7 @@ namespace ZeroLevel.Network : IDiscoveryClient { private readonly DCRouter _router = new DCRouter(); - private readonly IExClient _discoveryServerClient; + private readonly NetworkNode _discoveryServerClient; public DiscoveryClient(string endpoint) { diff --git a/ZeroLevel/Services/Network/Services/ExService.cs b/ZeroLevel/Services/Network/ExService.cs similarity index 91% rename from ZeroLevel/Services/Network/Services/ExService.cs rename to ZeroLevel/Services/Network/ExService.cs index ab6f168..0a8df49 100644 --- a/ZeroLevel/Services/Network/Services/ExService.cs +++ b/ZeroLevel/Services/Network/ExService.cs @@ -9,9 +9,9 @@ namespace ZeroLevel.Network private readonly ExRouter _router; private readonly IZObservableServer _server; - public event Action OnConnect = c => { }; + public event Action OnConnect = c => { }; - public event Action OnDisconnect = c => { }; + public event Action OnDisconnect = c => { }; public ExService(IZObservableServer server) { @@ -23,12 +23,12 @@ namespace ZeroLevel.Network _server.OnDisconnect += _server_OnDisconnect; } - private void _server_OnDisconnect(IZBackward client) + private void _server_OnDisconnect(ISocketClient client) { this.OnDisconnect(client); } - private void _server_OnConnect(IZBackward client) + private void _server_OnConnect(ISocketClient client) { this.OnConnect(client); } @@ -101,7 +101,7 @@ namespace ZeroLevel.Network _router.RegisterInbox(inbox, handler); } - public void RegisterInbox(Action handler) + public void RegisterInbox(Action handler) { _router.RegisterInbox(DEFAULT_REQUEST_INBOX, handler); } diff --git a/ZeroLevel/Services/Network/Services/ExServiceHost.cs b/ZeroLevel/Services/Network/ExServiceHost.cs similarity index 99% rename from ZeroLevel/Services/Network/Services/ExServiceHost.cs rename to ZeroLevel/Services/Network/ExServiceHost.cs index b593656..3efc3f8 100644 --- a/ZeroLevel/Services/Network/Services/ExServiceHost.cs +++ b/ZeroLevel/Services/Network/ExServiceHost.cs @@ -3,7 +3,6 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; -using System.Net; using System.Reflection; namespace ZeroLevel.Network diff --git a/ZeroLevel/Services/Network/Services/Exchange.cs b/ZeroLevel/Services/Network/Exchange.cs similarity index 93% rename from ZeroLevel/Services/Network/Services/Exchange.cs rename to ZeroLevel/Services/Network/Exchange.cs index 63ce946..5fc5970 100644 --- a/ZeroLevel/Services/Network/Services/Exchange.cs +++ b/ZeroLevel/Services/Network/Exchange.cs @@ -60,7 +60,7 @@ namespace ZeroLevel.Network return false; } - public bool Send(string serviceKey, T data) => Send(serviceKey, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); + public bool Send(string serviceKey, T data) => Send(serviceKey, BaseSocket.DEFAULT_MESSAGE_INBOX, data); #endregion Balanced send @@ -85,7 +85,7 @@ namespace ZeroLevel.Network { return false; } - if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) + if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS)) { return false; } @@ -128,7 +128,7 @@ namespace ZeroLevel.Network { return false; } - if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) + if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS)) { return false; } @@ -153,10 +153,10 @@ namespace ZeroLevel.Network } public Tresp Request(string serviceKey, Treq data) => - Request(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); + Request(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX, data); public Tresp Request(string serviceKey) => - Request(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX); + Request(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX); #endregion Balanced request @@ -181,7 +181,7 @@ namespace ZeroLevel.Network { return false; } - if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) + if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS)) { return false; } @@ -224,7 +224,7 @@ namespace ZeroLevel.Network { return false; } - if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) + if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS)) { return false; } @@ -249,10 +249,10 @@ namespace ZeroLevel.Network } public Tresp RequestDirect(string endpoint, string serviceKey, Treq data) => - RequestDirect(endpoint, serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); + RequestDirect(endpoint, serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX, data); public Tresp RequestDirect(string endpoint, string serviceKey) => - RequestDirect(endpoint, serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX); + RequestDirect(endpoint, serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX); #endregion Direct request @@ -299,7 +299,7 @@ namespace ZeroLevel.Network /// Service key /// Message /// true - on successful submission - public bool SendBroadcast(string serviceKey, T data) => SendBroadcast(serviceKey, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); + public bool SendBroadcast(string serviceKey, T data) => SendBroadcast(serviceKey, BaseSocket.DEFAULT_MESSAGE_INBOX, data); /// /// Sending a message to all services of a specific type to the specified handler @@ -343,7 +343,7 @@ namespace ZeroLevel.Network /// Message /// true - on successful submission public bool SendBroadcastByType(string serviceType, T data) => - SendBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); + SendBroadcastByType(serviceType, BaseSocket.DEFAULT_MESSAGE_INBOX, data); /// /// Sending a message to all services of a specific group to the specified handler @@ -387,7 +387,7 @@ namespace ZeroLevel.Network /// Messsage /// true - on successful submission public bool SendBroadcastByGroup(string serviceGroup, T data) => - SendBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); + SendBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_MESSAGE_INBOX, data); /// /// Broadcast polling services by key @@ -445,7 +445,7 @@ namespace ZeroLevel.Network /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcast(string serviceKey, Treq data) => - RequestBroadcast(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); + RequestBroadcast(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX, data); /// /// Broadcast polling of services by key, without message of request, to default handler @@ -455,7 +455,7 @@ namespace ZeroLevel.Network /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcast(string serviceKey) => - RequestBroadcast(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX); + RequestBroadcast(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX); /// /// Broadcast polling services by type of service @@ -513,7 +513,7 @@ namespace ZeroLevel.Network /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByType(string serviceType, Treq data) => - RequestBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); + RequestBroadcastByType(serviceType, BaseSocket.DEFAULT_REQUEST_INBOX, data); /// /// Broadcast polling services by type, without message request, in the default handler @@ -523,7 +523,7 @@ namespace ZeroLevel.Network /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByType(string serviceType) => - RequestBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_REQUEST_INBOX); + RequestBroadcastByType(serviceType, BaseSocket.DEFAULT_REQUEST_INBOX); /// /// Broadcast polling services for a group of services @@ -581,7 +581,7 @@ namespace ZeroLevel.Network /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByGroup(string serviceGroup, Treq data) => - RequestBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); + RequestBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_REQUEST_INBOX, data); /// ///Broadcast polling services for a group of services, without sending a request, to the default handler @@ -591,11 +591,11 @@ namespace ZeroLevel.Network /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByGroup(string serviceGroup) => - RequestBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_REQUEST_INBOX); + RequestBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_REQUEST_INBOX); #region Private - private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data) + private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data) { var response = new List(); using (var waiter = new CountdownEvent(clients.Count)) @@ -613,17 +613,17 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); } - waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS); + waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS); } return response; } - private IEnumerable _RequestBroadcast(List clients, string inbox) + private IEnumerable _RequestBroadcast(List clients, string inbox) { var response = new List(); using (var waiter = new CountdownEvent(clients.Count)) @@ -641,12 +641,12 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); } - waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS); + waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS); } return response; } diff --git a/ZeroLevel/Services/Network/ExchangeTransportFactory.cs b/ZeroLevel/Services/Network/ExchangeTransportFactory.cs index 3643d7d..9008688 100644 --- a/ZeroLevel/Services/Network/ExchangeTransportFactory.cs +++ b/ZeroLevel/Services/Network/ExchangeTransportFactory.cs @@ -5,7 +5,7 @@ namespace ZeroLevel.Network { public static class ExchangeTransportFactory { - private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); + private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); /// /// Creates a server to receive messages using the specified protocol @@ -22,13 +22,13 @@ namespace ZeroLevel.Network /// Protocol /// Server endpoint /// Client - public static IExClient GetClientWithCache(string endpoint) + public static NetworkNode GetClientWithCache(string endpoint) { - IExClient instance = null; + NetworkNode instance = null; if (_clientInstances.ContainsKey(endpoint)) { instance = _clientInstances[endpoint]; - if (instance.Status == ZTransportStatus.Working) + if (instance.Status == SocketClientStatus.Working) { return instance; } @@ -41,9 +41,9 @@ namespace ZeroLevel.Network return instance; } - public static IExClient GetClient(string endpoint) + public static NetworkNode GetClient(string endpoint) { - return new ExClient(new ZSocketClient(NetUtils.CreateIPEndPoint(endpoint))); + return new NetworkNode(new SocketClient(NetUtils.CreateIPEndPoint(endpoint))); } } } \ No newline at end of file diff --git a/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs b/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs index f583826..916d5dd 100644 --- a/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs +++ b/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs @@ -2,11 +2,9 @@ using System.Collections.Concurrent; using System.IO; using System.Threading; -using ZeroLevel.Network; -using ZeroLevel.Services.Network.FileTransfer.Model; using ZeroLevel.Services.Pools; -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { public abstract class BaseFileTransfer { diff --git a/ZeroLevel/Services/Network/FileTransfer/ClientFolderNameMapperDelegate.cs b/ZeroLevel/Services/Network/FileTransfer/ClientFolderNameMapperDelegate.cs index 81b5e26..4f06e4d 100644 --- a/ZeroLevel/Services/Network/FileTransfer/ClientFolderNameMapperDelegate.cs +++ b/ZeroLevel/Services/Network/FileTransfer/ClientFolderNameMapperDelegate.cs @@ -1,6 +1,4 @@ -using ZeroLevel.Network; - -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { - public delegate string ClientFolderNameMapper(IExClient client); + public delegate string ClientFolderNameMapper(ISocketClient client); } diff --git a/ZeroLevel/Services/Network/FileTransfer/FileClient.cs b/ZeroLevel/Services/Network/FileTransfer/FileClient.cs index 81be498..5271ac3 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileClient.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileClient.cs @@ -1,21 +1,18 @@ using System; -using System.IO; using System.Threading; using ZeroLevel.Models; -using ZeroLevel.Network; -using ZeroLevel.Services.Network.FileTransfer.Model; -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { public sealed class FileClient : BaseFileTransfer, IFileClient { - private readonly IExClient _client; + private readonly NetworkNode _client; private readonly string _baseFolder; private readonly ClientFolderNameMapper _nameMapper; private readonly bool _disposeClient; - internal FileClient(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) + internal FileClient(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) : base(baseFolder) { _client = client ?? throw new Exception(nameof(client)); @@ -23,9 +20,9 @@ namespace ZeroLevel.Services.Network.FileTransfer _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); _disposeClient = disposeClient; - _client.RegisterInbox("__upload_file_start", (f, _, __) => Receiver.Incoming(f, nameMapper(_client))); - _client.RegisterInbox("__upload_file_frame", (f, _, __) => Receiver.Incoming(f)); - _client.RegisterInbox("__upload_file_complete", (f, _, __) => Receiver.Incoming(f)); + _client.RegisterInbox("__upload_file_start", (c, f) => Receiver.Incoming(f, nameMapper(c))); + _client.RegisterInbox("__upload_file_frame", (c, f) => Receiver.Incoming(f)); + _client.RegisterInbox("__upload_file_complete", (c, f) => Receiver.Incoming(f)); } public void Dispose() diff --git a/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs b/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs index 522e720..533e3ab 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs @@ -1,7 +1,6 @@ -using ZeroLevel.Network; -using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.FileSystem; -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { public static class FileClientFactory { @@ -11,12 +10,12 @@ namespace ZeroLevel.Services.Network.FileTransfer nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true); } - public static IFileClient Create(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper = null) + public static IFileClient Create(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper = null) { return CreateFileServerClient(client, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false); } - private static IFileClient CreateFileServerClient(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) + private static IFileClient CreateFileServerClient(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) { return new FileClient(client, baseFolder, nameMapper, disposeClient); } diff --git a/ZeroLevel/Services/Network/FileTransfer/FileReader.cs b/ZeroLevel/Services/Network/FileTransfer/FileReader.cs index 128dbb2..aee7a80 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileReader.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileReader.cs @@ -1,9 +1,8 @@ using System; using System.Collections.Generic; using System.IO; -using ZeroLevel.Services.Network.FileTransfer.Model; -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { internal sealed class FileReader { diff --git a/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs b/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs index da3653c..e295220 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileReceiver.cs @@ -2,9 +2,8 @@ using System.Collections.Generic; using System.IO; using ZeroLevel.Models; -using ZeroLevel.Services.Network.FileTransfer.Model; -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { public class FileReceiver { diff --git a/ZeroLevel/Services/Network/FileTransfer/FileServer.cs b/ZeroLevel/Services/Network/FileTransfer/FileServer.cs index 3a39a0c..eaacfde 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileServer.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileServer.cs @@ -1,9 +1,7 @@ using System; using ZeroLevel.Models; -using ZeroLevel.Network; -using ZeroLevel.Services.Network.FileTransfer.Model; -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { public sealed class FileServer : BaseFileTransfer, IFileServer @@ -34,7 +32,7 @@ namespace ZeroLevel.Services.Network.FileTransfer } } - public void Send(IZBackward client, string fileName, Action completeHandler = null, Action errorHandler = null) + public void Send(ISocketClient client, string fileName, Action completeHandler = null, Action errorHandler = null) { PushTransferTask(fileName, completeHandler, errorHandler, client); } diff --git a/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs b/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs index 8176301..e8ede19 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs @@ -1,7 +1,7 @@ using ZeroLevel.Network; using ZeroLevel.Services.FileSystem; -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { public static class FileServerFactory { diff --git a/ZeroLevel/Services/Network/FileTransfer/IFileClient.cs b/ZeroLevel/Services/Network/FileTransfer/IFileClient.cs index 507e16f..579a330 100644 --- a/ZeroLevel/Services/Network/FileTransfer/IFileClient.cs +++ b/ZeroLevel/Services/Network/FileTransfer/IFileClient.cs @@ -1,6 +1,6 @@ using System; -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { public interface IFileClient : IDisposable diff --git a/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs b/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs index 4355dde..c082d8b 100644 --- a/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs +++ b/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs @@ -1,11 +1,11 @@ using System; using ZeroLevel.Network; -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { public interface IFileServer : IDisposable { - void Send(IZBackward client, string fileName, Action completeHandler = null, Action errorHandler = null); + void Send(ISocketClient client, string fileName, Action completeHandler = null, Action errorHandler = null); } } diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/FileEndFrame.cs b/ZeroLevel/Services/Network/FileTransfer/Model/FileEndFrame.cs index a6a7cda..bb58ca4 100644 --- a/ZeroLevel/Services/Network/FileTransfer/Model/FileEndFrame.cs +++ b/ZeroLevel/Services/Network/FileTransfer/Model/FileEndFrame.cs @@ -1,6 +1,6 @@ using ZeroLevel.Services.Serialization; -namespace ZeroLevel.Services.Network.FileTransfer.Model +namespace ZeroLevel.Network.FileTransfer { public sealed class FileEndFrame : IBinarySerializable, IFileTransferInfo diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs b/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs index 71be19d..3233d43 100644 --- a/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs +++ b/ZeroLevel/Services/Network/FileTransfer/Model/FileFrame.cs @@ -1,6 +1,6 @@ using ZeroLevel.Services.Serialization; -namespace ZeroLevel.Services.Network.FileTransfer.Model +namespace ZeroLevel.Network.FileTransfer { public sealed class FileFrame : IBinarySerializable, IFileTransferInfo diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/FileStartFrame.cs b/ZeroLevel/Services/Network/FileTransfer/Model/FileStartFrame.cs index ba16844..b49d8d4 100644 --- a/ZeroLevel/Services/Network/FileTransfer/Model/FileStartFrame.cs +++ b/ZeroLevel/Services/Network/FileTransfer/Model/FileStartFrame.cs @@ -1,7 +1,7 @@ using System.Threading; using ZeroLevel.Services.Serialization; -namespace ZeroLevel.Services.Network.FileTransfer.Model +namespace ZeroLevel.Network.FileTransfer { public sealed class FileStartFrame : IBinarySerializable, IFileTransferInfo diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs b/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs index a61084c..f94e4d2 100644 --- a/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs +++ b/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs @@ -1,13 +1,12 @@ using System; -using ZeroLevel.Network; -namespace ZeroLevel.Services.Network.FileTransfer.Model +namespace ZeroLevel.Network.FileTransfer { internal class FileTransferTask { public string FilePath; public Action CompletedHandler; public Action ErrorHandler; - public IZBackward Client; + public ISocketClient Client; } } diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/IFileTransferInfo.cs b/ZeroLevel/Services/Network/FileTransfer/Model/IFileTransferInfo.cs index c80da97..1337d9d 100644 --- a/ZeroLevel/Services/Network/FileTransfer/Model/IFileTransferInfo.cs +++ b/ZeroLevel/Services/Network/FileTransfer/Model/IFileTransferInfo.cs @@ -1,4 +1,4 @@ -namespace ZeroLevel.Services.Network.FileTransfer.Model +namespace ZeroLevel.Network.FileTransfer { public enum FileTransferInfoType { diff --git a/ZeroLevel/Services/Network/FileTransfer/ServerFolderNameMapperDelegate.cs b/ZeroLevel/Services/Network/FileTransfer/ServerFolderNameMapperDelegate.cs index 27e8417..f83c0fc 100644 --- a/ZeroLevel/Services/Network/FileTransfer/ServerFolderNameMapperDelegate.cs +++ b/ZeroLevel/Services/Network/FileTransfer/ServerFolderNameMapperDelegate.cs @@ -1,6 +1,6 @@ using ZeroLevel.Network; -namespace ZeroLevel.Services.Network.FileTransfer +namespace ZeroLevel.Network.FileTransfer { - public delegate string ServerFolderNameMapperDelegate(IZBackward connection); + public delegate string ServerFolderNameMapperDelegate(ISocketClient connection); } diff --git a/ZeroLevel/Services/Network/Models/ExServiceInfo.cs b/ZeroLevel/Services/Network/Model/ExServiceInfo.cs similarity index 100% rename from ZeroLevel/Services/Network/Models/ExServiceInfo.cs rename to ZeroLevel/Services/Network/Model/ExServiceInfo.cs diff --git a/ZeroLevel/Services/Network/Models/ExchangeAttributes.cs b/ZeroLevel/Services/Network/Model/ExchangeAttributes.cs similarity index 100% rename from ZeroLevel/Services/Network/Models/ExchangeAttributes.cs rename to ZeroLevel/Services/Network/Model/ExchangeAttributes.cs diff --git a/ZeroLevel/Services/Network/Models/Frame.cs b/ZeroLevel/Services/Network/Model/Frame.cs similarity index 86% rename from ZeroLevel/Services/Network/Models/Frame.cs rename to ZeroLevel/Services/Network/Model/Frame.cs index 7510301..613b0da 100644 --- a/ZeroLevel/Services/Network/Models/Frame.cs +++ b/ZeroLevel/Services/Network/Model/Frame.cs @@ -24,6 +24,22 @@ namespace ZeroLevel.Network return frame; } + public static Frame FromPool(string inbox) + { + var frame = _pool.Allocate(); + frame.Inbox = inbox; + frame.Payload = null; + return frame; + } + + public static Frame FromPool(string inbox, byte[] payload) + { + var frame = _pool.Allocate(); + frame.Inbox = inbox; + frame.Payload = payload; + return frame; + } + public void Release() { _pool.Free(this); @@ -35,6 +51,8 @@ namespace ZeroLevel.Network [DataMember] public byte[] Payload { get; set; } + public bool IsRequest { get; set; } + public Frame() { } @@ -114,4 +132,4 @@ namespace ZeroLevel.Network return new Frame(this); } } -} \ No newline at end of file +} diff --git a/ZeroLevel/Services/_Network/FrameType.cs b/ZeroLevel/Services/Network/Model/FrameType.cs similarity index 75% rename from ZeroLevel/Services/_Network/FrameType.cs rename to ZeroLevel/Services/Network/Model/FrameType.cs index 6a92336..5b641af 100644 --- a/ZeroLevel/Services/_Network/FrameType.cs +++ b/ZeroLevel/Services/Network/Model/FrameType.cs @@ -1,4 +1,4 @@ -namespace ZeroLevel.Services._Network +namespace ZeroLevel.Network { public enum FrameType { diff --git a/ZeroLevel/Services/_Network/NetworkDelegates.cs b/ZeroLevel/Services/Network/Model/NetworkDelegates.cs similarity index 88% rename from ZeroLevel/Services/_Network/NetworkDelegates.cs rename to ZeroLevel/Services/Network/Model/NetworkDelegates.cs index 0a1db6b..8ea0177 100644 --- a/ZeroLevel/Services/_Network/NetworkDelegates.cs +++ b/ZeroLevel/Services/Network/Model/NetworkDelegates.cs @@ -1,4 +1,4 @@ -namespace ZeroLevel.Services._Network +namespace ZeroLevel.Network { public delegate void MessageHandler(ISocketClient client); public delegate void MessageHandler(ISocketClient client, T message); diff --git a/ZeroLevel/Services/Network/Models/RequestInfo.cs b/ZeroLevel/Services/Network/Model/RequestInfo.cs similarity index 99% rename from ZeroLevel/Services/Network/Models/RequestInfo.cs rename to ZeroLevel/Services/Network/Model/RequestInfo.cs index 9443ba0..5d427eb 100644 --- a/ZeroLevel/Services/Network/Models/RequestInfo.cs +++ b/ZeroLevel/Services/Network/Model/RequestInfo.cs @@ -35,4 +35,4 @@ namespace ZeroLevel.Network _failHandler(reasonPhrase); } } -} \ No newline at end of file +} diff --git a/ZeroLevel/Services/Network/Models/ServiceEndpointInfo.cs b/ZeroLevel/Services/Network/Model/ServiceEndpointInfo.cs similarity index 100% rename from ZeroLevel/Services/Network/Models/ServiceEndpointInfo.cs rename to ZeroLevel/Services/Network/Model/ServiceEndpointInfo.cs diff --git a/ZeroLevel/Services/Network/Models/ServiceEndpointsInfo.cs b/ZeroLevel/Services/Network/Model/ServiceEndpointsInfo.cs similarity index 100% rename from ZeroLevel/Services/Network/Models/ServiceEndpointsInfo.cs rename to ZeroLevel/Services/Network/Model/ServiceEndpointsInfo.cs diff --git a/ZeroLevel/Services/Network/Models/ZTransportStatus.cs b/ZeroLevel/Services/Network/Model/SocketClientStatus.cs similarity index 73% rename from ZeroLevel/Services/Network/Models/ZTransportStatus.cs rename to ZeroLevel/Services/Network/Model/SocketClientStatus.cs index afd80e2..6c6b273 100644 --- a/ZeroLevel/Services/Network/Models/ZTransportStatus.cs +++ b/ZeroLevel/Services/Network/Model/SocketClientStatus.cs @@ -1,11 +1,10 @@ namespace ZeroLevel.Network { - public enum ZTransportStatus - : int + public enum SocketClientStatus { Initialized = 0, Working = 1, Broken = 2, Disposed = 4 } -} \ No newline at end of file +} diff --git a/ZeroLevel/Services/Network/NetworkNode.cs b/ZeroLevel/Services/Network/NetworkNode.cs new file mode 100644 index 0000000..eaf8742 --- /dev/null +++ b/ZeroLevel/Services/Network/NetworkNode.cs @@ -0,0 +1,197 @@ +using System; +using System.Net; +using ZeroLevel.Models; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Network +{ + public class NetworkNode + : IClient, IRouter, IDisposable + { + private FrameParser _parser = new FrameParser(); + private readonly ISocketClient _client; + private readonly Router _router; + private DateTime _lastConnectionTime; + public IPEndPoint EndPoint => _client?.Endpoint; + public SocketClientStatus Status => _client.Status; + + public NetworkNode(ISocketClient client) + { + _lastConnectionTime = DateTime.UtcNow; + _client = client; + _router = new Router(); + _parser.OnIncoming += _parser_OnIncoming; + _client.OnIncomingData += _readerWriter_OnIncomingData; + } + + private void _readerWriter_OnIncomingData(ISocketClient client, byte[] data, int length) + { + _parser.Push(data, length); + } + + private void _parser_OnIncoming(FrameType type, int identity, byte[] data) + { + switch (type) + { + case FrameType.KeepAlive: + _lastConnectionTime = DateTime.UtcNow; + break; + case FrameType.Message: + _router.HandleMessage(MessageSerializer.Deserialize(data), _client); + break; + case FrameType.Request: + var response = _router.HandleRequest(MessageSerializer.Deserialize(data), _client); + _client.Response(response, identity); + break; + } + } + + public void ForceConnect() => _client.ForceConnect(); + + public InvokeResult Send(string inbox) + { + try + { + _client.Send(Frame.FromPool(inbox)); + } + catch (Exception ex) + { + Log.Error(ex, "[NetworkNode.Send]"); + return InvokeResult.Fault(ex.Message); + } + return InvokeResult.Succeeding(); + } + + public InvokeResult Send(string inbox, byte[] data) + { + try + { + _client.Send(Frame.FromPool(inbox, data)); + } + catch (Exception ex) + { + Log.Error(ex, "[NetworkNode.Send]"); + return InvokeResult.Fault(ex.Message); + } + return InvokeResult.Succeeding(); + } + + public InvokeResult Send(string inbox, T message) + { + try + { + _client.Send(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible(message))); + } + catch (Exception ex) + { + Log.Error(ex, "[NetworkNode.Send]"); + return InvokeResult.Fault(ex.Message); + } + return InvokeResult.Succeeding(); + } + + public InvokeResult Request(string inbox, Action callback) + { + try + { + _client.Request(Frame.FromPool(inbox), f => callback(f.Payload)); + } + catch (Exception ex) + { + Log.Error(ex, "[NetworkNode.Request]"); + return InvokeResult.Fault(ex.Message); + } + return InvokeResult.Succeeding(); + } + + public InvokeResult Request(string inbox, byte[] data, Action callback) + { + try + { + _client.Request(Frame.FromPool(inbox, data), f => callback(f.Payload)); + } + catch (Exception ex) + { + Log.Error(ex, "[NetworkNode.Request]"); + return InvokeResult.Fault(ex.Message); + } + return InvokeResult.Succeeding(); + } + + public InvokeResult Request(string inbox, Action callback) + { + try + { + _client.Request(Frame.FromPool(inbox), f => callback(MessageSerializer.DeserializeCompatible(f.Payload))); + } + catch (Exception ex) + { + Log.Error(ex, "[NetworkNode.Request]"); + return InvokeResult.Fault(ex.Message); + } + return InvokeResult.Succeeding(); + } + + public InvokeResult Request(string inbox, Trequest request, Action callback) + { + try + { + _client.Request(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible(request)), + f => callback(MessageSerializer.DeserializeCompatible(f.Payload))); + } + catch (Exception ex) + { + Log.Error(ex, "[NetworkNode.Request]"); + return InvokeResult.Fault(ex.Message); + } + return InvokeResult.Succeeding(); + } + + #region IRouter + public void RegisterInbox(string inbox, MessageHandler handler) + { + _router.RegisterInbox(inbox, handler); + } + + public void RegisterInbox(string inbox, MessageHandler handler) + { + _router.RegisterInbox(inbox, handler); + } + + public void RegisterInbox(MessageHandler handler) + { + _router.RegisterInbox(handler); + } + + public void RegisterInbox(MessageHandler handler) + { + _router.RegisterInbox(handler); + } + + public void RegisterInbox(string inbox, RequestHandler handler) + { + _router.RegisterInbox(inbox, handler); + } + + public void RegisterInbox(string inbox, RequestHandler handler) + { + _router.RegisterInbox(inbox, handler); + } + + public void RegisterInbox(RequestHandler handler) + { + _router.RegisterInbox(handler); + } + + public void RegisterInbox(RequestHandler handler) + { + _router.RegisterInbox(handler); + } + #endregion + + public void Dispose() + { + _client.Dispose(); + } + } +} diff --git a/ZeroLevel/Services/Network/NetworkStreamDataObfuscator.cs b/ZeroLevel/Services/Network/NetworkStreamDataObfuscator.cs deleted file mode 100644 index 9ad795b..0000000 --- a/ZeroLevel/Services/Network/NetworkStreamDataObfuscator.cs +++ /dev/null @@ -1,50 +0,0 @@ -using System; - -namespace ZeroLevel.Network -{ - public static class NetworkStreamFastObfuscator - { - public static byte[] PrepareData(byte[] data) - { - var packet = new byte[data.Length + 6]; - packet[0] = 181; - Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4); - packet[5] = (byte)(packet[0] ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]); - HashData(data, packet[5]); - Array.Copy(data, 0, packet, 6, data.Length); - return packet; - } - - public static void HashData(byte[] data, byte initialmask) - { - if (data.Length == 0) return; - int i = 1; - data[0] ^= initialmask; - for (; i < (data.Length - 8); i += 8) - { - data[i + 0] ^= data[i - 1]; - data[i + 1] ^= data[i + 0]; - data[i + 2] ^= data[i + 1]; - data[i + 3] ^= data[i + 2]; - data[i + 4] ^= data[i + 3]; - data[i + 5] ^= data[i + 4]; - data[i + 6] ^= data[i + 5]; - data[i + 7] ^= data[i + 6]; - } - for (; i < data.Length; i++) - { - data[i] ^= data[i - 1]; - } - } - - public static void DeHashData(byte[] data, byte initialmask) - { - if (data.Length == 0) return; - for (var i = data.Length - 1; i > 0; i--) - { - data[i] ^= data[i - 1]; - } - data[0] ^= initialmask; - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Services/ExClient.cs b/ZeroLevel/Services/Network/Services/ExClient.cs deleted file mode 100644 index 8beba54..0000000 --- a/ZeroLevel/Services/Network/Services/ExClient.cs +++ /dev/null @@ -1,124 +0,0 @@ -using System; -using System.Net; -using ZeroLevel.Models; - -namespace ZeroLevel.Network -{ - internal sealed class ExClient - : ZBaseNetwork, IExClient, IZBackward - { - private readonly IZTransport _transport; - private readonly ExRouter _router; - private readonly FrameExchange _fe; - - public event Action Connected = () => { }; - - public new ZTransportStatus Status => _transport.Status; - - public ExClient(IZTransport transport) - { - _transport = transport ?? throw new ArgumentNullException(nameof(transport)); - transport.OnConnect += Transport_OnConnect; - _fe = new FrameExchange(transport); - _router = new ExRouter(); - transport.OnServerMessage += Transport_OnServerMessage; - } - - public void ForceConnect() - { - try - { - _transport.EnsureConnection(); - } - catch { } - } - - private void Transport_OnConnect() - { - Connected(); - } - - private void Transport_OnServerMessage(object sender, Frame e) - { - try - { - _router.HandleMessage(e, this); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ExClient] Fault handle server message"); - } - finally - { - e?.Release(); - } - } - - public IPEndPoint Endpoint => _fe.Endpoint; - - public override void Dispose() - { - _fe.Dispose(); - } - - public void RegisterInbox(string inbox, Action handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(Action handler) - { - _router.RegisterInbox(DEFAULT_MESSAGE_INBOX, handler); - } - - public InvokeResult Request(Action callback) - { - return _fe.Request(DEFAULT_REQUEST_INBOX, resp => callback(resp)); - } - - public InvokeResult Request(string inbox, Action callback) - { - return _fe.Request(inbox, resp => callback(resp)); - } - - public InvokeResult Request(Treq obj, Action callback) - { - return _fe.Request(DEFAULT_REQUEST_INBOX, obj, resp => callback(resp)); - } - - public InvokeResult Request(string inbox, Treq obj, Action callback) - { - return _fe.Request(inbox, obj, resp => callback(resp)); - } - - public InvokeResult Send(T obj) - { - return _fe.Send(DEFAULT_MESSAGE_INBOX, obj); - } - - public InvokeResult Send(string inbox, T obj) - { - return _fe.Send(inbox, obj); - } - - public InvokeResult SendBackward(Frame frame) - { - return _fe.Send(frame); - } - - public InvokeResult SendBackward(string inbox, T obj) - { - return Send(inbox, obj); - } - - public InvokeResult Send() - { - return _fe.Send(DEFAULT_MESSAGE_INBOX); - } - - public InvokeResult Send(string inbox) - { - return _fe.Send(inbox); - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Services/FrameBuilder.cs b/ZeroLevel/Services/Network/Services/FrameBuilder.cs deleted file mode 100644 index 0fe4754..0000000 --- a/ZeroLevel/Services/Network/Services/FrameBuilder.cs +++ /dev/null @@ -1,77 +0,0 @@ -using ZeroLevel.Services.Serialization; - -namespace ZeroLevel.Network -{ - public static class FrameBuilder - { - public static Frame BuildFrame(T obj, string inbox) - { - var frame = Frame.FromPool(); - frame.FrameId = Frame.GetMessageId(); - frame.IsRequest = false; - frame.Inbox = inbox; - frame.Payload = MessageSerializer.SerializeCompatible(obj); - return frame; - } - - public static Frame BuildFrame(string inbox) - { - var frame = Frame.FromPool(); - frame.FrameId = Frame.GetMessageId(); - frame.IsRequest = false; - frame.Inbox = inbox; - frame.Payload = null; - return frame; - } - - public static Frame BuildRequestFrame(T obj, string inbox) - { - var frame = Frame.FromPool(); - frame.FrameId = Frame.GetMessageId(); - frame.IsRequest = true; - frame.Inbox = inbox; - frame.Payload = MessageSerializer.SerializeCompatible(obj); - return frame; - } - - public static Frame BuildRequestFrame(string inbox) - { - var frame = Frame.FromPool(); - frame.FrameId = Frame.GetMessageId(); - frame.IsRequest = true; - frame.Inbox = inbox; - frame.Payload = null; - return frame; - } - - public static Frame BuildResponseFrame(object obj, Frame request) - { - var frame = Frame.FromPool(); - frame.IsRequest = true; - frame.FrameId = request.FrameId; - frame.Inbox = request.Inbox; - frame.Payload = MessageSerializer.SerializeCompatible(obj); - return frame; - } - - public static Frame BuildResponseFrame(T obj, Frame request) - { - var frame = Frame.FromPool(); - frame.IsRequest = true; - frame.FrameId = request.FrameId; - frame.Inbox = request.Inbox; - frame.Payload = MessageSerializer.SerializeCompatible(obj); - return frame; - } - - public static Frame BuildResponseFrame(T obj, Frame request, string inbox) - { - var frame = Frame.FromPool(); - frame.IsRequest = true; - frame.FrameId = request.FrameId; - frame.Inbox = inbox; - frame.Payload = MessageSerializer.SerializeCompatible(obj); - return frame; - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Services/FrameExchange.cs b/ZeroLevel/Services/Network/Services/FrameExchange.cs deleted file mode 100644 index 1a8f1c8..0000000 --- a/ZeroLevel/Services/Network/Services/FrameExchange.cs +++ /dev/null @@ -1,104 +0,0 @@ -using System; -using System.Net; -using ZeroLevel.Models; - -namespace ZeroLevel.Network -{ - internal sealed class FrameExchange - : IDisposable - { - private IZTransport _current; - public IPEndPoint Endpoint => _current?.Endpoint; - public bool IsConnected => _current?.Status == ZTransportStatus.Working; - - public FrameExchange(IZTransport transport) - { - _current = transport ?? throw new ArgumentNullException(nameof(transport)); - } - - public InvokeResult Send(string inbox, T obj) - { - try - { - var frame = FrameBuilder.BuildFrame(obj, inbox); - _current.Send(frame); - return InvokeResult.Succeeding(); - } - catch (Exception ex) - { - Log.SystemError(ex, "[FrameExchange] Fault send frame"); - return InvokeResult.Fault(ex.Message); - } - } - - public InvokeResult Send(string inbox) - { - try - { - var frame = FrameBuilder.BuildFrame(inbox); - _current.Send(frame); - return InvokeResult.Succeeding(); - } - catch (Exception ex) - { - Log.SystemError(ex, "[FrameExchange] Fault send frame"); - return InvokeResult.Fault(ex.Message); - } - } - - public InvokeResult Send(Frame frame) - { - try - { - _current.Send(frame); - return InvokeResult.Succeeding(); - } - catch (Exception ex) - { - Log.SystemError(ex, "[FrameExchange] Fault send frame"); - return InvokeResult.Fault(ex.Message); - } - } - - public InvokeResult Request(string inbox, Treq obj, Action callback, Action fault = null) - { - try - { - var frame = FrameBuilder.BuildRequestFrame(obj, inbox); - _current.Request(frame, response_data => - { - callback(response_data.Read()); - }, fault); - return InvokeResult.Succeeding(); - } - catch (Exception ex) - { - Log.SystemError(ex, "[FrameExchange] Fault send frame"); - return InvokeResult.Fault(ex.Message); - } - } - - public InvokeResult Request(string inbox, Action callback, Action fault = null) - { - try - { - var frame = FrameBuilder.BuildRequestFrame(inbox); - _current.Request(frame, response_data => - { - callback(response_data.Read()); - }, fault); - return InvokeResult.Succeeding(); - } - catch (Exception ex) - { - Log.SystemError(ex, "[FrameExchange] Fault send frame"); - return InvokeResult.Fault(ex.Message); - } - } - - public void Dispose() - { - _current?.Dispose(); - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Services/FrameParser.cs b/ZeroLevel/Services/Network/Services/FrameParser.cs deleted file mode 100644 index 406a985..0000000 --- a/ZeroLevel/Services/Network/Services/FrameParser.cs +++ /dev/null @@ -1,191 +0,0 @@ -using System; -using System.Threading.Tasks; -using ZeroLevel.Services.Serialization; - -namespace ZeroLevel.Network -{ - public sealed class FrameParser - { - #region private models - - private enum ParserState - { - WaitNew, - WaitSize, - Proceeding - } - - private class _Accum - { - public byte[] Payload; - public int Size; - public bool SizeFilled; - public bool PayloadFilled; - public bool Corrupted; - - public void Reset() - { - Size = 0; - offset = 0; - Payload = null; - SizeFilled = false; - PayloadFilled = false; - Corrupted = false; - } - - private byte[] _size_buf = new byte[4]; - private int offset; - - public int WriteSize(byte[] buf, int start, int length) - { - for (; offset < 4 && start < length; offset++, start++) - { - _size_buf[offset] = buf[start]; - } - if (offset == 4) - { - Size = BitConverter.ToInt32(_size_buf, 0); - SizeFilled = true; - offset = 0; - if (Size == 0) - { - // At least 1 byte with checksum must be - Corrupted = true; - } - } - return start; - } - - public int WritePayload(byte[] buf, int start, int length) - { - if (Payload == null) - { - Payload = new byte[Size]; - var mask = ((byte)(ZBaseNetwork.PACKET_HEADER_START_BYTE ^ _size_buf[0] ^ _size_buf[1] ^ _size_buf[2] ^ _size_buf[3])); - if (buf[start] != mask) - { - Corrupted = true; - return start; - } - start = start + 1; - } - int i = start; - for (; offset < Size && i < length; offset++, i++) - { - Payload[offset] = buf[i]; - } - if (offset == Size) - { - var mask = ((byte)(ZBaseNetwork.PACKET_HEADER_START_BYTE ^ _size_buf[0] ^ _size_buf[1] ^ _size_buf[2] ^ _size_buf[3])); - NetworkStreamFastObfuscator.DeHashData(Payload, mask); - PayloadFilled = true; - } - return i; - } - } - - #endregion private models - - private void FireOnFrame(byte[] payload) - { - Frame frame; - try - { - frame = MessageSerializer.Deserialize(payload); - } - catch (Exception ex) - { - //NetworkStats.Corrupted(); - Log.SystemError(ex, "[FrameParser] Fault deserialize frame from incomig data"); - return; - } - try - { - Task.Run(() => OnIncomingFrame?.Invoke(frame)); - //NetworkStats.Receive(payload); - } - catch (Exception ex) - { - Log.SystemError(ex, "[FrameParser] Fault handle frame"); - } - } - - public event Action OnIncomingFrame; - - private readonly _Accum _accum = new _Accum(); - - private ParserState _state = ParserState.WaitNew; - private readonly object _push_lock = new object(); - - /// - /// Parse with state machine - /// - public void Push(byte[] part, int start, int length) - { - lock (_push_lock) - { - __Push(part, start, length); - } - } - - private void __Push(byte[] part, int start, int length) - { - if (part == null || length == 0 || start >= length) return; - while (start < length) - { - switch (_state) - { - case ParserState.WaitNew: - { - for (; start < length; start++) - { - // Search for the beginning of the package header - if ((part[start] & ZBaseNetwork.PACKET_HEADER_START_BYTE) == ZBaseNetwork.PACKET_HEADER_START_BYTE) - { - _accum.Reset(); - _state = ParserState.WaitSize; - start += 1; - break; - } - } - } - break; - - case ParserState.WaitSize: - { - start = _accum.WriteSize(part, start, length); - if (_accum.SizeFilled) - { - if (_accum.Corrupted || _accum.Size < 1 || _accum.Size > ZBaseNetwork.MAX_FRAME_PAYLOAD_SIZE) - { - //NetworkStats.Corrupted(); - _state = ParserState.WaitNew; - } - else - { - _state = ParserState.Proceeding; - } - } - } - break; - - case ParserState.Proceeding: - { - start = _accum.WritePayload(part, start, length); - if (_accum.Corrupted) - { - // NetworkStats.Corrupted(); - _state = ParserState.WaitNew; - } - else if (_accum.PayloadFilled) - { - FireOnFrame(_accum.Payload); - _state = ParserState.WaitNew; - } - } - break; - } - } - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Services/ZExSocketObservableServer.cs b/ZeroLevel/Services/Network/Services/ZExSocketObservableServer.cs deleted file mode 100644 index e17d6f4..0000000 --- a/ZeroLevel/Services/Network/Services/ZExSocketObservableServer.cs +++ /dev/null @@ -1,30 +0,0 @@ -using System; -using System.Net; - -namespace ZeroLevel.Network -{ - public class ZExSocketObservableServer : - ZSocketServer, IZObservableServer - { - public ZExSocketObservableServer(IPEndPoint endpoint) - : base(endpoint) - { - } - - public IPEndPoint Endpoint => base.LocalEndpoint; - - public event Action OnMessage = (_, __) => { }; - - public event Func OnRequest = (_, __) => null; - - protected override void Handle(Frame frame, IZBackward client) - { - OnMessage(frame, client); - } - - protected override Frame HandleRequest(Frame frame, IZBackward client) - { - return OnRequest(frame, client); - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/ZSocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs similarity index 51% rename from ZeroLevel/Services/Network/ZSocketClient.cs rename to ZeroLevel/Services/Network/SocketClient.cs index c023381..a2b9994 100644 --- a/ZeroLevel/Services/Network/ZSocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -7,207 +7,174 @@ using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network { - public class ZSocketClient - : ZBaseNetwork, IZTransport + public class SocketClient + : BaseSocket, ISocketClient { - - #region Private private Socket _clientSocket; private NetworkStream _stream; private FrameParser _parser = new FrameParser(); private Thread _sendThread; - private long _heartbeat_key; + private long _heartbeat_key = -1; private long _last_rw_time = DateTime.UtcNow.Ticks; private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE]; private readonly object _reconnection_lock = new object(); - - private readonly BlockingCollection _send_queue = new BlockingCollection(); - + private readonly BlockingCollection _send_queue = new BlockingCollection(); private readonly RequestBuffer _requests = new RequestBuffer(); + private int _current_heartbeat_period_in_ms = 0; + private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета + private struct SendInfo + { + public bool isRequest; + public int identity; + public byte[] data; + } #endregion Private - public event EventHandler OnServerMessage = (_, __) => { }; - public event Action OnConnect = () => { }; - public event Action OnDisconnect = () => { }; - - public IPEndPoint Endpoint { get; } public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } } - public ZSocketClient(IPEndPoint ep) + public SocketClient(IPEndPoint ep) { Endpoint = ep; - _parser.OnIncomingFrame += _parser_OnIncomingFrame; - - _heartbeat_key = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat); + _parser.OnIncoming += _parser_OnIncoming; + _sendThread = new Thread(SendFramesJob); + _sendThread.IsBackground = true; + _sendThread.Start(); + } + public SocketClient(Socket socket) + { + _socket_freezed = true; + _clientSocket = socket; + Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint; + _parser.OnIncoming += _parser_OnIncoming; _sendThread = new Thread(SendFramesJob); _sendThread.IsBackground = true; _sendThread.Start(); } - #region Private methods + #region API + public event Action OnConnect = (s) => { }; + public event Action OnDisconnect = (s) => { }; + public event Action OnIncomingData = (_, __, ___) => { }; + public IPEndPoint Endpoint { get; } - private void Heartbeat() + public void Request(Frame frame, Action callback, Action fail = null) { - try - { - EnsureConnection(); - } - catch(Exception ex) - { - Log.SystemError(ex, "ZSocketClient.Heartbeat()->EnsureConnection()"); - Broken(); - return; - } - _requests.TestForTimeouts(); - try - { - Request(FrameBuilder.BuildFrame(DEFAULT_PING_INBOX), r => { }); - } - catch (Exception ex) - { - Log.SystemError(ex, "ZSocketClient.Heartbeat()->Request()"); - } - var diff_request_ms = ((DateTime.UtcNow.Ticks - _last_rw_time) / TimeSpan.TicksPerMillisecond); - if (diff_request_ms > (HEARTBEAT_UPDATE_PERIOD_MS * 2)) + if (frame == null) throw new ArgumentNullException(nameof(frame)); + if (frame != null && false == _send_queue.IsAddingCompleted) { - var port = (_clientSocket.LocalEndPoint as IPEndPoint)?.Port; - Log.Debug($"[ZClient] server disconnected, because last data was more thas {diff_request_ms} ms ago. Client port {port}"); - Broken(); + while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) + { + Thread.Sleep(50); + } + int id; + var sendInfo = new SendInfo + { + isRequest = true, + data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out id) + }; + sendInfo.identity = id; + _requests.RegisterForFrame(id, callback, fail); + _send_queue.Add(sendInfo); + frame.Release(); } } - private void _parser_OnIncomingFrame(Frame frame) + public void ForceConnect() + { + EnsureConnection(); + } + + public void Send(Frame frame) { - if (frame == null || frame.Inbox == null) return; - _last_rw_time = DateTime.UtcNow.Ticks; - if (frame.IsRequest) + if (frame == null) throw new ArgumentNullException(nameof(frame)); + if (frame != null && false == _send_queue.IsAddingCompleted) { - // Got response on request with id = packet_id - try + while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { - _requests.Success(frame.FrameId, frame); + Thread.Sleep(50); } - catch (Exception ex) + _send_queue.Add(new SendInfo { - Log.SystemError(ex, "ZSocketClient._parser_OnIncomingFrame()->_requests.Success(). Fault handle response"); - } + isRequest = false, + identity = 0, + data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame)) + }); + frame.Release(); } - else + } + + public void Response(byte[] data, int identity) + { + if (data == null) throw new ArgumentNullException(nameof(data)); + if (false == _send_queue.IsAddingCompleted) { - // Got server comand - if (frame.Inbox.Equals(DEFAULT_PING_INBOX, StringComparison.Ordinal)) + while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { - _last_rw_time = DateTime.UtcNow.Ticks; + Thread.Sleep(50); } - else + _send_queue.Add(new SendInfo { - try - { - OnServerMessage?.Invoke(this, frame); - } - catch (Exception ex) - { - Log.SystemError(ex, "ZSocketClient._parser_OnIncomingFrame()->OnServerMessage?.Invoke(). Fault handle server message"); - } - } + isRequest = false, + identity = 0, + data = NetworkPacketFactory.Response(data, identity) + }); } - frame?.Release(); } - private void ReceiveAsyncCallback(IAsyncResult ar) + public void UseKeepAlive(TimeSpan period) { - try + if (_heartbeat_key != -1) { - EnsureConnection(); - var count = _stream.EndRead(ar); - if (count > 0) - { - _parser.Push(_buffer, 0, count); - _last_rw_time = DateTime.UtcNow.Ticks; - } - if (Status == ZTransportStatus.Working) - { - _stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null); - } + Sheduller.Remove(_heartbeat_key); } - catch (ObjectDisposedException) + if (period != TimeSpan.Zero && period.TotalMilliseconds > MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS) { - /// Nothing + _current_heartbeat_period_in_ms = (int)period.TotalMilliseconds; + _heartbeat_key = Sheduller.RemindEvery(period, Heartbeat); } - catch (Exception ex) + else { - Log.SystemError(ex, $"[ZSocketServerClient] Error read data"); - Broken(); - OnDisconnect(); + _current_heartbeat_period_in_ms = 0; } } + #endregion - private void SendFramesJob() + #region Private methods + private void _parser_OnIncoming(FrameType type, int identity, byte[] data) { - Frame frame = null; - while (Status != ZTransportStatus.Disposed) + try { - if (_send_queue.IsCompleted) - { - return; - } - if (Status != ZTransportStatus.Working) + switch (type) { - Thread.Sleep(100); - try - { - EnsureConnection(); - } - catch (Exception ex) - { - Log.SystemError(ex, "[ZSocketClient] Send next frame fault"); - } - if (Status == ZTransportStatus.Disposed) return; - continue; - } - try - { - frame = _send_queue.Take(); - var data = NetworkStreamFastObfuscator.PrepareData(MessageSerializer.Serialize(frame)); - if (data != null && data.Length > 0) - { - if (frame.IsRequest) - { - _requests.StartSend(frame.FrameId); - } - _stream.Write(data, 0, data.Length); + case FrameType.KeepAlive: _last_rw_time = DateTime.UtcNow.Ticks; - //NetworkStats.Send(data); - } - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ZSocketServerClient] Backward send error."); - Broken(); - OnDisconnect(); - } - finally - { - frame?.Release(); + break; + case FrameType.Message: + case FrameType.Request: + OnIncomingData(this, data, identity); + break; + case FrameType.Response: + _requests.Success(identity, MessageSerializer.Deserialize(data)); + break; } } + catch (Exception ex) + { + Log.Error(ex, $"[SocketClient._parser_OnIncoming]"); + } } - #endregion Private methods - - #region API - private bool TryConnect() { - if (Status == ZTransportStatus.Working) + if (Status == SocketClientStatus.Working) { return true; } - if (Status == ZTransportStatus.Disposed) + if (Status == SocketClientStatus.Disposed) { return false; } @@ -235,24 +202,28 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, $"[ZSocketClient] Connection fault"); + Log.SystemError(ex, $"[SocketClient.TryConnect] Connection fault"); Broken(); return false; } Working(); - OnConnect(); + OnConnect(this); return true; } public void EnsureConnection() { + if (_socket_freezed) + { + return; + } lock (_reconnection_lock) { - if (Status == ZTransportStatus.Disposed) + if (Status == SocketClientStatus.Disposed) { throw new ObjectDisposedException("connection"); } - if (Status != ZTransportStatus.Working) + if (Status != SocketClientStatus.Working) { if (false == TryConnect()) { @@ -262,47 +233,112 @@ namespace ZeroLevel.Network } } - public void Send(Frame frame) + private void Heartbeat() { - if (frame == null) throw new ArgumentNullException(nameof(frame)); - EnsureConnection(); - if (frame != null && false == _send_queue.IsAddingCompleted) + try + { + EnsureConnection(); + } + catch (Exception ex) + { + Log.SystemError(ex, "[SocketClient.Heartbeat.EnsureConnection]"); + Broken(); + return; + } + _requests.TestForTimeouts(); + try { - while (_send_queue.Count >= ZBaseNetwork.MAX_SEND_QUEUE_SIZE) + _send_queue.Add(new SendInfo { - Thread.Sleep(50); - } - _send_queue.Add(frame); + identity = 0, + isRequest = false, + data = NetworkPacketFactory.KeepAliveMessage() + }); + } + catch (Exception ex) + { + Log.SystemError(ex, "[SocketClient.Heartbeat.Request]"); + } + var diff_request_ms = ((DateTime.UtcNow.Ticks - _last_rw_time) / TimeSpan.TicksPerMillisecond); + if (diff_request_ms > (_current_heartbeat_period_in_ms * 2)) + { + var port = (_clientSocket.LocalEndPoint as IPEndPoint)?.Port; + Log.Debug($"[SocketClient.Heartbeat] server disconnected, because last data was more thas {diff_request_ms} ms ago. Client port {port}"); + Broken(); } } - public void Request(Frame frame, Action callback, Action fail = null) + private void ReceiveAsyncCallback(IAsyncResult ar) { - if (frame == null) throw new ArgumentNullException(nameof(frame)); try { EnsureConnection(); + var count = _stream.EndRead(ar); + if (count > 0) + { + _parser.Push(_buffer, count); + _last_rw_time = DateTime.UtcNow.Ticks; + } + if (Status == SocketClientStatus.Working) + { + _stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null); + } } - catch (Exception ex) - { - fail?.Invoke(ex.Message); - return; - } - _requests.RegisterForFrame(frame, callback, fail); - try + catch (ObjectDisposedException) { - Send(frame); + /// Nothing } catch (Exception ex) { - fail?.Invoke(ex.Message); + Log.SystemError(ex, $"[SocketClient.ReceiveAsyncCallback] Error read data"); Broken(); - OnDisconnect(); - Log.SystemError(ex, $"[ZSocketClient] Request error. Frame '{frame.FrameId}'. Inbox '{frame.Inbox}'"); + OnDisconnect(this); + } + } + + private void SendFramesJob() + { + SendInfo frame; + while (Status != SocketClientStatus.Disposed) + { + if (_send_queue.IsCompleted) + { + return; + } + if (Status != SocketClientStatus.Working) + { + Thread.Sleep(100); + try + { + EnsureConnection(); + } + catch (Exception ex) + { + Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault"); + } + if (Status == SocketClientStatus.Disposed) return; + continue; + } + try + { + frame = _send_queue.Take(); + if (frame.isRequest) + { + _requests.StartSend(frame.identity); + } + _stream.Write(frame.data, 0, frame.data.Length); + _last_rw_time = DateTime.UtcNow.Ticks; + } + catch (Exception ex) + { + Log.SystemError(ex, $"[SocketClient.SendFramesJob] Backward send error."); + Broken(); + OnDisconnect(this); + } } } - #endregion API + #endregion #region Helper @@ -318,9 +354,9 @@ namespace ZeroLevel.Network public override void Dispose() { - if (Status == ZTransportStatus.Working) + if (Status == SocketClientStatus.Working) { - OnDisconnect(); + OnDisconnect(this); } Disposed(); Sheduller.Remove(_heartbeat_key); @@ -328,4 +364,4 @@ namespace ZeroLevel.Network _stream?.Dispose(); } } -} \ No newline at end of file +} diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs new file mode 100644 index 0000000..23981ae --- /dev/null +++ b/ZeroLevel/Services/Network/SocketServer.cs @@ -0,0 +1,119 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading; + +namespace ZeroLevel.Network +{ + public class SocketServer + : BaseSocket + { + private Socket _serverSocket; + private ReaderWriterLockSlim _connection_set_lock = new ReaderWriterLockSlim(); + private Dictionary _connections = new Dictionary(); + + public IPEndPoint LocalEndpoint { get; } + public event Action OnDisconnect = _ => { }; + public event Action OnConnect = _ => { }; + public IEnumerable ConnectionList + { + get + { + try + { + _connection_set_lock.EnterReadLock(); + return _connections.Select(c => c.Value.EndPoint).ToList(); + } + finally + { + _connection_set_lock.ExitReadLock(); + } + } + } + private void DisconnectEventRise(ISocketClient client) + { + try + { + OnDisconnect?.Invoke(client); + } + catch + { } + } + private void ConnectEventRise(ISocketClient client) + { + try + { + OnConnect?.Invoke(client); + } + catch + { } + } + + public SocketServer(IPEndPoint endpoint) + { + LocalEndpoint = endpoint; + _serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + _serverSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + _serverSocket.Bind(endpoint); + _serverSocket.Listen(100); + Working(); + _serverSocket.BeginAccept(BeginAcceptCallback, null); + } + + private void BeginAcceptCallback(IAsyncResult ar) + { + if (Status == SocketClientStatus.Working) + { + try + { + var client_socket = _serverSocket.EndAccept(ar); + _serverSocket.BeginAccept(BeginAcceptCallback, null); + _connection_set_lock.EnterWriteLock(); + + var connection = new SocketClient(client_socket); + connection.OnDisconnect += Connection_OnDisconnect; + _connections[connection.Endpoint] = new NetworkNode(connection); + + ConnectEventRise(connection); + } + catch (Exception ex) + { + Broken(); + Log.SystemError(ex, "[ZSocketServer] Error with connect accepting"); + } + finally + { + _connection_set_lock.ExitWriteLock(); + } + } + } + + private void Connection_OnIncomingData(ISocketClient client, byte[] data, int identity) + { + throw new NotImplementedException(); + } + + private void Connection_OnDisconnect(ISocketClient client) + { + client.OnDisconnect -= Connection_OnDisconnect; + try + { + _connection_set_lock.EnterWriteLock(); + _connections[client.Endpoint].Dispose(); + _connections.Remove(client.Endpoint); + } + finally + { + _connection_set_lock.ExitWriteLock(); + } + DisconnectEventRise(client); + } + + public override void Dispose() + { + + } + } +} diff --git a/ZeroLevel/Services/_Network/FrameParser.cs b/ZeroLevel/Services/Network/Utils/FrameParser.cs similarity index 98% rename from ZeroLevel/Services/_Network/FrameParser.cs rename to ZeroLevel/Services/Network/Utils/FrameParser.cs index 8aa34c4..d02b469 100644 --- a/ZeroLevel/Services/_Network/FrameParser.cs +++ b/ZeroLevel/Services/Network/Utils/FrameParser.cs @@ -1,9 +1,7 @@ using System; using System.Threading.Tasks; -using ZeroLevel.Network; -using ZeroLevel.Services._Network; -namespace ZeroLevel._Network +namespace ZeroLevel.Network { public sealed class FrameParser { @@ -130,6 +128,7 @@ namespace ZeroLevel._Network } #endregion private models + public event Action OnIncoming; private readonly _Accum _accum = new _Accum(); @@ -187,7 +186,7 @@ namespace ZeroLevel._Network position = _accum.WriteSize(part, position, length); if (_accum.SizeFilled) { - if (_accum.Corrupted || _accum.Size < 1 || _accum.Size > ZBaseNetwork.MAX_FRAME_PAYLOAD_SIZE) + if (_accum.Corrupted || _accum.Size < 1 || _accum.Size > BaseSocket.MAX_FRAME_PAYLOAD_SIZE) { _state = ParserState.WaitNew; } diff --git a/ZeroLevel/Services/Network/NetUtils.cs b/ZeroLevel/Services/Network/Utils/NetUtils.cs similarity index 100% rename from ZeroLevel/Services/Network/NetUtils.cs rename to ZeroLevel/Services/Network/Utils/NetUtils.cs diff --git a/ZeroLevel/Services/_Network/NetworkPacketFactory.cs b/ZeroLevel/Services/Network/Utils/NetworkPacketFactory.cs similarity index 98% rename from ZeroLevel/Services/_Network/NetworkPacketFactory.cs rename to ZeroLevel/Services/Network/Utils/NetworkPacketFactory.cs index b3a0471..02072d3 100644 --- a/ZeroLevel/Services/_Network/NetworkPacketFactory.cs +++ b/ZeroLevel/Services/Network/Utils/NetworkPacketFactory.cs @@ -2,7 +2,7 @@ using System.Runtime.CompilerServices; using System.Threading; -namespace ZeroLevel.Services._Network +namespace ZeroLevel.Network { public static class NetworkPacketFactory { diff --git a/ZeroLevel/Services/Network/Services/RequestBuffer.cs b/ZeroLevel/Services/Network/Utils/RequestBuffer.cs similarity index 91% rename from ZeroLevel/Services/Network/Services/RequestBuffer.cs rename to ZeroLevel/Services/Network/Utils/RequestBuffer.cs index 34aa109..00b7739 100644 --- a/ZeroLevel/Services/Network/Services/RequestBuffer.cs +++ b/ZeroLevel/Services/Network/Utils/RequestBuffer.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using ZeroLevel.Network; using ZeroLevel.Services.Pools; namespace ZeroLevel.Network @@ -11,13 +10,13 @@ namespace ZeroLevel.Network private Dictionary _requests = new Dictionary(); private static ObjectPool _ri_pool = new ObjectPool(() => new RequestInfo()); - public void RegisterForFrame(Frame frame, Action callback, Action fail = null) + public void RegisterForFrame(int identity, Action callback, Action fail = null) { var ri = _ri_pool.Allocate(); lock (_reqeust_lock) { ri.Reset(callback, fail); - _requests.Add(frame.FrameId, ri); + _requests.Add(identity, ri); } } @@ -83,7 +82,7 @@ namespace ZeroLevel.Network { if (pair.Value.Sended == false) continue; var diff = now_ticks - pair.Value.Timestamp; - if (diff > ZBaseNetwork.MAX_REQUEST_TIME_TICKS) + if (diff > BaseSocket.MAX_REQUEST_TIME_TICKS) { to_remove.Add(pair.Key); } diff --git a/ZeroLevel/Services/Network/Services/ExRouter.cs b/ZeroLevel/Services/Network/Utils/Router.cs similarity index 62% rename from ZeroLevel/Services/Network/Services/ExRouter.cs rename to ZeroLevel/Services/Network/Utils/Router.cs index d02f0a2..19a0a4e 100644 --- a/ZeroLevel/Services/Network/Services/ExRouter.cs +++ b/ZeroLevel/Services/Network/Utils/Router.cs @@ -8,7 +8,8 @@ using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network { - internal sealed class ExRouter + public class Router + : IRouter { #region Routing @@ -47,7 +48,7 @@ namespace ZeroLevel.Network private Type _typeResp; private bool _noArguments = false; - public static MRInvoker Create(Action handler) + public static MRInvoker Create(MessageHandler handler) { return new MRInvoker { @@ -59,7 +60,7 @@ namespace ZeroLevel.Network }; } - public static MRInvoker Create(Action handler) + public static MRInvoker Create(MessageHandler handler) { return new MRInvoker { @@ -70,50 +71,51 @@ namespace ZeroLevel.Network }; } - public static MRInvoker Create(Func handler) + public static MRInvoker Create(RequestHandler handler) { return new MRInvoker { - _typeReq = typeof(Treq), - _typeResp = typeof(Tresp), + _noArguments = true, + _typeReq = null, + _typeResp = typeof(Tresponse), _instance = handler.Target, _invoker = CreateCompiledExpression(handler) }; } - public static MRInvoker Create(Func handler) + public static MRInvoker Create(RequestHandler handler) { return new MRInvoker { - _typeReq = null, - _typeResp = typeof(Tresp), + _typeReq = typeof(Trequest), + _typeResp = typeof(Tresponse), _instance = handler.Target, _invoker = CreateCompiledExpression(handler) }; } - public object Invoke(Frame frame, IZBackward client) + public object Invoke(byte[] data, ISocketClient client) { if (_typeResp == null) { - var incoming = MessageSerializer.DeserializeCompatible(_typeReq, frame.Payload); + var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data); if (_noArguments) { - this._invoker.Invoke(this._instance, new object[] { frame.FrameId, client }); + this._invoker.Invoke(this._instance, new object[] { client }); } else { - this._invoker.Invoke(this._instance, new object[] { incoming, frame.FrameId, client }); + this._invoker.Invoke(this._instance, new object[] { incoming, client }); } } else if (_typeReq == null) { - return this._invoker.Invoke(this._instance, new object[] { frame.FrameId, client }); + return this._invoker.Invoke(this._instance, new object[] { client }); } else { - var incoming = MessageSerializer.DeserializeCompatible(_typeReq, frame.Payload); - return this._invoker.Invoke(this._instance, new object[] { incoming, frame.FrameId, client }); + var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data); + return this._invoker.Invoke(this._instance, new object[] { incoming, client }); } return null; } @@ -127,9 +129,57 @@ namespace ZeroLevel.Network #endregion Routing - #region Registration + #region Invokation + + public void HandleMessage(Frame frame, ISocketClient client) + { + try + { + if (_handlers.ContainsKey(frame.Inbox)) + { + foreach (var handler in _handlers[frame.Inbox]) + { + try + { + handler.Invoke(frame.Payload, client); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[ExRouter] Fault handle incomind message"); + } + } + } + } + catch (Exception ex) + { + Log.SystemError(ex, $"[ExRouter] Fault handle incomind message"); + } + } - public void RegisterInbox(string inbox, Action handler) + public byte[] HandleRequest(Frame frame, ISocketClient client) + { + try + { + if (_requestors.ContainsKey(frame.Inbox)) + { + return MessageSerializer.SerializeCompatible(_requestors[frame.Inbox].Invoke(frame.Payload, client)); + } + else + { + Log.SystemWarning($"[ExRouter] Not found inbox '{frame.Inbox}' for incoming request"); + } + } + catch (Exception ex) + { + Log.SystemError(ex, $"[ExRouter] Fault handle incomind request"); + } + return null; + } + + #endregion Invokation + + #region Message handlers registration + public void RegisterInbox(string inbox, MessageHandler handler) { if (false == _handlers.ContainsKey(inbox)) { @@ -138,7 +188,7 @@ namespace ZeroLevel.Network _handlers[inbox].Add(MRInvoker.Create(handler)); } - public void RegisterInbox(string inbox, Action handler) + public void RegisterInbox(string inbox, MessageHandler handler) { if (false == _handlers.ContainsKey(inbox)) { @@ -147,11 +197,31 @@ namespace ZeroLevel.Network _handlers[inbox].Add(MRInvoker.Create(handler)); } - public void RegisterInbox(string inbox, Func hanlder) + public void RegisterInbox(MessageHandler handler) + { + if (false == _handlers.ContainsKey(BaseSocket.DEFAULT_MESSAGE_INBOX)) + { + _handlers.Add(BaseSocket.DEFAULT_MESSAGE_INBOX, new List()); + } + _handlers[BaseSocket.DEFAULT_MESSAGE_INBOX].Add(MRInvoker.Create(handler)); + } + + public void RegisterInbox(MessageHandler handler) + { + if (false == _handlers.ContainsKey(BaseSocket.DEFAULT_MESSAGE_INBOX)) + { + _handlers.Add(BaseSocket.DEFAULT_MESSAGE_INBOX, new List()); + } + _handlers[BaseSocket.DEFAULT_MESSAGE_INBOX].Add(MRInvoker.Create(handler)); + } + #endregion + + #region Request handlers registration + public void RegisterInbox(string inbox, RequestHandler handler) { if (false == _requestors.ContainsKey(inbox)) { - _requestors.Add(inbox, MRInvoker.Create(hanlder)); + _requestors.Add(inbox, MRInvoker.Create(handler)); } else { @@ -159,11 +229,11 @@ namespace ZeroLevel.Network } } - public void RegisterInbox(string inbox, Func hanlder) + public void RegisterInbox(string inbox, RequestHandler handler) { if (false == _requestors.ContainsKey(inbox)) { - _requestors.Add(inbox, MRInvoker.Create(hanlder)); + _requestors.Add(inbox, MRInvoker.Create(handler)); } else { @@ -171,55 +241,29 @@ namespace ZeroLevel.Network } } - #endregion Registration - - #region Invokation - - public void HandleMessage(Frame frame, IZBackward client) + public void RegisterInbox(RequestHandler handler) { - try + if (false == _requestors.ContainsKey(BaseSocket.DEFAULT_REQUEST_INBOX)) { - if (_handlers.ContainsKey(frame.Inbox)) - { - foreach (var handler in _handlers[frame.Inbox]) - { - try - { - handler.Invoke(frame, client); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ExRouter] Fault handle incomind message"); - } - } - } + _requestors.Add(BaseSocket.DEFAULT_REQUEST_INBOX, MRInvoker.Create(handler)); } - catch (Exception ex) + else { - Log.SystemError(ex, $"[ExRouter] Fault handle incomind message"); + throw new Exception($"[SocketExchangeServer] Inbox {BaseSocket.DEFAULT_REQUEST_INBOX} already exists"); } } - public Frame HandleRequest(Frame frame, IZBackward client) + public void RegisterInbox(RequestHandler handler) { - try + if (false == _requestors.ContainsKey(BaseSocket.DEFAULT_REQUEST_INBOX)) { - if (_requestors.ContainsKey(frame.Inbox)) - { - return FrameBuilder.BuildResponseFrame(_requestors[frame.Inbox].Invoke(frame, client), frame); - } - else - { - Log.SystemWarning($"[ExRouter] Not found inbox '{frame.Inbox}' for incoming request"); - } + _requestors.Add(BaseSocket.DEFAULT_REQUEST_INBOX, MRInvoker.Create(handler)); } - catch (Exception ex) + else { - Log.SystemError(ex, $"[ExRouter] Fault handle incomind request"); + throw new Exception($"[SocketExchangeServer] Inbox {BaseSocket.DEFAULT_REQUEST_INBOX} already exists"); } - return null; } - - #endregion Invokation + #endregion } -} \ No newline at end of file +} diff --git a/ZeroLevel/Services/Network/ZSocketServer.cs b/ZeroLevel/Services/Network/ZSocketServer.cs deleted file mode 100644 index 99e712f..0000000 --- a/ZeroLevel/Services/Network/ZSocketServer.cs +++ /dev/null @@ -1,167 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Net.Sockets; -using System.Threading; - -namespace ZeroLevel.Network -{ - public abstract class ZSocketServer - : ZBaseNetwork - { - public IPEndPoint LocalEndpoint { get; } - - public event Action OnDisconnect = _ => { }; - - public event Action OnConnect = _ => { }; - - public IEnumerable ConnectionList - { - get - { - try - { - _connection_set_lock.EnterReadLock(); - return _connections.Select(c => c.Endpoint).ToList(); - } - finally - { - _connection_set_lock.ExitReadLock(); - } - } - } - - #region Private members - - private Socket _serverSocket; - private long _heartbeat_task = -1; - private readonly Frame _pingFrame = FrameBuilder.BuildFrame(DEFAULT_PING_INBOX); - private ReaderWriterLockSlim _connection_set_lock = new ReaderWriterLockSlim(); - private HashSet _connections = new HashSet(); - - private void DisconnectEventRise(IZBackward client) - { - try - { - OnDisconnect?.Invoke(client); - } - catch - { } - } - - private void ConnectEventRise(IZBackward client) - { - try - { - OnConnect?.Invoke(client); - } - catch - { } - } - - private void Heartbeat() - { - var enumerator = _connections.GetEnumerator(); - try - { - while (enumerator.MoveNext()) - { - var connection = enumerator.Current; - if ((DateTime.UtcNow.Ticks - connection.LastNetworkActionTimestamp) >= HEARTBEAT_PING_PERIOD_TICKS) - { - connection.SendBackward(_pingFrame); - } - } - } - catch { } - GC.Collect(1, GCCollectionMode.Forced, false); - } - - private void BeginAcceptCallback(IAsyncResult ar) - { - if (Status == ZTransportStatus.Working) - { - try - { - var client_socket = _serverSocket.EndAccept(ar); - _serverSocket.BeginAccept(BeginAcceptCallback, null); - _connection_set_lock.EnterWriteLock(); - var connection = new ZSocketServerClient(client_socket, Handle, HandleRequest); - connection.OnConnectionBroken += Connection_OnConnectionBroken; - _connections.Add(connection); - ConnectEventRise(connection); - } - catch (Exception ex) - { - Broken(); - Log.SystemError(ex, "[ZSocketServer] Error with connect accepting"); - } - finally - { - _connection_set_lock.ExitWriteLock(); - } - } - } - - private void Connection_OnConnectionBroken(ZSocketServerClient connection) - { - connection.OnConnectionBroken -= Connection_OnConnectionBroken; - try - { - _connection_set_lock.EnterWriteLock(); - _connections.Remove(connection); - } - finally - { - _connection_set_lock.ExitWriteLock(); - } - connection.Dispose(); - DisconnectEventRise(connection); - } - - #endregion Private members - - public ZSocketServer(IPEndPoint endpoint) - { - LocalEndpoint = endpoint; - _serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - _serverSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); - _serverSocket.Bind(endpoint); - _serverSocket.Listen(100); - _heartbeat_task = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat); - Working(); - _serverSocket.BeginAccept(BeginAcceptCallback, null); - } - - protected abstract void Handle(Frame frame, IZBackward client); - - protected abstract Frame HandleRequest(Frame frame, IZBackward client); - - public override void Dispose() - { - if (Status == ZTransportStatus.Disposed) - { - return; - } - Sheduller.Remove(_heartbeat_task); - Disposed(); - _serverSocket.Close(); - _serverSocket.Dispose(); - try - { - _connection_set_lock.EnterReadLock(); - foreach (var c in _connections) - { - c.Dispose(); - DisconnectEventRise(c); - } - } - finally - { - _connection_set_lock.ExitReadLock(); - } - _connection_set_lock.Dispose(); - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/ZSocketServerClient.cs b/ZeroLevel/Services/Network/ZSocketServerClient.cs deleted file mode 100644 index 01a51fd..0000000 --- a/ZeroLevel/Services/Network/ZSocketServerClient.cs +++ /dev/null @@ -1,235 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Net; -using System.Net.Sockets; -using System.Threading; -using ZeroLevel.Models; -using ZeroLevel.Services.Serialization; - -namespace ZeroLevel.Network -{ - internal sealed class ZSocketServerClient - : ZBaseNetwork, IZBackward, IEquatable - { - public IPEndPoint Endpoint { get; } - - internal long LastNetworkActionTimestamp { get; private set; } = DateTime.UtcNow.Ticks; - - private Thread _sendThread; - private NetworkStream _stream; - private readonly Socket _socket; - private readonly FrameParser _parser; - private readonly Action _handler; - private readonly Func _requestor; - private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE]; - private readonly BlockingCollection _send_queue = new BlockingCollection(); - - public event Action OnConnectionBroken = (_) => { }; - - private void RizeConnectionBrokenEvent() - { - try { OnConnectionBroken?.Invoke(this); } catch { } - } - - public ZSocketServerClient(Socket socket, - Action handler, - Func requestor) - { - _socket = socket ?? throw new ArgumentNullException(nameof(socket)); - _handler = handler ?? throw new ArgumentNullException(nameof(handler)); - _requestor = requestor ?? throw new ArgumentNullException(nameof(requestor)); - - Endpoint = _socket.RemoteEndPoint as IPEndPoint; - _stream = new NetworkStream(_socket, true); - _parser = new FrameParser(); - _parser.OnIncomingFrame += _parser_OnIncomingFrame; - - Working(); - - _sendThread = new Thread(SendFramesJob); - _sendThread.IsBackground = true; - _sendThread.Start(); - - _stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null); - } - - public InvokeResult SendBackward(Frame frame) - { - if (frame != null && Status == ZTransportStatus.Working && false == _send_queue.IsCompleted && false == _send_queue.IsAddingCompleted) - { - var data = MessageSerializer.Serialize(frame); - try - { - _send_queue.Add(NetworkStreamFastObfuscator.PrepareData(data)); - return InvokeResult.Succeeding(); - } - catch (ObjectDisposedException) - { - // Ignore - } - finally - { - frame?.Release(); - } - } - return InvokeResult.Fault(); - } - - public InvokeResult SendBackward(string inbox, T message) - { - var frame = FrameBuilder.BuildFrame(message, inbox); - if (Status == ZTransportStatus.Working && false == _send_queue.IsCompleted && false == _send_queue.IsAddingCompleted) - { - var data = MessageSerializer.Serialize(frame); - try - { - _send_queue.Add(NetworkStreamFastObfuscator.PrepareData(data)); - return InvokeResult.Succeeding(); - } - catch (ObjectDisposedException) - { - // Ignore - } - finally - { - frame?.Release(); - } - } - return InvokeResult.Fault(); - } - - private void SendFramesJob() - { - byte[] data; - while (Status == ZTransportStatus.Working) - { - if (_send_queue.IsCompleted) - { - return; - } - try - { - data = _send_queue.Take(); - if (data != null && data.Length > 0) - { - _stream.Write(data, 0, data.Length); - _stream.Flush(); - //Thread.Sleep(1); - LastNetworkActionTimestamp = DateTime.UtcNow.Ticks; - //NetworkStats.Send(data); - } - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ZSocketServerClient] Backward send error."); - Broken(); - RizeConnectionBrokenEvent(); - } - } - } - - private void ReceiveAsyncCallback(IAsyncResult ar) - { - try - { - var count = _stream.EndRead(ar); - if (count > 0) - { - _parser.Push(_buffer, 0, count); - LastNetworkActionTimestamp = DateTime.UtcNow.Ticks; - } - if (Status == ZTransportStatus.Working) - { - _stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null); - } - } - catch (ObjectDisposedException) - { - /// Nothing - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ZSocketServerClient] Error read data"); - Broken(); - RizeConnectionBrokenEvent(); - } - } - - private void _parser_OnIncomingFrame(Frame frame) - { - if (frame == null || frame.Inbox == null) return; - if (frame.Inbox.Equals(DEFAULT_PING_INBOX, StringComparison.Ordinal)) - { - SendBackward(frame); - } - else if (frame.IsRequest) - { - Frame response; - try - { - response = _requestor?.Invoke(frame, this); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ZSocketServerClient] Fault make response for request '{frame.FrameId}' to inbox '{frame.Inbox}'"); - response = FrameBuilder.BuildResponseFrame(ex.Message, frame, DEFAULT_REQUEST_ERROR_INBOX); - } - finally - { - frame?.Release(); - } - if (response != null) - { - SendBackward(response); - } - } - else - { - try - { - _handler?.Invoke(frame, this); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ZSocketServerClient] Fault handle message '{frame.FrameId}' in inbox '{frame.Inbox}'"); - } - finally - { - frame?.Release(); - } - } - } - - public override void Dispose() - { - if (Status == ZTransportStatus.Disposed) - { - return; - } - Disposed(); - - _send_queue.CompleteAdding(); - _send_queue.Dispose(); - - this._stream.Flush(); - this._stream.Close(); - this._stream.Dispose(); - } - - public override int GetHashCode() - { - return Endpoint.GetHashCode(); - } - - public override bool Equals(object obj) - { - return this.Equals(obj as ZSocketServerClient); - } - - public bool Equals(ZSocketServerClient other) - { - if (other == null) return false; - return this.Endpoint.Compare(other.Endpoint) == 0; - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Transliteration.cs b/ZeroLevel/Services/Transliteration.cs index 587604e..9512485 100644 --- a/ZeroLevel/Services/Transliteration.cs +++ b/ZeroLevel/Services/Transliteration.cs @@ -44,7 +44,10 @@ namespace ZeroLevel.Services foreach (KeyValuePair key in tdict) { - output = output.Replace(key.Value, key.Key); + if (key.Value.Length > 0) + { + output = output.Replace(key.Value, key.Key); + } } return output; } diff --git a/ZeroLevel/Services/_Network/Contracts/ISocketClient.cs b/ZeroLevel/Services/_Network/Contracts/ISocketClient.cs deleted file mode 100644 index 9a12485..0000000 --- a/ZeroLevel/Services/_Network/Contracts/ISocketClient.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace ZeroLevel.Services._Network -{ - public interface ISocketClient - { - event Action OnIncomingData; - void UseKeepAlive(TimeSpan period); - void Send(byte[] data); - byte[] Request(byte[] data); - } -} diff --git a/ZeroLevel/Services/_Network/IncomingRouter.cs b/ZeroLevel/Services/_Network/IncomingRouter.cs deleted file mode 100644 index c06bd98..0000000 --- a/ZeroLevel/Services/_Network/IncomingRouter.cs +++ /dev/null @@ -1,183 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Linq.Expressions; -using System.Reflection; -using ZeroLevel.Services.Invokation; - -namespace ZeroLevel.Services._Network -{ - public class Router - : IRouter - { - #region Routing - - private sealed class MRInvoker - { - /// - /// Creates a compiled expression for a quick method call, returns the identifier of the expression and a delegate for the call. - /// - private static Invoker CreateCompiledExpression(MethodInfo method) - { - var targetArg = Expression.Parameter(typeof(object)); // Target - var argsArg = Expression.Parameter(typeof(object[])); // Method's args - var parameters = method.GetParameters(); - Expression body = Expression.Call( - method.IsStatic - ? null - : Expression.Convert(targetArg, method.DeclaringType), // Method's type - method, - parameters.Select((p, i) => - Expression.Convert(Expression.ArrayIndex(argsArg, Expression.Constant(i)), p.ParameterType))); - if (body.Type == typeof(void)) - body = Expression.Block(body, Expression.Constant(null)); - else if (body.Type.IsValueType) - body = Expression.Convert(body, typeof(object)); - return Expression.Lambda(body, targetArg, argsArg).Compile(); - } - - private static Invoker CreateCompiledExpression(Delegate handler) - { - return CreateCompiledExpression(handler.GetMethodInfo()); - } - - private object _instance; - private Invoker _invoker; - private Type _typeReq; - private Type _typeResp; - private bool _noArguments = false; - - public static MRInvoker Create(Action handler) - { - return new MRInvoker - { - _noArguments = true, - _typeReq = null, - _typeResp = null, - _instance = handler.Target, - _invoker = CreateCompiledExpression(handler) - }; - } - - public static MRInvoker Create(Action handler) - { - return new MRInvoker - { - _typeReq = typeof(T), - _typeResp = null, - _instance = handler.Target, - _invoker = CreateCompiledExpression(handler) - }; - } - - public static MRInvoker Create(Func handler) - { - return new MRInvoker - { - _typeReq = typeof(Treq), - _typeResp = typeof(Tresp), - _instance = handler.Target, - _invoker = CreateCompiledExpression(handler) - }; - } - - public static MRInvoker Create(Func handler) - { - return new MRInvoker - { - _typeReq = null, - _typeResp = typeof(Tresp), - _instance = handler.Target, - _invoker = CreateCompiledExpression(handler) - }; - } - - public object Invoke(Frame frame, IZBackward client) - { - if (_typeResp == null) - { - var incoming = MessageSerializer.DeserializeCompatible(_typeReq, frame.Payload); - if (_noArguments) - { - this._invoker.Invoke(this._instance, new object[] { frame.FrameId, client }); - } - else - { - this._invoker.Invoke(this._instance, new object[] { incoming, frame.FrameId, client }); - } - } - else if (_typeReq == null) - { - return this._invoker.Invoke(this._instance, new object[] { frame.FrameId, client }); - } - else - { - var incoming = MessageSerializer.DeserializeCompatible(_typeReq, frame.Payload); - return this._invoker.Invoke(this._instance, new object[] { incoming, frame.FrameId, client }); - } - return null; - } - } - - private readonly Dictionary> _handlers = - new Dictionary>(); - - private readonly Dictionary _requestors = - new Dictionary(); - - #endregion Routing - - public void Incoming(FrameType type, byte[] data) - { - switch (type) - { - case FrameType.Message: - break; - case FrameType.Request: - break; - case FrameType.Response: - break; - } - } - - public void RegisterInbox(string inbox, MessageHandler handler) - { - throw new System.NotImplementedException(); - } - - public void RegisterInbox(string inbox, MessageHandler handler) - { - throw new System.NotImplementedException(); - } - - public void RegisterInbox(MessageHandler handler) - { - throw new System.NotImplementedException(); - } - - public void RegisterInbox(MessageHandler handler) - { - throw new System.NotImplementedException(); - } - - public void RegisterInbox(string inbox, RequestHandler handler) - { - throw new System.NotImplementedException(); - } - - public void RegisterInbox(string inbox, RequestHandler handler) - { - throw new System.NotImplementedException(); - } - - public void RegisterInbox(RequestHandler handler) - { - throw new System.NotImplementedException(); - } - - public void RegisterInbox(RequestHandler handler) - { - throw new System.NotImplementedException(); - } - } -} diff --git a/ZeroLevel/Services/_Network/NetworkNode.cs b/ZeroLevel/Services/_Network/NetworkNode.cs deleted file mode 100644 index b09ccc0..0000000 --- a/ZeroLevel/Services/_Network/NetworkNode.cs +++ /dev/null @@ -1,121 +0,0 @@ -using System; -using ZeroLevel._Network; - -namespace ZeroLevel.Services._Network -{ - public class NetworkNode - : IClient, IRouter - { - private FrameParser _parser = new FrameParser(); - private readonly ISocketClient _client; - private readonly IRouter _router; - private DateTime _lastConnectionTime; - - public NetworkNode(ISocketClient client, IRouter router) - { - _lastConnectionTime = DateTime.UtcNow; - _client = client; - _router = router; - _parser.OnIncoming += _parser_OnIncoming; - _client.OnIncomingData += _readerWriter_OnIncomingData; - } - - private void _readerWriter_OnIncomingData(byte[] data, int length) - { - _parser.Push(data, length); - } - - private void _parser_OnIncoming(FrameType type, int identity, byte[] data) - { - switch (type) - { - case FrameType.KeepAlive: - _lastConnectionTime = DateTime.UtcNow; - break; - case FrameType.Message: - break; - case FrameType.Request: - break; - case FrameType.Response: - break; - } - } - - public void Send(string inbox) - { - throw new System.NotImplementedException(); - } - - public void Send(string inbox, byte[] data) - { - throw new System.NotImplementedException(); - } - - public void Send(string inbox, T message) - { - throw new System.NotImplementedException(); - } - - public byte[] Request(string inbox) - { - throw new System.NotImplementedException(); - } - - public byte[] Request(string inbox, byte[] data) - { - throw new System.NotImplementedException(); - } - - public Tresponse Request(string inbox) - { - throw new System.NotImplementedException(); - } - - public Tresponse Request(string inbox, Trequest request) - { - throw new System.NotImplementedException(); - } - - #region IRouter - public void RegisterInbox(string inbox, MessageHandler handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(string inbox, MessageHandler handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(MessageHandler handler) - { - _router.RegisterInbox(handler); - } - - public void RegisterInbox(MessageHandler handler) - { - _router.RegisterInbox(handler); - } - - public void RegisterInbox(string inbox, RequestHandler handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(string inbox, RequestHandler handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(RequestHandler handler) - { - _router.RegisterInbox(handler); - } - - public void RegisterInbox(RequestHandler handler) - { - _router.RegisterInbox(handler); - } - #endregion - } -}