pull/1/head
a.bozhenov 5 years ago
parent 902ae03885
commit 55d2e76eed

@ -1,4 +1,5 @@
using System; using System;
using System.Net;
using System.Threading; using System.Threading;
using ZeroLevel; using ZeroLevel;
using ZeroLevel.Network; using ZeroLevel.Network;
@ -16,6 +17,7 @@ namespace TestApp
protected override void StartAction() protected override void StartAction()
{ {
Log.Info("Started"); Log.Info("Started");
ReadServiceInfo(); ReadServiceInfo();
@ -31,16 +33,15 @@ namespace TestApp
{ {
try try
{ {
Exchange.GetConnection("test.app").Request<int>("counter", s => Exchange.GetConnection("test.app")?.Request<int>("counter", s => Interlocked.Add(ref counter, s));
{
Interlocked.Add(ref counter, s);
});
} }
catch catch
{ {
Thread.Sleep(300); Thread.Sleep(300);
} }
} }
/* /*

@ -516,26 +516,29 @@ namespace ZeroLevel.Network
if (discovery_endpoint.Success) if (discovery_endpoint.Success)
{ {
var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true); var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true);
var services = _cachee.ServerList. if (discoveryClient != null)
Select(s => {
{ var services = _cachee.ServerList.
var info = MessageSerializer.Copy(_owner.ServiceInfo); Select(s =>
info.Port = s.LocalEndpoint.Port; {
return info; var info = MessageSerializer.Copy(_owner.ServiceInfo);
}). info.Port = s.LocalEndpoint.Port;
ToList(); return info;
foreach (var service in services) }).
{ ToList();
var request = discoveryClient.Request<ZeroServiceInfo, InvokeResult>("register", service, r => foreach (var service in services)
{ {
if (!r.Success) var request = discoveryClient.Request<ZeroServiceInfo, InvokeResult>("register", service, r =>
{ {
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled. {r.Comment}"); if (!r.Success)
{
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled. {r.Comment}");
}
});
if (request.Success == false)
{
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled.{request.Comment}");
} }
});
if (request.Success == false)
{
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled.{request.Comment}");
} }
} }
} }
@ -547,55 +550,58 @@ namespace ZeroLevel.Network
if (discovery_endpoint.Success) if (discovery_endpoint.Success)
{ {
var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true); var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true);
try if (discoveryClient != null)
{ {
var ir = discoveryClient.Request<IEnumerable<ServiceEndpointsInfo>>("services", records => try
{ {
if (records == null) var ir = discoveryClient.Request<IEnumerable<ServiceEndpointsInfo>>("services", records =>
{
Log.SystemWarning("[Exchange.UpdateServiceListFromDiscovery] UpdateServiceListInfo. Discrovery response is empty");
return;
}
var endpoints = new HashSet<IPEndPoint>();
_dicovery_aliases.BeginUpdate();
try
{ {
foreach (var service in records) if (records == null)
{ {
endpoints.Clear(); Log.SystemWarning("[Exchange.UpdateServiceListFromDiscovery] UpdateServiceListInfo. Discrovery response is empty");
foreach (var ep in service.Endpoints) return;
}
var endpoints = new HashSet<IPEndPoint>();
_dicovery_aliases.BeginUpdate();
try
{
foreach (var service in records)
{ {
try endpoints.Clear();
{ foreach (var ep in service.Endpoints)
var endpoint = NetUtils.CreateIPEndPoint(ep);
endpoints.Add(endpoint);
}
catch
{ {
Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Can't parse address {ep} as IPEndPoint"); try
{
var endpoint = NetUtils.CreateIPEndPoint(ep);
endpoints.Add(endpoint);
}
catch
{
Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Can't parse address {ep} as IPEndPoint");
}
} }
_dicovery_aliases.Set(service.ServiceKey,
service.ServiceType,
service.ServiceGroup,
endpoints);
} }
_dicovery_aliases.Set(service.ServiceKey, _dicovery_aliases.Commit();
service.ServiceType,
service.ServiceGroup,
endpoints);
} }
_dicovery_aliases.Commit(); catch
} {
catch _dicovery_aliases.Rollback();
}
});
if (!ir.Success)
{ {
_dicovery_aliases.Rollback(); Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Error request to inbox 'services'. {ir.Comment}");
} }
}); }
if (!ir.Success) catch (Exception ex)
{ {
Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Error request to inbox 'services'. {ir.Comment}"); Log.SystemError(ex, "[Exchange.UpdateServiceListFromDiscovery] Discovery service response is absent");
} }
} }
catch (Exception ex)
{
Log.SystemError(ex, "[Exchange.UpdateServiceListFromDiscovery] Discovery service response is absent");
}
} }
} }
#endregion #endregion
@ -777,6 +783,7 @@ namespace ZeroLevel.Network
Log.SystemError(ex, $"[Exchange.GetClientEnumerator] Can't get transport for endpoint '{endpoint}'"); Log.SystemError(ex, $"[Exchange.GetClientEnumerator] Can't get transport for endpoint '{endpoint}'");
continue; continue;
} }
if (transport == null) continue;
yield return transport; yield return transport;
} }
} }
@ -812,6 +819,7 @@ namespace ZeroLevel.Network
Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByType] Can't get transport for endpoint '{endpoint}'"); Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByType] Can't get transport for endpoint '{endpoint}'");
continue; continue;
} }
if (transport == null) continue;
yield return transport; yield return transport;
} }
} }
@ -847,6 +855,7 @@ namespace ZeroLevel.Network
Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByGroup] Can't get transport for endpoint '{service}'"); Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByGroup] Can't get transport for endpoint '{service}'");
continue; continue;
} }
if (transport == null) continue;
yield return transport; yield return transport;
} }
} }
@ -891,6 +900,7 @@ namespace ZeroLevel.Network
Log.SystemError(ex, $"[Exchange.CallService] Can't get transport for service '{serviceKey}'"); Log.SystemError(ex, $"[Exchange.CallService] Can't get transport for service '{serviceKey}'");
continue; continue;
} }
if (transport == null) continue;
try try
{ {
success = callHandler(transport); success = callHandler(transport);

@ -27,6 +27,7 @@ namespace ZeroLevel.Network.FileTransfer
public void Send(ExClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null) public void Send(ExClient client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
{ {
if (client == null) return;
PushTransferTask(client, fileName, completeHandler, errorHandler); PushTransferTask(client, fileName, completeHandler, errorHandler);
} }

@ -21,7 +21,7 @@ namespace ZeroLevel.Network
private long _last_rw_time = DateTime.UtcNow.Ticks; private long _last_rw_time = DateTime.UtcNow.Ticks;
private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE]; private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
private readonly object _reconnection_lock = new object(); private readonly object _reconnection_lock = new object();
private readonly BlockingCollection<SendInfo> _send_queue = new BlockingCollection<SendInfo>(); private BlockingCollection<SendInfo> _send_queue = new BlockingCollection<SendInfo>();
private readonly RequestBuffer _requests = new RequestBuffer(); private readonly RequestBuffer _requests = new RequestBuffer();
private int _current_heartbeat_period_in_ms = 0; private int _current_heartbeat_period_in_ms = 0;
private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета
@ -318,10 +318,11 @@ namespace ZeroLevel.Network
OnDisconnect(this); OnDisconnect(this);
} }
} }
/*
private void SendFramesJob() private void SendFramesJob()
{ {
SendInfo frame; SendInfo frame;
int unsuccess = 0;
while (Status != SocketClientStatus.Disposed) while (Status != SocketClientStatus.Disposed)
{ {
if (_send_queue.IsCompleted) if (_send_queue.IsCompleted)
@ -340,6 +341,16 @@ namespace ZeroLevel.Network
Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault"); Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault");
} }
if (Status == SocketClientStatus.Disposed) return; if (Status == SocketClientStatus.Disposed) return;
if (Status == SocketClientStatus.Broken)
{
unsuccess++;
if (unsuccess > 30) unsuccess = 30;
}
if (Status == SocketClientStatus.Working)
{
unsuccess = 0;
}
Thread.Sleep(unsuccess * 100);
continue; continue;
} }
try try
@ -360,6 +371,70 @@ namespace ZeroLevel.Network
} }
} }
} }
*/
private void SendFramesJob()
{
SendInfo frame = default(SendInfo);
int unsuccess = 0;
while (Status != SocketClientStatus.Disposed)
{
if (_send_queue.IsCompleted)
{
return;
}
try
{
frame = _send_queue.Take();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.SendFramesJob] send_queue.Take");
_send_queue.Dispose();
_send_queue = new BlockingCollection<SendInfo>();
continue;
}
while (_stream.CanWrite == false || Status != SocketClientStatus.Working)
{
try
{
EnsureConnection();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault");
}
if (Status == SocketClientStatus.Disposed)
{
return;
}
if (Status == SocketClientStatus.Broken)
{
unsuccess++;
if (unsuccess > 30) unsuccess = 30;
}
if (Status == SocketClientStatus.Working)
{
unsuccess = 0;
}
Thread.Sleep(unsuccess * 128);
}
try
{
if (frame.isRequest)
{
_requests.StartSend(frame.identity);
}
_stream.Write(frame.data, 0, frame.data.Length);
_last_rw_time = DateTime.UtcNow.Ticks;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[SocketClient.SendFramesJob] _stream.Write");
Broken();
OnDisconnect(this);
}
}
}
#endregion #endregion

@ -9,6 +9,7 @@ namespace ZeroLevel.Network
: IDisposable : IDisposable
{ {
private static readonly ConcurrentDictionary<string, ExClient> _clientInstances = new ConcurrentDictionary<string, ExClient>(); private static readonly ConcurrentDictionary<string, ExClient> _clientInstances = new ConcurrentDictionary<string, ExClient>();
private static readonly ConcurrentDictionary<string, object> _clientLocks = new ConcurrentDictionary<string, object>();
private readonly ConcurrentDictionary<string, SocketServer> _serverInstances = new ConcurrentDictionary<string, SocketServer>(); private readonly ConcurrentDictionary<string, SocketServer> _serverInstances = new ConcurrentDictionary<string, SocketServer>();
@ -19,26 +20,40 @@ namespace ZeroLevel.Network
if (use_cachee) if (use_cachee)
{ {
string key = $"{endpoint.Address}:{endpoint.Port}"; string key = $"{endpoint.Address}:{endpoint.Port}";
ExClient instance = null; if (false == _clientLocks.ContainsKey(key))
if (_clientInstances.ContainsKey(key))
{ {
instance = _clientInstances[key]; _clientLocks.TryAdd(key, new object());
if (instance.Status == SocketClientStatus.Working)
{
return instance;
}
_clientInstances.TryRemove(key, out instance);
instance.Dispose();
instance = null;
} }
instance = new ExClient(new SocketClient(endpoint, router ?? new Router())); lock (_clientLocks[key])
instance.ForceConnect();
if (instance.Status == SocketClientStatus.Initialized
|| instance.Status == SocketClientStatus.Working)
{ {
_clientInstances[key] = instance; try
instance.Socket.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS)); {
return instance; ExClient instance = null;
if (_clientInstances.ContainsKey(key))
{
instance = _clientInstances[key];
if (instance.Status == SocketClientStatus.Working)
{
return instance;
}
_clientInstances.TryRemove(key, out instance);
instance.Dispose();
instance = null;
}
instance = new ExClient(new SocketClient(endpoint, router ?? new Router()));
instance.ForceConnect();
if (instance.Status == SocketClientStatus.Initialized
|| instance.Status == SocketClientStatus.Working)
{
_clientInstances[key] = instance;
instance.Socket.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS));
return instance;
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExClientServerCachee.GetClient] Can't create ExClient for {key}");
}
} }
} }
else else

Loading…
Cancel
Save

Powered by TurnKey Linux.