diff --git a/TestPipeLine/Consumer/ConsumerService.cs b/TestPipeLine/Consumer/ConsumerService.cs index 92c9576..f8a0614 100644 --- a/TestPipeLine/Consumer/ConsumerService.cs +++ b/TestPipeLine/Consumer/ConsumerService.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using ZeroLevel; using ZeroLevel.Network; using ZeroLevel.Services.Applications; @@ -12,6 +13,12 @@ namespace Consumer { ReadServiceInfo(); AutoregisterInboxes(UseHost()); + + Sheduller.RemindEvery(TimeSpan.FromSeconds(1), () => + { + Console.SetCursorPosition(0, 0); + Console.WriteLine(_proceed); + }); } protected override void StopAction() diff --git a/TestPipeLine/Processor/ProcessorService.cs b/TestPipeLine/Processor/ProcessorService.cs index 4aa0fd1..7e22532 100644 --- a/TestPipeLine/Processor/ProcessorService.cs +++ b/TestPipeLine/Processor/ProcessorService.cs @@ -13,6 +13,12 @@ namespace Processor { ReadServiceInfo(); AutoregisterInboxes(UseHost()); + + Sheduller.RemindEvery(TimeSpan.FromSeconds(1), () => + { + Console.SetCursorPosition(0, 0); + Console.WriteLine(_proceed); + }); } protected override void StopAction() diff --git a/TestPipeLine/Source/SourceService.cs b/TestPipeLine/Source/SourceService.cs index 9c9befb..692bc4e 100644 --- a/TestPipeLine/Source/SourceService.cs +++ b/TestPipeLine/Source/SourceService.cs @@ -14,7 +14,7 @@ namespace Source ReadServiceInfo(); AutoregisterInboxes(UseHost()); - Sheduller.RemindEvery(TimeSpan.FromMilliseconds(10), () => + Sheduller.RemindEvery(TimeSpan.FromMilliseconds(100), () => { if (Exchange.Send("test.processor", "handle", Environment.TickCount)) { diff --git a/TestPipeLine/Watcher/WatcherService.cs b/TestPipeLine/Watcher/WatcherService.cs index cc01b2c..29bfbd3 100644 --- a/TestPipeLine/Watcher/WatcherService.cs +++ b/TestPipeLine/Watcher/WatcherService.cs @@ -22,6 +22,11 @@ namespace Watcher var success = Exchange.RequestBroadcastByGroup("Test", "meta", records => { + if (records.Any() == false) + { + Log.Info("No services"); + } + foreach (var record in records.OrderBy(r=>r.Name)) { sb.Append(record.Name); diff --git a/ZeroLevel.Discovery/RouteTable.cs b/ZeroLevel.Discovery/RouteTable.cs index a412cd0..56b299a 100644 --- a/ZeroLevel.Discovery/RouteTable.cs +++ b/ZeroLevel.Discovery/RouteTable.cs @@ -127,6 +127,7 @@ namespace ZeroLevel.Discovery foreach (var ep in pair.Value) { _table[pair.Key].Endpoints.Remove(ep); + Log.Debug($"Removed address {ep}"); } } var badKeys = _table.Where(f => f.Value.Endpoints.Count == 0) @@ -135,6 +136,7 @@ namespace ZeroLevel.Discovery foreach (var badKey in badKeys) { _table.Remove(badKey); + Log.Debug($"Removed service {badKey}"); } } finally diff --git a/ZeroLevel/Services/BaseZeroService.cs b/ZeroLevel/Services/BaseZeroService.cs index 4496382..340eff2 100644 --- a/ZeroLevel/Services/BaseZeroService.cs +++ b/ZeroLevel/Services/BaseZeroService.cs @@ -343,13 +343,14 @@ namespace ZeroLevel.Services.Applications { try { - StartAction(); _state = ZeroServiceStatus.Running; + StartAction(); Log.Debug($"[{Name}] Service started"); } catch (Exception ex) { Log.Fatal(ex, $"[{Name}] Failed to start service"); + Stop(); } } } diff --git a/ZeroLevel/Services/Bootstrap.cs b/ZeroLevel/Services/Bootstrap.cs index b5540d5..991b6ad 100644 --- a/ZeroLevel/Services/Bootstrap.cs +++ b/ZeroLevel/Services/Bootstrap.cs @@ -42,7 +42,11 @@ namespace ZeroLevel public BootstrapFluent EnableConsoleLog(LogLevel level = LogLevel.FullStandart) { Log.AddConsoleLogger(level); return this; } public ZeroServiceStatus Status { get { return _service.Status; } } - public IServiceExecution Run() { _service.Start(); return this; } + public IServiceExecution Run() + { + _service.Start(); + return this; + } public IServiceExecution Stop() { try diff --git a/ZeroLevel/Services/Network/Exchange.cs b/ZeroLevel/Services/Network/Exchange.cs index 9881ae0..6e8cf91 100644 --- a/ZeroLevel/Services/Network/Exchange.cs +++ b/ZeroLevel/Services/Network/Exchange.cs @@ -27,7 +27,6 @@ namespace ZeroLevel.Network public Exchange(IZeroService owner) { _owner = owner; - _cachee.OnBrokenConnection += _cachee_OnBrokenConnection; } #endregion Ctor @@ -920,7 +919,7 @@ namespace ZeroLevel.Network { try { - if (false == client.Request(inbox, data, resp => { waiter.Signal(); response.Add(resp); }).Success) + if (false == client.Request(inbox, data, resp => { response.Add(resp); waiter.Signal(); }).Success) { waiter.Signal(); } @@ -948,7 +947,7 @@ namespace ZeroLevel.Network { try { - if (false == client.Request(inbox, resp => { waiter.Signal(); response.Add(resp); }).Success) + if (false == client.Request(inbox, resp => { response.Add(resp); waiter.Signal(); }).Success) { waiter.Signal(); } @@ -964,11 +963,6 @@ namespace ZeroLevel.Network } return response; } - - private void _cachee_OnBrokenConnection(IPEndPoint endpoint) - { - //_aliases.Remove(endpoint); ??? no need - } #endregion public void Dispose() diff --git a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs index f2a6ec1..cc24412 100644 --- a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs +++ b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs @@ -8,16 +8,6 @@ namespace ZeroLevel.Network internal sealed class ExClientServerCachee : IDisposable { - internal event Action OnBrokenConnection; - private void RiseBrokenConnectionEvent(IPEndPoint endpoint) - { - var e = OnBrokenConnection; - if (e != null) - { - e.Invoke(endpoint); - } - } - private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); private readonly ConcurrentDictionary _serverInstances = new ConcurrentDictionary(); @@ -44,12 +34,8 @@ namespace ZeroLevel.Network } instance = new ExClient(new SocketClient(endpoint, router ?? new Router())); instance.ForceConnect(); - if (instance.Status != SocketClientStatus.Initialized && - instance.Status != SocketClientStatus.Working) - { - OnBrokenConnection(endpoint); - } - else + if (instance.Status == SocketClientStatus.Initialized + ||instance.Status == SocketClientStatus.Working) { _clientInstances[key] = instance; return instance; @@ -58,12 +44,8 @@ namespace ZeroLevel.Network else { var instance = new ExClient(new SocketClient(endpoint, router ?? new Router())); - if (instance.Status != SocketClientStatus.Initialized && - instance.Status != SocketClientStatus.Working) - { - OnBrokenConnection(endpoint); - } - else + if (instance.Status == SocketClientStatus.Initialized + || instance.Status == SocketClientStatus.Working) { return instance; }