network fix

pull/1/head
a.bozhenov 5 years ago
parent c9e3d408f3
commit 47660b3d88

@ -2,18 +2,15 @@
namespace ZeroLevel.Network
{
public abstract class ZBaseNetwork
: IDisposable
public abstract class BaseSocket
{
static ZBaseNetwork()
static BaseSocket()
{
MAX_FRAME_PAYLOAD_SIZE = Configuration.Default.FirstOrDefault<int>("MAX_FRAME_PAYLOAD_SIZE", DEFAULT_MAX_FRAME_PAYLOAD_SIZE);
}
public const string DEFAULT_MESSAGE_INBOX = "__message_inbox__";
public const string DEFAULT_REQUEST_INBOX = "__request_inbox__";
protected const string DEFAULT_PING_INBOX = "__ping__";
protected const string DEFAULT_REQUEST_ERROR_INBOX = "__request_error__";
/// <summary>
@ -29,7 +26,7 @@ namespace ZeroLevel.Network
/// <summary>
/// Connection check period
/// </summary>
protected const int HEARTBEAT_UPDATE_PERIOD_MS = 7500;
protected const int MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS = 7500;
/// <summary>
/// The period of the request, after which it is considered unsuccessful
@ -49,10 +46,10 @@ namespace ZeroLevel.Network
/// </summary>
public const int MAX_SEND_QUEUE_SIZE = 1024;
protected void Broken() => Status = Status == ZTransportStatus.Disposed ? Status : ZTransportStatus.Broken;
protected void Disposed() => Status = ZTransportStatus.Disposed;
protected void Working() => Status = Status == ZTransportStatus.Disposed ? Status : ZTransportStatus.Working;
public ZTransportStatus Status { get; private set; } = ZTransportStatus.Initialized;
protected void Broken() => Status = Status == SocketClientStatus.Disposed ? Status : SocketClientStatus.Broken;
protected void Disposed() => Status = SocketClientStatus.Disposed;
protected void Working() => Status = Status == SocketClientStatus.Disposed ? Status : SocketClientStatus.Working;
public SocketClientStatus Status { get; private set; } = SocketClientStatus.Initialized;
public abstract void Dispose();
}

@ -1,38 +0,0 @@
using System;
using System.Net;
using ZeroLevel.Models;
namespace ZeroLevel.Network
{
public interface IExClient
: IDisposable
{
event Action Connected;
void ForceConnect();
ZTransportStatus Status { get; }
IPEndPoint Endpoint { get; }
InvokeResult Send();
InvokeResult Send(string inbox);
InvokeResult Send<T>(T obj);
InvokeResult Send<T>(string inbox, T obj);
InvokeResult Request<Treq, Tresp>(Treq obj, Action<Tresp> callback);
InvokeResult Request<Treq, Tresp>(string inbox, Treq obj, Action<Tresp> callback);
InvokeResult Request<Tresp>(Action<Tresp> callback);
InvokeResult Request<Tresp>(string inbox, Action<Tresp> callback);
void RegisterInbox<T>(string inbox, Action<T, long, IZBackward> handler);
void RegisterInbox<T>(Action<T, long, IZBackward> handler);
}
}

@ -1,32 +0,0 @@
using System;
using System.Net;
namespace ZeroLevel.Network
{
public interface IExService
: IDisposable
{
IPEndPoint Endpoint { get; }
event Action<IZBackward> OnConnect;
event Action<IZBackward> OnDisconnect;
void RegisterInbox<T>(string inbox, Action<T, long, IZBackward> handler);
void RegisterInbox(string inbox, Action<long, IZBackward> handler);
void RegisterInbox<Treq, Tresp>(string inbox, Func<Treq, long, IZBackward, Tresp> handler);
/// <summary>
/// Replier without request
/// </summary>
void RegisterInbox<Tresp>(string inbox, Func<long, IZBackward, Tresp> handler);
/*
DEFAULT INBOXES
*/
void RegisterInbox(Action<long, IZBackward> handler);
void RegisterInbox<T>(Action<T, long, IZBackward> handler);
void RegisterInbox<Treq, Tresp>(Func<Treq, long, IZBackward, Tresp> handler);
void RegisterInbox<Tresp>(Func<long, IZBackward, Tresp> handler);
}
}

@ -1,13 +0,0 @@
using System.Net;
using ZeroLevel.Models;
namespace ZeroLevel.Network
{
public interface IZBackward
{
IPEndPoint Endpoint { get; }
InvokeResult SendBackward(Frame frame);
InvokeResult SendBackward<T>(string inbox, T message);
}
}

@ -1,21 +0,0 @@
using System;
using System.Collections.Generic;
using System.Net;
namespace ZeroLevel.Network
{
public interface IZObservableServer
: IDisposable
{
IPEndPoint Endpoint { get; }
IEnumerable<IPEndPoint> ConnectionList { get; }
event Action<IZBackward> OnConnect;
event Action<IZBackward> OnDisconnect;
event Action<Frame, IZBackward> OnMessage;
event Func<Frame, IZBackward, Frame> OnRequest;
}
}

@ -1,24 +0,0 @@
using System;
using System.Net;
namespace ZeroLevel.Network
{
public interface IZTransport
: IDisposable
{
event Action OnConnect;
event Action OnDisconnect;
event EventHandler<Frame> OnServerMessage;
IPEndPoint Endpoint { get; }
ZTransportStatus Status { get; }
void EnsureConnection();
void Send(Frame frame);
void Request(Frame frame, Action<Frame> callback, Action<string> fail = null);
}
}

@ -1,4 +1,7 @@
namespace ZeroLevel.Services._Network
using System;
using ZeroLevel.Models;
namespace ZeroLevel.Network
{
public interface IRouter
{
@ -23,13 +26,13 @@
public interface IClient
{
void Send(string inbox);
void Send(string inbox, byte[] data);
void Send<T>(string inbox, T message);
InvokeResult Send(string inbox);
InvokeResult Send(string inbox, byte[] data);
InvokeResult Send<T>(string inbox, T message);
byte[] Request(string inbox);
byte[] Request(string inbox, byte[] data);
Tresponse Request<Tresponse>(string inbox);
Tresponse Request<Tresponse, Trequest>(string inbox, Trequest request);
InvokeResult Request(string inbox, Action<byte[]> callback);
InvokeResult Request(string inbox, byte[] data, Action<byte[]> callback);
InvokeResult Request<Tresponse>(string inbox, Action<Tresponse> callback);
InvokeResult Request<Trequest, Tresponse>(string inbox, Trequest request, Action<Tresponse> callback);
}
}

@ -0,0 +1,21 @@
using System;
using System.Net;
namespace ZeroLevel.Network
{
public interface ISocketClient:
IDisposable
{
event Action<ISocketClient, byte[], int> OnIncomingData;
event Action<ISocketClient> OnConnect;
event Action<ISocketClient> OnDisconnect;
IPEndPoint Endpoint { get; }
SocketClientStatus Status { get; }
void ForceConnect();
void UseKeepAlive(TimeSpan period);
void Send(Frame data);
void Request(Frame data, Action<Frame> callback, Action<string> fail = null);
void Response(byte[] data, int identity);
}
}

@ -128,7 +128,7 @@ namespace ZeroLevel.Network
: IDiscoveryClient
{
private readonly DCRouter _router = new DCRouter();
private readonly IExClient _discoveryServerClient;
private readonly NetworkNode _discoveryServerClient;
public DiscoveryClient(string endpoint)
{

@ -9,9 +9,9 @@ namespace ZeroLevel.Network
private readonly ExRouter _router;
private readonly IZObservableServer _server;
public event Action<IZBackward> OnConnect = c => { };
public event Action<ISocketClient> OnConnect = c => { };
public event Action<IZBackward> OnDisconnect = c => { };
public event Action<ISocketClient> OnDisconnect = c => { };
public ExService(IZObservableServer server)
{
@ -23,12 +23,12 @@ namespace ZeroLevel.Network
_server.OnDisconnect += _server_OnDisconnect;
}
private void _server_OnDisconnect(IZBackward client)
private void _server_OnDisconnect(ISocketClient client)
{
this.OnDisconnect(client);
}
private void _server_OnConnect(IZBackward client)
private void _server_OnConnect(ISocketClient client)
{
this.OnConnect(client);
}
@ -101,7 +101,7 @@ namespace ZeroLevel.Network
_router.RegisterInbox(inbox, handler);
}
public void RegisterInbox(Action<long, IZBackward> handler)
public void RegisterInbox(Action<long, ISocketClient> handler)
{
_router.RegisterInbox(DEFAULT_REQUEST_INBOX, handler);
}

@ -3,7 +3,6 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Reflection;
namespace ZeroLevel.Network

@ -60,7 +60,7 @@ namespace ZeroLevel.Network
return false;
}
public bool Send<T>(string serviceKey, T data) => Send(serviceKey, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data);
public bool Send<T>(string serviceKey, T data) => Send(serviceKey, BaseSocket.DEFAULT_MESSAGE_INBOX, data);
#endregion Balanced send
@ -85,7 +85,7 @@ namespace ZeroLevel.Network
{
return false;
}
if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS))
if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS))
{
return false;
}
@ -128,7 +128,7 @@ namespace ZeroLevel.Network
{
return false;
}
if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS))
if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS))
{
return false;
}
@ -153,10 +153,10 @@ namespace ZeroLevel.Network
}
public Tresp Request<Treq, Tresp>(string serviceKey, Treq data) =>
Request<Treq, Tresp>(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data);
Request<Treq, Tresp>(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX, data);
public Tresp Request<Tresp>(string serviceKey) =>
Request<Tresp>(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX);
Request<Tresp>(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX);
#endregion Balanced request
@ -181,7 +181,7 @@ namespace ZeroLevel.Network
{
return false;
}
if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS))
if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS))
{
return false;
}
@ -224,7 +224,7 @@ namespace ZeroLevel.Network
{
return false;
}
if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS))
if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS))
{
return false;
}
@ -249,10 +249,10 @@ namespace ZeroLevel.Network
}
public Tresp RequestDirect<Treq, Tresp>(string endpoint, string serviceKey, Treq data) =>
RequestDirect<Treq, Tresp>(endpoint, serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data);
RequestDirect<Treq, Tresp>(endpoint, serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX, data);
public Tresp RequestDirect<Tresp>(string endpoint, string serviceKey) =>
RequestDirect<Tresp>(endpoint, serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX);
RequestDirect<Tresp>(endpoint, serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX);
#endregion Direct request
@ -299,7 +299,7 @@ namespace ZeroLevel.Network
/// <param name="serviceKey">Service key</param>
/// <param name="data">Message</param>
/// <returns>true - on successful submission</returns>
public bool SendBroadcast<T>(string serviceKey, T data) => SendBroadcast(serviceKey, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data);
public bool SendBroadcast<T>(string serviceKey, T data) => SendBroadcast(serviceKey, BaseSocket.DEFAULT_MESSAGE_INBOX, data);
/// <summary>
/// Sending a message to all services of a specific type to the specified handler
@ -343,7 +343,7 @@ namespace ZeroLevel.Network
/// <param name="data">Message</param>
/// <returns>true - on successful submission</returns>
public bool SendBroadcastByType<T>(string serviceType, T data) =>
SendBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data);
SendBroadcastByType(serviceType, BaseSocket.DEFAULT_MESSAGE_INBOX, data);
/// <summary>
/// Sending a message to all services of a specific group to the specified handler
@ -387,7 +387,7 @@ namespace ZeroLevel.Network
/// <param name="data">Messsage</param>
/// <returns>true - on successful submission</returns>
public bool SendBroadcastByGroup<T>(string serviceGroup, T data) =>
SendBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data);
SendBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_MESSAGE_INBOX, data);
/// <summary>
/// Broadcast polling services by key
@ -445,7 +445,7 @@ namespace ZeroLevel.Network
/// <param name="responseHandler">Response handler</param>
/// <returns>true - in case of successful mailing</returns>
public IEnumerable<Tresp> RequestBroadcast<Treq, Tresp>(string serviceKey, Treq data) =>
RequestBroadcast<Treq, Tresp>(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data);
RequestBroadcast<Treq, Tresp>(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX, data);
/// <summary>
/// Broadcast polling of services by key, without message of request, to default handler
@ -455,7 +455,7 @@ namespace ZeroLevel.Network
/// <param name="responseHandler">Response handler</param>
/// <returns>true - in case of successful mailing</returns>
public IEnumerable<Tresp> RequestBroadcast<Tresp>(string serviceKey) =>
RequestBroadcast<Tresp>(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX);
RequestBroadcast<Tresp>(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX);
/// <summary>
/// Broadcast polling services by type of service
@ -513,7 +513,7 @@ namespace ZeroLevel.Network
/// <param name="responseHandler">Response handler</param>
/// <returns>true - in case of successful mailing</returns>
public IEnumerable<Tresp> RequestBroadcastByType<Treq, Tresp>(string serviceType, Treq data) =>
RequestBroadcastByType<Treq, Tresp>(serviceType, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data);
RequestBroadcastByType<Treq, Tresp>(serviceType, BaseSocket.DEFAULT_REQUEST_INBOX, data);
/// <summary>
/// Broadcast polling services by type, without message request, in the default handler
@ -523,7 +523,7 @@ namespace ZeroLevel.Network
/// <param name="responseHandler">Response handler</param>
/// <returns>true - in case of successful mailing</returns>
public IEnumerable<Tresp> RequestBroadcastByType<Tresp>(string serviceType) =>
RequestBroadcastByType<Tresp>(serviceType, ZBaseNetwork.DEFAULT_REQUEST_INBOX);
RequestBroadcastByType<Tresp>(serviceType, BaseSocket.DEFAULT_REQUEST_INBOX);
/// <summary>
/// Broadcast polling services for a group of services
@ -581,7 +581,7 @@ namespace ZeroLevel.Network
/// <param name="responseHandler">Response handler</param>
/// <returns>true - in case of successful mailing</returns>
public IEnumerable<Tresp> RequestBroadcastByGroup<Treq, Tresp>(string serviceGroup, Treq data) =>
RequestBroadcastByGroup<Treq, Tresp>(serviceGroup, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data);
RequestBroadcastByGroup<Treq, Tresp>(serviceGroup, BaseSocket.DEFAULT_REQUEST_INBOX, data);
/// <summary>
///Broadcast polling services for a group of services, without sending a request, to the default handler
@ -591,11 +591,11 @@ namespace ZeroLevel.Network
/// <param name="responseHandler">Response handler</param>
/// <returns>true - in case of successful mailing</returns>
public IEnumerable<Tresp> RequestBroadcastByGroup<Tresp>(string serviceGroup) =>
RequestBroadcastByGroup<Tresp>(serviceGroup, ZBaseNetwork.DEFAULT_REQUEST_INBOX);
RequestBroadcastByGroup<Tresp>(serviceGroup, BaseSocket.DEFAULT_REQUEST_INBOX);
#region Private
private IEnumerable<Tresp> _RequestBroadcast<Treq, Tresp>(List<IExClient> clients, string inbox, Treq data)
private IEnumerable<Tresp> _RequestBroadcast<Treq, Tresp>(List<NetworkNode> clients, string inbox, Treq data)
{
var response = new List<Tresp>();
using (var waiter = new CountdownEvent(clients.Count))
@ -613,17 +613,17 @@ namespace ZeroLevel.Network
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'");
Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'");
waiter.Signal();
}
});
}
waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS);
waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS);
}
return response;
}
private IEnumerable<Tresp> _RequestBroadcast<Tresp>(List<IExClient> clients, string inbox)
private IEnumerable<Tresp> _RequestBroadcast<Tresp>(List<NetworkNode> clients, string inbox)
{
var response = new List<Tresp>();
using (var waiter = new CountdownEvent(clients.Count))
@ -641,12 +641,12 @@ namespace ZeroLevel.Network
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'");
Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'");
waiter.Signal();
}
});
}
waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS);
waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS);
}
return response;
}

@ -5,7 +5,7 @@ namespace ZeroLevel.Network
{
public static class ExchangeTransportFactory
{
private static readonly ConcurrentDictionary<string, IExClient> _clientInstances = new ConcurrentDictionary<string, IExClient>();
private static readonly ConcurrentDictionary<string, NetworkNode> _clientInstances = new ConcurrentDictionary<string, NetworkNode>();
/// <summary>
/// Creates a server to receive messages using the specified protocol
@ -22,13 +22,13 @@ namespace ZeroLevel.Network
/// <param name="protocol">Protocol</param>
/// <param name="endpoint">Server endpoint</param>
/// <returns>Client</returns>
public static IExClient GetClientWithCache(string endpoint)
public static NetworkNode GetClientWithCache(string endpoint)
{
IExClient instance = null;
NetworkNode instance = null;
if (_clientInstances.ContainsKey(endpoint))
{
instance = _clientInstances[endpoint];
if (instance.Status == ZTransportStatus.Working)
if (instance.Status == SocketClientStatus.Working)
{
return instance;
}
@ -41,9 +41,9 @@ namespace ZeroLevel.Network
return instance;
}
public static IExClient GetClient(string endpoint)
public static NetworkNode GetClient(string endpoint)
{
return new ExClient(new ZSocketClient(NetUtils.CreateIPEndPoint(endpoint)));
return new NetworkNode(new SocketClient(NetUtils.CreateIPEndPoint(endpoint)));
}
}
}

@ -2,11 +2,9 @@
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using ZeroLevel.Network;
using ZeroLevel.Services.Network.FileTransfer.Model;
using ZeroLevel.Services.Pools;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
public abstract class BaseFileTransfer
{

@ -1,6 +1,4 @@
using ZeroLevel.Network;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
public delegate string ClientFolderNameMapper(IExClient client);
public delegate string ClientFolderNameMapper(ISocketClient client);
}

@ -1,21 +1,18 @@
using System;
using System.IO;
using System.Threading;
using ZeroLevel.Models;
using ZeroLevel.Network;
using ZeroLevel.Services.Network.FileTransfer.Model;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
public sealed class FileClient
: BaseFileTransfer, IFileClient
{
private readonly IExClient _client;
private readonly NetworkNode _client;
private readonly string _baseFolder;
private readonly ClientFolderNameMapper _nameMapper;
private readonly bool _disposeClient;
internal FileClient(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
internal FileClient(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
: base(baseFolder)
{
_client = client ?? throw new Exception(nameof(client));
@ -23,9 +20,9 @@ namespace ZeroLevel.Services.Network.FileTransfer
_nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper));
_disposeClient = disposeClient;
_client.RegisterInbox<FileStartFrame>("__upload_file_start", (f, _, __) => Receiver.Incoming(f, nameMapper(_client)));
_client.RegisterInbox<FileFrame>("__upload_file_frame", (f, _, __) => Receiver.Incoming(f));
_client.RegisterInbox<FileEndFrame>("__upload_file_complete", (f, _, __) => Receiver.Incoming(f));
_client.RegisterInbox<FileStartFrame>("__upload_file_start", (c, f) => Receiver.Incoming(f, nameMapper(c)));
_client.RegisterInbox<FileFrame>("__upload_file_frame", (c, f) => Receiver.Incoming(f));
_client.RegisterInbox<FileEndFrame>("__upload_file_complete", (c, f) => Receiver.Incoming(f));
}
public void Dispose()

@ -1,7 +1,6 @@
using ZeroLevel.Network;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
public static class FileClientFactory
{
@ -11,12 +10,12 @@ namespace ZeroLevel.Services.Network.FileTransfer
nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true);
}
public static IFileClient Create(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper = null)
public static IFileClient Create(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper = null)
{
return CreateFileServerClient(client, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false);
}
private static IFileClient CreateFileServerClient(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
private static IFileClient CreateFileServerClient(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
{
return new FileClient(client, baseFolder, nameMapper, disposeClient);
}

@ -1,9 +1,8 @@
using System;
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Services.Network.FileTransfer.Model;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
internal sealed class FileReader
{

@ -2,9 +2,8 @@
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Models;
using ZeroLevel.Services.Network.FileTransfer.Model;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
public class FileReceiver
{

@ -1,9 +1,7 @@
using System;
using ZeroLevel.Models;
using ZeroLevel.Network;
using ZeroLevel.Services.Network.FileTransfer.Model;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
public sealed class FileServer
: BaseFileTransfer, IFileServer
@ -34,7 +32,7 @@ namespace ZeroLevel.Services.Network.FileTransfer
}
}
public void Send(IZBackward client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
public void Send(ISocketClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
{
PushTransferTask(fileName, completeHandler, errorHandler, client);
}

@ -1,7 +1,7 @@
using ZeroLevel.Network;
using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
public static class FileServerFactory
{

@ -1,6 +1,6 @@
using System;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
public interface IFileClient
: IDisposable

@ -1,11 +1,11 @@
using System;
using ZeroLevel.Network;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
public interface IFileServer
: IDisposable
{
void Send(IZBackward client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null);
void Send(ISocketClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null);
}
}

@ -1,6 +1,6 @@
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.Network.FileTransfer.Model
namespace ZeroLevel.Network.FileTransfer
{
public sealed class FileEndFrame
: IBinarySerializable, IFileTransferInfo

@ -1,6 +1,6 @@
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.Network.FileTransfer.Model
namespace ZeroLevel.Network.FileTransfer
{
public sealed class FileFrame :
IBinarySerializable, IFileTransferInfo

@ -1,7 +1,7 @@
using System.Threading;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.Network.FileTransfer.Model
namespace ZeroLevel.Network.FileTransfer
{
public sealed class FileStartFrame
: IBinarySerializable, IFileTransferInfo

@ -1,13 +1,12 @@
using System;
using ZeroLevel.Network;
namespace ZeroLevel.Services.Network.FileTransfer.Model
namespace ZeroLevel.Network.FileTransfer
{
internal class FileTransferTask
{
public string FilePath;
public Action<string> CompletedHandler;
public Action<string, string> ErrorHandler;
public IZBackward Client;
public ISocketClient Client;
}
}

@ -1,4 +1,4 @@
namespace ZeroLevel.Services.Network.FileTransfer.Model
namespace ZeroLevel.Network.FileTransfer
{
public enum FileTransferInfoType
{

@ -1,6 +1,6 @@
using ZeroLevel.Network;
namespace ZeroLevel.Services.Network.FileTransfer
namespace ZeroLevel.Network.FileTransfer
{
public delegate string ServerFolderNameMapperDelegate(IZBackward connection);
public delegate string ServerFolderNameMapperDelegate(ISocketClient connection);
}

@ -24,6 +24,22 @@ namespace ZeroLevel.Network
return frame;
}
public static Frame FromPool(string inbox)
{
var frame = _pool.Allocate();
frame.Inbox = inbox;
frame.Payload = null;
return frame;
}
public static Frame FromPool(string inbox, byte[] payload)
{
var frame = _pool.Allocate();
frame.Inbox = inbox;
frame.Payload = payload;
return frame;
}
public void Release()
{
_pool.Free(this);
@ -35,6 +51,8 @@ namespace ZeroLevel.Network
[DataMember]
public byte[] Payload { get; set; }
public bool IsRequest { get; set; }
public Frame()
{
}

@ -1,4 +1,4 @@
namespace ZeroLevel.Services._Network
namespace ZeroLevel.Network
{
public enum FrameType
{

@ -1,4 +1,4 @@
namespace ZeroLevel.Services._Network
namespace ZeroLevel.Network
{
public delegate void MessageHandler(ISocketClient client);
public delegate void MessageHandler<T>(ISocketClient client, T message);

@ -1,7 +1,6 @@
namespace ZeroLevel.Network
{
public enum ZTransportStatus
: int
public enum SocketClientStatus
{
Initialized = 0,
Working = 1,

@ -0,0 +1,197 @@
using System;
using System.Net;
using ZeroLevel.Models;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
public class NetworkNode
: IClient, IRouter, IDisposable
{
private FrameParser _parser = new FrameParser();
private readonly ISocketClient _client;
private readonly Router _router;
private DateTime _lastConnectionTime;
public IPEndPoint EndPoint => _client?.Endpoint;
public SocketClientStatus Status => _client.Status;
public NetworkNode(ISocketClient client)
{
_lastConnectionTime = DateTime.UtcNow;
_client = client;
_router = new Router();
_parser.OnIncoming += _parser_OnIncoming;
_client.OnIncomingData += _readerWriter_OnIncomingData;
}
private void _readerWriter_OnIncomingData(ISocketClient client, byte[] data, int length)
{
_parser.Push(data, length);
}
private void _parser_OnIncoming(FrameType type, int identity, byte[] data)
{
switch (type)
{
case FrameType.KeepAlive:
_lastConnectionTime = DateTime.UtcNow;
break;
case FrameType.Message:
_router.HandleMessage(MessageSerializer.Deserialize<Frame>(data), _client);
break;
case FrameType.Request:
var response = _router.HandleRequest(MessageSerializer.Deserialize<Frame>(data), _client);
_client.Response(response, identity);
break;
}
}
public void ForceConnect() => _client.ForceConnect();
public InvokeResult Send(string inbox)
{
try
{
_client.Send(Frame.FromPool(inbox));
}
catch (Exception ex)
{
Log.Error(ex, "[NetworkNode.Send]");
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
public InvokeResult Send(string inbox, byte[] data)
{
try
{
_client.Send(Frame.FromPool(inbox, data));
}
catch (Exception ex)
{
Log.Error(ex, "[NetworkNode.Send]");
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
public InvokeResult Send<T>(string inbox, T message)
{
try
{
_client.Send(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible<T>(message)));
}
catch (Exception ex)
{
Log.Error(ex, "[NetworkNode.Send]");
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
public InvokeResult Request(string inbox, Action<byte[]> callback)
{
try
{
_client.Request(Frame.FromPool(inbox), f => callback(f.Payload));
}
catch (Exception ex)
{
Log.Error(ex, "[NetworkNode.Request]");
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
public InvokeResult Request(string inbox, byte[] data, Action<byte[]> callback)
{
try
{
_client.Request(Frame.FromPool(inbox, data), f => callback(f.Payload));
}
catch (Exception ex)
{
Log.Error(ex, "[NetworkNode.Request]");
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
public InvokeResult Request<Tresponse>(string inbox, Action<Tresponse> callback)
{
try
{
_client.Request(Frame.FromPool(inbox), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f.Payload)));
}
catch (Exception ex)
{
Log.Error(ex, "[NetworkNode.Request]");
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
public InvokeResult Request<Trequest, Tresponse>(string inbox, Trequest request, Action<Tresponse> callback)
{
try
{
_client.Request(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible<Trequest>(request)),
f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f.Payload)));
}
catch (Exception ex)
{
Log.Error(ex, "[NetworkNode.Request]");
return InvokeResult.Fault(ex.Message);
}
return InvokeResult.Succeeding();
}
#region IRouter
public void RegisterInbox(string inbox, MessageHandler handler)
{
_router.RegisterInbox(inbox, handler);
}
public void RegisterInbox<T>(string inbox, MessageHandler<T> handler)
{
_router.RegisterInbox<T>(inbox, handler);
}
public void RegisterInbox(MessageHandler handler)
{
_router.RegisterInbox(handler);
}
public void RegisterInbox<T>(MessageHandler<T> handler)
{
_router.RegisterInbox<T>(handler);
}
public void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler)
{
_router.RegisterInbox<Tresponse>(inbox, handler);
}
public void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler)
{
_router.RegisterInbox<Trequest, Tresponse>(inbox, handler);
}
public void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler)
{
_router.RegisterInbox<Tresponse>(handler);
}
public void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler)
{
_router.RegisterInbox<Trequest, Tresponse>(handler);
}
#endregion
public void Dispose()
{
_client.Dispose();
}
}
}

@ -1,50 +0,0 @@
using System;
namespace ZeroLevel.Network
{
public static class NetworkStreamFastObfuscator
{
public static byte[] PrepareData(byte[] data)
{
var packet = new byte[data.Length + 6];
packet[0] = 181;
Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4);
packet[5] = (byte)(packet[0] ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]);
HashData(data, packet[5]);
Array.Copy(data, 0, packet, 6, data.Length);
return packet;
}
public static void HashData(byte[] data, byte initialmask)
{
if (data.Length == 0) return;
int i = 1;
data[0] ^= initialmask;
for (; i < (data.Length - 8); i += 8)
{
data[i + 0] ^= data[i - 1];
data[i + 1] ^= data[i + 0];
data[i + 2] ^= data[i + 1];
data[i + 3] ^= data[i + 2];
data[i + 4] ^= data[i + 3];
data[i + 5] ^= data[i + 4];
data[i + 6] ^= data[i + 5];
data[i + 7] ^= data[i + 6];
}
for (; i < data.Length; i++)
{
data[i] ^= data[i - 1];
}
}
public static void DeHashData(byte[] data, byte initialmask)
{
if (data.Length == 0) return;
for (var i = data.Length - 1; i > 0; i--)
{
data[i] ^= data[i - 1];
}
data[0] ^= initialmask;
}
}
}

@ -1,124 +0,0 @@
using System;
using System.Net;
using ZeroLevel.Models;
namespace ZeroLevel.Network
{
internal sealed class ExClient
: ZBaseNetwork, IExClient, IZBackward
{
private readonly IZTransport _transport;
private readonly ExRouter _router;
private readonly FrameExchange _fe;
public event Action Connected = () => { };
public new ZTransportStatus Status => _transport.Status;
public ExClient(IZTransport transport)
{
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
transport.OnConnect += Transport_OnConnect;
_fe = new FrameExchange(transport);
_router = new ExRouter();
transport.OnServerMessage += Transport_OnServerMessage;
}
public void ForceConnect()
{
try
{
_transport.EnsureConnection();
}
catch { }
}
private void Transport_OnConnect()
{
Connected();
}
private void Transport_OnServerMessage(object sender, Frame e)
{
try
{
_router.HandleMessage(e, this);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExClient] Fault handle server message");
}
finally
{
e?.Release();
}
}
public IPEndPoint Endpoint => _fe.Endpoint;
public override void Dispose()
{
_fe.Dispose();
}
public void RegisterInbox<T>(string inbox, Action<T, long, IZBackward> handler)
{
_router.RegisterInbox(inbox, handler);
}
public void RegisterInbox<T>(Action<T, long, IZBackward> handler)
{
_router.RegisterInbox(DEFAULT_MESSAGE_INBOX, handler);
}
public InvokeResult Request<Tresp>(Action<Tresp> callback)
{
return _fe.Request<Tresp>(DEFAULT_REQUEST_INBOX, resp => callback(resp));
}
public InvokeResult Request<Tresp>(string inbox, Action<Tresp> callback)
{
return _fe.Request<Tresp>(inbox, resp => callback(resp));
}
public InvokeResult Request<Treq, Tresp>(Treq obj, Action<Tresp> callback)
{
return _fe.Request<Treq, Tresp>(DEFAULT_REQUEST_INBOX, obj, resp => callback(resp));
}
public InvokeResult Request<Treq, Tresp>(string inbox, Treq obj, Action<Tresp> callback)
{
return _fe.Request<Treq, Tresp>(inbox, obj, resp => callback(resp));
}
public InvokeResult Send<T>(T obj)
{
return _fe.Send<T>(DEFAULT_MESSAGE_INBOX, obj);
}
public InvokeResult Send<T>(string inbox, T obj)
{
return _fe.Send<T>(inbox, obj);
}
public InvokeResult SendBackward(Frame frame)
{
return _fe.Send(frame);
}
public InvokeResult SendBackward<T>(string inbox, T obj)
{
return Send(inbox, obj);
}
public InvokeResult Send()
{
return _fe.Send(DEFAULT_MESSAGE_INBOX);
}
public InvokeResult Send(string inbox)
{
return _fe.Send(inbox);
}
}
}

@ -1,77 +0,0 @@
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
public static class FrameBuilder
{
public static Frame BuildFrame<T>(T obj, string inbox)
{
var frame = Frame.FromPool();
frame.FrameId = Frame.GetMessageId();
frame.IsRequest = false;
frame.Inbox = inbox;
frame.Payload = MessageSerializer.SerializeCompatible(obj);
return frame;
}
public static Frame BuildFrame(string inbox)
{
var frame = Frame.FromPool();
frame.FrameId = Frame.GetMessageId();
frame.IsRequest = false;
frame.Inbox = inbox;
frame.Payload = null;
return frame;
}
public static Frame BuildRequestFrame<T>(T obj, string inbox)
{
var frame = Frame.FromPool();
frame.FrameId = Frame.GetMessageId();
frame.IsRequest = true;
frame.Inbox = inbox;
frame.Payload = MessageSerializer.SerializeCompatible(obj);
return frame;
}
public static Frame BuildRequestFrame(string inbox)
{
var frame = Frame.FromPool();
frame.FrameId = Frame.GetMessageId();
frame.IsRequest = true;
frame.Inbox = inbox;
frame.Payload = null;
return frame;
}
public static Frame BuildResponseFrame(object obj, Frame request)
{
var frame = Frame.FromPool();
frame.IsRequest = true;
frame.FrameId = request.FrameId;
frame.Inbox = request.Inbox;
frame.Payload = MessageSerializer.SerializeCompatible(obj);
return frame;
}
public static Frame BuildResponseFrame<T>(T obj, Frame request)
{
var frame = Frame.FromPool();
frame.IsRequest = true;
frame.FrameId = request.FrameId;
frame.Inbox = request.Inbox;
frame.Payload = MessageSerializer.SerializeCompatible(obj);
return frame;
}
public static Frame BuildResponseFrame<T>(T obj, Frame request, string inbox)
{
var frame = Frame.FromPool();
frame.IsRequest = true;
frame.FrameId = request.FrameId;
frame.Inbox = inbox;
frame.Payload = MessageSerializer.SerializeCompatible(obj);
return frame;
}
}
}

@ -1,104 +0,0 @@
using System;
using System.Net;
using ZeroLevel.Models;
namespace ZeroLevel.Network
{
internal sealed class FrameExchange
: IDisposable
{
private IZTransport _current;
public IPEndPoint Endpoint => _current?.Endpoint;
public bool IsConnected => _current?.Status == ZTransportStatus.Working;
public FrameExchange(IZTransport transport)
{
_current = transport ?? throw new ArgumentNullException(nameof(transport));
}
public InvokeResult Send<T>(string inbox, T obj)
{
try
{
var frame = FrameBuilder.BuildFrame(obj, inbox);
_current.Send(frame);
return InvokeResult.Succeeding();
}
catch (Exception ex)
{
Log.SystemError(ex, "[FrameExchange] Fault send frame");
return InvokeResult.Fault(ex.Message);
}
}
public InvokeResult Send(string inbox)
{
try
{
var frame = FrameBuilder.BuildFrame(inbox);
_current.Send(frame);
return InvokeResult.Succeeding();
}
catch (Exception ex)
{
Log.SystemError(ex, "[FrameExchange] Fault send frame");
return InvokeResult.Fault(ex.Message);
}
}
public InvokeResult Send(Frame frame)
{
try
{
_current.Send(frame);
return InvokeResult.Succeeding();
}
catch (Exception ex)
{
Log.SystemError(ex, "[FrameExchange] Fault send frame");
return InvokeResult.Fault(ex.Message);
}
}
public InvokeResult Request<Treq, Tresp>(string inbox, Treq obj, Action<Tresp> callback, Action<string> fault = null)
{
try
{
var frame = FrameBuilder.BuildRequestFrame(obj, inbox);
_current.Request(frame, response_data =>
{
callback(response_data.Read<Tresp>());
}, fault);
return InvokeResult.Succeeding();
}
catch (Exception ex)
{
Log.SystemError(ex, "[FrameExchange] Fault send frame");
return InvokeResult.Fault(ex.Message);
}
}
public InvokeResult Request<Tresp>(string inbox, Action<Tresp> callback, Action<string> fault = null)
{
try
{
var frame = FrameBuilder.BuildRequestFrame(inbox);
_current.Request(frame, response_data =>
{
callback(response_data.Read<Tresp>());
}, fault);
return InvokeResult.Succeeding();
}
catch (Exception ex)
{
Log.SystemError(ex, "[FrameExchange] Fault send frame");
return InvokeResult.Fault(ex.Message);
}
}
public void Dispose()
{
_current?.Dispose();
}
}
}

@ -1,191 +0,0 @@
using System;
using System.Threading.Tasks;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
public sealed class FrameParser
{
#region private models
private enum ParserState
{
WaitNew,
WaitSize,
Proceeding
}
private class _Accum
{
public byte[] Payload;
public int Size;
public bool SizeFilled;
public bool PayloadFilled;
public bool Corrupted;
public void Reset()
{
Size = 0;
offset = 0;
Payload = null;
SizeFilled = false;
PayloadFilled = false;
Corrupted = false;
}
private byte[] _size_buf = new byte[4];
private int offset;
public int WriteSize(byte[] buf, int start, int length)
{
for (; offset < 4 && start < length; offset++, start++)
{
_size_buf[offset] = buf[start];
}
if (offset == 4)
{
Size = BitConverter.ToInt32(_size_buf, 0);
SizeFilled = true;
offset = 0;
if (Size == 0)
{
// At least 1 byte with checksum must be
Corrupted = true;
}
}
return start;
}
public int WritePayload(byte[] buf, int start, int length)
{
if (Payload == null)
{
Payload = new byte[Size];
var mask = ((byte)(ZBaseNetwork.PACKET_HEADER_START_BYTE ^ _size_buf[0] ^ _size_buf[1] ^ _size_buf[2] ^ _size_buf[3]));
if (buf[start] != mask)
{
Corrupted = true;
return start;
}
start = start + 1;
}
int i = start;
for (; offset < Size && i < length; offset++, i++)
{
Payload[offset] = buf[i];
}
if (offset == Size)
{
var mask = ((byte)(ZBaseNetwork.PACKET_HEADER_START_BYTE ^ _size_buf[0] ^ _size_buf[1] ^ _size_buf[2] ^ _size_buf[3]));
NetworkStreamFastObfuscator.DeHashData(Payload, mask);
PayloadFilled = true;
}
return i;
}
}
#endregion private models
private void FireOnFrame(byte[] payload)
{
Frame frame;
try
{
frame = MessageSerializer.Deserialize<Frame>(payload);
}
catch (Exception ex)
{
//NetworkStats.Corrupted();
Log.SystemError(ex, "[FrameParser] Fault deserialize frame from incomig data");
return;
}
try
{
Task.Run(() => OnIncomingFrame?.Invoke(frame));
//NetworkStats.Receive(payload);
}
catch (Exception ex)
{
Log.SystemError(ex, "[FrameParser] Fault handle frame");
}
}
public event Action<Frame> OnIncomingFrame;
private readonly _Accum _accum = new _Accum();
private ParserState _state = ParserState.WaitNew;
private readonly object _push_lock = new object();
/// <summary>
/// Parse with state machine
/// </summary>
public void Push(byte[] part, int start, int length)
{
lock (_push_lock)
{
__Push(part, start, length);
}
}
private void __Push(byte[] part, int start, int length)
{
if (part == null || length == 0 || start >= length) return;
while (start < length)
{
switch (_state)
{
case ParserState.WaitNew:
{
for (; start < length; start++)
{
// Search for the beginning of the package header
if ((part[start] & ZBaseNetwork.PACKET_HEADER_START_BYTE) == ZBaseNetwork.PACKET_HEADER_START_BYTE)
{
_accum.Reset();
_state = ParserState.WaitSize;
start += 1;
break;
}
}
}
break;
case ParserState.WaitSize:
{
start = _accum.WriteSize(part, start, length);
if (_accum.SizeFilled)
{
if (_accum.Corrupted || _accum.Size < 1 || _accum.Size > ZBaseNetwork.MAX_FRAME_PAYLOAD_SIZE)
{
//NetworkStats.Corrupted();
_state = ParserState.WaitNew;
}
else
{
_state = ParserState.Proceeding;
}
}
}
break;
case ParserState.Proceeding:
{
start = _accum.WritePayload(part, start, length);
if (_accum.Corrupted)
{
// NetworkStats.Corrupted();
_state = ParserState.WaitNew;
}
else if (_accum.PayloadFilled)
{
FireOnFrame(_accum.Payload);
_state = ParserState.WaitNew;
}
}
break;
}
}
}
}
}

@ -1,30 +0,0 @@
using System;
using System.Net;
namespace ZeroLevel.Network
{
public class ZExSocketObservableServer :
ZSocketServer, IZObservableServer
{
public ZExSocketObservableServer(IPEndPoint endpoint)
: base(endpoint)
{
}
public IPEndPoint Endpoint => base.LocalEndpoint;
public event Action<Frame, IZBackward> OnMessage = (_, __) => { };
public event Func<Frame, IZBackward, Frame> OnRequest = (_, __) => null;
protected override void Handle(Frame frame, IZBackward client)
{
OnMessage(frame, client);
}
protected override Frame HandleRequest(Frame frame, IZBackward client)
{
return OnRequest(frame, client);
}
}
}

@ -7,207 +7,174 @@ using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
public class ZSocketClient
: ZBaseNetwork, IZTransport
public class SocketClient
: BaseSocket, ISocketClient
{
#region Private
private Socket _clientSocket;
private NetworkStream _stream;
private FrameParser _parser = new FrameParser();
private Thread _sendThread;
private long _heartbeat_key;
private long _heartbeat_key = -1;
private long _last_rw_time = DateTime.UtcNow.Ticks;
private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
private readonly object _reconnection_lock = new object();
private readonly BlockingCollection<Frame> _send_queue = new BlockingCollection<Frame>();
private readonly BlockingCollection<SendInfo> _send_queue = new BlockingCollection<SendInfo>();
private readonly RequestBuffer _requests = new RequestBuffer();
private int _current_heartbeat_period_in_ms = 0;
private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета
private struct SendInfo
{
public bool isRequest;
public int identity;
public byte[] data;
}
#endregion Private
public event EventHandler<Frame> OnServerMessage = (_, __) => { };
public event Action OnConnect = () => { };
public event Action OnDisconnect = () => { };
public IPEndPoint Endpoint { get; }
public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } }
public ZSocketClient(IPEndPoint ep)
public SocketClient(IPEndPoint ep)
{
Endpoint = ep;
_parser.OnIncomingFrame += _parser_OnIncomingFrame;
_heartbeat_key = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat);
_parser.OnIncoming += _parser_OnIncoming;
_sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true;
_sendThread.Start();
}
public SocketClient(Socket socket)
{
_socket_freezed = true;
_clientSocket = socket;
Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint;
_parser.OnIncoming += _parser_OnIncoming;
_sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true;
_sendThread.Start();
}
#region Private methods
#region API
public event Action<ISocketClient> OnConnect = (s) => { };
public event Action<ISocketClient> OnDisconnect = (s) => { };
public event Action<ISocketClient, byte[], int> OnIncomingData = (_, __, ___) => { };
public IPEndPoint Endpoint { get; }
private void Heartbeat()
public void Request(Frame frame, Action<Frame> callback, Action<string> fail = null)
{
try
{
EnsureConnection();
}
catch(Exception ex)
{
Log.SystemError(ex, "ZSocketClient.Heartbeat()->EnsureConnection()");
Broken();
return;
}
_requests.TestForTimeouts();
try
{
Request(FrameBuilder.BuildFrame(DEFAULT_PING_INBOX), r => { });
}
catch (Exception ex)
{
Log.SystemError(ex, "ZSocketClient.Heartbeat()->Request()");
}
var diff_request_ms = ((DateTime.UtcNow.Ticks - _last_rw_time) / TimeSpan.TicksPerMillisecond);
if (diff_request_ms > (HEARTBEAT_UPDATE_PERIOD_MS * 2))
if (frame == null) throw new ArgumentNullException(nameof(frame));
if (frame != null && false == _send_queue.IsAddingCompleted)
{
var port = (_clientSocket.LocalEndPoint as IPEndPoint)?.Port;
Log.Debug($"[ZClient] server disconnected, because last data was more thas {diff_request_ms} ms ago. Client port {port}");
Broken();
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{
Thread.Sleep(50);
}
int id;
var sendInfo = new SendInfo
{
isRequest = true,
data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out id)
};
sendInfo.identity = id;
_requests.RegisterForFrame(id, callback, fail);
_send_queue.Add(sendInfo);
frame.Release();
}
}
private void _parser_OnIncomingFrame(Frame frame)
public void ForceConnect()
{
if (frame == null || frame.Inbox == null) return;
_last_rw_time = DateTime.UtcNow.Ticks;
if (frame.IsRequest)
EnsureConnection();
}
public void Send(Frame frame)
{
if (frame == null) throw new ArgumentNullException(nameof(frame));
if (frame != null && false == _send_queue.IsAddingCompleted)
{
// Got response on request with id = packet_id
try
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{
_requests.Success(frame.FrameId, frame);
Thread.Sleep(50);
}
catch (Exception ex)
_send_queue.Add(new SendInfo
{
Log.SystemError(ex, "ZSocketClient._parser_OnIncomingFrame()->_requests.Success(). Fault handle response");
}
isRequest = false,
identity = 0,
data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame))
});
frame.Release();
}
else
}
public void Response(byte[] data, int identity)
{
if (data == null) throw new ArgumentNullException(nameof(data));
if (false == _send_queue.IsAddingCompleted)
{
// Got server comand
if (frame.Inbox.Equals(DEFAULT_PING_INBOX, StringComparison.Ordinal))
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{
_last_rw_time = DateTime.UtcNow.Ticks;
Thread.Sleep(50);
}
else
_send_queue.Add(new SendInfo
{
try
{
OnServerMessage?.Invoke(this, frame);
}
catch (Exception ex)
{
Log.SystemError(ex, "ZSocketClient._parser_OnIncomingFrame()->OnServerMessage?.Invoke(). Fault handle server message");
}
}
isRequest = false,
identity = 0,
data = NetworkPacketFactory.Response(data, identity)
});
}
frame?.Release();
}
private void ReceiveAsyncCallback(IAsyncResult ar)
public void UseKeepAlive(TimeSpan period)
{
try
if (_heartbeat_key != -1)
{
EnsureConnection();
var count = _stream.EndRead(ar);
if (count > 0)
{
_parser.Push(_buffer, 0, count);
_last_rw_time = DateTime.UtcNow.Ticks;
}
if (Status == ZTransportStatus.Working)
{
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
Sheduller.Remove(_heartbeat_key);
}
catch (ObjectDisposedException)
if (period != TimeSpan.Zero && period.TotalMilliseconds > MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS)
{
/// Nothing
_current_heartbeat_period_in_ms = (int)period.TotalMilliseconds;
_heartbeat_key = Sheduller.RemindEvery(period, Heartbeat);
}
catch (Exception ex)
else
{
Log.SystemError(ex, $"[ZSocketServerClient] Error read data");
Broken();
OnDisconnect();
_current_heartbeat_period_in_ms = 0;
}
}
#endregion
private void SendFramesJob()
#region Private methods
private void _parser_OnIncoming(FrameType type, int identity, byte[] data)
{
Frame frame = null;
while (Status != ZTransportStatus.Disposed)
try
{
if (_send_queue.IsCompleted)
{
return;
}
if (Status != ZTransportStatus.Working)
{
Thread.Sleep(100);
try
{
EnsureConnection();
}
catch (Exception ex)
{
Log.SystemError(ex, "[ZSocketClient] Send next frame fault");
}
if (Status == ZTransportStatus.Disposed) return;
continue;
}
try
switch (type)
{
frame = _send_queue.Take();
var data = NetworkStreamFastObfuscator.PrepareData(MessageSerializer.Serialize(frame));
if (data != null && data.Length > 0)
{
if (frame.IsRequest)
{
_requests.StartSend(frame.FrameId);
}
_stream.Write(data, 0, data.Length);
case FrameType.KeepAlive:
_last_rw_time = DateTime.UtcNow.Ticks;
//NetworkStats.Send(data);
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Backward send error.");
Broken();
OnDisconnect();
}
finally
{
frame?.Release();
break;
case FrameType.Message:
case FrameType.Request:
OnIncomingData(this, data, identity);
break;
case FrameType.Response:
_requests.Success(identity, MessageSerializer.Deserialize<Frame>(data));
break;
}
}
catch (Exception ex)
{
Log.Error(ex, $"[SocketClient._parser_OnIncoming]");
}
}
#endregion Private methods
#region API
private bool TryConnect()
{
if (Status == ZTransportStatus.Working)
if (Status == SocketClientStatus.Working)
{
return true;
}
if (Status == ZTransportStatus.Disposed)
if (Status == SocketClientStatus.Disposed)
{
return false;
}
@ -235,24 +202,28 @@ namespace ZeroLevel.Network
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketClient] Connection fault");
Log.SystemError(ex, $"[SocketClient.TryConnect] Connection fault");
Broken();
return false;
}
Working();
OnConnect();
OnConnect(this);
return true;
}
public void EnsureConnection()
{
if (_socket_freezed)
{
return;
}
lock (_reconnection_lock)
{
if (Status == ZTransportStatus.Disposed)
if (Status == SocketClientStatus.Disposed)
{
throw new ObjectDisposedException("connection");
}
if (Status != ZTransportStatus.Working)
if (Status != SocketClientStatus.Working)
{
if (false == TryConnect())
{
@ -262,47 +233,112 @@ namespace ZeroLevel.Network
}
}
public void Send(Frame frame)
private void Heartbeat()
{
if (frame == null) throw new ArgumentNullException(nameof(frame));
EnsureConnection();
if (frame != null && false == _send_queue.IsAddingCompleted)
try
{
EnsureConnection();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.Heartbeat.EnsureConnection]");
Broken();
return;
}
_requests.TestForTimeouts();
try
{
while (_send_queue.Count >= ZBaseNetwork.MAX_SEND_QUEUE_SIZE)
_send_queue.Add(new SendInfo
{
Thread.Sleep(50);
}
_send_queue.Add(frame);
identity = 0,
isRequest = false,
data = NetworkPacketFactory.KeepAliveMessage()
});
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.Heartbeat.Request]");
}
var diff_request_ms = ((DateTime.UtcNow.Ticks - _last_rw_time) / TimeSpan.TicksPerMillisecond);
if (diff_request_ms > (_current_heartbeat_period_in_ms * 2))
{
var port = (_clientSocket.LocalEndPoint as IPEndPoint)?.Port;
Log.Debug($"[SocketClient.Heartbeat] server disconnected, because last data was more thas {diff_request_ms} ms ago. Client port {port}");
Broken();
}
}
public void Request(Frame frame, Action<Frame> callback, Action<string> fail = null)
private void ReceiveAsyncCallback(IAsyncResult ar)
{
if (frame == null) throw new ArgumentNullException(nameof(frame));
try
{
EnsureConnection();
var count = _stream.EndRead(ar);
if (count > 0)
{
_parser.Push(_buffer, count);
_last_rw_time = DateTime.UtcNow.Ticks;
}
if (Status == SocketClientStatus.Working)
{
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
}
catch (Exception ex)
{
fail?.Invoke(ex.Message);
return;
}
_requests.RegisterForFrame(frame, callback, fail);
try
catch (ObjectDisposedException)
{
Send(frame);
/// Nothing
}
catch (Exception ex)
{
fail?.Invoke(ex.Message);
Log.SystemError(ex, $"[SocketClient.ReceiveAsyncCallback] Error read data");
Broken();
OnDisconnect();
Log.SystemError(ex, $"[ZSocketClient] Request error. Frame '{frame.FrameId}'. Inbox '{frame.Inbox}'");
OnDisconnect(this);
}
}
private void SendFramesJob()
{
SendInfo frame;
while (Status != SocketClientStatus.Disposed)
{
if (_send_queue.IsCompleted)
{
return;
}
if (Status != SocketClientStatus.Working)
{
Thread.Sleep(100);
try
{
EnsureConnection();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault");
}
if (Status == SocketClientStatus.Disposed) return;
continue;
}
try
{
frame = _send_queue.Take();
if (frame.isRequest)
{
_requests.StartSend(frame.identity);
}
_stream.Write(frame.data, 0, frame.data.Length);
_last_rw_time = DateTime.UtcNow.Ticks;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[SocketClient.SendFramesJob] Backward send error.");
Broken();
OnDisconnect(this);
}
}
}
#endregion API
#endregion
#region Helper
@ -318,9 +354,9 @@ namespace ZeroLevel.Network
public override void Dispose()
{
if (Status == ZTransportStatus.Working)
if (Status == SocketClientStatus.Working)
{
OnDisconnect();
OnDisconnect(this);
}
Disposed();
Sheduller.Remove(_heartbeat_key);

@ -0,0 +1,119 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace ZeroLevel.Network
{
public class SocketServer
: BaseSocket
{
private Socket _serverSocket;
private ReaderWriterLockSlim _connection_set_lock = new ReaderWriterLockSlim();
private Dictionary<IPEndPoint, NetworkNode> _connections = new Dictionary<IPEndPoint, NetworkNode>();
public IPEndPoint LocalEndpoint { get; }
public event Action<ISocketClient> OnDisconnect = _ => { };
public event Action<ISocketClient> OnConnect = _ => { };
public IEnumerable<IPEndPoint> ConnectionList
{
get
{
try
{
_connection_set_lock.EnterReadLock();
return _connections.Select(c => c.Value.EndPoint).ToList();
}
finally
{
_connection_set_lock.ExitReadLock();
}
}
}
private void DisconnectEventRise(ISocketClient client)
{
try
{
OnDisconnect?.Invoke(client);
}
catch
{ }
}
private void ConnectEventRise(ISocketClient client)
{
try
{
OnConnect?.Invoke(client);
}
catch
{ }
}
public SocketServer(IPEndPoint endpoint)
{
LocalEndpoint = endpoint;
_serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_serverSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
_serverSocket.Bind(endpoint);
_serverSocket.Listen(100);
Working();
_serverSocket.BeginAccept(BeginAcceptCallback, null);
}
private void BeginAcceptCallback(IAsyncResult ar)
{
if (Status == SocketClientStatus.Working)
{
try
{
var client_socket = _serverSocket.EndAccept(ar);
_serverSocket.BeginAccept(BeginAcceptCallback, null);
_connection_set_lock.EnterWriteLock();
var connection = new SocketClient(client_socket);
connection.OnDisconnect += Connection_OnDisconnect;
_connections[connection.Endpoint] = new NetworkNode(connection);
ConnectEventRise(connection);
}
catch (Exception ex)
{
Broken();
Log.SystemError(ex, "[ZSocketServer] Error with connect accepting");
}
finally
{
_connection_set_lock.ExitWriteLock();
}
}
}
private void Connection_OnIncomingData(ISocketClient client, byte[] data, int identity)
{
throw new NotImplementedException();
}
private void Connection_OnDisconnect(ISocketClient client)
{
client.OnDisconnect -= Connection_OnDisconnect;
try
{
_connection_set_lock.EnterWriteLock();
_connections[client.Endpoint].Dispose();
_connections.Remove(client.Endpoint);
}
finally
{
_connection_set_lock.ExitWriteLock();
}
DisconnectEventRise(client);
}
public override void Dispose()
{
}
}
}

@ -1,9 +1,7 @@
using System;
using System.Threading.Tasks;
using ZeroLevel.Network;
using ZeroLevel.Services._Network;
namespace ZeroLevel._Network
namespace ZeroLevel.Network
{
public sealed class FrameParser
{
@ -130,6 +128,7 @@ namespace ZeroLevel._Network
}
#endregion private models
public event Action<FrameType, int, byte[]> OnIncoming;
private readonly _Accum _accum = new _Accum();
@ -187,7 +186,7 @@ namespace ZeroLevel._Network
position = _accum.WriteSize(part, position, length);
if (_accum.SizeFilled)
{
if (_accum.Corrupted || _accum.Size < 1 || _accum.Size > ZBaseNetwork.MAX_FRAME_PAYLOAD_SIZE)
if (_accum.Corrupted || _accum.Size < 1 || _accum.Size > BaseSocket.MAX_FRAME_PAYLOAD_SIZE)
{
_state = ParserState.WaitNew;
}

@ -2,7 +2,7 @@
using System.Runtime.CompilerServices;
using System.Threading;
namespace ZeroLevel.Services._Network
namespace ZeroLevel.Network
{
public static class NetworkPacketFactory
{

@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using ZeroLevel.Network;
using ZeroLevel.Services.Pools;
namespace ZeroLevel.Network
@ -11,13 +10,13 @@ namespace ZeroLevel.Network
private Dictionary<long, RequestInfo> _requests = new Dictionary<long, RequestInfo>();
private static ObjectPool<RequestInfo> _ri_pool = new ObjectPool<RequestInfo>(() => new RequestInfo());
public void RegisterForFrame(Frame frame, Action<Frame> callback, Action<string> fail = null)
public void RegisterForFrame(int identity, Action<Frame> callback, Action<string> fail = null)
{
var ri = _ri_pool.Allocate();
lock (_reqeust_lock)
{
ri.Reset(callback, fail);
_requests.Add(frame.FrameId, ri);
_requests.Add(identity, ri);
}
}
@ -83,7 +82,7 @@ namespace ZeroLevel.Network
{
if (pair.Value.Sended == false) continue;
var diff = now_ticks - pair.Value.Timestamp;
if (diff > ZBaseNetwork.MAX_REQUEST_TIME_TICKS)
if (diff > BaseSocket.MAX_REQUEST_TIME_TICKS)
{
to_remove.Add(pair.Key);
}

@ -8,7 +8,8 @@ using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
internal sealed class ExRouter
public class Router
: IRouter
{
#region Routing
@ -47,7 +48,7 @@ namespace ZeroLevel.Network
private Type _typeResp;
private bool _noArguments = false;
public static MRInvoker Create(Action<long, IZBackward> handler)
public static MRInvoker Create(MessageHandler handler)
{
return new MRInvoker
{
@ -59,7 +60,7 @@ namespace ZeroLevel.Network
};
}
public static MRInvoker Create<T>(Action<T, long, IZBackward> handler)
public static MRInvoker Create<T>(MessageHandler<T> handler)
{
return new MRInvoker
{
@ -70,50 +71,51 @@ namespace ZeroLevel.Network
};
}
public static MRInvoker Create<Treq, Tresp>(Func<Treq, long, IZBackward, Tresp> handler)
public static MRInvoker Create<Tresponse>(RequestHandler<Tresponse> handler)
{
return new MRInvoker
{
_typeReq = typeof(Treq),
_typeResp = typeof(Tresp),
_noArguments = true,
_typeReq = null,
_typeResp = typeof(Tresponse),
_instance = handler.Target,
_invoker = CreateCompiledExpression(handler)
};
}
public static MRInvoker Create<Tresp>(Func<long, IZBackward, Tresp> handler)
public static MRInvoker Create<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler)
{
return new MRInvoker
{
_typeReq = null,
_typeResp = typeof(Tresp),
_typeReq = typeof(Trequest),
_typeResp = typeof(Tresponse),
_instance = handler.Target,
_invoker = CreateCompiledExpression(handler)
};
}
public object Invoke(Frame frame, IZBackward client)
public object Invoke(byte[] data, ISocketClient client)
{
if (_typeResp == null)
{
var incoming = MessageSerializer.DeserializeCompatible(_typeReq, frame.Payload);
var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data);
if (_noArguments)
{
this._invoker.Invoke(this._instance, new object[] { frame.FrameId, client });
this._invoker.Invoke(this._instance, new object[] { client });
}
else
{
this._invoker.Invoke(this._instance, new object[] { incoming, frame.FrameId, client });
this._invoker.Invoke(this._instance, new object[] { incoming, client });
}
}
else if (_typeReq == null)
{
return this._invoker.Invoke(this._instance, new object[] { frame.FrameId, client });
return this._invoker.Invoke(this._instance, new object[] { client });
}
else
{
var incoming = MessageSerializer.DeserializeCompatible(_typeReq, frame.Payload);
return this._invoker.Invoke(this._instance, new object[] { incoming, frame.FrameId, client });
var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data);
return this._invoker.Invoke(this._instance, new object[] { incoming, client });
}
return null;
}
@ -127,9 +129,57 @@ namespace ZeroLevel.Network
#endregion Routing
#region Registration
#region Invokation
public void HandleMessage(Frame frame, ISocketClient client)
{
try
{
if (_handlers.ContainsKey(frame.Inbox))
{
foreach (var handler in _handlers[frame.Inbox])
{
try
{
handler.Invoke(frame.Payload, client);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExRouter] Fault handle incomind message");
}
}
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExRouter] Fault handle incomind message");
}
}
public void RegisterInbox(string inbox, Action<long, IZBackward> handler)
public byte[] HandleRequest(Frame frame, ISocketClient client)
{
try
{
if (_requestors.ContainsKey(frame.Inbox))
{
return MessageSerializer.SerializeCompatible(_requestors[frame.Inbox].Invoke(frame.Payload, client));
}
else
{
Log.SystemWarning($"[ExRouter] Not found inbox '{frame.Inbox}' for incoming request");
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExRouter] Fault handle incomind request");
}
return null;
}
#endregion Invokation
#region Message handlers registration
public void RegisterInbox(string inbox, MessageHandler handler)
{
if (false == _handlers.ContainsKey(inbox))
{
@ -138,7 +188,7 @@ namespace ZeroLevel.Network
_handlers[inbox].Add(MRInvoker.Create(handler));
}
public void RegisterInbox<T>(string inbox, Action<T, long, IZBackward> handler)
public void RegisterInbox<T>(string inbox, MessageHandler<T> handler)
{
if (false == _handlers.ContainsKey(inbox))
{
@ -147,11 +197,31 @@ namespace ZeroLevel.Network
_handlers[inbox].Add(MRInvoker.Create<T>(handler));
}
public void RegisterInbox<Treq, Tresp>(string inbox, Func<Treq, long, IZBackward, Tresp> hanlder)
public void RegisterInbox(MessageHandler handler)
{
if (false == _handlers.ContainsKey(BaseSocket.DEFAULT_MESSAGE_INBOX))
{
_handlers.Add(BaseSocket.DEFAULT_MESSAGE_INBOX, new List<MRInvoker>());
}
_handlers[BaseSocket.DEFAULT_MESSAGE_INBOX].Add(MRInvoker.Create(handler));
}
public void RegisterInbox<T>(MessageHandler<T> handler)
{
if (false == _handlers.ContainsKey(BaseSocket.DEFAULT_MESSAGE_INBOX))
{
_handlers.Add(BaseSocket.DEFAULT_MESSAGE_INBOX, new List<MRInvoker>());
}
_handlers[BaseSocket.DEFAULT_MESSAGE_INBOX].Add(MRInvoker.Create<T>(handler));
}
#endregion
#region Request handlers registration
public void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler)
{
if (false == _requestors.ContainsKey(inbox))
{
_requestors.Add(inbox, MRInvoker.Create<Treq, Tresp>(hanlder));
_requestors.Add(inbox, MRInvoker.Create<Tresponse>(handler));
}
else
{
@ -159,11 +229,11 @@ namespace ZeroLevel.Network
}
}
public void RegisterInbox<Tresp>(string inbox, Func<long, IZBackward, Tresp> hanlder)
public void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler)
{
if (false == _requestors.ContainsKey(inbox))
{
_requestors.Add(inbox, MRInvoker.Create<Tresp>(hanlder));
_requestors.Add(inbox, MRInvoker.Create<Trequest, Tresponse>(handler));
}
else
{
@ -171,55 +241,29 @@ namespace ZeroLevel.Network
}
}
#endregion Registration
#region Invokation
public void HandleMessage(Frame frame, IZBackward client)
public void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler)
{
try
if (false == _requestors.ContainsKey(BaseSocket.DEFAULT_REQUEST_INBOX))
{
if (_handlers.ContainsKey(frame.Inbox))
{
foreach (var handler in _handlers[frame.Inbox])
{
try
{
handler.Invoke(frame, client);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExRouter] Fault handle incomind message");
}
}
}
_requestors.Add(BaseSocket.DEFAULT_REQUEST_INBOX, MRInvoker.Create<Tresponse>(handler));
}
catch (Exception ex)
else
{
Log.SystemError(ex, $"[ExRouter] Fault handle incomind message");
throw new Exception($"[SocketExchangeServer] Inbox {BaseSocket.DEFAULT_REQUEST_INBOX} already exists");
}
}
public Frame HandleRequest(Frame frame, IZBackward client)
public void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler)
{
try
if (false == _requestors.ContainsKey(BaseSocket.DEFAULT_REQUEST_INBOX))
{
if (_requestors.ContainsKey(frame.Inbox))
{
return FrameBuilder.BuildResponseFrame(_requestors[frame.Inbox].Invoke(frame, client), frame);
}
else
{
Log.SystemWarning($"[ExRouter] Not found inbox '{frame.Inbox}' for incoming request");
}
_requestors.Add(BaseSocket.DEFAULT_REQUEST_INBOX, MRInvoker.Create<Trequest, Tresponse>(handler));
}
catch (Exception ex)
else
{
Log.SystemError(ex, $"[ExRouter] Fault handle incomind request");
throw new Exception($"[SocketExchangeServer] Inbox {BaseSocket.DEFAULT_REQUEST_INBOX} already exists");
}
return null;
}
#endregion Invokation
#endregion
}
}

@ -1,167 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace ZeroLevel.Network
{
public abstract class ZSocketServer
: ZBaseNetwork
{
public IPEndPoint LocalEndpoint { get; }
public event Action<IZBackward> OnDisconnect = _ => { };
public event Action<IZBackward> OnConnect = _ => { };
public IEnumerable<IPEndPoint> ConnectionList
{
get
{
try
{
_connection_set_lock.EnterReadLock();
return _connections.Select(c => c.Endpoint).ToList();
}
finally
{
_connection_set_lock.ExitReadLock();
}
}
}
#region Private members
private Socket _serverSocket;
private long _heartbeat_task = -1;
private readonly Frame _pingFrame = FrameBuilder.BuildFrame(DEFAULT_PING_INBOX);
private ReaderWriterLockSlim _connection_set_lock = new ReaderWriterLockSlim();
private HashSet<ZSocketServerClient> _connections = new HashSet<ZSocketServerClient>();
private void DisconnectEventRise(IZBackward client)
{
try
{
OnDisconnect?.Invoke(client);
}
catch
{ }
}
private void ConnectEventRise(IZBackward client)
{
try
{
OnConnect?.Invoke(client);
}
catch
{ }
}
private void Heartbeat()
{
var enumerator = _connections.GetEnumerator();
try
{
while (enumerator.MoveNext())
{
var connection = enumerator.Current;
if ((DateTime.UtcNow.Ticks - connection.LastNetworkActionTimestamp) >= HEARTBEAT_PING_PERIOD_TICKS)
{
connection.SendBackward(_pingFrame);
}
}
}
catch { }
GC.Collect(1, GCCollectionMode.Forced, false);
}
private void BeginAcceptCallback(IAsyncResult ar)
{
if (Status == ZTransportStatus.Working)
{
try
{
var client_socket = _serverSocket.EndAccept(ar);
_serverSocket.BeginAccept(BeginAcceptCallback, null);
_connection_set_lock.EnterWriteLock();
var connection = new ZSocketServerClient(client_socket, Handle, HandleRequest);
connection.OnConnectionBroken += Connection_OnConnectionBroken;
_connections.Add(connection);
ConnectEventRise(connection);
}
catch (Exception ex)
{
Broken();
Log.SystemError(ex, "[ZSocketServer] Error with connect accepting");
}
finally
{
_connection_set_lock.ExitWriteLock();
}
}
}
private void Connection_OnConnectionBroken(ZSocketServerClient connection)
{
connection.OnConnectionBroken -= Connection_OnConnectionBroken;
try
{
_connection_set_lock.EnterWriteLock();
_connections.Remove(connection);
}
finally
{
_connection_set_lock.ExitWriteLock();
}
connection.Dispose();
DisconnectEventRise(connection);
}
#endregion Private members
public ZSocketServer(IPEndPoint endpoint)
{
LocalEndpoint = endpoint;
_serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_serverSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
_serverSocket.Bind(endpoint);
_serverSocket.Listen(100);
_heartbeat_task = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat);
Working();
_serverSocket.BeginAccept(BeginAcceptCallback, null);
}
protected abstract void Handle(Frame frame, IZBackward client);
protected abstract Frame HandleRequest(Frame frame, IZBackward client);
public override void Dispose()
{
if (Status == ZTransportStatus.Disposed)
{
return;
}
Sheduller.Remove(_heartbeat_task);
Disposed();
_serverSocket.Close();
_serverSocket.Dispose();
try
{
_connection_set_lock.EnterReadLock();
foreach (var c in _connections)
{
c.Dispose();
DisconnectEventRise(c);
}
}
finally
{
_connection_set_lock.ExitReadLock();
}
_connection_set_lock.Dispose();
}
}
}

@ -1,235 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using ZeroLevel.Models;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
internal sealed class ZSocketServerClient
: ZBaseNetwork, IZBackward, IEquatable<ZSocketServerClient>
{
public IPEndPoint Endpoint { get; }
internal long LastNetworkActionTimestamp { get; private set; } = DateTime.UtcNow.Ticks;
private Thread _sendThread;
private NetworkStream _stream;
private readonly Socket _socket;
private readonly FrameParser _parser;
private readonly Action<Frame, IZBackward> _handler;
private readonly Func<Frame, IZBackward, Frame> _requestor;
private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
private readonly BlockingCollection<byte[]> _send_queue = new BlockingCollection<byte[]>();
public event Action<ZSocketServerClient> OnConnectionBroken = (_) => { };
private void RizeConnectionBrokenEvent()
{
try { OnConnectionBroken?.Invoke(this); } catch { }
}
public ZSocketServerClient(Socket socket,
Action<Frame, IZBackward> handler,
Func<Frame, IZBackward, Frame> requestor)
{
_socket = socket ?? throw new ArgumentNullException(nameof(socket));
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
_requestor = requestor ?? throw new ArgumentNullException(nameof(requestor));
Endpoint = _socket.RemoteEndPoint as IPEndPoint;
_stream = new NetworkStream(_socket, true);
_parser = new FrameParser();
_parser.OnIncomingFrame += _parser_OnIncomingFrame;
Working();
_sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true;
_sendThread.Start();
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
public InvokeResult SendBackward(Frame frame)
{
if (frame != null && Status == ZTransportStatus.Working && false == _send_queue.IsCompleted && false == _send_queue.IsAddingCompleted)
{
var data = MessageSerializer.Serialize(frame);
try
{
_send_queue.Add(NetworkStreamFastObfuscator.PrepareData(data));
return InvokeResult.Succeeding();
}
catch (ObjectDisposedException)
{
// Ignore
}
finally
{
frame?.Release();
}
}
return InvokeResult.Fault();
}
public InvokeResult SendBackward<T>(string inbox, T message)
{
var frame = FrameBuilder.BuildFrame<T>(message, inbox);
if (Status == ZTransportStatus.Working && false == _send_queue.IsCompleted && false == _send_queue.IsAddingCompleted)
{
var data = MessageSerializer.Serialize(frame);
try
{
_send_queue.Add(NetworkStreamFastObfuscator.PrepareData(data));
return InvokeResult.Succeeding();
}
catch (ObjectDisposedException)
{
// Ignore
}
finally
{
frame?.Release();
}
}
return InvokeResult.Fault();
}
private void SendFramesJob()
{
byte[] data;
while (Status == ZTransportStatus.Working)
{
if (_send_queue.IsCompleted)
{
return;
}
try
{
data = _send_queue.Take();
if (data != null && data.Length > 0)
{
_stream.Write(data, 0, data.Length);
_stream.Flush();
//Thread.Sleep(1);
LastNetworkActionTimestamp = DateTime.UtcNow.Ticks;
//NetworkStats.Send(data);
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Backward send error.");
Broken();
RizeConnectionBrokenEvent();
}
}
}
private void ReceiveAsyncCallback(IAsyncResult ar)
{
try
{
var count = _stream.EndRead(ar);
if (count > 0)
{
_parser.Push(_buffer, 0, count);
LastNetworkActionTimestamp = DateTime.UtcNow.Ticks;
}
if (Status == ZTransportStatus.Working)
{
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
}
catch (ObjectDisposedException)
{
/// Nothing
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Error read data");
Broken();
RizeConnectionBrokenEvent();
}
}
private void _parser_OnIncomingFrame(Frame frame)
{
if (frame == null || frame.Inbox == null) return;
if (frame.Inbox.Equals(DEFAULT_PING_INBOX, StringComparison.Ordinal))
{
SendBackward(frame);
}
else if (frame.IsRequest)
{
Frame response;
try
{
response = _requestor?.Invoke(frame, this);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Fault make response for request '{frame.FrameId}' to inbox '{frame.Inbox}'");
response = FrameBuilder.BuildResponseFrame<string>(ex.Message, frame, DEFAULT_REQUEST_ERROR_INBOX);
}
finally
{
frame?.Release();
}
if (response != null)
{
SendBackward(response);
}
}
else
{
try
{
_handler?.Invoke(frame, this);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Fault handle message '{frame.FrameId}' in inbox '{frame.Inbox}'");
}
finally
{
frame?.Release();
}
}
}
public override void Dispose()
{
if (Status == ZTransportStatus.Disposed)
{
return;
}
Disposed();
_send_queue.CompleteAdding();
_send_queue.Dispose();
this._stream.Flush();
this._stream.Close();
this._stream.Dispose();
}
public override int GetHashCode()
{
return Endpoint.GetHashCode();
}
public override bool Equals(object obj)
{
return this.Equals(obj as ZSocketServerClient);
}
public bool Equals(ZSocketServerClient other)
{
if (other == null) return false;
return this.Endpoint.Compare(other.Endpoint) == 0;
}
}
}

@ -44,7 +44,10 @@ namespace ZeroLevel.Services
foreach (KeyValuePair<string, string> key in tdict)
{
output = output.Replace(key.Value, key.Key);
if (key.Value.Length > 0)
{
output = output.Replace(key.Value, key.Key);
}
}
return output;
}

@ -1,12 +0,0 @@
using System;
namespace ZeroLevel.Services._Network
{
public interface ISocketClient
{
event Action<byte[], int> OnIncomingData;
void UseKeepAlive(TimeSpan period);
void Send(byte[] data);
byte[] Request(byte[] data);
}
}

@ -1,183 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using ZeroLevel.Services.Invokation;
namespace ZeroLevel.Services._Network
{
public class Router
: IRouter
{
#region Routing
private sealed class MRInvoker
{
/// <summary>
/// Creates a compiled expression for a quick method call, returns the identifier of the expression and a delegate for the call.
/// </summary>
private static Invoker CreateCompiledExpression(MethodInfo method)
{
var targetArg = Expression.Parameter(typeof(object)); // Target
var argsArg = Expression.Parameter(typeof(object[])); // Method's args
var parameters = method.GetParameters();
Expression body = Expression.Call(
method.IsStatic
? null
: Expression.Convert(targetArg, method.DeclaringType), // Method's type
method,
parameters.Select((p, i) =>
Expression.Convert(Expression.ArrayIndex(argsArg, Expression.Constant(i)), p.ParameterType)));
if (body.Type == typeof(void))
body = Expression.Block(body, Expression.Constant(null));
else if (body.Type.IsValueType)
body = Expression.Convert(body, typeof(object));
return Expression.Lambda<Invoker>(body, targetArg, argsArg).Compile();
}
private static Invoker CreateCompiledExpression(Delegate handler)
{
return CreateCompiledExpression(handler.GetMethodInfo());
}
private object _instance;
private Invoker _invoker;
private Type _typeReq;
private Type _typeResp;
private bool _noArguments = false;
public static MRInvoker Create(Action<long, IZBackward> handler)
{
return new MRInvoker
{
_noArguments = true,
_typeReq = null,
_typeResp = null,
_instance = handler.Target,
_invoker = CreateCompiledExpression(handler)
};
}
public static MRInvoker Create<T>(Action<T, long, IZBackward> handler)
{
return new MRInvoker
{
_typeReq = typeof(T),
_typeResp = null,
_instance = handler.Target,
_invoker = CreateCompiledExpression(handler)
};
}
public static MRInvoker Create<Treq, Tresp>(Func<Treq, long, IZBackward, Tresp> handler)
{
return new MRInvoker
{
_typeReq = typeof(Treq),
_typeResp = typeof(Tresp),
_instance = handler.Target,
_invoker = CreateCompiledExpression(handler)
};
}
public static MRInvoker Create<Tresp>(Func<long, IZBackward, Tresp> handler)
{
return new MRInvoker
{
_typeReq = null,
_typeResp = typeof(Tresp),
_instance = handler.Target,
_invoker = CreateCompiledExpression(handler)
};
}
public object Invoke(Frame frame, IZBackward client)
{
if (_typeResp == null)
{
var incoming = MessageSerializer.DeserializeCompatible(_typeReq, frame.Payload);
if (_noArguments)
{
this._invoker.Invoke(this._instance, new object[] { frame.FrameId, client });
}
else
{
this._invoker.Invoke(this._instance, new object[] { incoming, frame.FrameId, client });
}
}
else if (_typeReq == null)
{
return this._invoker.Invoke(this._instance, new object[] { frame.FrameId, client });
}
else
{
var incoming = MessageSerializer.DeserializeCompatible(_typeReq, frame.Payload);
return this._invoker.Invoke(this._instance, new object[] { incoming, frame.FrameId, client });
}
return null;
}
}
private readonly Dictionary<string, List<MRInvoker>> _handlers =
new Dictionary<string, List<MRInvoker>>();
private readonly Dictionary<string, MRInvoker> _requestors =
new Dictionary<string, MRInvoker>();
#endregion Routing
public void Incoming(FrameType type, byte[] data)
{
switch (type)
{
case FrameType.Message:
break;
case FrameType.Request:
break;
case FrameType.Response:
break;
}
}
public void RegisterInbox(string inbox, MessageHandler handler)
{
throw new System.NotImplementedException();
}
public void RegisterInbox<T>(string inbox, MessageHandler<T> handler)
{
throw new System.NotImplementedException();
}
public void RegisterInbox(MessageHandler handler)
{
throw new System.NotImplementedException();
}
public void RegisterInbox<T>(MessageHandler<T> handler)
{
throw new System.NotImplementedException();
}
public void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler)
{
throw new System.NotImplementedException();
}
public void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler)
{
throw new System.NotImplementedException();
}
public void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler)
{
throw new System.NotImplementedException();
}
public void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler)
{
throw new System.NotImplementedException();
}
}
}

@ -1,121 +0,0 @@
using System;
using ZeroLevel._Network;
namespace ZeroLevel.Services._Network
{
public class NetworkNode
: IClient, IRouter
{
private FrameParser _parser = new FrameParser();
private readonly ISocketClient _client;
private readonly IRouter _router;
private DateTime _lastConnectionTime;
public NetworkNode(ISocketClient client, IRouter router)
{
_lastConnectionTime = DateTime.UtcNow;
_client = client;
_router = router;
_parser.OnIncoming += _parser_OnIncoming;
_client.OnIncomingData += _readerWriter_OnIncomingData;
}
private void _readerWriter_OnIncomingData(byte[] data, int length)
{
_parser.Push(data, length);
}
private void _parser_OnIncoming(FrameType type, int identity, byte[] data)
{
switch (type)
{
case FrameType.KeepAlive:
_lastConnectionTime = DateTime.UtcNow;
break;
case FrameType.Message:
break;
case FrameType.Request:
break;
case FrameType.Response:
break;
}
}
public void Send(string inbox)
{
throw new System.NotImplementedException();
}
public void Send(string inbox, byte[] data)
{
throw new System.NotImplementedException();
}
public void Send<T>(string inbox, T message)
{
throw new System.NotImplementedException();
}
public byte[] Request(string inbox)
{
throw new System.NotImplementedException();
}
public byte[] Request(string inbox, byte[] data)
{
throw new System.NotImplementedException();
}
public Tresponse Request<Tresponse>(string inbox)
{
throw new System.NotImplementedException();
}
public Tresponse Request<Tresponse, Trequest>(string inbox, Trequest request)
{
throw new System.NotImplementedException();
}
#region IRouter
public void RegisterInbox(string inbox, MessageHandler handler)
{
_router.RegisterInbox(inbox, handler);
}
public void RegisterInbox<T>(string inbox, MessageHandler<T> handler)
{
_router.RegisterInbox<T>(inbox, handler);
}
public void RegisterInbox(MessageHandler handler)
{
_router.RegisterInbox(handler);
}
public void RegisterInbox<T>(MessageHandler<T> handler)
{
_router.RegisterInbox<T>(handler);
}
public void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler)
{
_router.RegisterInbox<Tresponse>(inbox, handler);
}
public void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler)
{
_router.RegisterInbox<Trequest, Tresponse>(inbox, handler);
}
public void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler)
{
_router.RegisterInbox<Tresponse>(handler);
}
public void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler)
{
_router.RegisterInbox<Trequest, Tresponse>(handler);
}
#endregion
}
}
Loading…
Cancel
Save

Powered by TurnKey Linux.