diff --git a/TestApp/MyService.cs b/TestApp/MyService.cs index 6008688..bcd9f63 100644 --- a/TestApp/MyService.cs +++ b/TestApp/MyService.cs @@ -18,23 +18,22 @@ namespace TestApp protected override void StartAction() { Log.Info("Started"); - - AutoregisterInboxes(UseHost(8800)); ReadServiceInfo(); + AutoregisterInboxes(UseHost(8800)); UseHost(8801).RegisterInbox("metainfo", (c) => { Log.Info("Reqeust for metainfo"); return this.ServiceInfo; }); - StoreConnection("mytest", new IPEndPoint(IPAddress.Loopback, 8800)); - StoreConnection("mymeta", new IPEndPoint(IPAddress.Loopback, 8801)); + Exchange.RoutesStorage.Set("mytest", new IPEndPoint(IPAddress.Loopback, 8800)); + Exchange.RoutesStorage.Set("mymeta", new IPEndPoint(IPAddress.Loopback, 8801)); - int count = 0; - Sheduller.RemindWhile(TimeSpan.FromSeconds(1), () => + + Sheduller.RemindEvery(TimeSpan.FromSeconds(1), () => { - var client = ConnectToService("mytest"); + var client = Exchange.GetConnection("mytest"); client.Send("pum"); client.Send(BaseSocket.DEFAULT_MESSAGE_INBOX, "'This is message'"); client.Request("d2s", DateTime.Now, s => Log.Info($"Response: {s}")); @@ -43,14 +42,11 @@ namespace TestApp s => Log.Info($"Response: {s}")); client.Request("now", s => Log.Info($"Response date: {s}")); client.Request(BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, s => Log.Info($"Response ip: {s}")); - count++; - return count > 3; }); - + Sheduller.RemindEvery(TimeSpan.FromSeconds(3), () => - { - var client = ConnectToService("mymeta"); - client.Request("metainfo", info => + { + Exchange.Request("mymeta", "metainfo", info => { var si = new StringBuilder(); si.AppendLine(info.Name); diff --git a/TestApp/Program.cs b/TestApp/Program.cs index 589882e..6178ae2 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -9,7 +9,7 @@ namespace TestApp Bootstrap.Startup(args, () => Configuration.ReadSetFromIniFile("config.ini")) .EnableConsoleLog(ZeroLevel.Services.Logging.LogLevel.System | ZeroLevel.Services.Logging.LogLevel.FullDebug) - .UseDiscovery() + //.UseDiscovery() .Run() .WaitWhileStatus(ZeroServiceStatus.Running) .Stop(); diff --git a/ZeroLevel.UnitTests/CollectionsTests.cs b/ZeroLevel.UnitTests/CollectionsTests.cs index 2f74634..9625cc2 100644 --- a/ZeroLevel.UnitTests/CollectionsTests.cs +++ b/ZeroLevel.UnitTests/CollectionsTests.cs @@ -44,18 +44,23 @@ namespace ZeroLevel.CollectionUnitTests { var arr = new int[] { 1, 2, 3 }; // Arrange - var collection = new RoundRobinOverCollection(arr); + var collection = new RoundRobinCollection(arr); var iter1 = new int[] { 1, 2, 3 }; var iter2 = new int[] { 2, 3, 1 }; var iter3 = new int[] { 3, 1, 2 }; // Act // Assert - Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GenerateSeq().ToArray(), iter1)); - Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GenerateSeq().ToArray(), iter2)); - Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GenerateSeq().ToArray(), iter3)); - Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GenerateSeq().ToArray(), iter1)); - Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GenerateSeq().ToArray(), iter2)); - Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GenerateSeq().ToArray(), iter3)); + Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GetCurrentSeq().ToArray(), iter1)); + collection.MoveNext(); + Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GetCurrentSeq().ToArray(), iter2)); + collection.MoveNext(); + Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GetCurrentSeq().ToArray(), iter3)); + collection.MoveNext(); + Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GetCurrentSeq().ToArray(), iter1)); + collection.MoveNext(); + Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GetCurrentSeq().ToArray(), iter2)); + collection.MoveNext(); + Assert.True(CollectionComparsionExtensions.OrderingEquals(collection.GetCurrentSeq().ToArray(), iter3)); } [Fact] diff --git a/ZeroLevel.UnitTests/ExchangeTests.cs b/ZeroLevel.UnitTests/ExchangeTests.cs index 6deb4b2..12e9803 100644 --- a/ZeroLevel.UnitTests/ExchangeTests.cs +++ b/ZeroLevel.UnitTests/ExchangeTests.cs @@ -31,7 +31,7 @@ namespace ZeroLevel.NetworkUnitTests }); // Act - var client = ConnectToService(IPAddress.Loopback.ToString() + ":6666"); + var client = Exchange.GetConnection(IPAddress.Loopback.ToString() + ":6666"); var ir = client.Send("register", info); locker.WaitOne(1000); @@ -70,7 +70,7 @@ namespace ZeroLevel.NetworkUnitTests server.RegisterInbox>("services", (_) => new[] { info1, info2 }); // Act - var client = ConnectToService(IPAddress.Loopback.ToString() + ":6667"); + var client = Exchange.GetConnection(IPAddress.Loopback.ToString() + ":6667"); var ir = client.Request>("services", response => { received = response; diff --git a/ZeroLevel.UnitTests/NetworkTest.cs b/ZeroLevel.UnitTests/NetworkTest.cs index 3d194fe..befabf3 100644 --- a/ZeroLevel.UnitTests/NetworkTest.cs +++ b/ZeroLevel.UnitTests/NetworkTest.cs @@ -13,7 +13,7 @@ namespace ZeroLevel.UnitTests { // Arrange var server = UseHost(8181); - var client = ConnectToService("127.0.0.1:8181"); + var client = Exchange.GetConnection("127.0.0.1:8181"); bool got_message_no_request = false; bool got_message_with_request = false; diff --git a/ZeroLevel/Services/BaseZeroService.cs b/ZeroLevel/Services/BaseZeroService.cs index aceb8be..77b41a6 100644 --- a/ZeroLevel/Services/BaseZeroService.cs +++ b/ZeroLevel/Services/BaseZeroService.cs @@ -12,7 +12,8 @@ namespace ZeroLevel.Services.Applications : IZeroService { private readonly ZeroServiceInfo _serviceInfo = new ZeroServiceInfo(); - + private readonly IExchange _exhange; + protected IExchange Exchange => _exhange; public ZeroServiceInfo ServiceInfo => _serviceInfo; public string Name { get { return _serviceInfo.Name; } private set { _serviceInfo.Name = value; } } @@ -27,11 +28,13 @@ namespace ZeroLevel.Services.Applications protected BaseZeroService() { Name = GetType().Name; + _exhange = new Exchange(this); } protected BaseZeroService(string name) { Name = name; + _exhange = new Exchange(this); } protected abstract void StartAction(); @@ -118,9 +121,7 @@ namespace ZeroLevel.Services.Applications } #endregion Config - #region Network - private readonly Exchange _exhange = new Exchange(); - + #region Network public void UseDiscovery() { @@ -179,30 +180,6 @@ namespace ZeroLevel.Services.Applications return BaseSocket.NullRouter; } - public ExClient ConnectToService(string endpoint) - { - if (_state == ZeroServiceStatus.Running - || _state == ZeroServiceStatus.Initialized) - { - if (_aliases.Contains(endpoint)) - { - return GetClient(_aliases.Get(endpoint), true); - } - return GetClient(NetUtils.CreateIPEndPoint(endpoint), true); - } - return null; - } - - public ExClient ConnectToService(IPEndPoint endpoint) - { - if (_state == ZeroServiceStatus.Running - || _state == ZeroServiceStatus.Initialized) - { - return GetClient(endpoint, true); - } - return null; - } - #region Autoregistration inboxes private static Delegate CreateDelegate(Type delegateType, MethodInfo methodInfo, object target) { diff --git a/ZeroLevel/Services/IZeroService.cs b/ZeroLevel/Services/IZeroService.cs index 2113af2..34b3fcb 100644 --- a/ZeroLevel/Services/IZeroService.cs +++ b/ZeroLevel/Services/IZeroService.cs @@ -13,6 +13,8 @@ namespace ZeroLevel string Group { get; } string Type { get; } + ZeroServiceInfo ServiceInfo { get; } + void UseDiscovery(); void UseDiscovery(string url); void UseDiscovery(IPEndPoint endpoint); @@ -21,9 +23,6 @@ namespace ZeroLevel IRouter UseHost(int port); IRouter UseHost(IPEndPoint endpoint); - ExClient ConnectToService(string url); - ExClient ConnectToService(IPEndPoint endpoint); - void ReadServiceInfo(); void ReadServiceInfo(IConfigurationSet config); diff --git a/ZeroLevel/Services/Network/Contracts/IServiceRoutesStorage.cs b/ZeroLevel/Services/Network/Contracts/IServiceRoutesStorage.cs new file mode 100644 index 0000000..0c59973 --- /dev/null +++ b/ZeroLevel/Services/Network/Contracts/IServiceRoutesStorage.cs @@ -0,0 +1,23 @@ +using System.Collections.Generic; +using System.Net; +using ZeroLevel.Models; + +namespace ZeroLevel.Network +{ + public interface IServiceRoutesStorage + { + void Set(IPEndPoint endpoint); + void Set(IEnumerable endpoints); + void Set(string key, IPEndPoint endpoint); + void Set(string key, IEnumerable endpoints); + void Set(string key, string type, string group, IPEndPoint endpoint); + void Set(string key, string type, string group, IEnumerable endpoints); + + InvokeResult Get(string key); + InvokeResult> GetAll(string key); + InvokeResult GetByType(string type); + InvokeResult> GetAllByType(string type); + InvokeResult GetByGroup(string group); + InvokeResult> GetAllByGroup(string group); + } +} diff --git a/ZeroLevel/Services/Network/Exchange.cs b/ZeroLevel/Services/Network/Exchange.cs index dd76d43..f308c7e 100644 --- a/ZeroLevel/Services/Network/Exchange.cs +++ b/ZeroLevel/Services/Network/Exchange.cs @@ -9,21 +9,41 @@ using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network { + public interface IExchange + : IClientSet, IDisposable + { + void UseDiscovery(); + void UseDiscovery(string endpoint); + void UseDiscovery(IPEndPoint endpoint); + + IRouter UseHost(); + IRouter UseHost(int port); + IRouter UseHost(IPEndPoint endpoint); + + IServiceRoutesStorage RoutesStorage { get; } + + ExClient GetConnection(string alias); + ExClient GetConnection(IPEndPoint endpoint); + } + /// /// Provides data exchange between services /// - public sealed class Exchange : - IClientSet, - IDisposable + internal sealed class Exchange : + IExchange { private IDiscoveryClient _discoveryClient = null; // Feature расширить до нескольких discovery private readonly ServiceRouteStorage _aliases = new ServiceRouteStorage(); private readonly ExClientServerCachee _cachee = new ExClientServerCachee(); - #region Ctor + public IServiceRoutesStorage RoutesStorage => _aliases; + private readonly IZeroService _owner; - public Exchange() + #region Ctor + + public Exchange(IZeroService owner) { + _owner = owner; } #endregion Ctor @@ -482,16 +502,15 @@ namespace ZeroLevel.Network } RegisterServicesInDiscovery(); _update_discovery_table_task = Sheduller.RemindEvery(_update_discovery_table_period, RegisterServicesInDiscovery); - _register_in_discovery_table_task = Sheduller.RemindEvery(_register_in_discovery_table_period, () => { }); + _register_in_discovery_table_task = Sheduller.RemindEvery(_register_in_discovery_table_period, UpdateServiceListFromDiscovery); } private void RegisterServicesInDiscovery() { - var services = _serverInstances. - Values. + var services = _cachee.ServerList. Select(s => { - var info = MessageSerializer.Copy(this._serviceInfo); + var info = MessageSerializer.Copy(_owner.ServiceInfo); info.Port = s.LocalEndpoint.Port; return info; }). @@ -501,8 +520,45 @@ namespace ZeroLevel.Network _discoveryClient.Register(service); } } + + private void UpdateServiceListFromDiscovery() + { + + } #endregion + public ExClient GetConnection(string alias) + { + var address = _aliases.Get(alias); + if (address.Success) + { + return _cachee.GetClient(address.Value, true); + } + try + { + var endpoint = NetUtils.CreateIPEndPoint(alias); + return _cachee.GetClient(endpoint, true); + } + catch (Exception ex) + { + Log.Error(ex, "[Exchange.GetConnection]"); + } + return null; + } + + public ExClient GetConnection(IPEndPoint endpoint) + { + try + { + return _cachee.GetClient(endpoint, true); + } + catch (Exception ex) + { + Log.Error(ex, "[Exchange.GetConnection]"); + } + return null; + } + #region Host service public IRouter UseHost() { @@ -521,7 +577,7 @@ namespace ZeroLevel.Network #endregion #region Private - internal IEnumerable GetClientEnumerator(string serviceKey) + private IEnumerable GetClientEnumerator(string serviceKey) { InvokeResult> candidates; try @@ -556,7 +612,7 @@ namespace ZeroLevel.Network } } - internal IEnumerable GetClientEnumeratorByType(string serviceType) + private IEnumerable GetClientEnumeratorByType(string serviceType) { InvokeResult> candidates; try @@ -591,7 +647,7 @@ namespace ZeroLevel.Network } } - internal IEnumerable GetClientEnumeratorByGroup(string serviceGroup) + private IEnumerable GetClientEnumeratorByGroup(string serviceGroup) { InvokeResult> candidates; try @@ -631,7 +687,7 @@ namespace ZeroLevel.Network /// Service key /// Service call code /// true - service called succesfully - internal bool CallService(string serviceKey, Func callHandler) + private bool CallService(string serviceKey, Func callHandler) { InvokeResult> candidates; try @@ -678,7 +734,7 @@ namespace ZeroLevel.Network return success; } - internal InvokeResult CallServiceDirect(string endpoint, Func callHandler) + private InvokeResult CallServiceDirect(string endpoint, Func callHandler) { ExClient transport; try diff --git a/ZeroLevel/Services/Network/ServiceRouteStorage.cs b/ZeroLevel/Services/Network/ServiceRouteStorage.cs index fa1ba98..89f2999 100644 --- a/ZeroLevel/Services/Network/ServiceRouteStorage.cs +++ b/ZeroLevel/Services/Network/ServiceRouteStorage.cs @@ -7,7 +7,7 @@ using ZeroLevel.Services.Collections; namespace ZeroLevel.Network { - /* + /* One IPEndpoint binded with one service. Service can have one key, one type, one group. Therefore IPEndpoint can be binded with one key, one type and one group. @@ -18,6 +18,7 @@ namespace ZeroLevel.Network */ public sealed class ServiceRouteStorage + : IServiceRoutesStorage { private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); diff --git a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs index cda5e3a..6a2d28c 100644 --- a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs +++ b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs @@ -13,6 +13,8 @@ namespace ZeroLevel.Network private readonly ConcurrentDictionary _serverInstances = new ConcurrentDictionary(); private HashSet _clients = new HashSet(); + internal IEnumerable ServerList => _serverInstances.Values; + public ExClient GetClient(IPEndPoint endpoint, bool use_cachee, IRouter router = null) { if (use_cachee)