Fix network

pull/1/head
Ogoun 6 years ago
parent fc03d49fb4
commit 2a021162a6

@ -90,24 +90,6 @@ namespace ZeroLevel.Discovery
} }
#endregion Snapshot #endregion Snapshot
private bool Ping(string endpoint, string msg)
{
try
{
using (var client = ExchangeTransportFactory.GetClient(endpoint))
{
client.ForceConnect();
return client.Status == ZTransportStatus.Working;
}
}
catch (Exception ex)
{
Log.Error(ex, $"[RouteTable] Fault ping endpoint {endpoint}");
return false;
}
}
private void Heartbeat(long taskid) private void Heartbeat(long taskid)
{ {
try try
@ -121,7 +103,7 @@ namespace ZeroLevel.Discovery
var endpointsToRemove = new List<string>(); var endpointsToRemove = new List<string>();
foreach (var e in pair.Value.Endpoints) foreach (var e in pair.Value.Endpoints)
{ {
if (Ping(e, "HELLO") == false) if (NetUtils.TestConnection(NetUtils.CreateIPEndPoint(e)) == false)
{ {
if (false == removeEntities.ContainsKey(pair.Key)) if (false == removeEntities.ContainsKey(pair.Key))
{ {
@ -169,8 +151,9 @@ namespace ZeroLevel.Discovery
public InvokeResult Append(ExServiceInfo serviceInfo, IZBackward client) public InvokeResult Append(ExServiceInfo serviceInfo, IZBackward client)
{ {
InvokeResult result = null; InvokeResult result = null;
var endpoint = $"{client.Endpoint.Address}:{client.Endpoint.Port}"; var endpoint = $"{client.Endpoint.Address}:{serviceInfo.Port}";
if (Ping(endpoint, serviceInfo.ServiceKey)) Log.Info($"Regiter request from {endpoint}. Service {serviceInfo?.ServiceKey}");
if (NetUtils.TestConnection(NetUtils.CreateIPEndPoint(endpoint)))
{ {
var key = $"{serviceInfo.ServiceGroup}:{serviceInfo.ServiceType}:{serviceInfo.ServiceKey.Trim().ToLowerInvariant()}"; var key = $"{serviceInfo.ServiceGroup}:{serviceInfo.ServiceType}:{serviceInfo.ServiceKey.Trim().ToLowerInvariant()}";
_lock.EnterWriteLock(); _lock.EnterWriteLock();
@ -201,7 +184,7 @@ namespace ZeroLevel.Discovery
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "Fault append service ({0} {1}) endpoint '{2}'", serviceInfo.ServiceKey, serviceInfo.Version, endpoint); Log.Error(ex, $"Fault append service ({serviceInfo.ServiceKey} {serviceInfo.Version}) endpoint '{endpoint}'");
result = InvokeResult.Fault(ex.Message); result = InvokeResult.Fault(ex.Message);
} }
finally finally

@ -35,12 +35,17 @@ namespace ZeroLevel.Network
/// </summary> /// </summary>
[DataMember] [DataMember]
public string Version { get; set; } public string Version { get; set; }
/// <summary>
/// Service port
/// </summary>
[DataMember]
public int Port { get; set; }
public bool Equals(ExServiceInfo other) public bool Equals(ExServiceInfo other)
{ {
if (other == null) return false; if (other == null) return false;
if (object.ReferenceEquals(this, other)) return true; if (object.ReferenceEquals(this, other)) return true;
if (this.Port != other.Port) return false;
if (string.Compare(this.ServiceKey, other.ServiceKey, true) != 0) return false; if (string.Compare(this.ServiceKey, other.ServiceKey, true) != 0) return false;
if (string.Compare(this.ServiceGroup, other.ServiceGroup, true) != 0) return false; if (string.Compare(this.ServiceGroup, other.ServiceGroup, true) != 0) return false;
if (string.Compare(this.ServiceType, other.ServiceType, true) != 0) return false; if (string.Compare(this.ServiceType, other.ServiceType, true) != 0) return false;
@ -60,6 +65,7 @@ namespace ZeroLevel.Network
public void Serialize(IBinaryWriter writer) public void Serialize(IBinaryWriter writer)
{ {
writer.WriteInt32(this.Port);
writer.WriteString(this.ServiceKey); writer.WriteString(this.ServiceKey);
writer.WriteString(this.ServiceGroup); writer.WriteString(this.ServiceGroup);
writer.WriteString(this.ServiceType); writer.WriteString(this.ServiceType);
@ -68,6 +74,7 @@ namespace ZeroLevel.Network
public void Deserialize(IBinaryReader reader) public void Deserialize(IBinaryReader reader)
{ {
this.Port = reader.ReadInt32();
this.ServiceKey = reader.ReadString(); this.ServiceKey = reader.ReadString();
this.ServiceGroup = reader.ReadString(); this.ServiceGroup = reader.ReadString();
this.ServiceType = reader.ReadString(); this.ServiceType = reader.ReadString();

@ -8,6 +8,34 @@ namespace ZeroLevel.Network
{ {
public static class NetUtils public static class NetUtils
{ {
public static bool TestConnection(IPEndPoint endpoint)
{
using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
{
socket.SetIPProtectionLevel(IPProtectionLevel.Unrestricted);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, false);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoChecksum, true);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseUnicastPort, true);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, 100);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, 100);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontRoute, true);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ExclusiveAddressUse, false);
try
{
socket.Connect(endpoint);
socket.Close();
return true;
}
catch
{
}
}
return false;
}
public static int Compare(this IPEndPoint x, IPEndPoint y) public static int Compare(this IPEndPoint x, IPEndPoint y)
{ {
var xx = x.Address.ToString(); var xx = x.Address.ToString();

@ -27,7 +27,7 @@ namespace ZeroLevel.Network
public ExServiceHost(IDiscoveryClient client) public ExServiceHost(IDiscoveryClient client)
{ {
_discoveryClient = client; _discoveryClient = client;
_registerTaskKey = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(15), RegisterServicesInDiscovery); _registerTaskKey = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(55), RegisterServicesInDiscovery);
} }
public IExService RegisterService(IExchangeService service) public IExService RegisterService(IExchangeService service)
@ -47,6 +47,7 @@ namespace ZeroLevel.Network
Server = server, Server = server,
ServiceInfo = new ExServiceInfo ServiceInfo = new ExServiceInfo
{ {
Port = server.Endpoint.Port,
ServiceKey = service.Key, ServiceKey = service.Key,
Version = service.Version, Version = service.Version,
ServiceGroup = service.Group, ServiceGroup = service.Group,
@ -88,6 +89,7 @@ namespace ZeroLevel.Network
Server = server, Server = server,
ServiceInfo = new ExServiceInfo ServiceInfo = new ExServiceInfo
{ {
Port = server.Endpoint.Port,
ServiceKey = serviceInfo.ServiceKey, ServiceKey = serviceInfo.ServiceKey,
Version = serviceInfo.Version, Version = serviceInfo.Version,
ServiceGroup = serviceInfo.ServiceGroup, ServiceGroup = serviceInfo.ServiceGroup,
@ -329,7 +331,7 @@ namespace ZeroLevel.Network
IExClient transport; IExClient transport;
try try
{ {
transport = ExchangeTransportFactory.GetClient(service.Endpoint); transport = ExchangeTransportFactory.GetClientWithCache(service.Endpoint);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -374,7 +376,7 @@ namespace ZeroLevel.Network
IExClient transport; IExClient transport;
try try
{ {
transport = ExchangeTransportFactory.GetClient(candidate.Endpoint); transport = ExchangeTransportFactory.GetClientWithCache(candidate.Endpoint);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -405,7 +407,7 @@ namespace ZeroLevel.Network
IExClient transport; IExClient transport;
try try
{ {
transport = ExchangeTransportFactory.GetClient(service.Endpoint); transport = ExchangeTransportFactory.GetClientWithCache(service.Endpoint);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -443,7 +445,7 @@ namespace ZeroLevel.Network
IExClient transport; IExClient transport;
try try
{ {
transport = ExchangeTransportFactory.GetClient(service.Endpoint); transport = ExchangeTransportFactory.GetClientWithCache(service.Endpoint);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -481,7 +483,7 @@ namespace ZeroLevel.Network
IExClient transport; IExClient transport;
try try
{ {
transport = ExchangeTransportFactory.GetClient(service.Endpoint); transport = ExchangeTransportFactory.GetClientWithCache(service.Endpoint);
} }
catch (Exception ex) catch (Exception ex)
{ {

@ -132,6 +132,8 @@ namespace ZeroLevel.Network
_heartbeat_task = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat); _heartbeat_task = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat);
Working(); Working();
_serverSocket.BeginAccept(BeginAcceptCallback, null); _serverSocket.BeginAccept(BeginAcceptCallback, null);
Sheduller.RemindEvery(TimeSpan.FromSeconds(5), () => Log.Info($"Connections: {ConnectionList.Count()}"));
} }
protected abstract void Handle(Frame frame, IZBackward client); protected abstract void Handle(Frame frame, IZBackward client);

@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading; using System.Threading;
@ -20,8 +19,7 @@ namespace ZeroLevel.Network
private readonly BlockingCollection<byte[]> _send_queue = new BlockingCollection<byte[]>(); private readonly BlockingCollection<byte[]> _send_queue = new BlockingCollection<byte[]>();
private Thread _sendThread; private Thread _sendThread;
private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE]; private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
private long _last_rw_time = DateTime.UtcNow.Ticks; internal long LastNetworkActionTimestamp { get; private set; } = DateTime.UtcNow.Ticks;
internal long LastNetworkActionTimestamp => _last_rw_time;
public event Action<ZSocketServerClient> OnConnectionBroken = (c) => { }; public event Action<ZSocketServerClient> OnConnectionBroken = (c) => { };
@ -89,7 +87,7 @@ namespace ZeroLevel.Network
_stream.Write(data, 0, data.Length); _stream.Write(data, 0, data.Length);
_stream.Flush(); _stream.Flush();
//Thread.Sleep(1); //Thread.Sleep(1);
_last_rw_time = DateTime.UtcNow.Ticks; LastNetworkActionTimestamp = DateTime.UtcNow.Ticks;
//NetworkStats.Send(data); //NetworkStats.Send(data);
} }
} }
@ -110,7 +108,7 @@ namespace ZeroLevel.Network
if (count > 0) if (count > 0)
{ {
_parser.Push(_buffer, 0, count); _parser.Push(_buffer, 0, count);
_last_rw_time = DateTime.UtcNow.Ticks; LastNetworkActionTimestamp = DateTime.UtcNow.Ticks;
} }
if (Status == ZTransportStatus.Working) if (Status == ZTransportStatus.Working)
{ {

Loading…
Cancel
Save

Powered by TurnKey Linux.