Enable discovering

pull/1/head
a.bozhenov 5 years ago
parent 4c9b1512e6
commit 01cc1450b7

@ -21,19 +21,19 @@ namespace TestApp
ReadServiceInfo();
AutoregisterInboxes(UseHost(8800));
UseHost(8801).RegisterInbox<ZeroServiceInfo>("metainfo", (c) =>
/*UseHost(8801).RegisterInbox<ZeroServiceInfo>("metainfo", (c) =>
{
Log.Info("Reqeust for metainfo");
return this.ServiceInfo;
});
Exchange.RoutesStorage.Set("mytest", new IPEndPoint(IPAddress.Loopback, 8800));
Exchange.RoutesStorage.Set("mymeta", new IPEndPoint(IPAddress.Loopback, 8801));
});*/
//Exchange.RoutesStorage.Set("mytest", new IPEndPoint(IPAddress.Loopback, 8800));
//Exchange.RoutesStorage.Set("mymeta", new IPEndPoint(IPAddress.Loopback, 8801));
Sheduller.RemindEvery(TimeSpan.FromSeconds(1), () =>
{
var client = Exchange.GetConnection("mytest");
var client = Exchange.GetConnection("test.app");
client.Send("pum");
client.Send<string>(BaseSocket.DEFAULT_MESSAGE_INBOX, "'This is message'");
client.Request<DateTime, string>("d2s", DateTime.Now, s => Log.Info($"Response: {s}"));
@ -44,9 +44,9 @@ namespace TestApp
client.Request<string>(BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, s => Log.Info($"Response ip: {s}"));
});
Sheduller.RemindEvery(TimeSpan.FromSeconds(3), () =>
/*Sheduller.RemindEvery(TimeSpan.FromSeconds(3), () =>
{
Exchange.Request<ZeroServiceInfo>("mymeta", "metainfo", info =>
Exchange.Request<ZeroServiceInfo>("test.app", "metainfo", info =>
{
var si = new StringBuilder();
si.AppendLine(info.Name);
@ -55,7 +55,7 @@ namespace TestApp
Log.Info("Service info:\r\n{0}", si.ToString());
});
});
});*/
}
[ExchangeHandler("pum")]

@ -9,7 +9,7 @@ namespace TestApp
Bootstrap.Startup<MyService>(args,
() => Configuration.ReadSetFromIniFile("config.ini"))
.EnableConsoleLog(ZeroLevel.Services.Logging.LogLevel.System | ZeroLevel.Services.Logging.LogLevel.FullDebug)
//.UseDiscovery()
.UseDiscovery()
.Run()
.WaitWhileStatus(ZeroServiceStatus.Running)
.Stop();

@ -376,7 +376,7 @@ namespace ZeroLevel.Services.Applications
var start = DateTime.UtcNow;
while (this.Status != status)
{
Thread.Sleep(150);
Thread.Sleep(1500);
}
}
@ -385,7 +385,7 @@ namespace ZeroLevel.Services.Applications
var start = DateTime.UtcNow;
while (this.Status != status && (DateTime.UtcNow - start) < period)
{
Thread.Sleep(150);
Thread.Sleep(1500);
}
}
@ -394,7 +394,7 @@ namespace ZeroLevel.Services.Applications
var start = DateTime.UtcNow;
while (this.Status == status)
{
Thread.Sleep(150);
Thread.Sleep(1500);
}
}
@ -403,7 +403,7 @@ namespace ZeroLevel.Services.Applications
var start = DateTime.UtcNow;
while (this.Status == status && (DateTime.UtcNow - start) < period)
{
Thread.Sleep(150);
Thread.Sleep(1500);
}
}
#endregion

@ -1,227 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace ZeroLevel.Network
{
public sealed class AliasSet<T>
{
public sealed class _RoundRobinCollection<T> :
IDisposable
{
private readonly List<T> _collection =
new List<T>();
private int _index = -1;
private readonly ReaderWriterLockSlim _lock =
new ReaderWriterLockSlim();
public int Count { get { return _collection.Count; } }
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
_collection.Add(item);
if (_index == -1) _index = 0;
}
finally
{
_lock.ExitWriteLock();
}
}
public void Remove(T item)
{
_lock.EnterWriteLock();
try
{
_collection.Remove(item);
if (_index >= _collection.Count)
{
if (_collection.Count == 0) _index = -1;
else _index = 0;
}
}
finally
{
_lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return _collection.Contains(item);
}
finally
{
_lock.ExitReadLock();
}
}
public bool MoveNext()
{
_lock.EnterReadLock();
try
{
if (_collection.Count > 0)
{
_index = Interlocked.Increment(ref _index) % _collection.Count;
return true;
}
}
finally
{
_lock.ExitReadLock();
}
return false;
}
public T Current
{
get
{
return _index == -1 ? default(T) : _collection[_index];
}
}
public void Clear()
{
_collection.Clear();
_index = -1;
}
public IEnumerable<T> GetCurrentSeq()
{
_lock.EnterReadLock();
try
{
var arr = new T[_collection.Count];
int p = 0;
for (int i = _index; i < _collection.Count; i++, p++)
{
arr[p] = _collection[i];
}
for (int i = 0; i < _index; i++, p++)
{
arr[p] = _collection[i];
}
return arr;
}
finally
{
_lock.ExitReadLock();
}
}
public void Dispose()
{
_collection.Clear();
_lock.Dispose();
}
}
private readonly ConcurrentDictionary<string, _RoundRobinCollection<T>> _aliases = new ConcurrentDictionary<string, _RoundRobinCollection<T>>();
public bool Contains(string key) => _aliases.ContainsKey(key);
public void Append(string key, T address)
{
if (_aliases.ContainsKey(key) == false)
{
if (_aliases.TryAdd(key, new _RoundRobinCollection<T>()))
{
_aliases[key].Add(address);
}
}
else
{
_aliases[key].Add(address);
}
}
public void Append(string key, IEnumerable<T> addresses)
{
if (_aliases.ContainsKey(key) == false)
{
if (_aliases.TryAdd(key, new _RoundRobinCollection<T>()))
{
foreach (var address in addresses)
{
_aliases[key].Add(address);
}
}
}
else
{
foreach (var address in addresses)
{
_aliases[key].Add(address);
}
}
}
public void Update(string key, T address)
{
if (_aliases.ContainsKey(key) == false)
{
if (_aliases.TryAdd(key, new _RoundRobinCollection<T>()))
{
_aliases[key].Add(address);
}
}
else
{
_aliases[key].Clear();
_aliases[key].Add(address);
}
}
public void Update(string key, IEnumerable<T> addresses)
{
if (_aliases.ContainsKey(key) == false)
{
if (_aliases.TryAdd(key, new _RoundRobinCollection<T>()))
{
foreach (var address in addresses)
{
_aliases[key].Add(address);
}
}
}
else
{
_aliases[key].Clear();
foreach (var address in addresses)
{
_aliases[key].Add(address);
}
}
}
public T Get(string key)
{
if (_aliases.ContainsKey(key) && _aliases[key].MoveNext())
{
return _aliases[key].Current;
}
return default(T);
}
public IEnumerable<T> GetAll(string key)
{
if (_aliases.ContainsKey(key) && _aliases[key].MoveNext())
{
return _aliases[key].GetCurrentSeq();
}
return Enumerable.Empty<T>();
}
}
}

@ -11,6 +11,8 @@ namespace ZeroLevel.Network
public static readonly IRouter NullRouter = new NullRouter();
public const string DISCOVERY_ALIAS = "__discovery__";
public const string DEFAULT_MESSAGE_INBOX = "__message_inbox__";
public const string DEFAULT_REQUEST_INBOX = "__request_inbox__";
public const string DEFAULT_REQUEST_WITHOUT_ARGS_INBOX = "__request_no_args_inbox__";

@ -13,6 +13,8 @@ namespace ZeroLevel.Network
void Set(string key, string type, string group, IPEndPoint endpoint);
void Set(string key, string type, string group, IEnumerable<IPEndPoint> endpoints);
void Remove(IPEndPoint endpoint);
InvokeResult<IPEndPoint> Get(string key);
InvokeResult<IEnumerable<IPEndPoint>> GetAll(string key);
InvokeResult<IPEndPoint> GetByType(string type);

@ -32,7 +32,6 @@ namespace ZeroLevel.Network
internal sealed class Exchange :
IExchange
{
private IDiscoveryClient _discoveryClient = null; // Feature расширить до нескольких discovery
private readonly ServiceRouteStorage _aliases = new ServiceRouteStorage();
private readonly ExClientServerCachee _cachee = new ExClientServerCachee();
@ -44,8 +43,8 @@ namespace ZeroLevel.Network
public Exchange(IZeroService owner)
{
_owner = owner;
_cachee.OnBrokenConnection += _cachee_OnBrokenConnection;
}
#endregion Ctor
#region IMultiClient
@ -458,36 +457,42 @@ namespace ZeroLevel.Network
public void UseDiscovery()
{
if (_discoveryClient != null)
try
{
_discoveryClient.Dispose();
_discoveryClient = null;
var discoveryEndpoint = Configuration.Default.First("discovery");
_aliases.Set(BaseSocket.DISCOVERY_ALIAS, NetUtils.CreateIPEndPoint(discoveryEndpoint));
RestartDiscoveryTasks();
}
catch (Exception ex)
{
Log.Error(ex, "[Exchange.UseDiscovery]");
}
var discovery = Configuration.Default.First("discovery");
_discoveryClient = new DiscoveryClient(_cachee.GetClient(NetUtils.CreateIPEndPoint(discovery), false, BaseSocket.NullRouter));
RestartDiscoveryTasks();
}
public void UseDiscovery(string endpoint)
public void UseDiscovery(string discoveryEndpoint)
{
if (_discoveryClient != null)
try
{
_aliases.Set(BaseSocket.DISCOVERY_ALIAS, NetUtils.CreateIPEndPoint(discoveryEndpoint));
RestartDiscoveryTasks();
}
catch (Exception ex)
{
_discoveryClient.Dispose();
_discoveryClient = null;
Log.Error(ex, "[Exchange.UseDiscovery]");
}
_discoveryClient = new DiscoveryClient(_cachee.GetClient(NetUtils.CreateIPEndPoint(endpoint), false, BaseSocket.NullRouter));
RestartDiscoveryTasks();
}
public void UseDiscovery(IPEndPoint endpoint)
public void UseDiscovery(IPEndPoint discoveryEndpoint)
{
if (_discoveryClient != null)
try
{
_discoveryClient.Dispose();
_discoveryClient = null;
_aliases.Set(BaseSocket.DISCOVERY_ALIAS, discoveryEndpoint);
RestartDiscoveryTasks();
}
catch (Exception ex)
{
Log.Error(ex, "[Exchange.UseDiscovery]");
}
_discoveryClient = new DiscoveryClient(_cachee.GetClient(endpoint, false, BaseSocket.NullRouter));
RestartDiscoveryTasks();
}
private void RestartDiscoveryTasks()
@ -500,30 +505,88 @@ namespace ZeroLevel.Network
{
Sheduller.Remove(_register_in_discovery_table_task);
}
RegisterServicesInDiscovery();
_update_discovery_table_task = Sheduller.RemindEvery(_update_discovery_table_period, RegisterServicesInDiscovery);
_register_in_discovery_table_task = Sheduller.RemindEvery(_register_in_discovery_table_period, UpdateServiceListFromDiscovery);
_register_in_discovery_table_task = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(500), _update_discovery_table_period, RegisterServicesInDiscovery);
_update_discovery_table_task = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(750), _register_in_discovery_table_period, UpdateServiceListFromDiscovery);
}
private void RegisterServicesInDiscovery()
{
var services = _cachee.ServerList.
Select(s =>
{
var info = MessageSerializer.Copy(_owner.ServiceInfo);
info.Port = s.LocalEndpoint.Port;
return info;
}).
ToList();
foreach (var service in services)
var discovery_endpoint = _aliases.Get(BaseSocket.DISCOVERY_ALIAS);
if (discovery_endpoint.Success)
{
_discoveryClient.Register(service);
var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true);
var services = _cachee.ServerList.
Select(s =>
{
var info = MessageSerializer.Copy(_owner.ServiceInfo);
info.Port = s.LocalEndpoint.Port;
return info;
}).
ToList();
foreach (var service in services)
{
var request = discoveryClient.Request<ZeroServiceInfo, InvokeResult>("register", service, r =>
{
if (!r.Success)
{
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled. {r.Comment}");
}
});
if (request.Success == false)
{
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled.{request.Comment}");
}
}
}
}
private void UpdateServiceListFromDiscovery()
{
var discovery_endpoint = _aliases.Get(BaseSocket.DISCOVERY_ALIAS);
if (discovery_endpoint.Success)
{
var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true);
try
{
var ir = discoveryClient.Request<IEnumerable<ServiceEndpointsInfo>>("services", records =>
{
if (records == null)
{
Log.SystemWarning("[Exchange.UpdateServiceListFromDiscovery] UpdateServiceListInfo. Discrovery response is empty");
return;
}
var endpoints = new HashSet<IPEndPoint>();
foreach (var service in records)
{
endpoints.Clear();
foreach (var ep in service.Endpoints)
{
try
{
var endpoint = NetUtils.CreateIPEndPoint(ep);
endpoints.Add(endpoint);
}
catch
{
Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Can't parse address {ep} as IPEndPoint");
}
}
_aliases.Set(service.ServiceKey,
service.ServiceType,
service.ServiceGroup,
endpoints);
}
});
if (!ir.Success)
{
Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Error request to inbox 'services'. {ir.Comment}");
}
}
catch (Exception ex)
{
Log.SystemError(ex, "[Exchange.UpdateServiceListFromDiscovery] Discovery service response is absent");
}
}
}
#endregion
@ -541,7 +604,7 @@ namespace ZeroLevel.Network
}
catch (Exception ex)
{
Log.Error(ex, "[Exchange.GetConnection]");
Log.SystemError(ex, "[Exchange.GetConnection]");
}
return null;
}
@ -554,7 +617,7 @@ namespace ZeroLevel.Network
}
catch (Exception ex)
{
Log.Error(ex, "[Exchange.GetConnection]");
Log.SystemError(ex, "[Exchange.GetConnection]");
}
return null;
}
@ -734,21 +797,6 @@ namespace ZeroLevel.Network
return success;
}
private InvokeResult CallServiceDirect(string endpoint, Func<ExClient, InvokeResult> callHandler)
{
ExClient transport;
try
{
transport = _cachee.GetClient(NetUtils.CreateIPEndPoint(endpoint), true);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange.CallServiceDirect] Can't get transport for endpoint '{endpoint}'");
return InvokeResult.Fault(ex.Message);
}
return callHandler(transport);
}
private IEnumerable<Tresp> _RequestBroadcast<Treq, Tresp>(List<ExClient> clients, string inbox, Treq data)
{
var response = new List<Tresp>();
@ -804,6 +852,11 @@ namespace ZeroLevel.Network
}
return response;
}
private void _cachee_OnBrokenConnection(IPEndPoint endpoint)
{
//_aliases.Remove(endpoint); ??? no need
}
#endregion
public void Dispose()

@ -7,15 +7,15 @@ using ZeroLevel.Services.Collections;
namespace ZeroLevel.Network
{
/*
One IPEndpoint binded with one service.
Service can have one key, one type, one group.
Therefore IPEndpoint can be binded with one key, one type and one group.
/*
One IPEndpoint binded with one service.
Service can have one key, one type, one group.
Therefore IPEndpoint can be binded with one key, one type and one group.
One key can refer to many IPEndPoints.
One type can refer to many IPEndPoints.
One group can refer to many IPEndPoints.
*/
One key can refer to many IPEndPoints.
One type can refer to many IPEndPoints.
One group can refer to many IPEndPoints.
*/
public sealed class ServiceRouteStorage
: IServiceRoutesStorage
@ -46,7 +46,7 @@ namespace ZeroLevel.Network
{
return;
}
Remove(endpoint);
RemoveLocked(endpoint);
}
AppendByKeys(key, endpoint);
_endpoints.Add(endpoint, new string[3] { $"{endpoint.Address}:{endpoint.Port}", null, null });
@ -80,10 +80,10 @@ namespace ZeroLevel.Network
{
return;
}
Remove(endpoint);
RemoveLocked(endpoint);
}
AppendByKeys(key, endpoint);
_endpoints.Add(endpoint, new string[3] { key, null, null });
_endpoints.Add(endpoint, new string[3] { key.ToUpperInvariant(), null, null });
}
finally
{
@ -105,12 +105,12 @@ namespace ZeroLevel.Network
var drop = _tableByKey[key].Source.ToArray();
for (int i = 0; i < drop.Length; i++)
{
Remove(drop[i]);
RemoveLocked(drop[i]);
}
}
foreach (var ep in endpoints)
{
_endpoints.Add(ep, new string[3] { key, null, null });
_endpoints.Add(ep, new string[3] { key.ToUpperInvariant(), null, null });
AppendByKeys(key, ep);
}
}
@ -125,7 +125,7 @@ namespace ZeroLevel.Network
_lock.EnterWriteLock();
try
{
Remove(endpoint);
RemoveLocked(endpoint);
if (key == null)
{
key = $"{endpoint.Address}:{endpoint.Port}";
@ -139,7 +139,7 @@ namespace ZeroLevel.Network
{
AppendByGroup(key, endpoint);
}
_endpoints.Add(endpoint, new string[3] { key, null, null });
_endpoints.Add(endpoint, new string[3] { key.ToUpperInvariant(), type.ToUpperInvariant(), group.ToUpperInvariant() });
}
finally
{
@ -148,15 +148,20 @@ namespace ZeroLevel.Network
}
public void Set(string key, string type, string group, IEnumerable<IPEndPoint> endpoints)
{
foreach (var ep in endpoints)
{
RemoveLocked(ep);
Set(key, type, group, ep);
}
}
public void Remove(IPEndPoint endpoint)
{
_lock.EnterWriteLock();
try
{
foreach (var ep in endpoints)
{
Remove(ep);
Set(key, type, group, ep);
}
RemoveLocked(endpoint);
}
finally
{
@ -167,6 +172,7 @@ namespace ZeroLevel.Network
#region GET
public InvokeResult<IPEndPoint> Get(string key)
{
key = key.ToUpperInvariant();
if (_tableByKey.ContainsKey(key))
{
if (_tableByKey[key].MoveNext())
@ -176,6 +182,7 @@ namespace ZeroLevel.Network
}
public InvokeResult<IEnumerable<IPEndPoint>> GetAll(string key)
{
key = key.ToUpperInvariant();
if (_tableByKey.ContainsKey(key))
{
if (_tableByKey[key].MoveNext())
@ -185,6 +192,7 @@ namespace ZeroLevel.Network
}
public InvokeResult<IPEndPoint> GetByType(string type)
{
type = type.ToUpperInvariant();
if (_tableByTypes.ContainsKey(type))
{
if (_tableByTypes[type].MoveNext())
@ -194,6 +202,7 @@ namespace ZeroLevel.Network
}
public InvokeResult<IEnumerable<IPEndPoint>> GetAllByType(string type)
{
type = type.ToUpperInvariant();
if (_tableByTypes.ContainsKey(type))
{
if (_tableByTypes[type].MoveNext())
@ -203,6 +212,7 @@ namespace ZeroLevel.Network
}
public InvokeResult<IPEndPoint> GetByGroup(string group)
{
group = group.ToUpperInvariant();
if (_tableByGroups.ContainsKey(group))
{
if (_tableByGroups[group].MoveNext())
@ -212,6 +222,7 @@ namespace ZeroLevel.Network
}
public InvokeResult<IEnumerable<IPEndPoint>> GetAllByGroup(string group)
{
group = group.ToUpperInvariant();
if (_tableByGroups.ContainsKey(group))
{
if (_tableByGroups[group].MoveNext())
@ -224,15 +235,15 @@ namespace ZeroLevel.Network
#region Private
private void AppendByKeys(string key, IPEndPoint endpoint)
{
Append(key, endpoint, _tableByKey);
Append(key.ToUpperInvariant(), endpoint, _tableByKey);
}
private void AppendByType(string type, IPEndPoint endpoint)
{
Append(type, endpoint, _tableByTypes);
Append(type.ToUpperInvariant(), endpoint, _tableByTypes);
}
private void AppendByGroup(string group, IPEndPoint endpoint)
{
Append(group, endpoint, _tableByGroups);
Append(group.ToUpperInvariant(), endpoint, _tableByGroups);
}
private void Append(string key, IPEndPoint value, Dictionary<string, RoundRobinCollection<IPEndPoint>> dict)
{
@ -243,13 +254,16 @@ namespace ZeroLevel.Network
dict[key].Add(value);
}
private void Remove(IPEndPoint endpoint)
private void RemoveLocked(IPEndPoint endpoint)
{
var refs = _endpoints[endpoint];
if (refs[0] != null && _tableByKey.ContainsKey(refs[0])) _tableByKey[refs[0]].Remove(endpoint);
if (refs[1] != null && _tableByTypes.ContainsKey(refs[1])) _tableByTypes[refs[1]].Remove(endpoint);
if (refs[2] != null && _tableByGroups.ContainsKey(refs[2])) _tableByGroups[refs[2]].Remove(endpoint);
_endpoints.Remove(endpoint);
if (_endpoints.ContainsKey(endpoint))
{
var refs = _endpoints[endpoint];
if (refs[0] != null && _tableByKey.ContainsKey(refs[0])) _tableByKey[refs[0]].Remove(endpoint);
if (refs[1] != null && _tableByTypes.ContainsKey(refs[1])) _tableByTypes[refs[1]].Remove(endpoint);
if (refs[2] != null && _tableByGroups.ContainsKey(refs[2])) _tableByGroups[refs[2]].Remove(endpoint);
_endpoints.Remove(endpoint);
}
}
#endregion
}

@ -8,6 +8,16 @@ namespace ZeroLevel.Network
internal sealed class ExClientServerCachee
: IDisposable
{
internal event Action<IPEndPoint> OnBrokenConnection;
private void RiseBrokenConnectionEvent(IPEndPoint endpoint)
{
var e = OnBrokenConnection;
if (e != null)
{
e.Invoke(endpoint);
}
}
private static readonly ConcurrentDictionary<string, ExClient> _clientInstances = new ConcurrentDictionary<string, ExClient>();
private readonly ConcurrentDictionary<string, SocketServer> _serverInstances = new ConcurrentDictionary<string, SocketServer>();
@ -33,10 +43,32 @@ namespace ZeroLevel.Network
instance = null;
}
instance = new ExClient(new SocketClient(endpoint, router ?? new Router()));
_clientInstances[key] = instance;
return instance;
instance.ForceConnect();
if (instance.Status != SocketClientStatus.Initialized &&
instance.Status != SocketClientStatus.Working)
{
OnBrokenConnection(endpoint);
}
else
{
_clientInstances[key] = instance;
return instance;
}
}
else
{
var instance = new ExClient(new SocketClient(endpoint, router ?? new Router()));
if (instance.Status != SocketClientStatus.Initialized &&
instance.Status != SocketClientStatus.Working)
{
OnBrokenConnection(endpoint);
}
else
{
return instance;
}
}
return new ExClient(new SocketClient(endpoint, router ?? new Router()));
return null;
}
public SocketServer GetServer(IPEndPoint endpoint, IRouter router)

Loading…
Cancel
Save

Powered by TurnKey Linux.