The moving to the next net version continues.
pull/1/head
unknown 5 years ago
parent 47660b3d88
commit 4e568df6d6

@ -7,6 +7,11 @@ namespace TestApp
public class MyService
: BaseZeroService
{
public MyService()
:base()
{
}
protected override void StartAction()
{
Log.Info("Started");

@ -1,4 +1,6 @@
using ZeroLevel;
using System;
using System.Net;
using ZeroLevel;
namespace TestApp
{
@ -6,7 +8,20 @@ namespace TestApp
{
private static void Main(string[] args)
{
Bootstrap.Startup<MyService>(args, () => Configuration.ReadSetFromIniFile("config.ini"));
var se = Bootstrap.Startup<MyService>(args,
() => Configuration.ReadSetFromIniFile("config.ini"))
.ReadServiceInfo()
//.UseDiscovery()
.Run();
var router = se.Service.UseHost(8800);
router.RegisterInbox<string, string>("upper", (c, s) => s.ToUpperInvariant());
var client = se.Service.ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800));
client.Request<string, string>("upper", "hello", s => Console.WriteLine(s));
se.WaitWhileStatus(ZeroServiceStatus.Running)
.Stop();
}
}
}

@ -90,6 +90,7 @@ namespace ZeroLevel.Discovery
}
#endregion Snapshot
private void Heartbeat(long taskid)
{
try
@ -148,7 +149,7 @@ namespace ZeroLevel.Discovery
Save();
}
public InvokeResult Append(ExServiceInfo serviceInfo, IZBackward client)
public InvokeResult Append(ExServiceInfo serviceInfo, ISocketClient client)
{
InvokeResult result = null;
var endpoint = $"{client.Endpoint.Address}:{serviceInfo.Port}";

@ -1,125 +0,0 @@
using ZeroLevel.Network;
namespace ZeroLevel.Services.Applications
{
public abstract class BaseZeroExchangeService
: BaseZeroService, IExchangeService
{
public string Key { get; private set; }
public string Version { get; private set; }
public string Group { get; private set; }
public string Type { get; private set; }
protected readonly IConfigurationSet _configSet;
protected IConfiguration _config => _configSet?.Default;
protected Exchange Exchange { get; }
private BaseZeroExchangeService()
{
}
protected BaseZeroExchangeService(IConfigurationSet configuration = null)
: base()
{
_configSet = configuration ?? Configuration.DefaultSet;
base.Name = ReadName();
this.Key = ReadKey();
this.Version = ReadVersion();
this.Group = ReadServiceGroup();
this.Type = ReadServiceType();
var discovery = _config.First("discovery");
Exchange = new Exchange(new DiscoveryClient(discovery));
}
private IExService _self_service = null;
private readonly object _self_create_lock = new object();
protected IExService Self
{
get
{
if (_self_service == null)
{
lock (_self_create_lock)
{
if (_self_service == null)
{
_self_service = Exchange.RegisterService(this);
}
}
}
return _self_service;
}
}
#region Config
private string ReadName()
{
return FindInConfig<string>(new[] { "ServiceName", "AppName" }, string.Empty, "service")
?? this.GetType().Name;
}
private string ReadKey()
{
return FindInConfig<string>(new[] { "ServiceKey", "AppKey" }, string.Empty, "service");
}
private string ReadVersion()
{
return FindInConfig<string>(new[] { "Version", "AppVersion" }, string.Empty, "service")
?? "1.0";
}
private string ReadServiceGroup()
{
return FindInConfig<string>(new[] { "DiscoveryGroup", "ServiceGroup" }, string.Empty, "service")
?? ExServiceInfo.DEFAULT_GROUP_NAME;
}
private string ReadServiceType()
{
return FindInConfig<string>(new[] { "DiscoveryType", "ServiceType" }, string.Empty, "service")
?? ExServiceInfo.DEFAULT_TYPE_NAME;
}
protected T FindInConfig<T>(string[] keys, params string[] sections)
{
foreach (var section in sections)
{
if (string.IsNullOrWhiteSpace(section))
{
foreach (var key in keys)
{
if (_configSet.Default.Contains(key))
{
return _configSet.Default.First<T>(key);
}
}
}
else if (_configSet.ContainsSection(section))
{
foreach (var key in keys)
{
if (_configSet[section].Contains(key))
{
return _configSet[section].First<T>(key);
}
}
}
}
return default(T);
}
#endregion Config
public string Endpoint { get; private set; }
protected override void StopAction()
{
this.Exchange.Dispose();
}
}
}

@ -1,5 +1,8 @@
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Threading;
using ZeroLevel.Network;
namespace ZeroLevel.Services.Applications
{
@ -7,8 +10,13 @@ namespace ZeroLevel.Services.Applications
: IZeroService
{
public string Name { get; protected set; }
public ZeroServiceState State => _state;
private ZeroServiceState _state;
public string Key { get; private set; }
public string Version { get; private set; }
public string Group { get; private set; }
public string Type { get; private set; }
public ZeroServiceStatus Status => _state;
private ZeroServiceStatus _state;
protected BaseZeroService()
{
@ -20,36 +28,179 @@ namespace ZeroLevel.Services.Applications
Name = name;
}
private ManualResetEvent InteraciveModeWorkingFlag = new ManualResetEvent(false);
protected abstract void StartAction();
protected abstract void StopAction();
#region Config
private const string DEFAULT_GROUP_NAME = "__default_group__";
private const string DEFAULT_TYPE_NAME = "__default_type__";
public void ReadServiceInfo()
{
this.Name = ReadName();
this.Key = ReadKey();
this.Version = ReadVersion();
this.Group = ReadServiceGroup();
this.Type = ReadServiceType();
}
public void ReadServiceInfo(IConfigurationSet set)
{
this.Name = ReadName(set);
this.Key = ReadKey(set);
this.Version = ReadVersion(set);
this.Group = ReadServiceGroup(set);
this.Type = ReadServiceType(set);
}
private string ReadName(IConfigurationSet set = null)
{
return FindInConfig<string>(set, new[] { "ServiceName", "AppName" }, string.Empty, "service")
?? this.GetType().Name;
}
private string ReadKey(IConfigurationSet set = null)
{
return FindInConfig<string>(set, new[] { "ServiceKey", "AppKey" }, string.Empty, "service");
}
private string ReadVersion(IConfigurationSet set = null)
{
return FindInConfig<string>(set, new[] { "Version", "AppVersion" }, string.Empty, "service")
?? "1.0";
}
private string ReadServiceGroup(IConfigurationSet set = null)
{
return FindInConfig<string>(set, new[] { "DiscoveryGroup", "ServiceGroup" }, string.Empty, "service")
?? DEFAULT_GROUP_NAME;
}
private string ReadServiceType(IConfigurationSet set = null)
{
return FindInConfig<string>(set, new[] { "DiscoveryType", "ServiceType" }, string.Empty, "service")
?? DEFAULT_TYPE_NAME;
}
protected T FindInConfig<T>(IConfigurationSet set, string[] keys, params string[] sections)
{
var configSet = set ?? Configuration.DefaultSet;
foreach (var section in sections)
{
if (string.IsNullOrWhiteSpace(section))
{
foreach (var key in keys)
{
if (configSet.Default.Contains(key))
{
return configSet.Default.First<T>(key);
}
}
}
else if (configSet.ContainsSection(section))
{
foreach (var key in keys)
{
if (configSet[section].Contains(key))
{
return configSet[section].First<T>(key);
}
}
}
}
return default(T);
}
#endregion Config
#region Network
private IRouter _router;
private static IRouter _null_router = new NullRouter();
private IDiscoveryClient _discoveryClient;
public void UseDiscovery()
{
var discovery = Configuration.Default.First("discovery");
_discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(discovery), _null_router, false));
}
public void UseDiscovery(string endpoint)
{
_discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), _null_router, false));
}
public void UseDiscovery(IPEndPoint endpoint)
{
_discoveryClient = new DiscoveryClient(GetClient(endpoint, _null_router, false));
}
public IRouter UseHost()
{
if (_state == ZeroServiceStatus.Running)
{
return GetServer(new IPEndPoint(NetUtils.GetNonLoopbackAddress(), NetUtils.GetFreeTcpPort()), new Router()).Router;
}
return _null_router;
}
public IRouter UseHost(int port)
{
if (_state == ZeroServiceStatus.Running)
{
return GetServer(new IPEndPoint(NetUtils.GetNonLoopbackAddress(), port), new Router()).Router;
}
return _null_router;
}
public IRouter UseHost(IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running)
{
return GetServer(endpoint, new Router()).Router;
}
return _null_router;
}
public ExClient ConnectToService(string endpoint)
{
if (_state == ZeroServiceStatus.Running)
{
return new ExClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), new Router(), true));
}
return null;
}
public ExClient ConnectToService(IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running)
{
return new ExClient(GetClient(endpoint, new Router(), true));
}
return null;
}
#endregion
#region Service control
public void Start()
{
InteraciveModeWorkingFlag.Reset();
if (_state == ZeroServiceState.Initialized)
if (_state == ZeroServiceStatus.Initialized)
{
try
{
StartAction();
_state = ZeroServiceState.Started;
_state = ZeroServiceStatus.Running;
Log.Debug($"[{Name}] Service started");
}
catch (Exception ex)
{
Log.Fatal(ex, $"[{Name}] Failed to start service");
InteraciveModeWorkingFlag.Set();
}
}
try
}
public void Stop()
{
while (false == InteraciveModeWorkingFlag.WaitOne(2000))
if (_state == ZeroServiceStatus.Running)
{
}
}
catch { }
_state = ZeroServiceState.Stopped;
try
{
StopAction();
@ -60,10 +211,115 @@ namespace ZeroLevel.Services.Applications
Log.Fatal(ex, $"[{Name}] Failed to stop service");
}
}
_state = ZeroServiceStatus.Stopped;
}
public void Stop()
public void WaitForStatus(ZeroServiceStatus status)
{
var start = DateTime.UtcNow;
while (this.Status != status)
{
InteraciveModeWorkingFlag.Set();
Thread.Sleep(150);
}
}
public void WaitForStatus(ZeroServiceStatus status, TimeSpan period)
{
var start = DateTime.UtcNow;
while (this.Status != status && (DateTime.UtcNow - start) < period)
{
Thread.Sleep(150);
}
}
public void WaitWhileStatus(ZeroServiceStatus status)
{
var start = DateTime.UtcNow;
while (this.Status == status)
{
Thread.Sleep(150);
}
}
public void WaitWhileStatus(ZeroServiceStatus status, TimeSpan period)
{
var start = DateTime.UtcNow;
while (this.Status == status && (DateTime.UtcNow - start) < period)
{
Thread.Sleep(150);
}
}
#endregion
#region Utils
private static readonly ConcurrentDictionary<string, ISocketClient> _clientInstances = new ConcurrentDictionary<string, ISocketClient>();
private readonly ConcurrentDictionary<string, SocketServer> _serverInstances = new ConcurrentDictionary<string, SocketServer>();
private ISocketClient GetClient(IPEndPoint endpoint, IRouter router, bool use_cachee)
{
if (use_cachee)
{
string key = $"{endpoint.Address}:{endpoint.Port}";
ISocketClient instance = null;
if (_clientInstances.ContainsKey(key))
{
instance = _clientInstances[key];
if (instance.Status == SocketClientStatus.Working)
{
return instance;
}
_clientInstances.TryRemove(key, out instance);
instance.Dispose();
instance = null;
}
instance = new SocketClient(endpoint, router);
_clientInstances[key] = instance;
return instance;
}
return new SocketClient(endpoint, router);
}
private SocketServer GetServer(IPEndPoint endpoint, IRouter router)
{
string key = $"{endpoint.Address}:{endpoint.Port}";
if (_serverInstances.ContainsKey(key))
{
return _serverInstances[key];
}
var instance = new SocketServer(endpoint, router);
_serverInstances[key] = instance;
return instance;
}
#endregion
public void Dispose()
{
_state = ZeroServiceStatus.Disposed;
foreach (var client in _clientInstances)
{
try
{
client.Value.Dispose();
}
catch (Exception ex)
{
Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketClient to endpoint {client.Key}");
}
}
foreach (var server in _serverInstances)
{
try
{
server.Value.Dispose();
}
catch (Exception ex)
{
Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketServer with endpoint {server.Key}");
}
}
}
}
}

@ -1,12 +1,87 @@
using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Reflection;
namespace ZeroLevel
{
public static class Bootstrap
{
public interface IServiceExecution
{
IServiceExecution Run();
IServiceExecution WaitForStatus(ZeroServiceStatus status);
IServiceExecution WaitForStatus(ZeroServiceStatus status, TimeSpan period);
IServiceExecution WaitWhileStatus(ZeroServiceStatus status);
IServiceExecution WaitWhileStatus(ZeroServiceStatus status, TimeSpan period);
IServiceExecution Stop();
IZeroService Service { get; }
ZeroServiceStatus Status { get; }
}
public class BootstrapFluent
: IServiceExecution
{
private readonly IZeroService _service;
public IZeroService Service { get { return _service; } }
public BootstrapFluent(IZeroService service)
{
_service = service;
}
public BootstrapFluent UseDiscovery() { _service?.UseDiscovery(); return this; }
public BootstrapFluent UseDiscovery(string url) { _service?.UseDiscovery(url); return this; }
public BootstrapFluent UseDiscovery(IPEndPoint endpoint) { _service?.UseDiscovery(endpoint); return this; }
/* public BootstrapFluent UseHost() { _service?.UseHost(); return this; }
public BootstrapFluent UseHost(int port) { _service?.UseHost(port); return this; }
public BootstrapFluent UseHost(IPEndPoint endpoint) { _service?.UseHost(endpoint); return this; }
public BootstrapFluent ConnectToService(string url) { _service.ConnectToService(url); return this; }
public BootstrapFluent ConnectToService(IPEndPoint endpoint) { _service.ConnectToService(endpoint); return this; }
*/
public BootstrapFluent ReadServiceInfo() { _service?.ReadServiceInfo(); return this; }
public BootstrapFluent ReadServiceInfo(IConfigurationSet config) { _service?.ReadServiceInfo(config); return this; }
public ZeroServiceStatus Status { get { return _service.Status; } }
public IServiceExecution Run() { _service.Start(); return this; }
public IServiceExecution Stop()
{
try
{
_service?.Stop();
}
catch (Exception ex)
{
Log.Error(ex, $"[Bootstrap] Service {_service?.Name} dispose error");
}
return this;
}
public IServiceExecution WaitForStatus(ZeroServiceStatus status)
{
_service.WaitForStatus(status);
return this;
}
public IServiceExecution WaitForStatus(ZeroServiceStatus status, TimeSpan period)
{
_service.WaitForStatus(status, period);
return this;
}
public IServiceExecution WaitWhileStatus(ZeroServiceStatus status)
{
_service.WaitWhileStatus(status);
return this;
}
public IServiceExecution WaitWhileStatus(ZeroServiceStatus status, TimeSpan period)
{
_service.WaitWhileStatus(status, period);
return this;
}
}
static Bootstrap()
{
// Tricks for minimize config changes for dependency resolve
@ -22,10 +97,6 @@ namespace ZeroLevel
{
return Assembly.LoadFile(Path.Combine(Configuration.BaseDirectory, "Newtonsoft.Json.dll"));
}
else if (args.Name.Equals("Microsoft.Owin", StringComparison.Ordinal))
{
return Assembly.LoadFile(Path.Combine(Configuration.BaseDirectory, "Microsoft.Owin.dll"));
}
var candidates = Directory.GetFiles(Path.Combine(Configuration.BaseDirectory), args.Name, SearchOption.TopDirectoryOnly);
if (candidates != null && candidates.Any())
{
@ -39,32 +110,24 @@ namespace ZeroLevel
return null;
}
public static void Startup<T>(string[] args,
public static BootstrapFluent Startup<T>(string[] args,
Func<bool> preStartConfiguration = null,
Func<bool> postStartConfiguration = null)
where T : IZeroService
{
var service = Initialize<T>(args, Configuration.ReadSetFromApplicationConfig(),
preStartConfiguration, postStartConfiguration);
if (service != null)
{
service.Start();
Shutdown(service);
}
return new BootstrapFluent(service);
}
public static void Startup<T>(string[] args,
public static BootstrapFluent Startup<T>(string[] args,
Func<IConfigurationSet> configuration,
Func<bool> preStartConfiguration = null,
Func<bool> postStartConfiguration = null)
where T : IZeroService
{
var service = Initialize<T>(args, configuration(), preStartConfiguration, postStartConfiguration);
if (service != null)
{
service.Start();
Shutdown(service);
}
return new BootstrapFluent(service);
}
private static IZeroService Initialize<T>(string[] args,
@ -121,12 +184,11 @@ namespace ZeroLevel
return service;
}
private static void Shutdown(IZeroService service)
private static void Shutdown()
{
try { Sheduller.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose default sheduller error"); }
try { Log.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose log error"); }
try { Injector.Default.Dispose(); Injector.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose DI containers error"); }
try { (service as IDisposable)?.Dispose(); } catch (Exception ex) { Log.Error(ex, $"[Bootstrap] Service {service?.Name} dispose error"); }
}
}
}

@ -1,10 +1,38 @@
namespace ZeroLevel
using System;
using System.Net;
using ZeroLevel.Network;
namespace ZeroLevel
{
public interface IZeroService
: IDisposable
{
ZeroServiceState State { get; }
string Name { get; }
string Key { get; }
string Version { get; }
string Group { get; }
string Type { get; }
void UseDiscovery();
void UseDiscovery(string url);
void UseDiscovery(IPEndPoint endpoint);
IRouter UseHost();
IRouter UseHost(int port);
IRouter UseHost(IPEndPoint endpoint);
ExClient ConnectToService(string url);
ExClient ConnectToService(IPEndPoint endpoint);
void ReadServiceInfo();
void ReadServiceInfo(IConfigurationSet config);
void WaitForStatus(ZeroServiceStatus status);
void WaitForStatus(ZeroServiceStatus status, TimeSpan period);
void WaitWhileStatus(ZeroServiceStatus status);
void WaitWhileStatus(ZeroServiceStatus status, TimeSpan period);
ZeroServiceStatus Status { get; }
void Start();

@ -0,0 +1,17 @@
using System;
using ZeroLevel.Models;
namespace ZeroLevel.Network
{
public interface IClient
{
InvokeResult Send(string inbox);
InvokeResult Send(string inbox, byte[] data);
InvokeResult Send<T>(string inbox, T message);
InvokeResult Request(string inbox, Action<byte[]> callback);
InvokeResult Request(string inbox, byte[] data, Action<byte[]> callback);
InvokeResult Request<Tresponse>(string inbox, Action<Tresponse> callback);
InvokeResult Request<Trequest, Tresponse>(string inbox, Trequest request, Action<Tresponse> callback);
}
}

@ -1,12 +0,0 @@
namespace ZeroLevel.Network
{
public interface IExchangeService
{
string Name { get; }
string Key { get; }
string Endpoint { get; }
string Version { get; }
string Group { get; }
string Type { get; }
}
}

@ -1,38 +1,9 @@
using System;
using ZeroLevel.Models;
namespace ZeroLevel.Network
namespace ZeroLevel.Network
{
public interface IRouter
: IServer
{
#region Messages
void RegisterInbox(string inbox, MessageHandler handler);
void RegisterInbox<T>(string inbox, MessageHandler<T> handler);
// Default inboxe
void RegisterInbox(MessageHandler handler);
void RegisterInbox<T>(MessageHandler<T> handler);
#endregion
#region Requests
void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler);
void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler);
// Default inboxe
void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler);
void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler);
#endregion
}
public interface IClient
{
InvokeResult Send(string inbox);
InvokeResult Send(string inbox, byte[] data);
InvokeResult Send<T>(string inbox, T message);
InvokeResult Request(string inbox, Action<byte[]> callback);
InvokeResult Request(string inbox, byte[] data, Action<byte[]> callback);
InvokeResult Request<Tresponse>(string inbox, Action<Tresponse> callback);
InvokeResult Request<Trequest, Tresponse>(string inbox, Trequest request, Action<Tresponse> callback);
void HandleMessage(Frame frame, ISocketClient client);
byte[] HandleRequest(Frame frame, ISocketClient client);
}
}

@ -0,0 +1,23 @@
namespace ZeroLevel.Network
{
public interface IServer
{
#region Messages
void RegisterInbox(string inbox, MessageHandler handler);
void RegisterInbox<T>(string inbox, MessageHandler<T> handler);
// Default inboxe
void RegisterInbox(MessageHandler handler);
void RegisterInbox<T>(MessageHandler<T> handler);
#endregion
#region Requests
void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler);
void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler);
// Default inboxe
void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler);
void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler);
#endregion
}
}

@ -12,6 +12,8 @@ namespace ZeroLevel.Network
IPEndPoint Endpoint { get; }
SocketClientStatus Status { get; }
IRouter Router { get; }
void ForceConnect();
void UseKeepAlive(TimeSpan period);
void Send(Frame data);

@ -7,7 +7,10 @@ using ZeroLevel.Services.Collections;
namespace ZeroLevel.Network
{
internal sealed class DCRouter
public class DiscoveryClient
: IDiscoveryClient
{
private sealed class DCRouter
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private IEnumerable<ServiceEndpointInfo> _empty = Enumerable.Empty<ServiceEndpointInfo>();
@ -123,16 +126,12 @@ namespace ZeroLevel.Network
}
}
public class DiscoveryClient
: IDiscoveryClient
{
private readonly DCRouter _router = new DCRouter();
private readonly NetworkNode _discoveryServerClient;
private readonly ExClient _discoveryServerClient;
public DiscoveryClient(string endpoint)
public DiscoveryClient(ISocketClient client)
{
_discoveryServerClient = ExchangeTransportFactory.GetClient(endpoint);
_discoveryServerClient = new ExClient(client);
UpdateServiceListInfo();
Sheduller.RemindEvery(TimeSpan.FromSeconds(30), UpdateServiceListInfo);
}
@ -140,7 +139,7 @@ namespace ZeroLevel.Network
private void UpdateServiceListInfo()
{
_discoveryServerClient.ForceConnect();
if (_discoveryServerClient.Status == ZTransportStatus.Working)
if (_discoveryServerClient.Status == SocketClientStatus.Working)
{
try
{
@ -164,7 +163,7 @@ namespace ZeroLevel.Network
public bool Register(ExServiceInfo info)
{
_discoveryServerClient.ForceConnect();
if (_discoveryServerClient.Status == ZTransportStatus.Working)
if (_discoveryServerClient.Status == SocketClientStatus.Working)
{
bool result = false;
try

@ -5,45 +5,17 @@ using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
public class NetworkNode
: IClient, IRouter, IDisposable
public sealed class ExClient
: IClient, IDisposable
{
private FrameParser _parser = new FrameParser();
private readonly ISocketClient _client;
private readonly Router _router;
private DateTime _lastConnectionTime;
public IPEndPoint EndPoint => _client?.Endpoint;
public SocketClientStatus Status => _client.Status;
public IRouter Router => _client.Router;
public NetworkNode(ISocketClient client)
public ExClient(ISocketClient client)
{
_lastConnectionTime = DateTime.UtcNow;
_client = client;
_router = new Router();
_parser.OnIncoming += _parser_OnIncoming;
_client.OnIncomingData += _readerWriter_OnIncomingData;
}
private void _readerWriter_OnIncomingData(ISocketClient client, byte[] data, int length)
{
_parser.Push(data, length);
}
private void _parser_OnIncoming(FrameType type, int identity, byte[] data)
{
switch (type)
{
case FrameType.KeepAlive:
_lastConnectionTime = DateTime.UtcNow;
break;
case FrameType.Message:
_router.HandleMessage(MessageSerializer.Deserialize<Frame>(data), _client);
break;
case FrameType.Request:
var response = _router.HandleRequest(MessageSerializer.Deserialize<Frame>(data), _client);
_client.Response(response, identity);
break;
}
}
public void ForceConnect() => _client.ForceConnect();
@ -147,48 +119,6 @@ namespace ZeroLevel.Network
return InvokeResult.Succeeding();
}
#region IRouter
public void RegisterInbox(string inbox, MessageHandler handler)
{
_router.RegisterInbox(inbox, handler);
}
public void RegisterInbox<T>(string inbox, MessageHandler<T> handler)
{
_router.RegisterInbox<T>(inbox, handler);
}
public void RegisterInbox(MessageHandler handler)
{
_router.RegisterInbox(handler);
}
public void RegisterInbox<T>(MessageHandler<T> handler)
{
_router.RegisterInbox<T>(handler);
}
public void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler)
{
_router.RegisterInbox<Tresponse>(inbox, handler);
}
public void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler)
{
_router.RegisterInbox<Trequest, Tresponse>(inbox, handler);
}
public void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler)
{
_router.RegisterInbox<Tresponse>(handler);
}
public void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler)
{
_router.RegisterInbox<Trequest, Tresponse>(handler);
}
#endregion
public void Dispose()
{
_client.Dispose();

@ -1,114 +0,0 @@
using System;
using System.Net;
namespace ZeroLevel.Network
{
internal sealed class ExService
: ZBaseNetwork, IExService
{
private readonly ExRouter _router;
private readonly IZObservableServer _server;
public event Action<ISocketClient> OnConnect = c => { };
public event Action<ISocketClient> OnDisconnect = c => { };
public ExService(IZObservableServer server)
{
_server = server ?? throw new ArgumentNullException(nameof(server));
_router = new ExRouter();
_server.OnMessage += _server_OnMessage;
_server.OnRequest += _server_OnRequest;
_server.OnConnect += _server_OnConnect;
_server.OnDisconnect += _server_OnDisconnect;
}
private void _server_OnDisconnect(ISocketClient client)
{
this.OnDisconnect(client);
}
private void _server_OnConnect(ISocketClient client)
{
this.OnConnect(client);
}
private Frame _server_OnRequest(Frame frame, IZBackward client)
{
return _router.HandleRequest(frame, client);
}
private void _server_OnMessage(Frame frame, IZBackward client)
{
_router.HandleMessage(frame, client);
}
public IPEndPoint Endpoint => _server.Endpoint;
/// <summary>
/// Registering an Inbox Handler
/// </summary>
/// <typeparam name="T">Message type</typeparam>
/// <param name="inbox">Inbox name</param>
/// <param name="handler">Handler</param>
public void RegisterInbox<T>(string inbox, Action<T, long, IZBackward> handler)
{
_router.RegisterInbox(inbox, handler);
}
public void RegisterInbox<T>(Action<T, long, IZBackward> handler)
{
_router.RegisterInbox(DEFAULT_MESSAGE_INBOX, handler);
}
/// <summary>
/// Registration method responding to an incoming request
/// </summary>
/// <typeparam name="Treq">Type of input message</typeparam>
/// <typeparam name="Tresp">Type of response</typeparam>
/// <param name="protocol">Protocol</param>
/// <param name="inbox">Inbox name</param>
/// <param name="replier">Handler</param>
public void RegisterInbox<Treq, Tresp>(string inbox, Func<Treq, long, IZBackward, Tresp> handler)
{
_router.RegisterInbox<Treq, Tresp>(inbox, handler);
}
public void RegisterInbox<Treq, Tresp>(Func<Treq, long, IZBackward, Tresp> handler)
{
_router.RegisterInbox<Treq, Tresp>(DEFAULT_REQUEST_INBOX, handler);
}
/// <summary>
/// Registration of the method of responding to the incoming request, not receiving incoming data
/// </summary>
/// <typeparam name="Tresp">Type of response</typeparam>
/// <param name="protocol">Protocol</param>
/// <param name="inbox">Inbox name</param>
/// <param name="replier">Handler</param>
public void RegisterInbox<Tresp>(string inbox, Func<long, IZBackward, Tresp> handler)
{
_router.RegisterInbox<Tresp>(inbox, handler);
}
public void RegisterInbox<Tresp>(Func<long, IZBackward, Tresp> handler)
{
_router.RegisterInbox<Tresp>(DEFAULT_REQUEST_INBOX, handler);
}
public void RegisterInbox(string inbox, Action<long, IZBackward> handler)
{
_router.RegisterInbox(inbox, handler);
}
public void RegisterInbox(Action<long, ISocketClient> handler)
{
_router.RegisterInbox(DEFAULT_REQUEST_INBOX, handler);
}
public override void Dispose()
{
_server.Dispose();
}
}
}

@ -7,6 +7,7 @@ using System.Reflection;
namespace ZeroLevel.Network
{
/*
public sealed class ExServiceHost
: IDisposable
{
@ -512,4 +513,5 @@ namespace ZeroLevel.Network
}
}
}
*/
}

@ -6,6 +6,7 @@ using System.Threading.Tasks;
namespace ZeroLevel.Network
{
/*
/// <summary>
/// Provides data exchange between services
/// </summary>
@ -28,12 +29,12 @@ namespace ZeroLevel.Network
/// <summary>
/// Registration service
/// </summary>
public IExService RegisterService(IExchangeService service)
public IExchangeService RegisterService(IExchangeService service)
{
return _host.RegisterService(service);
}
public IExService RegisterService(ExServiceInfo service)
public IExchangeService RegisterService(ExServiceInfo service)
{
return _host.RegisterService(service);
}
@ -595,7 +596,7 @@ namespace ZeroLevel.Network
#region Private
private IEnumerable<Tresp> _RequestBroadcast<Treq, Tresp>(List<NetworkNode> clients, string inbox, Treq data)
private IEnumerable<Tresp> _RequestBroadcast<Treq, Tresp>(List<ExClient> clients, string inbox, Treq data)
{
var response = new List<Tresp>();
using (var waiter = new CountdownEvent(clients.Count))
@ -623,7 +624,7 @@ namespace ZeroLevel.Network
return response;
}
private IEnumerable<Tresp> _RequestBroadcast<Tresp>(List<NetworkNode> clients, string inbox)
private IEnumerable<Tresp> _RequestBroadcast<Tresp>(List<ExClient> clients, string inbox)
{
var response = new List<Tresp>();
using (var waiter = new CountdownEvent(clients.Count))
@ -660,4 +661,5 @@ namespace ZeroLevel.Network
this._host.Dispose();
}
}
*/
}

@ -1,49 +0,0 @@
using System.Collections.Concurrent;
using System.Net;
namespace ZeroLevel.Network
{
public static class ExchangeTransportFactory
{
private static readonly ConcurrentDictionary<string, NetworkNode> _clientInstances = new ConcurrentDictionary<string, NetworkNode>();
/// <summary>
/// Creates a server to receive messages using the specified protocol
/// </summary>
/// <param name="protocol">Protocol</param>
/// <returns>Server</returns>
public static IExService GetServer(int port = -1)
{
return new ExService(new ZExSocketObservableServer(new System.Net.IPEndPoint(IPAddress.Any, port == -1 ? NetUtils.GetFreeTcpPort() : port)));
}
/// <summary>
/// Creates a client to access the server using the specified protocol
/// </summary>
/// <param name="protocol">Protocol</param>
/// <param name="endpoint">Server endpoint</param>
/// <returns>Client</returns>
public static NetworkNode GetClientWithCache(string endpoint)
{
NetworkNode instance = null;
if (_clientInstances.ContainsKey(endpoint))
{
instance = _clientInstances[endpoint];
if (instance.Status == SocketClientStatus.Working)
{
return instance;
}
_clientInstances.TryRemove(endpoint, out instance);
instance.Dispose();
instance = null;
}
instance = GetClient(endpoint);
_clientInstances[endpoint] = instance;
return instance;
}
public static NetworkNode GetClient(string endpoint)
{
return new NetworkNode(new SocketClient(NetUtils.CreateIPEndPoint(endpoint)));
}
}
}

@ -27,7 +27,7 @@ namespace ZeroLevel.Network.FileTransfer
_currentFileTransfers = 0;*/
}
protected void PushTransferTask(string filePath, Action<string> completeHandler = null, Action<string, string> errorHandler = null, IZBackward client = null)
protected void PushTransferTask(string filePath, Action<string> completeHandler = null, Action<string, string> errorHandler = null, ExClient client = null)
{
if (string.IsNullOrWhiteSpace(filePath))
{

@ -7,12 +7,12 @@ namespace ZeroLevel.Network.FileTransfer
public sealed class FileClient
: BaseFileTransfer, IFileClient
{
private readonly NetworkNode _client;
private readonly ExClient _client;
private readonly string _baseFolder;
private readonly ClientFolderNameMapper _nameMapper;
private readonly bool _disposeClient;
internal FileClient(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
internal FileClient(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
: base(baseFolder)
{
_client = client ?? throw new Exception(nameof(client));
@ -20,9 +20,9 @@ namespace ZeroLevel.Network.FileTransfer
_nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper));
_disposeClient = disposeClient;
_client.RegisterInbox<FileStartFrame>("__upload_file_start", (c, f) => Receiver.Incoming(f, nameMapper(c)));
_client.RegisterInbox<FileFrame>("__upload_file_frame", (c, f) => Receiver.Incoming(f));
_client.RegisterInbox<FileEndFrame>("__upload_file_complete", (c, f) => Receiver.Incoming(f));
_client.Router.RegisterInbox<FileStartFrame>("__upload_file_start", (c, f) => Receiver.Incoming(f, nameMapper(c)));
_client.Router.RegisterInbox<FileFrame>("__upload_file_frame", (c, f) => Receiver.Incoming(f));
_client.Router.RegisterInbox<FileEndFrame>("__upload_file_complete", (c, f) => Receiver.Incoming(f));
}
public void Dispose()

@ -6,16 +6,16 @@ namespace ZeroLevel.Network.FileTransfer
{
public static IFileClient Create(string serverEndpoint, string baseFolder, ClientFolderNameMapper nameMapper = null)
{
return CreateFileServerClient(ExchangeTransportFactory.GetClient(serverEndpoint), baseFolder,
nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true);
return null;/* CreateFileServerClient(ExchangeTransportFactory.GetClient(serverEndpoint), baseFolder,
nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true);*/
}
public static IFileClient Create(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper = null)
public static IFileClient Create(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper = null)
{
return CreateFileServerClient(client, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false);
}
private static IFileClient CreateFileServerClient(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
private static IFileClient CreateFileServerClient(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
{
return new FileClient(client, baseFolder, nameMapper, disposeClient);
}

@ -6,12 +6,12 @@ namespace ZeroLevel.Network.FileTransfer
public sealed class FileServer
: BaseFileTransfer, IFileServer
{
private readonly IExService _service;
private readonly IRouter _service;
private readonly string _baseFolder;
private readonly ServerFolderNameMapperDelegate _nameMapper;
private readonly bool _disposeService;
internal FileServer(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService)
internal FileServer(IRouter service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService)
: base(baseFolder)
{
_service = service ?? throw new Exception(nameof(service));
@ -19,20 +19,12 @@ namespace ZeroLevel.Network.FileTransfer
_nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper));
_disposeService = disposeService;
_service.RegisterInbox<FileStartFrame, InvokeResult>("__upload_file_start", (f, _, client) => Receiver.Incoming(f, nameMapper(client)));
_service.RegisterInbox<FileFrame, InvokeResult>("__upload_file_frame", (f, _, __) => Receiver.Incoming(f));
_service.RegisterInbox<FileEndFrame, InvokeResult>("__upload_file_complete", (f, _, __) => Receiver.Incoming(f));
_service.RegisterInbox<FileStartFrame, InvokeResult>("__upload_file_start", (client, f) => Receiver.Incoming(f, nameMapper(client)));
_service.RegisterInbox<FileFrame, InvokeResult>("__upload_file_frame", (client, f) => Receiver.Incoming(f));
_service.RegisterInbox<FileEndFrame, InvokeResult>("__upload_file_complete", (client, f) => Receiver.Incoming(f));
}
public void Dispose()
{
if (_disposeService)
{
_service?.Dispose();
}
}
public void Send(ISocketClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
public void Send(ExClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
{
PushTransferTask(fileName, completeHandler, errorHandler, client);
}
@ -41,18 +33,18 @@ namespace ZeroLevel.Network.FileTransfer
{
Log.Info($"Start upload file {reader.Path}");
var startinfo = reader.GetStartInfo();
if (false == task.Client.SendBackward<FileStartFrame>("__upload_file_start", startinfo).Success)
if (false == task.Client.Send<FileStartFrame>("__upload_file_start", startinfo).Success)
{
return;
}
foreach (var chunk in reader.Read())
{
if (task.Client.SendBackward<FileFrame>("__upload_file_frame", chunk).Success == false)
if (task.Client.Send<FileFrame>("__upload_file_frame", chunk).Success == false)
{
return;
}
}
task.Client.SendBackward<FileEndFrame>("__upload_file_complete", reader.GetCompleteInfo());
task.Client.Send<FileEndFrame>("__upload_file_complete", reader.GetCompleteInfo());
Log.Debug($"Stop upload file {reader.Path}");
}
}

@ -7,17 +7,17 @@ namespace ZeroLevel.Network.FileTransfer
{
public static IFileServer Create(int port, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null)
{
return CreateFileServer(ExchangeTransportFactory.GetServer(port), baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true);
return null;// CreateFileServer(ExchangeTransportFactory.GetServer(port), baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true);
}
public static IFileServer Create(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null)
public static IFileServer Create(IZeroService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null)
{
return CreateFileServer(service, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false);
return null;// CreateFileServer(service, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false);
}
private static IFileServer CreateFileServer(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService)
private static IFileServer CreateFileServer(IZeroService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService)
{
return new FileServer(service, baseFolder, nameMapper, disposeService);
return null;// new FileServer(service, baseFolder, nameMapper, disposeService);
}
}
}

@ -4,8 +4,7 @@ using ZeroLevel.Network;
namespace ZeroLevel.Network.FileTransfer
{
public interface IFileServer
: IDisposable
{
void Send(ISocketClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null);
void Send(ExClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null);
}
}

@ -7,6 +7,6 @@ namespace ZeroLevel.Network.FileTransfer
public string FilePath;
public Action<string> CompletedHandler;
public Action<string, string> ErrorHandler;
public ISocketClient Client;
public ExClient Client;
}
}

@ -12,6 +12,7 @@ namespace ZeroLevel.Network
{
#region Private
private readonly IRouter _router;
private Socket _clientSocket;
private NetworkStream _stream;
private FrameParser _parser = new FrameParser();
@ -33,10 +34,13 @@ namespace ZeroLevel.Network
}
#endregion Private
public IRouter Router { get { return _router; } }
public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } }
public SocketClient(IPEndPoint ep)
public SocketClient(IPEndPoint ep, IRouter router)
{
_router = router;
Endpoint = ep;
_parser.OnIncoming += _parser_OnIncoming;
_sendThread = new Thread(SendFramesJob);
@ -44,8 +48,9 @@ namespace ZeroLevel.Network
_sendThread.Start();
}
public SocketClient(Socket socket)
public SocketClient(Socket socket, IRouter router)
{
_router = router;
_socket_freezed = true;
_clientSocket = socket;
Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint;
@ -151,16 +156,23 @@ namespace ZeroLevel.Network
switch (type)
{
case FrameType.KeepAlive:
_last_rw_time = DateTime.UtcNow.Ticks;
break;
// Nothing
return;
case FrameType.Message:
_router?.HandleMessage(MessageSerializer.Deserialize<Frame>(data), this);
break;
case FrameType.Request:
OnIncomingData(this, data, identity);
var response = _router?.HandleRequest(MessageSerializer.Deserialize<Frame>(data), this);
if (response != null)
{
this.Response(response, identity);
}
break;
case FrameType.Response:
_requests.Success(identity, MessageSerializer.Deserialize<Frame>(data));
break;
}
OnIncomingData(this, data, identity);
}
catch (Exception ex)
{
@ -227,7 +239,7 @@ namespace ZeroLevel.Network
{
if (false == TryConnect())
{
throw new ObjectDisposedException("No connection");
throw new Exception("No connection");
}
}
}

@ -12,11 +12,12 @@ namespace ZeroLevel.Network
{
private Socket _serverSocket;
private ReaderWriterLockSlim _connection_set_lock = new ReaderWriterLockSlim();
private Dictionary<IPEndPoint, NetworkNode> _connections = new Dictionary<IPEndPoint, NetworkNode>();
private Dictionary<IPEndPoint, ExClient> _connections = new Dictionary<IPEndPoint, ExClient>();
private readonly IRouter _router;
public IPEndPoint LocalEndpoint { get; }
public event Action<ISocketClient> OnDisconnect = _ => { };
public event Action<ISocketClient> OnConnect = _ => { };
public event Action<ExClient> OnConnect = _ => { };
public IEnumerable<IPEndPoint> ConnectionList
{
get
@ -41,7 +42,7 @@ namespace ZeroLevel.Network
catch
{ }
}
private void ConnectEventRise(ISocketClient client)
private void ConnectEventRise(ExClient client)
{
try
{
@ -51,8 +52,11 @@ namespace ZeroLevel.Network
{ }
}
public SocketServer(IPEndPoint endpoint)
public IRouter Router { get { return _router; } }
public SocketServer(IPEndPoint endpoint, IRouter router)
{
_router = router;
LocalEndpoint = endpoint;
_serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_serverSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
@ -72,11 +76,11 @@ namespace ZeroLevel.Network
_serverSocket.BeginAccept(BeginAcceptCallback, null);
_connection_set_lock.EnterWriteLock();
var connection = new SocketClient(client_socket);
var connection = new SocketClient(client_socket, _router);
connection.OnDisconnect += Connection_OnDisconnect;
_connections[connection.Endpoint] = new NetworkNode(connection);
_connections[connection.Endpoint] = new ExClient(connection);
ConnectEventRise(connection);
ConnectEventRise(_connections[connection.Endpoint]);
}
catch (Exception ex)
{

@ -266,4 +266,19 @@ namespace ZeroLevel.Network
}
#endregion
}
internal sealed class NullRouter
: IRouter
{
public void HandleMessage(Frame frame, ISocketClient client) { }
public byte[] HandleRequest(Frame frame, ISocketClient client) { return null; }
public void RegisterInbox(string inbox, MessageHandler handler) { }
public void RegisterInbox<T>(string inbox, MessageHandler<T> handler) { }
public void RegisterInbox(MessageHandler handler) { }
public void RegisterInbox<T>(MessageHandler<T> handler) { }
public void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler) { }
public void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler) { }
public void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler) { }
public void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler) { }
}
}

@ -3,10 +3,11 @@
namespace ZeroLevel
{
[Flags]
public enum ZeroServiceState : int
public enum ZeroServiceStatus : int
{
Initialized = 0,
Started = 1,
Stopped = 2
Running = 1,
Stopped = 2,
Disposed = 6
}
}
Loading…
Cancel
Save

Powered by TurnKey Linux.