From 55d2e76eedf036a2c88464688ec09efc62092302 Mon Sep 17 00:00:00 2001 From: "a.bozhenov" Date: Fri, 12 Jul 2019 16:40:41 +0300 Subject: [PATCH] fix --- TestApp/MyService.cs | 9 +- ZeroLevel/Services/Network/Exchange.cs | 114 ++++++++++-------- .../Network/FileTransfer/FileSender.cs | 1 + ZeroLevel/Services/Network/SocketClient.cs | 79 +++++++++++- .../Network/Utils/ExClientServerCachee.cs | 49 +++++--- 5 files changed, 177 insertions(+), 75 deletions(-) diff --git a/TestApp/MyService.cs b/TestApp/MyService.cs index 35f02dd..532866b 100644 --- a/TestApp/MyService.cs +++ b/TestApp/MyService.cs @@ -1,4 +1,5 @@ using System; +using System.Net; using System.Threading; using ZeroLevel; using ZeroLevel.Network; @@ -16,6 +17,7 @@ namespace TestApp protected override void StartAction() { + Log.Info("Started"); ReadServiceInfo(); @@ -31,16 +33,15 @@ namespace TestApp { try { - Exchange.GetConnection("test.app").Request("counter", s => - { - Interlocked.Add(ref counter, s); - }); + Exchange.GetConnection("test.app")?.Request("counter", s => Interlocked.Add(ref counter, s)); } catch { Thread.Sleep(300); } } + + /* diff --git a/ZeroLevel/Services/Network/Exchange.cs b/ZeroLevel/Services/Network/Exchange.cs index 6e8cf91..bedef10 100644 --- a/ZeroLevel/Services/Network/Exchange.cs +++ b/ZeroLevel/Services/Network/Exchange.cs @@ -516,26 +516,29 @@ namespace ZeroLevel.Network if (discovery_endpoint.Success) { var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true); - var services = _cachee.ServerList. - Select(s => - { - var info = MessageSerializer.Copy(_owner.ServiceInfo); - info.Port = s.LocalEndpoint.Port; - return info; - }). - ToList(); - foreach (var service in services) - { - var request = discoveryClient.Request("register", service, r => + if (discoveryClient != null) + { + var services = _cachee.ServerList. + Select(s => + { + var info = MessageSerializer.Copy(_owner.ServiceInfo); + info.Port = s.LocalEndpoint.Port; + return info; + }). + ToList(); + foreach (var service in services) { - if (!r.Success) + var request = discoveryClient.Request("register", service, r => { - Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled. {r.Comment}"); + if (!r.Success) + { + Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled. {r.Comment}"); + } + }); + if (request.Success == false) + { + Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled.{request.Comment}"); } - }); - if (request.Success == false) - { - Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled.{request.Comment}"); } } } @@ -547,55 +550,58 @@ namespace ZeroLevel.Network if (discovery_endpoint.Success) { var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true); - try + if (discoveryClient != null) { - var ir = discoveryClient.Request>("services", records => + try { - if (records == null) - { - Log.SystemWarning("[Exchange.UpdateServiceListFromDiscovery] UpdateServiceListInfo. Discrovery response is empty"); - return; - } - var endpoints = new HashSet(); - _dicovery_aliases.BeginUpdate(); - try + var ir = discoveryClient.Request>("services", records => { - foreach (var service in records) + if (records == null) { - endpoints.Clear(); - foreach (var ep in service.Endpoints) + Log.SystemWarning("[Exchange.UpdateServiceListFromDiscovery] UpdateServiceListInfo. Discrovery response is empty"); + return; + } + var endpoints = new HashSet(); + _dicovery_aliases.BeginUpdate(); + try + { + foreach (var service in records) { - try - { - var endpoint = NetUtils.CreateIPEndPoint(ep); - endpoints.Add(endpoint); - } - catch + endpoints.Clear(); + foreach (var ep in service.Endpoints) { - Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Can't parse address {ep} as IPEndPoint"); + try + { + var endpoint = NetUtils.CreateIPEndPoint(ep); + endpoints.Add(endpoint); + } + catch + { + Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Can't parse address {ep} as IPEndPoint"); + } } + _dicovery_aliases.Set(service.ServiceKey, + service.ServiceType, + service.ServiceGroup, + endpoints); } - _dicovery_aliases.Set(service.ServiceKey, - service.ServiceType, - service.ServiceGroup, - endpoints); + _dicovery_aliases.Commit(); } - _dicovery_aliases.Commit(); - } - catch + catch + { + _dicovery_aliases.Rollback(); + } + }); + if (!ir.Success) { - _dicovery_aliases.Rollback(); + Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Error request to inbox 'services'. {ir.Comment}"); } - }); - if (!ir.Success) + } + catch (Exception ex) { - Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Error request to inbox 'services'. {ir.Comment}"); + Log.SystemError(ex, "[Exchange.UpdateServiceListFromDiscovery] Discovery service response is absent"); } } - catch (Exception ex) - { - Log.SystemError(ex, "[Exchange.UpdateServiceListFromDiscovery] Discovery service response is absent"); - } } } #endregion @@ -777,6 +783,7 @@ namespace ZeroLevel.Network Log.SystemError(ex, $"[Exchange.GetClientEnumerator] Can't get transport for endpoint '{endpoint}'"); continue; } + if (transport == null) continue; yield return transport; } } @@ -812,6 +819,7 @@ namespace ZeroLevel.Network Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByType] Can't get transport for endpoint '{endpoint}'"); continue; } + if (transport == null) continue; yield return transport; } } @@ -847,6 +855,7 @@ namespace ZeroLevel.Network Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByGroup] Can't get transport for endpoint '{service}'"); continue; } + if (transport == null) continue; yield return transport; } } @@ -891,6 +900,7 @@ namespace ZeroLevel.Network Log.SystemError(ex, $"[Exchange.CallService] Can't get transport for service '{serviceKey}'"); continue; } + if (transport == null) continue; try { success = callHandler(transport); diff --git a/ZeroLevel/Services/Network/FileTransfer/FileSender.cs b/ZeroLevel/Services/Network/FileTransfer/FileSender.cs index 4b2af64..4dea1b7 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileSender.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileSender.cs @@ -27,6 +27,7 @@ namespace ZeroLevel.Network.FileTransfer public void Send(ExClient client, string fileName, Action completeHandler = null, Action errorHandler = null) { + if (client == null) return; PushTransferTask(client, fileName, completeHandler, errorHandler); } diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs index 014952e..7e99cc9 100644 --- a/ZeroLevel/Services/Network/SocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -21,7 +21,7 @@ namespace ZeroLevel.Network private long _last_rw_time = DateTime.UtcNow.Ticks; private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE]; private readonly object _reconnection_lock = new object(); - private readonly BlockingCollection _send_queue = new BlockingCollection(); + private BlockingCollection _send_queue = new BlockingCollection(); private readonly RequestBuffer _requests = new RequestBuffer(); private int _current_heartbeat_period_in_ms = 0; private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета @@ -318,10 +318,11 @@ namespace ZeroLevel.Network OnDisconnect(this); } } - + /* private void SendFramesJob() { SendInfo frame; + int unsuccess = 0; while (Status != SocketClientStatus.Disposed) { if (_send_queue.IsCompleted) @@ -340,6 +341,16 @@ namespace ZeroLevel.Network Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault"); } if (Status == SocketClientStatus.Disposed) return; + if (Status == SocketClientStatus.Broken) + { + unsuccess++; + if (unsuccess > 30) unsuccess = 30; + } + if (Status == SocketClientStatus.Working) + { + unsuccess = 0; + } + Thread.Sleep(unsuccess * 100); continue; } try @@ -360,6 +371,70 @@ namespace ZeroLevel.Network } } } + */ + private void SendFramesJob() + { + SendInfo frame = default(SendInfo); + int unsuccess = 0; + while (Status != SocketClientStatus.Disposed) + { + if (_send_queue.IsCompleted) + { + return; + } + try + { + frame = _send_queue.Take(); + } + catch (Exception ex) + { + Log.SystemError(ex, "[SocketClient.SendFramesJob] send_queue.Take"); + _send_queue.Dispose(); + _send_queue = new BlockingCollection(); + continue; + } + while (_stream.CanWrite == false || Status != SocketClientStatus.Working) + { + try + { + EnsureConnection(); + } + catch (Exception ex) + { + Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault"); + } + if (Status == SocketClientStatus.Disposed) + { + return; + } + if (Status == SocketClientStatus.Broken) + { + unsuccess++; + if (unsuccess > 30) unsuccess = 30; + } + if (Status == SocketClientStatus.Working) + { + unsuccess = 0; + } + Thread.Sleep(unsuccess * 128); + } + try + { + if (frame.isRequest) + { + _requests.StartSend(frame.identity); + } + _stream.Write(frame.data, 0, frame.data.Length); + _last_rw_time = DateTime.UtcNow.Ticks; + } + catch (Exception ex) + { + Log.SystemError(ex, $"[SocketClient.SendFramesJob] _stream.Write"); + Broken(); + OnDisconnect(this); + } + } + } #endregion diff --git a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs index e9982af..ac183c8 100644 --- a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs +++ b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs @@ -9,6 +9,7 @@ namespace ZeroLevel.Network : IDisposable { private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); + private static readonly ConcurrentDictionary _clientLocks = new ConcurrentDictionary(); private readonly ConcurrentDictionary _serverInstances = new ConcurrentDictionary(); @@ -19,26 +20,40 @@ namespace ZeroLevel.Network if (use_cachee) { string key = $"{endpoint.Address}:{endpoint.Port}"; - ExClient instance = null; - if (_clientInstances.ContainsKey(key)) + if (false == _clientLocks.ContainsKey(key)) { - instance = _clientInstances[key]; - if (instance.Status == SocketClientStatus.Working) - { - return instance; - } - _clientInstances.TryRemove(key, out instance); - instance.Dispose(); - instance = null; + _clientLocks.TryAdd(key, new object()); } - instance = new ExClient(new SocketClient(endpoint, router ?? new Router())); - instance.ForceConnect(); - if (instance.Status == SocketClientStatus.Initialized - || instance.Status == SocketClientStatus.Working) + lock (_clientLocks[key]) { - _clientInstances[key] = instance; - instance.Socket.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS)); - return instance; + try + { + ExClient instance = null; + if (_clientInstances.ContainsKey(key)) + { + instance = _clientInstances[key]; + if (instance.Status == SocketClientStatus.Working) + { + return instance; + } + _clientInstances.TryRemove(key, out instance); + instance.Dispose(); + instance = null; + } + instance = new ExClient(new SocketClient(endpoint, router ?? new Router())); + instance.ForceConnect(); + if (instance.Status == SocketClientStatus.Initialized + || instance.Status == SocketClientStatus.Working) + { + _clientInstances[key] = instance; + instance.Socket.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS)); + return instance; + } + } + catch (Exception ex) + { + Log.SystemError(ex, $"[ExClientServerCachee.GetClient] Can't create ExClient for {key}"); + } } } else