From e5415d8dbc1720c4dfec6cc5cd1309b2520139a1 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 3 Jul 2019 03:07:07 +0300 Subject: [PATCH] Prepare to discovery Added aliases --- TestApp/MyService.cs | 18 ++- TestApp/Program.cs | 17 +- ZeroLevel/Services/BaseZeroService.cs | 47 +++++- ZeroLevel/Services/Bootstrap.cs | 9 +- ZeroLevel/Services/Network/Alias.cs | 152 ++++++++++++++++++ .../Services/Network/Contracts/IServer.cs | 16 +- ZeroLevel/Services/Network/Utils/Router.cs | 40 +++-- 7 files changed, 247 insertions(+), 52 deletions(-) create mode 100644 ZeroLevel/Services/Network/Alias.cs diff --git a/TestApp/MyService.cs b/TestApp/MyService.cs index a127697..08e0a61 100644 --- a/TestApp/MyService.cs +++ b/TestApp/MyService.cs @@ -1,6 +1,7 @@ using System; using System.Net; using ZeroLevel; +using ZeroLevel.Network; using ZeroLevel.Services.Applications; namespace TestApp @@ -9,17 +10,30 @@ namespace TestApp : BaseZeroService { public MyService() - :base() + : base() { } protected override void StartAction() { Log.Info("Started"); - Sheduller.RemindEvery(TimeSpan.FromSeconds(5), () => { + UseHost(8800) + .RegisterInbox("upper", (c, s) => s.ToUpperInvariant()) + .RegisterInbox("ip2str", (c, ip) => $"{ip.Address}:{ip.Port}"); + + + Sheduller.RemindEvery(TimeSpan.FromSeconds(5), () => + { var client = ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800)); client.Request("upper", "hello", s => Log.Info(s)); }); + + Sheduller.RemindEvery(TimeSpan.FromSeconds(6), () => + { + var client = ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800)); + client.Request("ip2str", new IPEndPoint(NetUtils.GetNonLoopbackAddress(), NetUtils.GetFreeTcpPort()), s => Log.Info(s)); + }); + } protected override void StopAction() diff --git a/TestApp/Program.cs b/TestApp/Program.cs index bb909ac..6dab700 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -8,19 +8,14 @@ namespace TestApp { private static void Main(string[] args) { - var se = Bootstrap.Startup(args, + Bootstrap.Startup(args, () => Configuration.ReadSetFromIniFile("config.ini")) - .ReadServiceInfo() + //.ReadServiceInfo() //.UseDiscovery() - .Run(); - - var router = se.Service.UseHost(8800); - router.RegisterInbox("upper", (c, s) => s.ToUpperInvariant()); - - - - se.WaitWhileStatus(ZeroServiceStatus.Running) - .Stop(); + .Run() + .WaitWhileStatus(ZeroServiceStatus.Running) + .Stop(); + Bootstrap.Shutdown(); } } } diff --git a/ZeroLevel/Services/BaseZeroService.cs b/ZeroLevel/Services/BaseZeroService.cs index 9b667b8..701242d 100644 --- a/ZeroLevel/Services/BaseZeroService.cs +++ b/ZeroLevel/Services/BaseZeroService.cs @@ -119,23 +119,36 @@ namespace ZeroLevel.Services.Applications public void UseDiscovery() { - var discovery = Configuration.Default.First("discovery"); - _discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(discovery), _null_router, false)); + if (_state == ZeroServiceStatus.Running || + _state == ZeroServiceStatus.Initialized) + { + var discovery = Configuration.Default.First("discovery"); + _discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(discovery), _null_router, false)); + } } public void UseDiscovery(string endpoint) { - _discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), _null_router, false)); + if (_state == ZeroServiceStatus.Running || + _state == ZeroServiceStatus.Initialized) + { + _discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), _null_router, false)); + } } public void UseDiscovery(IPEndPoint endpoint) { - _discoveryClient = new DiscoveryClient(GetClient(endpoint, _null_router, false)); + if (_state == ZeroServiceStatus.Running || + _state == ZeroServiceStatus.Initialized) + { + _discoveryClient = new DiscoveryClient(GetClient(endpoint, _null_router, false)); + } } public IRouter UseHost() { - if (_state == ZeroServiceStatus.Running) + if (_state == ZeroServiceStatus.Running || + _state == ZeroServiceStatus.Initialized) { return GetServer(new IPEndPoint(IPAddress.Any, NetUtils.GetFreeTcpPort()), new Router()).Router; } @@ -144,7 +157,8 @@ namespace ZeroLevel.Services.Applications public IRouter UseHost(int port) { - if (_state == ZeroServiceStatus.Running) + if (_state == ZeroServiceStatus.Running || + _state == ZeroServiceStatus.Initialized) { return GetServer(new IPEndPoint(IPAddress.Any, port), new Router()).Router; } @@ -153,7 +167,8 @@ namespace ZeroLevel.Services.Applications public IRouter UseHost(IPEndPoint endpoint) { - if (_state == ZeroServiceStatus.Running) + if (_state == ZeroServiceStatus.Running || + _state == ZeroServiceStatus.Initialized) { return GetServer(endpoint, new Router()).Router; } @@ -169,6 +184,15 @@ namespace ZeroLevel.Services.Applications return null; } + public ExClient ConnectToService(string alias, string endpoint) + { + if (_state == ZeroServiceStatus.Running) + { + return new ExClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), new Router(), true)); + } + return null; + } + public ExClient ConnectToService(IPEndPoint endpoint) { if (_state == ZeroServiceStatus.Running) @@ -177,6 +201,15 @@ namespace ZeroLevel.Services.Applications } return null; } + + public ExClient ConnectToService(string alias, IPEndPoint endpoint) + { + if (_state == ZeroServiceStatus.Running) + { + return new ExClient(GetClient(endpoint, new Router(), true)); + } + return null; + } #endregion #region Service control diff --git a/ZeroLevel/Services/Bootstrap.cs b/ZeroLevel/Services/Bootstrap.cs index 0b97a7f..cc94c7c 100644 --- a/ZeroLevel/Services/Bootstrap.cs +++ b/ZeroLevel/Services/Bootstrap.cs @@ -35,13 +35,6 @@ namespace ZeroLevel public BootstrapFluent UseDiscovery(string url) { _service?.UseDiscovery(url); return this; } public BootstrapFluent UseDiscovery(IPEndPoint endpoint) { _service?.UseDiscovery(endpoint); return this; } - /* public BootstrapFluent UseHost() { _service?.UseHost(); return this; } - public BootstrapFluent UseHost(int port) { _service?.UseHost(port); return this; } - public BootstrapFluent UseHost(IPEndPoint endpoint) { _service?.UseHost(endpoint); return this; } - - public BootstrapFluent ConnectToService(string url) { _service.ConnectToService(url); return this; } - public BootstrapFluent ConnectToService(IPEndPoint endpoint) { _service.ConnectToService(endpoint); return this; } - */ public BootstrapFluent ReadServiceInfo() { _service?.ReadServiceInfo(); return this; } public BootstrapFluent ReadServiceInfo(IConfigurationSet config) { _service?.ReadServiceInfo(config); return this; } @@ -184,7 +177,7 @@ namespace ZeroLevel return service; } - private static void Shutdown() + public static void Shutdown() { try { Sheduller.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose default sheduller error"); } try { Log.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose log error"); } diff --git a/ZeroLevel/Services/Network/Alias.cs b/ZeroLevel/Services/Network/Alias.cs new file mode 100644 index 0000000..8331061 --- /dev/null +++ b/ZeroLevel/Services/Network/Alias.cs @@ -0,0 +1,152 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; + +namespace ZeroLevel.Network +{ + public sealed class AliasSet + { + public sealed class _RoundRobinCollection : + IDisposable + { + private readonly List _collection = + new List(); + + private int _index = -1; + + private readonly ReaderWriterLockSlim _lock = + new ReaderWriterLockSlim(); + + public int Count { get { return _collection.Count; } } + + public void Add(T item) + { + _lock.EnterWriteLock(); + try + { + _collection.Add(item); + if (_index == -1) _index = 0; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void Remove(T item) + { + _lock.EnterWriteLock(); + try + { + _collection.Remove(item); + if (_index >= _collection.Count) + { + if (_collection.Count == 0) _index = -1; + else _index = 0; + } + } + finally + { + _lock.ExitWriteLock(); + } + } + + public bool Contains(T item) + { + _lock.EnterReadLock(); + try + { + return _collection.Contains(item); + } + finally + { + _lock.ExitReadLock(); + } + } + + public bool MoveNext() + { + _lock.EnterReadLock(); + try + { + if (_collection.Count > 0) + { + _index = Interlocked.Increment(ref _index) % _collection.Count; + return true; + } + } + finally + { + _lock.ExitReadLock(); + } + return false; + } + + public T Current + { + get + { + return _index == -1 ? default(T) : _collection[_index]; + } + } + + public void Clear() + { + _collection.Clear(); + _index = -1; + } + + public void Dispose() + { + _collection.Clear(); + _lock.Dispose(); + } + } + + private readonly ConcurrentDictionary> _aliases = new ConcurrentDictionary>(); + + public void Set(string alias, string address) + { + if (_aliases.ContainsKey(alias) == false) + { + if (_aliases.TryAdd(alias, new _RoundRobinCollection())) + { + _aliases[alias].Add(address); + } + } + else + { + _aliases[alias].Clear(); + _aliases[alias].Add(address); + } + } + + public void Set(string alias, IEnumerable addresses) + { + if (_aliases.ContainsKey(alias) == false) + { + if (_aliases.TryAdd(alias, new _RoundRobinCollection())) + { + foreach (var address in addresses) + _aliases[alias].Add(address); + } + } + else + { + _aliases[alias].Clear(); + foreach (var address in addresses) + _aliases[alias].Add(address); + } + } + + public string GetAddress(string alias) + { + if (_aliases.ContainsKey(alias) && _aliases[alias].MoveNext()) + { + return _aliases[alias].Current; + } + return null; + } + } +} diff --git a/ZeroLevel/Services/Network/Contracts/IServer.cs b/ZeroLevel/Services/Network/Contracts/IServer.cs index d41ab3f..aaa5eba 100644 --- a/ZeroLevel/Services/Network/Contracts/IServer.cs +++ b/ZeroLevel/Services/Network/Contracts/IServer.cs @@ -3,21 +3,21 @@ public interface IServer { #region Messages - void RegisterInbox(string inbox, MessageHandler handler); - void RegisterInbox(string inbox, MessageHandler handler); + IServer RegisterInbox(string inbox, MessageHandler handler); + IServer RegisterInbox(string inbox, MessageHandler handler); // Default inboxe - void RegisterInbox(MessageHandler handler); - void RegisterInbox(MessageHandler handler); + IServer RegisterInbox(MessageHandler handler); + IServer RegisterInbox(MessageHandler handler); #endregion #region Requests - void RegisterInbox(string inbox, RequestHandler handler); - void RegisterInbox(string inbox, RequestHandler handler); + IServer RegisterInbox(string inbox, RequestHandler handler); + IServer RegisterInbox(string inbox, RequestHandler handler); // Default inboxe - void RegisterInbox(RequestHandler handler); - void RegisterInbox(RequestHandler handler); + IServer RegisterInbox(RequestHandler handler); + IServer RegisterInbox(RequestHandler handler); #endregion } } diff --git a/ZeroLevel/Services/Network/Utils/Router.cs b/ZeroLevel/Services/Network/Utils/Router.cs index 6f09928..58610ba 100644 --- a/ZeroLevel/Services/Network/Utils/Router.cs +++ b/ZeroLevel/Services/Network/Utils/Router.cs @@ -179,45 +179,49 @@ namespace ZeroLevel.Network #endregion Invokation #region Message handlers registration - public void RegisterInbox(string inbox, MessageHandler handler) + public IServer RegisterInbox(string inbox, MessageHandler handler) { if (false == _handlers.ContainsKey(inbox)) { _handlers.Add(inbox, new List()); } _handlers[inbox].Add(MRInvoker.Create(handler)); + return this; } - public void RegisterInbox(string inbox, MessageHandler handler) + public IServer RegisterInbox(string inbox, MessageHandler handler) { if (false == _handlers.ContainsKey(inbox)) { _handlers.Add(inbox, new List()); } _handlers[inbox].Add(MRInvoker.Create(handler)); + return this; } - public void RegisterInbox(MessageHandler handler) + public IServer RegisterInbox(MessageHandler handler) { if (false == _handlers.ContainsKey(BaseSocket.DEFAULT_MESSAGE_INBOX)) { _handlers.Add(BaseSocket.DEFAULT_MESSAGE_INBOX, new List()); } _handlers[BaseSocket.DEFAULT_MESSAGE_INBOX].Add(MRInvoker.Create(handler)); + return this; } - public void RegisterInbox(MessageHandler handler) + public IServer RegisterInbox(MessageHandler handler) { if (false == _handlers.ContainsKey(BaseSocket.DEFAULT_MESSAGE_INBOX)) { _handlers.Add(BaseSocket.DEFAULT_MESSAGE_INBOX, new List()); } _handlers[BaseSocket.DEFAULT_MESSAGE_INBOX].Add(MRInvoker.Create(handler)); + return this; } #endregion #region Request handlers registration - public void RegisterInbox(string inbox, RequestHandler handler) + public IServer RegisterInbox(string inbox, RequestHandler handler) { if (false == _requestors.ContainsKey(inbox)) { @@ -227,9 +231,10 @@ namespace ZeroLevel.Network { throw new Exception($"[SocketExchangeServer] Inbox {inbox} already exists"); } + return this; } - public void RegisterInbox(string inbox, RequestHandler handler) + public IServer RegisterInbox(string inbox, RequestHandler handler) { if (false == _requestors.ContainsKey(inbox)) { @@ -239,9 +244,10 @@ namespace ZeroLevel.Network { throw new Exception($"[SocketExchangeServer] Inbox {inbox} already exists"); } + return this; } - public void RegisterInbox(RequestHandler handler) + public IServer RegisterInbox(RequestHandler handler) { if (false == _requestors.ContainsKey(BaseSocket.DEFAULT_REQUEST_INBOX)) { @@ -251,9 +257,10 @@ namespace ZeroLevel.Network { throw new Exception($"[SocketExchangeServer] Inbox {BaseSocket.DEFAULT_REQUEST_INBOX} already exists"); } + return this; } - public void RegisterInbox(RequestHandler handler) + public IServer RegisterInbox(RequestHandler handler) { if (false == _requestors.ContainsKey(BaseSocket.DEFAULT_REQUEST_INBOX)) { @@ -263,6 +270,7 @@ namespace ZeroLevel.Network { throw new Exception($"[SocketExchangeServer] Inbox {BaseSocket.DEFAULT_REQUEST_INBOX} already exists"); } + return this; } #endregion } @@ -272,13 +280,13 @@ namespace ZeroLevel.Network { public void HandleMessage(Frame frame, ISocketClient client) { } public byte[] HandleRequest(Frame frame, ISocketClient client) { return null; } - public void RegisterInbox(string inbox, MessageHandler handler) { } - public void RegisterInbox(string inbox, MessageHandler handler) { } - public void RegisterInbox(MessageHandler handler) { } - public void RegisterInbox(MessageHandler handler) { } - public void RegisterInbox(string inbox, RequestHandler handler) { } - public void RegisterInbox(string inbox, RequestHandler handler) { } - public void RegisterInbox(RequestHandler handler) { } - public void RegisterInbox(RequestHandler handler) { } + public IServer RegisterInbox(string inbox, MessageHandler handler) { return this; } + public IServer RegisterInbox(string inbox, MessageHandler handler) { return this; } + public IServer RegisterInbox(MessageHandler handler) { return this; } + public IServer RegisterInbox(MessageHandler handler) { return this; } + public IServer RegisterInbox(string inbox, RequestHandler handler) { return this; } + public IServer RegisterInbox(string inbox, RequestHandler handler) { return this; } + public IServer RegisterInbox(RequestHandler handler) { return this; } + public IServer RegisterInbox(RequestHandler handler) { return this; } } }