diff --git a/TestApp/MyService.cs b/TestApp/MyService.cs index d19afa6..6d2c2f8 100644 --- a/TestApp/MyService.cs +++ b/TestApp/MyService.cs @@ -7,6 +7,11 @@ namespace TestApp public class MyService : BaseZeroService { + public MyService() + :base() + { + } + protected override void StartAction() { Log.Info("Started"); diff --git a/TestApp/Program.cs b/TestApp/Program.cs index ada05d7..e265260 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -1,4 +1,6 @@ -using ZeroLevel; +using System; +using System.Net; +using ZeroLevel; namespace TestApp { @@ -6,7 +8,20 @@ namespace TestApp { private static void Main(string[] args) { - Bootstrap.Startup(args, () => Configuration.ReadSetFromIniFile("config.ini")); + var se = Bootstrap.Startup(args, + () => Configuration.ReadSetFromIniFile("config.ini")) + .ReadServiceInfo() + //.UseDiscovery() + .Run(); + + var router = se.Service.UseHost(8800); + router.RegisterInbox("upper", (c, s) => s.ToUpperInvariant()); + + var client = se.Service.ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800)); + client.Request("upper", "hello", s => Console.WriteLine(s)); + + se.WaitWhileStatus(ZeroServiceStatus.Running) + .Stop(); } } } diff --git a/ZeroLevel.Discovery/RouteTable.cs b/ZeroLevel.Discovery/RouteTable.cs index 662efeb..6d4a16d 100644 --- a/ZeroLevel.Discovery/RouteTable.cs +++ b/ZeroLevel.Discovery/RouteTable.cs @@ -90,6 +90,7 @@ namespace ZeroLevel.Discovery } #endregion Snapshot + private void Heartbeat(long taskid) { try @@ -148,7 +149,7 @@ namespace ZeroLevel.Discovery Save(); } - public InvokeResult Append(ExServiceInfo serviceInfo, IZBackward client) + public InvokeResult Append(ExServiceInfo serviceInfo, ISocketClient client) { InvokeResult result = null; var endpoint = $"{client.Endpoint.Address}:{serviceInfo.Port}"; diff --git a/ZeroLevel/Services/BaseZeroExchangeService.cs b/ZeroLevel/Services/BaseZeroExchangeService.cs deleted file mode 100644 index a232466..0000000 --- a/ZeroLevel/Services/BaseZeroExchangeService.cs +++ /dev/null @@ -1,125 +0,0 @@ -using ZeroLevel.Network; - -namespace ZeroLevel.Services.Applications -{ - public abstract class BaseZeroExchangeService - : BaseZeroService, IExchangeService - { - public string Key { get; private set; } - public string Version { get; private set; } - public string Group { get; private set; } - public string Type { get; private set; } - - protected readonly IConfigurationSet _configSet; - protected IConfiguration _config => _configSet?.Default; - - protected Exchange Exchange { get; } - - private BaseZeroExchangeService() - { - - } - - protected BaseZeroExchangeService(IConfigurationSet configuration = null) - : base() - { - _configSet = configuration ?? Configuration.DefaultSet; - base.Name = ReadName(); - this.Key = ReadKey(); - this.Version = ReadVersion(); - this.Group = ReadServiceGroup(); - this.Type = ReadServiceType(); - - var discovery = _config.First("discovery"); - - Exchange = new Exchange(new DiscoveryClient(discovery)); - } - - private IExService _self_service = null; - private readonly object _self_create_lock = new object(); - protected IExService Self - { - get - { - if (_self_service == null) - { - lock (_self_create_lock) - { - if (_self_service == null) - { - _self_service = Exchange.RegisterService(this); - } - } - } - return _self_service; - } - } - - #region Config - - private string ReadName() - { - return FindInConfig(new[] { "ServiceName", "AppName" }, string.Empty, "service") - ?? this.GetType().Name; - } - - private string ReadKey() - { - return FindInConfig(new[] { "ServiceKey", "AppKey" }, string.Empty, "service"); - } - - private string ReadVersion() - { - return FindInConfig(new[] { "Version", "AppVersion" }, string.Empty, "service") - ?? "1.0"; - } - - private string ReadServiceGroup() - { - return FindInConfig(new[] { "DiscoveryGroup", "ServiceGroup" }, string.Empty, "service") - ?? ExServiceInfo.DEFAULT_GROUP_NAME; - } - - private string ReadServiceType() - { - return FindInConfig(new[] { "DiscoveryType", "ServiceType" }, string.Empty, "service") - ?? ExServiceInfo.DEFAULT_TYPE_NAME; - } - - protected T FindInConfig(string[] keys, params string[] sections) - { - foreach (var section in sections) - { - if (string.IsNullOrWhiteSpace(section)) - { - foreach (var key in keys) - { - if (_configSet.Default.Contains(key)) - { - return _configSet.Default.First(key); - } - } - } - else if (_configSet.ContainsSection(section)) - { - foreach (var key in keys) - { - if (_configSet[section].Contains(key)) - { - return _configSet[section].First(key); - } - } - } - } - return default(T); - } - #endregion Config - - public string Endpoint { get; private set; } - - protected override void StopAction() - { - this.Exchange.Dispose(); - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/BaseZeroService.cs b/ZeroLevel/Services/BaseZeroService.cs index 0a5c911..1775f35 100644 --- a/ZeroLevel/Services/BaseZeroService.cs +++ b/ZeroLevel/Services/BaseZeroService.cs @@ -1,5 +1,8 @@ using System; +using System.Collections.Concurrent; +using System.Net; using System.Threading; +using ZeroLevel.Network; namespace ZeroLevel.Services.Applications { @@ -7,8 +10,13 @@ namespace ZeroLevel.Services.Applications : IZeroService { public string Name { get; protected set; } - public ZeroServiceState State => _state; - private ZeroServiceState _state; + public string Key { get; private set; } + public string Version { get; private set; } + public string Group { get; private set; } + public string Type { get; private set; } + + public ZeroServiceStatus Status => _state; + private ZeroServiceStatus _state; protected BaseZeroService() { @@ -20,50 +28,298 @@ namespace ZeroLevel.Services.Applications Name = name; } - private ManualResetEvent InteraciveModeWorkingFlag = new ManualResetEvent(false); - protected abstract void StartAction(); protected abstract void StopAction(); + #region Config + private const string DEFAULT_GROUP_NAME = "__default_group__"; + private const string DEFAULT_TYPE_NAME = "__default_type__"; + + public void ReadServiceInfo() + { + this.Name = ReadName(); + this.Key = ReadKey(); + this.Version = ReadVersion(); + this.Group = ReadServiceGroup(); + this.Type = ReadServiceType(); + } + + public void ReadServiceInfo(IConfigurationSet set) + { + this.Name = ReadName(set); + this.Key = ReadKey(set); + this.Version = ReadVersion(set); + this.Group = ReadServiceGroup(set); + this.Type = ReadServiceType(set); + } + + private string ReadName(IConfigurationSet set = null) + { + return FindInConfig(set, new[] { "ServiceName", "AppName" }, string.Empty, "service") + ?? this.GetType().Name; + } + + private string ReadKey(IConfigurationSet set = null) + { + return FindInConfig(set, new[] { "ServiceKey", "AppKey" }, string.Empty, "service"); + } + + private string ReadVersion(IConfigurationSet set = null) + { + return FindInConfig(set, new[] { "Version", "AppVersion" }, string.Empty, "service") + ?? "1.0"; + } + + private string ReadServiceGroup(IConfigurationSet set = null) + { + return FindInConfig(set, new[] { "DiscoveryGroup", "ServiceGroup" }, string.Empty, "service") + ?? DEFAULT_GROUP_NAME; + } + + private string ReadServiceType(IConfigurationSet set = null) + { + return FindInConfig(set, new[] { "DiscoveryType", "ServiceType" }, string.Empty, "service") + ?? DEFAULT_TYPE_NAME; + } + + protected T FindInConfig(IConfigurationSet set, string[] keys, params string[] sections) + { + var configSet = set ?? Configuration.DefaultSet; + foreach (var section in sections) + { + if (string.IsNullOrWhiteSpace(section)) + { + foreach (var key in keys) + { + if (configSet.Default.Contains(key)) + { + return configSet.Default.First(key); + } + } + } + else if (configSet.ContainsSection(section)) + { + foreach (var key in keys) + { + if (configSet[section].Contains(key)) + { + return configSet[section].First(key); + } + } + } + } + return default(T); + } + #endregion Config + + #region Network + private IRouter _router; + private static IRouter _null_router = new NullRouter(); + private IDiscoveryClient _discoveryClient; + + public void UseDiscovery() + { + var discovery = Configuration.Default.First("discovery"); + _discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(discovery), _null_router, false)); + } + + public void UseDiscovery(string endpoint) + { + _discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), _null_router, false)); + } + + public void UseDiscovery(IPEndPoint endpoint) + { + _discoveryClient = new DiscoveryClient(GetClient(endpoint, _null_router, false)); + } + + public IRouter UseHost() + { + if (_state == ZeroServiceStatus.Running) + { + return GetServer(new IPEndPoint(NetUtils.GetNonLoopbackAddress(), NetUtils.GetFreeTcpPort()), new Router()).Router; + } + return _null_router; + } + + public IRouter UseHost(int port) + { + if (_state == ZeroServiceStatus.Running) + { + return GetServer(new IPEndPoint(NetUtils.GetNonLoopbackAddress(), port), new Router()).Router; + } + return _null_router; + } + + public IRouter UseHost(IPEndPoint endpoint) + { + if (_state == ZeroServiceStatus.Running) + { + return GetServer(endpoint, new Router()).Router; + } + return _null_router; + } + + public ExClient ConnectToService(string endpoint) + { + if (_state == ZeroServiceStatus.Running) + { + return new ExClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), new Router(), true)); + } + return null; + } + + public ExClient ConnectToService(IPEndPoint endpoint) + { + if (_state == ZeroServiceStatus.Running) + { + return new ExClient(GetClient(endpoint, new Router(), true)); + } + return null; + } + #endregion + + #region Service control public void Start() { - InteraciveModeWorkingFlag.Reset(); - if (_state == ZeroServiceState.Initialized) + if (_state == ZeroServiceStatus.Initialized) { try { StartAction(); - _state = ZeroServiceState.Started; + _state = ZeroServiceStatus.Running; Log.Debug($"[{Name}] Service started"); } catch (Exception ex) { Log.Fatal(ex, $"[{Name}] Failed to start service"); - InteraciveModeWorkingFlag.Set(); } } - try + } + + public void Stop() + { + if (_state == ZeroServiceStatus.Running) { - while (false == InteraciveModeWorkingFlag.WaitOne(2000)) + try + { + StopAction(); + Log.Debug($"[{Name}] Service stopped"); + } + catch (Exception ex) { + Log.Fatal(ex, $"[{Name}] Failed to stop service"); } } - catch { } - _state = ZeroServiceState.Stopped; - try + _state = ZeroServiceStatus.Stopped; + } + + public void WaitForStatus(ZeroServiceStatus status) + { + var start = DateTime.UtcNow; + while (this.Status != status) { - StopAction(); - Log.Debug($"[{Name}] Service stopped"); + Thread.Sleep(150); } - catch (Exception ex) + } + + public void WaitForStatus(ZeroServiceStatus status, TimeSpan period) + { + var start = DateTime.UtcNow; + while (this.Status != status && (DateTime.UtcNow - start) < period) { - Log.Fatal(ex, $"[{Name}] Failed to stop service"); + Thread.Sleep(150); } } - public void Stop() + public void WaitWhileStatus(ZeroServiceStatus status) + { + var start = DateTime.UtcNow; + while (this.Status == status) + { + Thread.Sleep(150); + } + } + + public void WaitWhileStatus(ZeroServiceStatus status, TimeSpan period) + { + var start = DateTime.UtcNow; + while (this.Status == status && (DateTime.UtcNow - start) < period) + { + Thread.Sleep(150); + } + } + #endregion + + #region Utils + private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _serverInstances = new ConcurrentDictionary(); + + private ISocketClient GetClient(IPEndPoint endpoint, IRouter router, bool use_cachee) { - InteraciveModeWorkingFlag.Set(); + if (use_cachee) + { + string key = $"{endpoint.Address}:{endpoint.Port}"; + ISocketClient instance = null; + if (_clientInstances.ContainsKey(key)) + { + instance = _clientInstances[key]; + if (instance.Status == SocketClientStatus.Working) + { + return instance; + } + _clientInstances.TryRemove(key, out instance); + instance.Dispose(); + instance = null; + } + instance = new SocketClient(endpoint, router); + _clientInstances[key] = instance; + return instance; + } + return new SocketClient(endpoint, router); + } + + private SocketServer GetServer(IPEndPoint endpoint, IRouter router) + { + string key = $"{endpoint.Address}:{endpoint.Port}"; + if (_serverInstances.ContainsKey(key)) + { + return _serverInstances[key]; + } + var instance = new SocketServer(endpoint, router); + _serverInstances[key] = instance; + return instance; + } + + #endregion + + public void Dispose() + { + _state = ZeroServiceStatus.Disposed; + + foreach (var client in _clientInstances) + { + try + { + client.Value.Dispose(); + } + catch (Exception ex) + { + Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketClient to endpoint {client.Key}"); + } + } + + foreach (var server in _serverInstances) + { + try + { + server.Value.Dispose(); + } + catch (Exception ex) + { + Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketServer with endpoint {server.Key}"); + } + } } } } \ No newline at end of file diff --git a/ZeroLevel/Services/Bootstrap.cs b/ZeroLevel/Services/Bootstrap.cs index dadc148..0b97a7f 100644 --- a/ZeroLevel/Services/Bootstrap.cs +++ b/ZeroLevel/Services/Bootstrap.cs @@ -1,12 +1,87 @@ using System; using System.IO; using System.Linq; +using System.Net; using System.Reflection; namespace ZeroLevel { public static class Bootstrap { + public interface IServiceExecution + { + IServiceExecution Run(); + IServiceExecution WaitForStatus(ZeroServiceStatus status); + IServiceExecution WaitForStatus(ZeroServiceStatus status, TimeSpan period); + IServiceExecution WaitWhileStatus(ZeroServiceStatus status); + IServiceExecution WaitWhileStatus(ZeroServiceStatus status, TimeSpan period); + IServiceExecution Stop(); + IZeroService Service { get; } + ZeroServiceStatus Status { get; } + } + + public class BootstrapFluent + : IServiceExecution + { + private readonly IZeroService _service; + public IZeroService Service { get { return _service; } } + + public BootstrapFluent(IZeroService service) + { + _service = service; + } + + public BootstrapFluent UseDiscovery() { _service?.UseDiscovery(); return this; } + public BootstrapFluent UseDiscovery(string url) { _service?.UseDiscovery(url); return this; } + public BootstrapFluent UseDiscovery(IPEndPoint endpoint) { _service?.UseDiscovery(endpoint); return this; } + + /* public BootstrapFluent UseHost() { _service?.UseHost(); return this; } + public BootstrapFluent UseHost(int port) { _service?.UseHost(port); return this; } + public BootstrapFluent UseHost(IPEndPoint endpoint) { _service?.UseHost(endpoint); return this; } + + public BootstrapFluent ConnectToService(string url) { _service.ConnectToService(url); return this; } + public BootstrapFluent ConnectToService(IPEndPoint endpoint) { _service.ConnectToService(endpoint); return this; } + */ + public BootstrapFluent ReadServiceInfo() { _service?.ReadServiceInfo(); return this; } + public BootstrapFluent ReadServiceInfo(IConfigurationSet config) { _service?.ReadServiceInfo(config); return this; } + + public ZeroServiceStatus Status { get { return _service.Status; } } + public IServiceExecution Run() { _service.Start(); return this; } + public IServiceExecution Stop() + { + try + { + _service?.Stop(); + } + catch (Exception ex) + { + Log.Error(ex, $"[Bootstrap] Service {_service?.Name} dispose error"); + } + return this; + } + + public IServiceExecution WaitForStatus(ZeroServiceStatus status) + { + _service.WaitForStatus(status); + return this; + } + public IServiceExecution WaitForStatus(ZeroServiceStatus status, TimeSpan period) + { + _service.WaitForStatus(status, period); + return this; + } + public IServiceExecution WaitWhileStatus(ZeroServiceStatus status) + { + _service.WaitWhileStatus(status); + return this; + } + public IServiceExecution WaitWhileStatus(ZeroServiceStatus status, TimeSpan period) + { + _service.WaitWhileStatus(status, period); + return this; + } + } + static Bootstrap() { // Tricks for minimize config changes for dependency resolve @@ -22,10 +97,6 @@ namespace ZeroLevel { return Assembly.LoadFile(Path.Combine(Configuration.BaseDirectory, "Newtonsoft.Json.dll")); } - else if (args.Name.Equals("Microsoft.Owin", StringComparison.Ordinal)) - { - return Assembly.LoadFile(Path.Combine(Configuration.BaseDirectory, "Microsoft.Owin.dll")); - } var candidates = Directory.GetFiles(Path.Combine(Configuration.BaseDirectory), args.Name, SearchOption.TopDirectoryOnly); if (candidates != null && candidates.Any()) { @@ -39,37 +110,29 @@ namespace ZeroLevel return null; } - public static void Startup(string[] args, - Func preStartConfiguration = null, + public static BootstrapFluent Startup(string[] args, + Func preStartConfiguration = null, Func postStartConfiguration = null) where T : IZeroService { - var service = Initialize(args, Configuration.ReadSetFromApplicationConfig(), + var service = Initialize(args, Configuration.ReadSetFromApplicationConfig(), preStartConfiguration, postStartConfiguration); - if (service != null) - { - service.Start(); - Shutdown(service); - } + return new BootstrapFluent(service); } - public static void Startup(string[] args, + public static BootstrapFluent Startup(string[] args, Func configuration, Func preStartConfiguration = null, Func postStartConfiguration = null) where T : IZeroService { var service = Initialize(args, configuration(), preStartConfiguration, postStartConfiguration); - if (service != null) - { - service.Start(); - Shutdown(service); - } + return new BootstrapFluent(service); } private static IZeroService Initialize(string[] args, IConfigurationSet configurationSet, - Func preStartConfiguration = null, + Func preStartConfiguration = null, Func postStartConfiguration = null) where T : IZeroService { @@ -121,12 +184,11 @@ namespace ZeroLevel return service; } - private static void Shutdown(IZeroService service) + private static void Shutdown() { try { Sheduller.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose default sheduller error"); } try { Log.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose log error"); } try { Injector.Default.Dispose(); Injector.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose DI containers error"); } - try { (service as IDisposable)?.Dispose(); } catch (Exception ex) { Log.Error(ex, $"[Bootstrap] Service {service?.Name} dispose error"); } } } } \ No newline at end of file diff --git a/ZeroLevel/Services/IZeroService.cs b/ZeroLevel/Services/IZeroService.cs index 1b79ac2..2113af2 100644 --- a/ZeroLevel/Services/IZeroService.cs +++ b/ZeroLevel/Services/IZeroService.cs @@ -1,10 +1,38 @@ -namespace ZeroLevel +using System; +using System.Net; +using ZeroLevel.Network; + +namespace ZeroLevel { public interface IZeroService + : IDisposable { - ZeroServiceState State { get; } - string Name { get; } + string Key { get; } + string Version { get; } + string Group { get; } + string Type { get; } + + void UseDiscovery(); + void UseDiscovery(string url); + void UseDiscovery(IPEndPoint endpoint); + + IRouter UseHost(); + IRouter UseHost(int port); + IRouter UseHost(IPEndPoint endpoint); + + ExClient ConnectToService(string url); + ExClient ConnectToService(IPEndPoint endpoint); + + void ReadServiceInfo(); + void ReadServiceInfo(IConfigurationSet config); + + void WaitForStatus(ZeroServiceStatus status); + void WaitForStatus(ZeroServiceStatus status, TimeSpan period); + void WaitWhileStatus(ZeroServiceStatus status); + void WaitWhileStatus(ZeroServiceStatus status, TimeSpan period); + + ZeroServiceStatus Status { get; } void Start(); diff --git a/ZeroLevel/Services/Network/Contracts/IClient.cs b/ZeroLevel/Services/Network/Contracts/IClient.cs new file mode 100644 index 0000000..dd10a3b --- /dev/null +++ b/ZeroLevel/Services/Network/Contracts/IClient.cs @@ -0,0 +1,17 @@ +using System; +using ZeroLevel.Models; + +namespace ZeroLevel.Network +{ + public interface IClient + { + InvokeResult Send(string inbox); + InvokeResult Send(string inbox, byte[] data); + InvokeResult Send(string inbox, T message); + + InvokeResult Request(string inbox, Action callback); + InvokeResult Request(string inbox, byte[] data, Action callback); + InvokeResult Request(string inbox, Action callback); + InvokeResult Request(string inbox, Trequest request, Action callback); + } +} diff --git a/ZeroLevel/Services/Network/Contracts/IExchangeService.cs b/ZeroLevel/Services/Network/Contracts/IExchangeService.cs deleted file mode 100644 index ce7c9cb..0000000 --- a/ZeroLevel/Services/Network/Contracts/IExchangeService.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace ZeroLevel.Network -{ - public interface IExchangeService - { - string Name { get; } - string Key { get; } - string Endpoint { get; } - string Version { get; } - string Group { get; } - string Type { get; } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Contracts/IRouter.cs b/ZeroLevel/Services/Network/Contracts/IRouter.cs index e334075..f282f84 100644 --- a/ZeroLevel/Services/Network/Contracts/IRouter.cs +++ b/ZeroLevel/Services/Network/Contracts/IRouter.cs @@ -1,38 +1,9 @@ -using System; -using ZeroLevel.Models; - -namespace ZeroLevel.Network +namespace ZeroLevel.Network { public interface IRouter + : IServer { - #region Messages - void RegisterInbox(string inbox, MessageHandler handler); - void RegisterInbox(string inbox, MessageHandler handler); - - // Default inboxe - void RegisterInbox(MessageHandler handler); - void RegisterInbox(MessageHandler handler); - #endregion - - #region Requests - void RegisterInbox(string inbox, RequestHandler handler); - void RegisterInbox(string inbox, RequestHandler handler); - - // Default inboxe - void RegisterInbox(RequestHandler handler); - void RegisterInbox(RequestHandler handler); - #endregion - } - - public interface IClient - { - InvokeResult Send(string inbox); - InvokeResult Send(string inbox, byte[] data); - InvokeResult Send(string inbox, T message); - - InvokeResult Request(string inbox, Action callback); - InvokeResult Request(string inbox, byte[] data, Action callback); - InvokeResult Request(string inbox, Action callback); - InvokeResult Request(string inbox, Trequest request, Action callback); + void HandleMessage(Frame frame, ISocketClient client); + byte[] HandleRequest(Frame frame, ISocketClient client); } } diff --git a/ZeroLevel/Services/Network/Contracts/IServer.cs b/ZeroLevel/Services/Network/Contracts/IServer.cs new file mode 100644 index 0000000..d41ab3f --- /dev/null +++ b/ZeroLevel/Services/Network/Contracts/IServer.cs @@ -0,0 +1,23 @@ +namespace ZeroLevel.Network +{ + public interface IServer + { + #region Messages + void RegisterInbox(string inbox, MessageHandler handler); + void RegisterInbox(string inbox, MessageHandler handler); + + // Default inboxe + void RegisterInbox(MessageHandler handler); + void RegisterInbox(MessageHandler handler); + #endregion + + #region Requests + void RegisterInbox(string inbox, RequestHandler handler); + void RegisterInbox(string inbox, RequestHandler handler); + + // Default inboxe + void RegisterInbox(RequestHandler handler); + void RegisterInbox(RequestHandler handler); + #endregion + } +} diff --git a/ZeroLevel/Services/Network/Contracts/ISocketClient.cs b/ZeroLevel/Services/Network/Contracts/ISocketClient.cs index 772020e..585bfd9 100644 --- a/ZeroLevel/Services/Network/Contracts/ISocketClient.cs +++ b/ZeroLevel/Services/Network/Contracts/ISocketClient.cs @@ -12,6 +12,8 @@ namespace ZeroLevel.Network IPEndPoint Endpoint { get; } SocketClientStatus Status { get; } + IRouter Router { get; } + void ForceConnect(); void UseKeepAlive(TimeSpan period); void Send(Frame data); diff --git a/ZeroLevel/Services/Network/DiscoveryClient.cs b/ZeroLevel/Services/Network/DiscoveryClient.cs index 125199a..6b1db1d 100644 --- a/ZeroLevel/Services/Network/DiscoveryClient.cs +++ b/ZeroLevel/Services/Network/DiscoveryClient.cs @@ -7,132 +7,131 @@ using ZeroLevel.Services.Collections; namespace ZeroLevel.Network { - internal sealed class DCRouter + public class DiscoveryClient + : IDiscoveryClient { - private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); - private IEnumerable _empty = Enumerable.Empty(); - private List _services = new List(); + private sealed class DCRouter + { + private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); + private IEnumerable _empty = Enumerable.Empty(); + private List _services = new List(); - private Dictionary> _tableByKey; - private Dictionary> _tableByGroups; - private Dictionary> _tableByTypes; + private Dictionary> _tableByKey; + private Dictionary> _tableByGroups; + private Dictionary> _tableByTypes; - internal void Update(IEnumerable records) - { - if (records == null) + internal void Update(IEnumerable records) { - Log.Warning("[DiscoveryClient] UpdateServiceListInfo. Discrovery response is empty"); - return; - } - var services = new List(); - foreach (var service in records) - { - var key = service.ServiceKey.ToUpperInvariant(); - var type = service.ServiceType.ToUpperInvariant(); - var group = service.ServiceGroup.ToUpperInvariant(); - services.AddRange(service.Endpoints.Select(e => new ServiceEndpointInfo { Endpoint = e, Group = group, Key = key, Type = type })); - } - _lock.EnterWriteLock(); - try - { - _services = services; - _tableByKey = _services.GroupBy(r => r.Key).ToDictionary(g => g.Key, g => new RoundRobinOverCollection(g)); - _tableByTypes = _services.GroupBy(r => r.Type).ToDictionary(g => g.Key, g => new RoundRobinOverCollection(g)); - _tableByGroups = _services.GroupBy(r => r.Group).ToDictionary(g => g.Key, g => new RoundRobinOverCollection(g)); - } - catch (Exception ex) - { - Log.Error(ex, "[DiscoveryClient] UpdateServiceListInfo. Update local routing table error."); - } - finally - { - _lock.ExitWriteLock(); + if (records == null) + { + Log.Warning("[DiscoveryClient] UpdateServiceListInfo. Discrovery response is empty"); + return; + } + var services = new List(); + foreach (var service in records) + { + var key = service.ServiceKey.ToUpperInvariant(); + var type = service.ServiceType.ToUpperInvariant(); + var group = service.ServiceGroup.ToUpperInvariant(); + services.AddRange(service.Endpoints.Select(e => new ServiceEndpointInfo { Endpoint = e, Group = group, Key = key, Type = type })); + } + _lock.EnterWriteLock(); + try + { + _services = services; + _tableByKey = _services.GroupBy(r => r.Key).ToDictionary(g => g.Key, g => new RoundRobinOverCollection(g)); + _tableByTypes = _services.GroupBy(r => r.Type).ToDictionary(g => g.Key, g => new RoundRobinOverCollection(g)); + _tableByGroups = _services.GroupBy(r => r.Group).ToDictionary(g => g.Key, g => new RoundRobinOverCollection(g)); + } + catch (Exception ex) + { + Log.Error(ex, "[DiscoveryClient] UpdateServiceListInfo. Update local routing table error."); + } + finally + { + _lock.ExitWriteLock(); + } } - } - public ServiceEndpointInfo GetService(string serviceKey, string endpoint) - { - var key = serviceKey.ToUpperInvariant(); - _lock.EnterReadLock(); - try + public ServiceEndpointInfo GetService(string serviceKey, string endpoint) { - if (_tableByKey.ContainsKey(key) && !_tableByKey[key].IsEmpty) + var key = serviceKey.ToUpperInvariant(); + _lock.EnterReadLock(); + try + { + if (_tableByKey.ContainsKey(key) && !_tableByKey[key].IsEmpty) + { + return _tableByKey[key].Find(s => s.Endpoint.Equals(endpoint, StringComparison.OrdinalIgnoreCase)).FirstOrDefault(); + } + } + finally { - return _tableByKey[key].Find(s => s.Endpoint.Equals(endpoint, StringComparison.OrdinalIgnoreCase)).FirstOrDefault(); + _lock.ExitReadLock(); } + return null; } - finally - { - _lock.ExitReadLock(); - } - return null; - } - public IEnumerable GetServiceEndpoints(string serviceKey) - { - var key = serviceKey.Trim().ToUpperInvariant(); - _lock.EnterReadLock(); - try + public IEnumerable GetServiceEndpoints(string serviceKey) { - if (_tableByKey.ContainsKey(key) && !_tableByKey[key].IsEmpty) + var key = serviceKey.Trim().ToUpperInvariant(); + _lock.EnterReadLock(); + try { - return _tableByKey[key].GenerateSeq(); + if (_tableByKey.ContainsKey(key) && !_tableByKey[key].IsEmpty) + { + return _tableByKey[key].GenerateSeq(); + } } + finally + { + _lock.ExitReadLock(); + } + return _empty; } - finally - { - _lock.ExitReadLock(); - } - return _empty; - } - public IEnumerable GetServiceEndpointsByGroup(string serviceGroup) - { - var group = serviceGroup.Trim().ToUpperInvariant(); - _lock.EnterReadLock(); - try + public IEnumerable GetServiceEndpointsByGroup(string serviceGroup) { - if (_tableByGroups.ContainsKey(group) && !_tableByGroups[group].IsEmpty) + var group = serviceGroup.Trim().ToUpperInvariant(); + _lock.EnterReadLock(); + try { - return _tableByGroups[group].GenerateSeq(); + if (_tableByGroups.ContainsKey(group) && !_tableByGroups[group].IsEmpty) + { + return _tableByGroups[group].GenerateSeq(); + } } + finally + { + _lock.ExitReadLock(); + } + return _empty; } - finally - { - _lock.ExitReadLock(); - } - return _empty; - } - public IEnumerable GetServiceEndpointsByType(string serviceType) - { - var type = serviceType.Trim().ToUpperInvariant(); - _lock.EnterReadLock(); - try + public IEnumerable GetServiceEndpointsByType(string serviceType) { - if (_tableByTypes.ContainsKey(type) && !_tableByTypes[type].IsEmpty) + var type = serviceType.Trim().ToUpperInvariant(); + _lock.EnterReadLock(); + try { - return _tableByTypes[type].GenerateSeq(); + if (_tableByTypes.ContainsKey(type) && !_tableByTypes[type].IsEmpty) + { + return _tableByTypes[type].GenerateSeq(); + } } + finally + { + _lock.ExitReadLock(); + } + return _empty; } - finally - { - _lock.ExitReadLock(); - } - return _empty; } - } - - public class DiscoveryClient - : IDiscoveryClient - { private readonly DCRouter _router = new DCRouter(); - private readonly NetworkNode _discoveryServerClient; + private readonly ExClient _discoveryServerClient; - public DiscoveryClient(string endpoint) + public DiscoveryClient(ISocketClient client) { - _discoveryServerClient = ExchangeTransportFactory.GetClient(endpoint); + _discoveryServerClient = new ExClient(client); UpdateServiceListInfo(); Sheduller.RemindEvery(TimeSpan.FromSeconds(30), UpdateServiceListInfo); } @@ -140,7 +139,7 @@ namespace ZeroLevel.Network private void UpdateServiceListInfo() { _discoveryServerClient.ForceConnect(); - if (_discoveryServerClient.Status == ZTransportStatus.Working) + if (_discoveryServerClient.Status == SocketClientStatus.Working) { try { @@ -164,7 +163,7 @@ namespace ZeroLevel.Network public bool Register(ExServiceInfo info) { _discoveryServerClient.ForceConnect(); - if (_discoveryServerClient.Status == ZTransportStatus.Working) + if (_discoveryServerClient.Status == SocketClientStatus.Working) { bool result = false; try diff --git a/ZeroLevel/Services/Network/NetworkNode.cs b/ZeroLevel/Services/Network/ExClient.cs similarity index 58% rename from ZeroLevel/Services/Network/NetworkNode.cs rename to ZeroLevel/Services/Network/ExClient.cs index eaf8742..c50203e 100644 --- a/ZeroLevel/Services/Network/NetworkNode.cs +++ b/ZeroLevel/Services/Network/ExClient.cs @@ -5,45 +5,17 @@ using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network { - public class NetworkNode - : IClient, IRouter, IDisposable + public sealed class ExClient + : IClient, IDisposable { - private FrameParser _parser = new FrameParser(); private readonly ISocketClient _client; - private readonly Router _router; - private DateTime _lastConnectionTime; public IPEndPoint EndPoint => _client?.Endpoint; public SocketClientStatus Status => _client.Status; + public IRouter Router => _client.Router; - public NetworkNode(ISocketClient client) + public ExClient(ISocketClient client) { - _lastConnectionTime = DateTime.UtcNow; _client = client; - _router = new Router(); - _parser.OnIncoming += _parser_OnIncoming; - _client.OnIncomingData += _readerWriter_OnIncomingData; - } - - private void _readerWriter_OnIncomingData(ISocketClient client, byte[] data, int length) - { - _parser.Push(data, length); - } - - private void _parser_OnIncoming(FrameType type, int identity, byte[] data) - { - switch (type) - { - case FrameType.KeepAlive: - _lastConnectionTime = DateTime.UtcNow; - break; - case FrameType.Message: - _router.HandleMessage(MessageSerializer.Deserialize(data), _client); - break; - case FrameType.Request: - var response = _router.HandleRequest(MessageSerializer.Deserialize(data), _client); - _client.Response(response, identity); - break; - } } public void ForceConnect() => _client.ForceConnect(); @@ -147,48 +119,6 @@ namespace ZeroLevel.Network return InvokeResult.Succeeding(); } - #region IRouter - public void RegisterInbox(string inbox, MessageHandler handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(string inbox, MessageHandler handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(MessageHandler handler) - { - _router.RegisterInbox(handler); - } - - public void RegisterInbox(MessageHandler handler) - { - _router.RegisterInbox(handler); - } - - public void RegisterInbox(string inbox, RequestHandler handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(string inbox, RequestHandler handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(RequestHandler handler) - { - _router.RegisterInbox(handler); - } - - public void RegisterInbox(RequestHandler handler) - { - _router.RegisterInbox(handler); - } - #endregion - public void Dispose() { _client.Dispose(); diff --git a/ZeroLevel/Services/Network/ExService.cs b/ZeroLevel/Services/Network/ExService.cs deleted file mode 100644 index 0a8df49..0000000 --- a/ZeroLevel/Services/Network/ExService.cs +++ /dev/null @@ -1,114 +0,0 @@ -using System; -using System.Net; - -namespace ZeroLevel.Network -{ - internal sealed class ExService - : ZBaseNetwork, IExService - { - private readonly ExRouter _router; - private readonly IZObservableServer _server; - - public event Action OnConnect = c => { }; - - public event Action OnDisconnect = c => { }; - - public ExService(IZObservableServer server) - { - _server = server ?? throw new ArgumentNullException(nameof(server)); - _router = new ExRouter(); - _server.OnMessage += _server_OnMessage; - _server.OnRequest += _server_OnRequest; - _server.OnConnect += _server_OnConnect; - _server.OnDisconnect += _server_OnDisconnect; - } - - private void _server_OnDisconnect(ISocketClient client) - { - this.OnDisconnect(client); - } - - private void _server_OnConnect(ISocketClient client) - { - this.OnConnect(client); - } - - private Frame _server_OnRequest(Frame frame, IZBackward client) - { - return _router.HandleRequest(frame, client); - } - - private void _server_OnMessage(Frame frame, IZBackward client) - { - _router.HandleMessage(frame, client); - } - - public IPEndPoint Endpoint => _server.Endpoint; - - /// - /// Registering an Inbox Handler - /// - /// Message type - /// Inbox name - /// Handler - public void RegisterInbox(string inbox, Action handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(Action handler) - { - _router.RegisterInbox(DEFAULT_MESSAGE_INBOX, handler); - } - - /// - /// Registration method responding to an incoming request - /// - /// Type of input message - /// Type of response - /// Protocol - /// Inbox name - /// Handler - public void RegisterInbox(string inbox, Func handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(Func handler) - { - _router.RegisterInbox(DEFAULT_REQUEST_INBOX, handler); - } - - /// - /// Registration of the method of responding to the incoming request, not receiving incoming data - /// - /// Type of response - /// Protocol - /// Inbox name - /// Handler - public void RegisterInbox(string inbox, Func handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(Func handler) - { - _router.RegisterInbox(DEFAULT_REQUEST_INBOX, handler); - } - - public void RegisterInbox(string inbox, Action handler) - { - _router.RegisterInbox(inbox, handler); - } - - public void RegisterInbox(Action handler) - { - _router.RegisterInbox(DEFAULT_REQUEST_INBOX, handler); - } - - public override void Dispose() - { - _server.Dispose(); - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/ExServiceHost.cs b/ZeroLevel/Services/Network/ExServiceHost.cs index 3efc3f8..272d014 100644 --- a/ZeroLevel/Services/Network/ExServiceHost.cs +++ b/ZeroLevel/Services/Network/ExServiceHost.cs @@ -7,6 +7,7 @@ using System.Reflection; namespace ZeroLevel.Network { + /* public sealed class ExServiceHost : IDisposable { @@ -512,4 +513,5 @@ namespace ZeroLevel.Network } } } + */ } \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Exchange.cs b/ZeroLevel/Services/Network/Exchange.cs index 5fc5970..b3ef3b4 100644 --- a/ZeroLevel/Services/Network/Exchange.cs +++ b/ZeroLevel/Services/Network/Exchange.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; namespace ZeroLevel.Network { + /* /// /// Provides data exchange between services /// @@ -28,12 +29,12 @@ namespace ZeroLevel.Network /// /// Registration service /// - public IExService RegisterService(IExchangeService service) + public IExchangeService RegisterService(IExchangeService service) { return _host.RegisterService(service); } - public IExService RegisterService(ExServiceInfo service) + public IExchangeService RegisterService(ExServiceInfo service) { return _host.RegisterService(service); } @@ -595,7 +596,7 @@ namespace ZeroLevel.Network #region Private - private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data) + private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data) { var response = new List(); using (var waiter = new CountdownEvent(clients.Count)) @@ -623,7 +624,7 @@ namespace ZeroLevel.Network return response; } - private IEnumerable _RequestBroadcast(List clients, string inbox) + private IEnumerable _RequestBroadcast(List clients, string inbox) { var response = new List(); using (var waiter = new CountdownEvent(clients.Count)) @@ -660,4 +661,5 @@ namespace ZeroLevel.Network this._host.Dispose(); } } + */ } \ No newline at end of file diff --git a/ZeroLevel/Services/Network/ExchangeTransportFactory.cs b/ZeroLevel/Services/Network/ExchangeTransportFactory.cs deleted file mode 100644 index 9008688..0000000 --- a/ZeroLevel/Services/Network/ExchangeTransportFactory.cs +++ /dev/null @@ -1,49 +0,0 @@ -using System.Collections.Concurrent; -using System.Net; - -namespace ZeroLevel.Network -{ - public static class ExchangeTransportFactory - { - private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); - - /// - /// Creates a server to receive messages using the specified protocol - /// - /// Protocol - /// Server - public static IExService GetServer(int port = -1) - { - return new ExService(new ZExSocketObservableServer(new System.Net.IPEndPoint(IPAddress.Any, port == -1 ? NetUtils.GetFreeTcpPort() : port))); - } - /// - /// Creates a client to access the server using the specified protocol - /// - /// Protocol - /// Server endpoint - /// Client - public static NetworkNode GetClientWithCache(string endpoint) - { - NetworkNode instance = null; - if (_clientInstances.ContainsKey(endpoint)) - { - instance = _clientInstances[endpoint]; - if (instance.Status == SocketClientStatus.Working) - { - return instance; - } - _clientInstances.TryRemove(endpoint, out instance); - instance.Dispose(); - instance = null; - } - instance = GetClient(endpoint); - _clientInstances[endpoint] = instance; - return instance; - } - - public static NetworkNode GetClient(string endpoint) - { - return new NetworkNode(new SocketClient(NetUtils.CreateIPEndPoint(endpoint))); - } - } -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs b/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs index 916d5dd..6b316dd 100644 --- a/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs +++ b/ZeroLevel/Services/Network/FileTransfer/BaseFileTransfer.cs @@ -27,7 +27,7 @@ namespace ZeroLevel.Network.FileTransfer _currentFileTransfers = 0;*/ } - protected void PushTransferTask(string filePath, Action completeHandler = null, Action errorHandler = null, IZBackward client = null) + protected void PushTransferTask(string filePath, Action completeHandler = null, Action errorHandler = null, ExClient client = null) { if (string.IsNullOrWhiteSpace(filePath)) { diff --git a/ZeroLevel/Services/Network/FileTransfer/FileClient.cs b/ZeroLevel/Services/Network/FileTransfer/FileClient.cs index 5271ac3..ade114e 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileClient.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileClient.cs @@ -7,12 +7,12 @@ namespace ZeroLevel.Network.FileTransfer public sealed class FileClient : BaseFileTransfer, IFileClient { - private readonly NetworkNode _client; + private readonly ExClient _client; private readonly string _baseFolder; private readonly ClientFolderNameMapper _nameMapper; private readonly bool _disposeClient; - internal FileClient(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) + internal FileClient(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) : base(baseFolder) { _client = client ?? throw new Exception(nameof(client)); @@ -20,9 +20,9 @@ namespace ZeroLevel.Network.FileTransfer _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); _disposeClient = disposeClient; - _client.RegisterInbox("__upload_file_start", (c, f) => Receiver.Incoming(f, nameMapper(c))); - _client.RegisterInbox("__upload_file_frame", (c, f) => Receiver.Incoming(f)); - _client.RegisterInbox("__upload_file_complete", (c, f) => Receiver.Incoming(f)); + _client.Router.RegisterInbox("__upload_file_start", (c, f) => Receiver.Incoming(f, nameMapper(c))); + _client.Router.RegisterInbox("__upload_file_frame", (c, f) => Receiver.Incoming(f)); + _client.Router.RegisterInbox("__upload_file_complete", (c, f) => Receiver.Incoming(f)); } public void Dispose() diff --git a/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs b/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs index 533e3ab..eab08fc 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileClientFactory.cs @@ -6,16 +6,16 @@ namespace ZeroLevel.Network.FileTransfer { public static IFileClient Create(string serverEndpoint, string baseFolder, ClientFolderNameMapper nameMapper = null) { - return CreateFileServerClient(ExchangeTransportFactory.GetClient(serverEndpoint), baseFolder, - nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true); + return null;/* CreateFileServerClient(ExchangeTransportFactory.GetClient(serverEndpoint), baseFolder, + nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true);*/ } - public static IFileClient Create(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper = null) + public static IFileClient Create(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper = null) { return CreateFileServerClient(client, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false); } - private static IFileClient CreateFileServerClient(NetworkNode client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) + private static IFileClient CreateFileServerClient(ExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient) { return new FileClient(client, baseFolder, nameMapper, disposeClient); } diff --git a/ZeroLevel/Services/Network/FileTransfer/FileServer.cs b/ZeroLevel/Services/Network/FileTransfer/FileServer.cs index eaacfde..a371afc 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileServer.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileServer.cs @@ -6,12 +6,12 @@ namespace ZeroLevel.Network.FileTransfer public sealed class FileServer : BaseFileTransfer, IFileServer { - private readonly IExService _service; + private readonly IRouter _service; private readonly string _baseFolder; private readonly ServerFolderNameMapperDelegate _nameMapper; private readonly bool _disposeService; - internal FileServer(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService) + internal FileServer(IRouter service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService) : base(baseFolder) { _service = service ?? throw new Exception(nameof(service)); @@ -19,20 +19,12 @@ namespace ZeroLevel.Network.FileTransfer _nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper)); _disposeService = disposeService; - _service.RegisterInbox("__upload_file_start", (f, _, client) => Receiver.Incoming(f, nameMapper(client))); - _service.RegisterInbox("__upload_file_frame", (f, _, __) => Receiver.Incoming(f)); - _service.RegisterInbox("__upload_file_complete", (f, _, __) => Receiver.Incoming(f)); + _service.RegisterInbox("__upload_file_start", (client, f) => Receiver.Incoming(f, nameMapper(client))); + _service.RegisterInbox("__upload_file_frame", (client, f) => Receiver.Incoming(f)); + _service.RegisterInbox("__upload_file_complete", (client, f) => Receiver.Incoming(f)); } - public void Dispose() - { - if (_disposeService) - { - _service?.Dispose(); - } - } - - public void Send(ISocketClient client, string fileName, Action completeHandler = null, Action errorHandler = null) + public void Send(ExClient client, string fileName, Action completeHandler = null, Action errorHandler = null) { PushTransferTask(fileName, completeHandler, errorHandler, client); } @@ -41,18 +33,18 @@ namespace ZeroLevel.Network.FileTransfer { Log.Info($"Start upload file {reader.Path}"); var startinfo = reader.GetStartInfo(); - if (false == task.Client.SendBackward("__upload_file_start", startinfo).Success) + if (false == task.Client.Send("__upload_file_start", startinfo).Success) { return; } foreach (var chunk in reader.Read()) { - if (task.Client.SendBackward("__upload_file_frame", chunk).Success == false) + if (task.Client.Send("__upload_file_frame", chunk).Success == false) { return; } } - task.Client.SendBackward("__upload_file_complete", reader.GetCompleteInfo()); + task.Client.Send("__upload_file_complete", reader.GetCompleteInfo()); Log.Debug($"Stop upload file {reader.Path}"); } } diff --git a/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs b/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs index e8ede19..9db6e62 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileServerFactory.cs @@ -7,17 +7,17 @@ namespace ZeroLevel.Network.FileTransfer { public static IFileServer Create(int port, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null) { - return CreateFileServer(ExchangeTransportFactory.GetServer(port), baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true); + return null;// CreateFileServer(ExchangeTransportFactory.GetServer(port), baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true); } - public static IFileServer Create(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null) + public static IFileServer Create(IZeroService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null) { - return CreateFileServer(service, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false); + return null;// CreateFileServer(service, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false); } - private static IFileServer CreateFileServer(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService) + private static IFileServer CreateFileServer(IZeroService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService) { - return new FileServer(service, baseFolder, nameMapper, disposeService); + return null;// new FileServer(service, baseFolder, nameMapper, disposeService); } } } diff --git a/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs b/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs index c082d8b..d235a53 100644 --- a/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs +++ b/ZeroLevel/Services/Network/FileTransfer/IFileServer.cs @@ -4,8 +4,7 @@ using ZeroLevel.Network; namespace ZeroLevel.Network.FileTransfer { public interface IFileServer - : IDisposable { - void Send(ISocketClient client, string fileName, Action completeHandler = null, Action errorHandler = null); + void Send(ExClient client, string fileName, Action completeHandler = null, Action errorHandler = null); } } diff --git a/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs b/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs index f94e4d2..602353b 100644 --- a/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs +++ b/ZeroLevel/Services/Network/FileTransfer/Model/FileTransferTask.cs @@ -7,6 +7,6 @@ namespace ZeroLevel.Network.FileTransfer public string FilePath; public Action CompletedHandler; public Action ErrorHandler; - public ISocketClient Client; + public ExClient Client; } } diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs index a2b9994..ac89d7a 100644 --- a/ZeroLevel/Services/Network/SocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -12,6 +12,7 @@ namespace ZeroLevel.Network { #region Private + private readonly IRouter _router; private Socket _clientSocket; private NetworkStream _stream; private FrameParser _parser = new FrameParser(); @@ -33,10 +34,13 @@ namespace ZeroLevel.Network } #endregion Private + public IRouter Router { get { return _router; } } + public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } } - public SocketClient(IPEndPoint ep) + public SocketClient(IPEndPoint ep, IRouter router) { + _router = router; Endpoint = ep; _parser.OnIncoming += _parser_OnIncoming; _sendThread = new Thread(SendFramesJob); @@ -44,8 +48,9 @@ namespace ZeroLevel.Network _sendThread.Start(); } - public SocketClient(Socket socket) + public SocketClient(Socket socket, IRouter router) { + _router = router; _socket_freezed = true; _clientSocket = socket; Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint; @@ -151,16 +156,23 @@ namespace ZeroLevel.Network switch (type) { case FrameType.KeepAlive: - _last_rw_time = DateTime.UtcNow.Ticks; - break; + // Nothing + return; case FrameType.Message: + _router?.HandleMessage(MessageSerializer.Deserialize(data), this); + break; case FrameType.Request: - OnIncomingData(this, data, identity); + var response = _router?.HandleRequest(MessageSerializer.Deserialize(data), this); + if (response != null) + { + this.Response(response, identity); + } break; case FrameType.Response: _requests.Success(identity, MessageSerializer.Deserialize(data)); break; } + OnIncomingData(this, data, identity); } catch (Exception ex) { @@ -227,7 +239,7 @@ namespace ZeroLevel.Network { if (false == TryConnect()) { - throw new ObjectDisposedException("No connection"); + throw new Exception("No connection"); } } } diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs index 23981ae..a06ce93 100644 --- a/ZeroLevel/Services/Network/SocketServer.cs +++ b/ZeroLevel/Services/Network/SocketServer.cs @@ -12,11 +12,12 @@ namespace ZeroLevel.Network { private Socket _serverSocket; private ReaderWriterLockSlim _connection_set_lock = new ReaderWriterLockSlim(); - private Dictionary _connections = new Dictionary(); + private Dictionary _connections = new Dictionary(); + private readonly IRouter _router; public IPEndPoint LocalEndpoint { get; } public event Action OnDisconnect = _ => { }; - public event Action OnConnect = _ => { }; + public event Action OnConnect = _ => { }; public IEnumerable ConnectionList { get @@ -41,7 +42,7 @@ namespace ZeroLevel.Network catch { } } - private void ConnectEventRise(ISocketClient client) + private void ConnectEventRise(ExClient client) { try { @@ -51,8 +52,11 @@ namespace ZeroLevel.Network { } } - public SocketServer(IPEndPoint endpoint) + public IRouter Router { get { return _router; } } + + public SocketServer(IPEndPoint endpoint, IRouter router) { + _router = router; LocalEndpoint = endpoint; _serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _serverSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); @@ -72,11 +76,11 @@ namespace ZeroLevel.Network _serverSocket.BeginAccept(BeginAcceptCallback, null); _connection_set_lock.EnterWriteLock(); - var connection = new SocketClient(client_socket); + var connection = new SocketClient(client_socket, _router); connection.OnDisconnect += Connection_OnDisconnect; - _connections[connection.Endpoint] = new NetworkNode(connection); + _connections[connection.Endpoint] = new ExClient(connection); - ConnectEventRise(connection); + ConnectEventRise(_connections[connection.Endpoint]); } catch (Exception ex) { diff --git a/ZeroLevel/Services/Network/Utils/Router.cs b/ZeroLevel/Services/Network/Utils/Router.cs index 19a0a4e..2dfb33b 100644 --- a/ZeroLevel/Services/Network/Utils/Router.cs +++ b/ZeroLevel/Services/Network/Utils/Router.cs @@ -266,4 +266,19 @@ namespace ZeroLevel.Network } #endregion } + + internal sealed class NullRouter + : IRouter + { + public void HandleMessage(Frame frame, ISocketClient client) { } + public byte[] HandleRequest(Frame frame, ISocketClient client) { return null; } + public void RegisterInbox(string inbox, MessageHandler handler) { } + public void RegisterInbox(string inbox, MessageHandler handler) { } + public void RegisterInbox(MessageHandler handler) { } + public void RegisterInbox(MessageHandler handler) { } + public void RegisterInbox(string inbox, RequestHandler handler) { } + public void RegisterInbox(string inbox, RequestHandler handler) { } + public void RegisterInbox(RequestHandler handler) { } + public void RegisterInbox(RequestHandler handler) { } + } } diff --git a/ZeroLevel/Services/ZeroServiceState.cs b/ZeroLevel/Services/ZeroServiceState.cs index fdf176d..5a5d995 100644 --- a/ZeroLevel/Services/ZeroServiceState.cs +++ b/ZeroLevel/Services/ZeroServiceState.cs @@ -3,10 +3,11 @@ namespace ZeroLevel { [Flags] - public enum ZeroServiceState : int + public enum ZeroServiceStatus : int { Initialized = 0, - Started = 1, - Stopped = 2 + Running = 1, + Stopped = 2, + Disposed = 6 } } \ No newline at end of file