discovery refactoring

pull/1/head
a.bozhenov 5 years ago
parent fadb9689af
commit 1d24a48ac2

@ -3,6 +3,7 @@ using System.Net;
using System.Threading;
using ZeroLevel;
using ZeroLevel.Network;
using ZeroLevel.Network.SDL;
using ZeroLevel.Services.Applications;
namespace TestApp
@ -17,6 +18,13 @@ namespace TestApp
protected override void StartAction()
{
var client = Exchange.GetConnection("192.168.51.104:50223");
client?.Request<ServiceDescription>("__service_description__", record =>
{
Log.Info(record.ServiceInfo.ServiceKey);
});
return;
Log.Info("Started");
ReadServiceInfo();
var host = UseHost(8800);

@ -1,4 +1,5 @@
using ZeroLevel.Models;
using System.Linq;
using ZeroLevel.Models;
using ZeroLevel.Network;
using ZeroLevel.Services.Applications;
@ -8,6 +9,7 @@ namespace ZeroLevel.Discovery
: BaseZeroService
{
private IRouter _exInbox;
private ServiceEndpointsTable _table;
public DiscoveryService()
: base("Discovery")
@ -16,12 +18,11 @@ namespace ZeroLevel.Discovery
protected override void StartAction()
{
var routeTable = new RouteTable();
Injector.Default.Register<RouteTable>(routeTable);
_table = new ServiceEndpointsTable();
var servicePort = Configuration.Default.First<int>("port");
_exInbox = UseHost(servicePort);
_exInbox.RegisterInbox("services", (_) => routeTable.Get());
_exInbox.RegisterInbox<ZeroServiceInfo, InvokeResult>("register", (client, info) => routeTable.Append(info, client));
_exInbox.RegisterInbox("services", (_) => _table.GetRoutingTable().ToList());
_exInbox.RegisterInbox<ServiceRegisterInfo, InvokeResult>("register", (client, info) => _table.AppendOrUpdate(info, client));
}
protected override void StopAction()

@ -1,225 +0,0 @@
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using ZeroLevel.Models;
using ZeroLevel.Network;
namespace ZeroLevel.Discovery
{
public class RouteTable
: IDisposable
{
private readonly Dictionary<string, ServiceEndpointsInfo> _table = new Dictionary<string, ServiceEndpointsInfo>();
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
public RouteTable()
{
Load();
Sheduller.RemindEvery(TimeSpan.FromSeconds(10), Heartbeat);
}
#region Snapshot
private static readonly object _snapshot_lock = new object();
private void Save()
{
string snapshot;
_lock.EnterReadLock();
try
{
snapshot = JsonConvert.SerializeObject(_table);
}
catch (Exception ex)
{
Log.Error(ex, "Fault make snapshot");
return;
}
finally
{
_lock.ExitReadLock();
}
try
{
var snapshot_path = Path.Combine(Configuration.BaseDirectory, "snapshot.snp");
lock (_snapshot_lock)
{
File.WriteAllText(snapshot_path, snapshot);
}
}
catch (Exception ex)
{
Log.Error(ex, "Fault save shapshot");
}
}
private void Load()
{
try
{
var path = Path.Combine(Configuration.BaseDirectory, "snapshot.snp");
if (File.Exists(path))
{
var snapshot = File.ReadAllText(path);
if (string.IsNullOrWhiteSpace(snapshot) == false)
{
var restored = JsonConvert.DeserializeObject<Dictionary<string, ServiceEndpointsInfo>>(snapshot);
_lock.EnterWriteLock();
try
{
_table.Clear();
foreach (var r in restored)
{
_table.Add(r.Key, r.Value);
}
}
finally
{
_lock.ExitWriteLock();
}
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Fault load snapshot");
}
}
#endregion Snapshot
private void Heartbeat(long taskid)
{
try
{
var removeEntities = new Dictionary<string, List<string>>();
_lock.EnterReadLock();
try
{
foreach (var pair in _table)
{
var endpointsToRemove = new List<string>();
foreach (var e in pair.Value.Endpoints)
{
if (NetUtils.TestConnection(NetUtils.CreateIPEndPoint(e)) == false)
{
if (false == removeEntities.ContainsKey(pair.Key))
{
removeEntities.Add(pair.Key, new List<string>());
}
removeEntities[pair.Key].Add(e);
}
}
}
}
finally
{
_lock.ExitReadLock();
}
_lock.EnterWriteLock();
try
{
foreach (var pair in removeEntities)
{
foreach (var ep in pair.Value)
{
_table[pair.Key].Endpoints.Remove(ep);
Log.Debug($"Removed address {ep}");
}
}
var badKeys = _table.Where(f => f.Value.Endpoints.Count == 0)
.Select(pair => pair.Key)
.ToList();
foreach (var badKey in badKeys)
{
_table.Remove(badKey);
Log.Debug($"Removed service {badKey}");
}
}
finally
{
_lock.ExitWriteLock();
}
}
catch (Exception ex)
{
Log.Error(ex, "Fault heartbeat");
}
Save();
}
public InvokeResult Append(ZeroServiceInfo serviceInfo, ISocketClient client)
{
InvokeResult result = null;
var endpoint = $"{client.Endpoint.Address}:{serviceInfo.Port}";
Log.Info($"Regiter request from {endpoint}. Service {serviceInfo?.ServiceKey}");
if (NetUtils.TestConnection(NetUtils.CreateIPEndPoint(endpoint)))
{
var key = $"{serviceInfo.ServiceGroup}:{serviceInfo.ServiceType}:{serviceInfo.ServiceKey.Trim().ToLowerInvariant()}";
_lock.EnterWriteLock();
try
{
if (false == _table.ContainsKey(key))
{
_table.Add(key, new ServiceEndpointsInfo
{
ServiceKey = serviceInfo.ServiceKey,
Version = serviceInfo.Version,
ServiceGroup = serviceInfo.ServiceGroup,
ServiceType = serviceInfo.ServiceType,
Endpoints = new List<string>()
});
_table[key].Endpoints.Add(endpoint);
Log.Info($"The service '{serviceInfo.ServiceKey}' registered on endpoint: {endpoint}");
}
else
{
var exists = _table[key];
if (exists.Endpoints.Contains(endpoint) == false)
{
Log.Info($"The service '{serviceInfo.ServiceKey}' register endpoint: {endpoint}");
exists.Endpoints.Add(endpoint);
}
}
}
catch (Exception ex)
{
Log.Error(ex, $"Fault append service ({serviceInfo.ServiceKey} {serviceInfo.Version}) endpoint '{endpoint}'");
result = InvokeResult.Fault(ex.Message);
}
finally
{
_lock.ExitWriteLock();
}
Save();
result = InvokeResult.Succeeding();
}
else
{
result = InvokeResult.Fault($"Appending endpoint '{endpoint}' canceled for service {serviceInfo.ServiceKey} ({serviceInfo.Version}) because endpoind no avaliable");
}
return result;
}
public IEnumerable<ServiceEndpointsInfo> Get()
{
_lock.EnterReadLock();
try
{
return _table.Values.ToList();
}
finally
{
_lock.ExitReadLock();
}
}
public void Dispose()
{
_lock.Dispose();
}
}
}

@ -0,0 +1,130 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Models;
using ZeroLevel.Network;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Discovery
{
public class ServiceEndpointsTable
{
private ConcurrentDictionary<string, ZeroServiceInfo> _records;
public ServiceEndpointsTable()
{
if (!TryLoad())
{
_records = new ConcurrentDictionary<string, ZeroServiceInfo>();
}
Sheduller.RemindEvery(TimeSpan.FromSeconds(10), Heartbeat);
}
public InvokeResult AppendOrUpdate(ServiceRegisterInfo registerInfo, ISocketClient client)
{
if (registerInfo == null || registerInfo.ServiceInfo == null) return InvokeResult.Fault();
var endpoint = $"{client.Endpoint.Address}:{registerInfo.Port}";
Log.Info($"[ServiceEndpointsTable.AppendOrUpdate]\t{registerInfo.ServiceInfo.ServiceKey}\t{endpoint}");
if (NetUtils.TestConnection(NetUtils.CreateIPEndPoint(endpoint)))
{
_records.AddOrUpdate(endpoint, registerInfo.ServiceInfo, (key, oldValue) => registerInfo.ServiceInfo);
Save();
return InvokeResult.Succeeding();
}
else
{
Log.Warning($"[ServiceEndpointsTable.AppendOrUpdate]\t{registerInfo.ServiceInfo.ServiceKey}\t{endpoint} no avaliable");
}
return InvokeResult.Fault();
}
public IEnumerable<ServiceEndpointInfo> GetRoutingTable()
{
foreach (var pair in _records) yield return new ServiceEndpointInfo { Endpoint = pair.Key, ServiceInfo = pair.Value };
}
#region Snapshot
private void Save()
{
try
{
using (var fs = new FileStream(Path.Combine(Configuration.BaseDirectory, "snapshot.snp")
, FileMode.Create
, FileAccess.Write
, FileShare.None))
{
using (var writer = new MemoryStreamWriter(fs))
{
writer.WriteDictionary(_records);
writer.Stream.Flush();
}
}
}
catch (Exception ex)
{
Log.Error(ex, "[ServiceEndpointsTable.Save]");
}
}
private bool TryLoad()
{
try
{
var path = Path.Combine(Configuration.BaseDirectory, "snapshot.snp");
if (File.Exists(path))
{
using (var fs = new FileStream(path
, FileMode.Open
, FileAccess.Read
, FileShare.None))
{
using (var reader = new MemoryStreamReader(fs))
{
_records = reader.ReadDictionaryAsConcurrent<string, ZeroServiceInfo>();
return _records != null;
}
}
}
}
catch (Exception ex)
{
Log.Error(ex, "[ServiceEndpointsTable.Load]");
}
return false;
}
#endregion Snapshot
#region Heartbeat
private void Heartbeat(long taskid)
{
try
{
var toRemove = new List<string>();
foreach (var pair in _records)
{
if (NetUtils.TestConnection(NetUtils.CreateIPEndPoint(pair.Key)) == false)
{
toRemove.Add(pair.Key);
}
}
ZeroServiceInfo service;
foreach (var key in toRemove)
{
if (_records.TryRemove(key, out service))
{
Log.Info($"[ServiceEndpointsTable.Heartbeat] {service.ServiceKey} on {key} was removed because not answer for ping");
}
}
}
catch (Exception ex)
{
Log.Error(ex, $"[ServiceEndpointsTable.Heartbeat]");
}
Save();
}
#endregion
}
}

@ -38,20 +38,12 @@ namespace ZeroLevel
/// </summary>
[DataMember]
public string Version { get; set; }
/// <summary>
/// Service port
///
/// TODO move port out to new class for discovery service
///
/// </summary>
[DataMember]
public int Port { get; set; }
public bool Equals(ZeroServiceInfo other)
{
if (other == null) return false;
if (object.ReferenceEquals(this, other)) return true;
if (this.Port != other.Port) return false;
if (string.Compare(this.Name, other.Name, true) != 0) return false;
if (string.Compare(this.ServiceKey, other.ServiceKey, true) != 0) return false;
if (string.Compare(this.ServiceGroup, other.ServiceGroup, true) != 0) return false;
@ -72,7 +64,6 @@ namespace ZeroLevel
public void Serialize(IBinaryWriter writer)
{
writer.WriteInt32(this.Port);
writer.WriteString(this.Name);
writer.WriteString(this.ServiceKey);
writer.WriteString(this.ServiceGroup);
@ -82,7 +73,6 @@ namespace ZeroLevel
public void Deserialize(IBinaryReader reader)
{
this.Port = reader.ReadInt32();
this.Name = reader.ReadString();
this.ServiceKey = reader.ReadString();
this.ServiceGroup = reader.ReadString();

@ -49,7 +49,7 @@ namespace ZeroLevel.Network
/// <summary>
/// The size of the message queue to send
/// </summary>
public const int MAX_SEND_QUEUE_SIZE = 1024;
public const int MAX_SEND_QUEUE_SIZE = 256;
protected void Broken() => Status = Status == SocketClientStatus.Disposed ? Status : SocketClientStatus.Broken;
protected void Disposed() => Status = SocketClientStatus.Disposed;

@ -6,7 +6,6 @@ using System.Threading;
using System.Threading.Tasks;
using ZeroLevel.Models;
using ZeroLevel.Network.SDL;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
@ -521,23 +520,21 @@ namespace ZeroLevel.Network
var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true);
if (discoveryClient != null)
{
var services = _cachee.ServerList.
Select(s =>
{
var info = MessageSerializer.Copy(_owner.ServiceInfo);
info.Port = s.LocalEndpoint.Port;
return info;
}).
ToList();
foreach (var service in services)
foreach (var service in _cachee.ServerList)
{
var request = discoveryClient.Request<ZeroServiceInfo, InvokeResult>("register", service, r =>
{
if (!r.Success)
var request = discoveryClient.Request<ServiceRegisterInfo, InvokeResult>("register"
, new ServiceRegisterInfo
{
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled. {r.Comment}");
Port = service.LocalEndpoint.Port,
ServiceInfo = _owner.ServiceInfo
}
});
, r =>
{
if (!r.Success)
{
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled. {r.Comment}");
}
});
if (request.Success == false)
{
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled.{request.Comment}");
@ -557,36 +554,22 @@ namespace ZeroLevel.Network
{
try
{
var ir = discoveryClient.Request<IEnumerable<ServiceEndpointsInfo>>("services", records =>
var ir = discoveryClient.Request<IEnumerable<ServiceEndpointInfo>>("services", records =>
{
if (records == null)
{
Log.SystemWarning("[Exchange.UpdateServiceListFromDiscovery] UpdateServiceListInfo. Discrovery response is empty");
return;
}
var endpoints = new HashSet<IPEndPoint>();
_dicovery_aliases.BeginUpdate();
try
{
foreach (var service in records)
{
endpoints.Clear();
foreach (var ep in service.Endpoints)
{
try
{
var endpoint = NetUtils.CreateIPEndPoint(ep);
endpoints.Add(endpoint);
}
catch
{
Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Can't parse address {ep} as IPEndPoint");
}
}
_dicovery_aliases.Set(service.ServiceKey,
service.ServiceType,
service.ServiceGroup,
endpoints);
_dicovery_aliases.Set(service.ServiceInfo.ServiceKey
, service.ServiceInfo.ServiceType
, service.ServiceInfo.ServiceGroup
, NetUtils.CreateIPEndPoint(service.Endpoint));
}
_dicovery_aliases.Commit();
}
@ -997,7 +980,7 @@ namespace ZeroLevel.Network
{
i.Port = se.LocalEndpoint.Port;
return i;
}))
})).ToList()
};
}

@ -7,7 +7,7 @@ namespace ZeroLevel.Network.SDL
: IBinarySerializable
{
public ZeroServiceInfo ServiceInfo { get; set; }
public IEnumerable<InboxServiceDescription> Inboxes { get; set; }
public List<InboxServiceDescription> Inboxes { get; set; }
public void Deserialize(IBinaryReader reader)
{

@ -7,18 +7,16 @@ namespace ZeroLevel.Network
/// Endpoint
/// </summary>
public class ServiceEndpointInfo :
IEquatable<ServiceEndpointInfo>
IBinarySerializable, IEquatable<ServiceEndpointInfo>
{
public string Key { get; set; }
public string Type { get; set; }
public string Group { get; set; }
public string Endpoint { get; set; }
public ZeroServiceInfo ServiceInfo { get; set; }
public bool Equals(ServiceEndpointInfo other)
{
if (other == null) return false;
if (string.Compare(this.Endpoint, other.Endpoint, true) != 0) return false;
return true;
return this.ServiceInfo?.Equals(other.ServiceInfo) ?? other != null ? false : true;
}
public override bool Equals(object obj)
@ -28,17 +26,19 @@ namespace ZeroLevel.Network
public override int GetHashCode()
{
return Endpoint?.GetHashCode() ?? 0;
return this.ServiceInfo?.GetHashCode() ?? 0 ^ Endpoint?.GetHashCode() ?? 0;
}
public void Serialize(IBinaryWriter writer)
{
writer.WriteString(this.Endpoint);
writer.Write(this.ServiceInfo);
}
public void Deserialize(IBinaryReader reader)
{
this.Endpoint = reader.ReadString();
this.ServiceInfo = reader.Read<ZeroServiceInfo>();
}
}
}

@ -1,58 +0,0 @@
using System;
using System.Collections.Generic;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
/// <summary>
/// Information about service connection points
/// </summary>
public class ServiceEndpointsInfo :
IEquatable<ServiceEndpointsInfo>, IBinarySerializable
{
public string ServiceKey { get; set; }
public string Version { get; set; }
public string ServiceGroup { get; set; }
public string ServiceType { get; set; }
public List<string> Endpoints { get; set; }
public bool Equals(ServiceEndpointsInfo other)
{
if (other == null) return false;
if (string.Compare(this.ServiceKey, other.ServiceKey, true) != 0) return false;
if (string.Compare(this.Version, other.Version, true) != 0) return false;
if (string.Compare(this.ServiceGroup, other.ServiceGroup, true) != 0) return false;
if (string.Compare(this.ServiceType, other.ServiceType, true) != 0) return false;
if (!CollectionComparsionExtensions.NoOrderingEquals(this.Endpoints, other.Endpoints, (a, b) => a.Equals(b))) return false;
return true;
}
public override bool Equals(object obj)
{
return this.Equals(obj as ServiceEndpointsInfo);
}
public override int GetHashCode()
{
return ServiceKey?.GetHashCode() ?? 0 ^ Version?.GetHashCode() ?? 0 ^ ServiceGroup?.GetHashCode() ?? 0 ^ ServiceType?.GetHashCode() ?? 0;
}
public void Serialize(IBinaryWriter writer)
{
writer.WriteString(this.ServiceKey);
writer.WriteString(this.Version);
writer.WriteString(this.ServiceGroup);
writer.WriteString(this.ServiceType);
writer.WriteCollection(this.Endpoints);
}
public void Deserialize(IBinaryReader reader)
{
this.ServiceKey = reader.ReadString();
this.Version = reader.ReadString();
this.ServiceGroup = reader.ReadString();
this.ServiceType = reader.ReadString();
this.Endpoints = reader.ReadStringCollection();
}
}
}

@ -0,0 +1,41 @@
using System;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
public class ServiceRegisterInfo :
IBinarySerializable, IEquatable<ServiceRegisterInfo>
{
public int Port { get; set; }
public ZeroServiceInfo ServiceInfo { get; set; }
public bool Equals(ServiceRegisterInfo other)
{
if (other == null) return false;
if (this.Port != other.Port) return false;
return this.ServiceInfo?.Equals(other.ServiceInfo) ?? other != null ? false : true;
}
public override bool Equals(object obj)
{
return this.Equals(obj as ServiceRegisterInfo);
}
public override int GetHashCode()
{
return Port.GetHashCode() ^ this.ServiceInfo.GetHashCode();
}
public void Serialize(IBinaryWriter writer)
{
writer.WriteInt32(this.Port);
writer.Write(this.ServiceInfo);
}
public void Deserialize(IBinaryReader reader)
{
this.Port = reader.ReadInt32();
this.ServiceInfo = reader.Read<ZeroServiceInfo>();
}
}
}

@ -1,8 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using ZeroLevel.Services;
@ -453,7 +455,33 @@ namespace ZeroLevel.Network
s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);
return s;
}
/* TODO to test
public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
try
{
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(Dispose))
{
if (cancellationToken.IsCancellationRequested)
{
return 0;
}
return await _stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
}
catch (IOException exception)
{
if (exception.InnerException is SocketException socketException)
{
ExceptionDispatchInfo.Capture(socketException).Throw();
}
throw;
}
}
*/
#endregion Helper
public override void Dispose()

@ -26,6 +26,7 @@ namespace ZeroLevel.Network
try
{
socket.Connect(endpoint);
socket.Shutdown(SocketShutdown.Both);
socket.Close();
return true;
}

@ -10,6 +10,7 @@
<Grid.RowDefinitions>
<RowDefinition Height="81"/>
<RowDefinition Height="27"/>
<RowDefinition Height="27"/>
<RowDefinition Height="*"/>
<RowDefinition Height="300"/>
</Grid.RowDefinitions>
@ -37,8 +38,9 @@
<Label Grid.Row="1" Grid.Column="2" Content="Service Group" VerticalAlignment="Center" HorizontalAlignment="Stretch"/>
<TextBlock x:Name="tbGroup" Grid.Row="1" Grid.Column="3" VerticalAlignment="Center" HorizontalAlignment="Stretch"/>
</Grid>
<Label Grid.Row="1" Content="Inbox list"/>
<ListBox x:Name="lbInboxes" ItemsSource="{Binding}" Grid.Row="2" SelectionChanged="LbInboxes_SelectionChanged">
<ComboBox Grid.Row="1" x:Name="cbEndpoints" ItemsSource="{Binding}"/>
<Label Grid.Row="2" Content="Inbox list"/>
<ListBox x:Name="lbInboxes" ItemsSource="{Binding}" Grid.Row="3" SelectionChanged="LbInboxes_SelectionChanged">
<ListBox.ItemTemplate>
<DataTemplate>
<WrapPanel>
@ -50,7 +52,7 @@
</DataTemplate>
</ListBox.ItemTemplate>
</ListBox>
<Grid Grid.Row="3">
<Grid Grid.Row="4">
<TextBlock HorizontalAlignment="Stretch" TextWrapping="Wrap" x:Name="tbInboxDescription" VerticalAlignment="Stretch"/>
</Grid>
</Grid>

@ -36,11 +36,29 @@ namespace ZeroNetworkMonitor
{
var exchange = Injector.Default.Resolve<IExchange>();
var client = exchange.GetConnection(serviceKey);
client?.Request<ServiceDescription>(SDL_INBOX, desc =>
exchange.RequestBroadcast<ServiceDescription>(serviceKey, SDL_INBOX, records =>
{
_description = desc;
if (records != null && records.Any())
{
_description = records.First();
foreach (var r in records.Skip(1))
{
_description.Inboxes.AddRange(r.Inboxes);
}
}
else
{
_description = null;
}
UpdateDescriptionView();
});
/*client?.Request<ServiceDescription>(SDL_INBOX, desc =>
{
_description = desc;
UpdateDescriptionView();
});*/
}
private void UpdateDescriptionView()

Loading…
Cancel
Save

Powered by TurnKey Linux.