diff --git a/ZeroLevel/Services/BaseZeroService.cs b/ZeroLevel/Services/BaseZeroService.cs index dd5268e..aceb8be 100644 --- a/ZeroLevel/Services/BaseZeroService.cs +++ b/ZeroLevel/Services/BaseZeroService.cs @@ -1,12 +1,10 @@ using System; -using System.Collections.Concurrent; using System.Linq; using System.Linq.Expressions; using System.Net; using System.Reflection; using System.Threading; using ZeroLevel.Network; -using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.Applications { @@ -121,61 +119,15 @@ namespace ZeroLevel.Services.Applications #endregion Config #region Network - private static readonly IRouter _null_router = new NullRouter(); - private IDiscoveryClient _discoveryClient = null; // Feature расширить до нескольких discovery - private long _update_discovery_table_task = -1; - private long _register_in_discovery_table_task = -1; - private readonly AliasSet _aliases = new AliasSet(); - private static TimeSpan _update_discovery_table_period = TimeSpan.FromSeconds(15); - private static TimeSpan _register_in_discovery_table_period = TimeSpan.FromSeconds(15); - private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _serverInstances = new ConcurrentDictionary(); - - private void RestartDiscoveryTasks() - { - if (_update_discovery_table_task != -1) - { - Sheduller.Remove(_update_discovery_table_task); - } - if (_register_in_discovery_table_task != -1) - { - Sheduller.Remove(_register_in_discovery_table_task); - } - RegisterServicesInDiscovery(); - _update_discovery_table_task = Sheduller.RemindEvery(_update_discovery_table_period, RegisterServicesInDiscovery); - _register_in_discovery_table_task = Sheduller.RemindEvery(_register_in_discovery_table_period, () => { }); - } - - private void RegisterServicesInDiscovery() - { - var services = _serverInstances. - Values. - Select(s => - { - var info = MessageSerializer.Copy(this._serviceInfo); - info.Port = s.LocalEndpoint.Port; - return info; - }). - ToList(); - foreach (var service in services) - { - _discoveryClient.Register(service); - } - } + private readonly Exchange _exhange = new Exchange(); + public void UseDiscovery() { if (_state == ZeroServiceStatus.Running || _state == ZeroServiceStatus.Initialized) { - if (_discoveryClient != null) - { - _discoveryClient.Dispose(); - _discoveryClient = null; - } - var discovery = Configuration.Default.First("discovery"); - _discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(discovery), false, _null_router)); - RestartDiscoveryTasks(); + _exhange.UseDiscovery(); } } @@ -184,13 +136,7 @@ namespace ZeroLevel.Services.Applications if (_state == ZeroServiceStatus.Running || _state == ZeroServiceStatus.Initialized) { - if (_discoveryClient != null) - { - _discoveryClient.Dispose(); - _discoveryClient = null; - } - _discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), false, _null_router)); - RestartDiscoveryTasks(); + _exhange.UseDiscovery(endpoint); } } @@ -199,13 +145,7 @@ namespace ZeroLevel.Services.Applications if (_state == ZeroServiceStatus.Running || _state == ZeroServiceStatus.Initialized) { - if (_discoveryClient != null) - { - _discoveryClient.Dispose(); - _discoveryClient = null; - } - _discoveryClient = new DiscoveryClient(GetClient(endpoint, false, _null_router)); - RestartDiscoveryTasks(); + _exhange.UseDiscovery(endpoint); } } @@ -214,9 +154,9 @@ namespace ZeroLevel.Services.Applications if (_state == ZeroServiceStatus.Running || _state == ZeroServiceStatus.Initialized) { - return GetServer(new IPEndPoint(IPAddress.Any, NetUtils.GetFreeTcpPort()), new Router()).Router; + return _exhange.UseHost(); } - return _null_router; + return BaseSocket.NullRouter; } public IRouter UseHost(int port) @@ -224,9 +164,9 @@ namespace ZeroLevel.Services.Applications if (_state == ZeroServiceStatus.Running || _state == ZeroServiceStatus.Initialized) { - return GetServer(new IPEndPoint(IPAddress.Any, port), new Router()).Router; + return _exhange.UseHost(port); } - return _null_router; + return BaseSocket.NullRouter; } public IRouter UseHost(IPEndPoint endpoint) @@ -234,9 +174,9 @@ namespace ZeroLevel.Services.Applications if (_state == ZeroServiceStatus.Running || _state == ZeroServiceStatus.Initialized) { - return GetServer(endpoint, new Router()).Router; + return _exhange.UseHost(endpoint); } - return _null_router; + return BaseSocket.NullRouter; } public ExClient ConnectToService(string endpoint) @@ -246,7 +186,7 @@ namespace ZeroLevel.Services.Applications { if (_aliases.Contains(endpoint)) { - return GetClient(_aliases.GetAddress(endpoint), true); + return GetClient(_aliases.Get(endpoint), true); } return GetClient(NetUtils.CreateIPEndPoint(endpoint), true); } @@ -417,39 +357,6 @@ namespace ZeroLevel.Services.Applications } #endregion - - public void StoreConnection(string endpoint) - { - if (_state == ZeroServiceStatus.Running || - _state == ZeroServiceStatus.Initialized) - { - _aliases.Set(endpoint, NetUtils.CreateIPEndPoint(endpoint)); - } - } - public void StoreConnection(string alias, string endpoint) - { - if (_state == ZeroServiceStatus.Running || - _state == ZeroServiceStatus.Initialized) - { - _aliases.Set(alias, NetUtils.CreateIPEndPoint(endpoint)); - } - } - public void StoreConnection(IPEndPoint endpoint) - { - if (_state == ZeroServiceStatus.Running || - _state == ZeroServiceStatus.Initialized) - { - _aliases.Set($"{endpoint.Address}:{endpoint.Port}", endpoint); - } - } - public void StoreConnection(string alias, IPEndPoint endpoint) - { - if (_state == ZeroServiceStatus.Running || - _state == ZeroServiceStatus.Initialized) - { - _aliases.Set(alias, endpoint); - } - } #endregion #region Service control @@ -524,85 +431,12 @@ namespace ZeroLevel.Services.Applications } #endregion - #region Utils - - private ExClient GetClient(IPEndPoint endpoint, bool use_cachee, IRouter router = null) - { - if (use_cachee) - { - string key = $"{endpoint.Address}:{endpoint.Port}"; - ExClient instance = null; - if (_clientInstances.ContainsKey(key)) - { - instance = _clientInstances[key]; - if (instance.Status == SocketClientStatus.Working) - { - return instance; - } - _clientInstances.TryRemove(key, out instance); - instance.Dispose(); - instance = null; - } - instance = new ExClient(new SocketClient(endpoint, router ?? new Router())); - _clientInstances[key] = instance; - return instance; - } - return new ExClient(new SocketClient(endpoint, router ?? new Router())); - } - - private SocketServer GetServer(IPEndPoint endpoint, IRouter router) - { - string key = $"{endpoint.Address}:{endpoint.Port}"; - if (_serverInstances.ContainsKey(key)) - { - return _serverInstances[key]; - } - var instance = new SocketServer(endpoint, router); - _serverInstances[key] = instance; - return instance; - } - - #endregion - public void Dispose() { if (_state != ZeroServiceStatus.Disposed) { _state = ZeroServiceStatus.Disposed; - - if (_update_discovery_table_task != -1) - { - Sheduller.Remove(_update_discovery_table_task); - } - - if (_register_in_discovery_table_task != -1) - { - Sheduller.Remove(_register_in_discovery_table_task); - } - - foreach (var client in _clientInstances) - { - try - { - client.Value.Dispose(); - } - catch (Exception ex) - { - Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketClient to endpoint {client.Key}"); - } - } - - foreach (var server in _serverInstances) - { - try - { - server.Value.Dispose(); - } - catch (Exception ex) - { - Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketServer with endpoint {server.Key}"); - } - } + _exhange.Dispose(); } } } diff --git a/ZeroLevel/Services/Collections/RoundRobinCollection.cs b/ZeroLevel/Services/Collections/RoundRobinCollection.cs index a86fa44..624347b 100644 --- a/ZeroLevel/Services/Collections/RoundRobinCollection.cs +++ b/ZeroLevel/Services/Collections/RoundRobinCollection.cs @@ -23,13 +23,26 @@ namespace ZeroLevel.Services.Collections public int Count { get { return _collection.Count; } } + public RoundRobinCollection() { } + public RoundRobinCollection(IEnumerable items) + { + if (items != null && items.Any()) + { + _collection.AddRange(items); + _index = 0; + } + } + public void Add(T item) { _lock.EnterWriteLock(); try { - _collection.Add(item); - if (_index == -1) _index = 0; + if (!_collection.Contains(item)) + { + _collection.Add(item); + if (_index == -1) _index = 0; + } } finally { @@ -121,7 +134,23 @@ namespace ZeroLevel.Services.Collections public IEnumerable Find(Func selector) { - return _collection.Where(selector); + _lock.EnterReadLock(); + try + { + var arr = new List(_collection.Count); + for (int i = _index; i < _collection.Count; i++) + { + if (selector(_collection[i])) + { + arr.Add(_collection[i]); + } + } + return arr; + } + finally + { + _lock.ExitReadLock(); + } } public void Clear() diff --git a/ZeroLevel/Services/Collections/RoundRobinOverCollection.cs b/ZeroLevel/Services/Collections/RoundRobinOverCollection.cs deleted file mode 100644 index a1485da..0000000 --- a/ZeroLevel/Services/Collections/RoundRobinOverCollection.cs +++ /dev/null @@ -1,73 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; - -namespace ZeroLevel.Services.Collections -{ - public sealed class RoundRobinOverCollection - { - private class Node - { - public T Value; - public Node Next; - } - - private int _count; - private Node _currentNode; - - public bool IsEmpty => _count <= 0; - - public RoundRobinOverCollection(IEnumerable collection) - { - if (collection.Any()) - { - _count = 1; - _currentNode = new Node { Value = collection.First() }; - var prev = _currentNode; - foreach (var e in collection.Skip(1)) - { - prev.Next = new Node { Value = e }; - prev = prev.Next; - _count++; - } - prev.Next = _currentNode; - } - else - { - _count = 0; - } - } - - public IEnumerable Find(Func selector) - { - if (_count == 0) - { - yield break; - } - var cursor = _currentNode; - for (int i = 0; i < _count; i++) - { - if (selector(cursor.Value)) - { - yield return cursor.Value; - } - cursor = cursor.Next; - } - } - - public IEnumerable GenerateSeq() - { - if (_count == 0) - { - yield break; - } - var cursor = _currentNode; - _currentNode = _currentNode.Next; - for (int i = 0; i < _count; i++) - { - yield return cursor.Value; - cursor = cursor.Next; - } - } - } -} diff --git a/ZeroLevel/Services/Network/Alias.cs b/ZeroLevel/Services/Network/AliasSet.cs similarity index 66% rename from ZeroLevel/Services/Network/Alias.cs rename to ZeroLevel/Services/Network/AliasSet.cs index 8b88f9c..549c684 100644 --- a/ZeroLevel/Services/Network/Alias.cs +++ b/ZeroLevel/Services/Network/AliasSet.cs @@ -130,54 +130,96 @@ namespace ZeroLevel.Network private readonly ConcurrentDictionary> _aliases = new ConcurrentDictionary>(); - public bool Contains(string alias) => _aliases.ContainsKey(alias); + public bool Contains(string key) => _aliases.ContainsKey(key); - public void Set(string alias, T address) + public void Append(string key, T address) { - if (_aliases.ContainsKey(alias) == false) + if (_aliases.ContainsKey(key) == false) { - if (_aliases.TryAdd(alias, new _RoundRobinCollection())) + if (_aliases.TryAdd(key, new _RoundRobinCollection())) { - _aliases[alias].Add(address); + _aliases[key].Add(address); } } else { - _aliases[alias].Add(address); + _aliases[key].Add(address); } } - public void Set(string alias, IEnumerable addresses) + public void Append(string key, IEnumerable addresses) { - if (_aliases.ContainsKey(alias) == false) + if (_aliases.ContainsKey(key) == false) { - if (_aliases.TryAdd(alias, new _RoundRobinCollection())) + if (_aliases.TryAdd(key, new _RoundRobinCollection())) { foreach (var address in addresses) - _aliases[alias].Add(address); + { + _aliases[key].Add(address); + } + } + } + else + { + foreach (var address in addresses) + { + _aliases[key].Add(address); + } + } + } + + public void Update(string key, T address) + { + if (_aliases.ContainsKey(key) == false) + { + if (_aliases.TryAdd(key, new _RoundRobinCollection())) + { + _aliases[key].Add(address); } } else { + _aliases[key].Clear(); + _aliases[key].Add(address); + } + } + + public void Update(string key, IEnumerable addresses) + { + if (_aliases.ContainsKey(key) == false) + { + if (_aliases.TryAdd(key, new _RoundRobinCollection())) + { + foreach (var address in addresses) + { + _aliases[key].Add(address); + } + } + } + else + { + _aliases[key].Clear(); foreach (var address in addresses) - _aliases[alias].Add(address); + { + _aliases[key].Add(address); + } } } - public T GetAddress(string alias) + public T Get(string key) { - if (_aliases.ContainsKey(alias) && _aliases[alias].MoveNext()) + if (_aliases.ContainsKey(key) && _aliases[key].MoveNext()) { - return _aliases[alias].Current; + return _aliases[key].Current; } return default(T); } - public IEnumerable GetAddresses(string alias) + public IEnumerable GetAll(string key) { - if (_aliases.ContainsKey(alias) && _aliases[alias].MoveNext()) + if (_aliases.ContainsKey(key) && _aliases[key].MoveNext()) { - return _aliases[alias].GetCurrentSeq(); + return _aliases[key].GetCurrentSeq(); } return Enumerable.Empty(); } diff --git a/ZeroLevel/Services/Network/BaseSocket.cs b/ZeroLevel/Services/Network/BaseSocket.cs index 2317ea6..6ea48dc 100644 --- a/ZeroLevel/Services/Network/BaseSocket.cs +++ b/ZeroLevel/Services/Network/BaseSocket.cs @@ -9,6 +9,8 @@ namespace ZeroLevel.Network MAX_FRAME_PAYLOAD_SIZE = Configuration.Default.FirstOrDefault("MAX_FRAME_PAYLOAD_SIZE", DEFAULT_MAX_FRAME_PAYLOAD_SIZE); } + public static readonly IRouter NullRouter = new NullRouter(); + public const string DEFAULT_MESSAGE_INBOX = "__message_inbox__"; public const string DEFAULT_REQUEST_INBOX = "__request_inbox__"; public const string DEFAULT_REQUEST_WITHOUT_ARGS_INBOX = "__request_no_args_inbox__"; diff --git a/ZeroLevel/Services/Network/Contracts/IClientSet.cs b/ZeroLevel/Services/Network/Contracts/IClientSet.cs index dc066ac..c3a68b4 100644 --- a/ZeroLevel/Services/Network/Contracts/IClientSet.cs +++ b/ZeroLevel/Services/Network/Contracts/IClientSet.cs @@ -6,39 +6,39 @@ namespace ZeroLevel.Network { public interface IClientSet { - InvokeResult Send(string alias, T data); - InvokeResult Send(string alias, string inbox, T data); - InvokeResult Request(string alias, Action callback); - InvokeResult Request(string alias, string inbox, Action callback); - InvokeResult Request(string alias, Trequest request, Action callback); - InvokeResult Request(string alias, string inbox, Trequest request, Action callback); - - - InvokeResult SendBroadcast(string alias, T data); - InvokeResult SendBroadcast(string alias, string inbox, T data); - - InvokeResult SendBroadcastByType(string serviceType, T data); - InvokeResult SendBroadcastByType(string serviceType, string inbox, T data); - - InvokeResult SendBroadcastByGroup(string serviceGroup, T data); - InvokeResult SendBroadcastByGroup(string serviceGroup, string inbox, T data); - - InvokeResult RequestBroadcast(string alias, Action> callback); - InvokeResult RequestBroadcast(string alias, string inbox, Action> callback); - - InvokeResult RequestBroadcast(string alias, Trequest data, Action> callback); - InvokeResult RequestBroadcast(string alias, string inbox, Trequest data, Action> callback); - - InvokeResult RequestBroadcastByType(string serviceType, Action> callback); - InvokeResult RequestBroadcastByType(string serviceType, string inbox, Action> callback); - - InvokeResult RequestBroadcastByType(string serviceType, Trequest data, Action> callback); - InvokeResult RequestBroadcastByType(string serviceType, string inbox, Trequest data, Action> callback); - - InvokeResult RequestBroadcastByGroup(string serviceGroup, Action> callback); - InvokeResult RequestBroadcastByGroup(string serviceGroup, string inbox, Action> callback); - - InvokeResult RequestBroadcastByGroup(string serviceGroup, Trequest data, Action> callback); - InvokeResult RequestBroadcastByGroup(string serviceGroup, string inbox, Trequest data, Action> callback); + bool Send(string alias, T data); + bool Send(string alias, string inbox, T data); + bool Request(string alias, Action callback); + bool Request(string alias, string inbox, Action callback); + bool Request(string alias, Trequest request, Action callback); + bool Request(string alias, string inbox, Trequest request, Action callback); + + + bool SendBroadcast(string alias, T data); + bool SendBroadcast(string alias, string inbox, T data); + + bool SendBroadcastByType(string serviceType, T data); + bool SendBroadcastByType(string serviceType, string inbox, T data); + + bool SendBroadcastByGroup(string serviceGroup, T data); + bool SendBroadcastByGroup(string serviceGroup, string inbox, T data); + + bool RequestBroadcast(string alias, Action> callback); + bool RequestBroadcast(string alias, string inbox, Action> callback); + + bool RequestBroadcast(string alias, Trequest data, Action> callback); + bool RequestBroadcast(string alias, string inbox, Trequest data, Action> callback); + + bool RequestBroadcastByType(string serviceType, Action> callback); + bool RequestBroadcastByType(string serviceType, string inbox, Action> callback); + + bool RequestBroadcastByType(string serviceType, Trequest data, Action> callback); + bool RequestBroadcastByType(string serviceType, string inbox, Trequest data, Action> callback); + + bool RequestBroadcastByGroup(string serviceGroup, Action> callback); + bool RequestBroadcastByGroup(string serviceGroup, string inbox, Action> callback); + + bool RequestBroadcastByGroup(string serviceGroup, Trequest data, Action> callback); + bool RequestBroadcastByGroup(string serviceGroup, string inbox, Trequest data, Action> callback); } } diff --git a/ZeroLevel/Services/Network/DiscoveryClient.cs b/ZeroLevel/Services/Network/DiscoveryClient.cs index 7e81d84..74c4d2a 100644 --- a/ZeroLevel/Services/Network/DiscoveryClient.cs +++ b/ZeroLevel/Services/Network/DiscoveryClient.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Net; using System.Threading; using ZeroLevel.Models; using ZeroLevel.Services.Collections; @@ -17,9 +18,9 @@ namespace ZeroLevel.Network private IEnumerable _empty = Enumerable.Empty(); private List _services = new List(); - private Dictionary> _tableByKey; - private Dictionary> _tableByGroups; - private Dictionary> _tableByTypes; + private Dictionary> _tableByKey; + private Dictionary> _tableByGroups; + private Dictionary> _tableByTypes; internal void Update(IEnumerable records) { @@ -40,9 +41,9 @@ namespace ZeroLevel.Network try { _services = services; - _tableByKey = _services.GroupBy(r => r.Key).ToDictionary(g => g.Key, g => new RoundRobinOverCollection(g)); - _tableByTypes = _services.GroupBy(r => r.Type).ToDictionary(g => g.Key, g => new RoundRobinOverCollection(g)); - _tableByGroups = _services.GroupBy(r => r.Group).ToDictionary(g => g.Key, g => new RoundRobinOverCollection(g)); + _tableByKey = _services.GroupBy(r => r.Key).ToDictionary(g => g.Key, g => new RoundRobinCollection(g)); + _tableByTypes = _services.GroupBy(r => r.Type).ToDictionary(g => g.Key, g => new RoundRobinCollection(g)); + _tableByGroups = _services.GroupBy(r => r.Group).ToDictionary(g => g.Key, g => new RoundRobinCollection(g)); } catch (Exception ex) { @@ -60,7 +61,7 @@ namespace ZeroLevel.Network _lock.EnterReadLock(); try { - if (_tableByKey.ContainsKey(key) && !_tableByKey[key].IsEmpty) + if (_tableByKey.ContainsKey(key) && _tableByKey[key].Count > 0) { return _tableByKey[key].Find(s => s.Endpoint.Equals(endpoint, StringComparison.OrdinalIgnoreCase)).FirstOrDefault(); } @@ -78,9 +79,9 @@ namespace ZeroLevel.Network _lock.EnterReadLock(); try { - if (_tableByKey.ContainsKey(key) && !_tableByKey[key].IsEmpty) + if (_tableByKey.ContainsKey(key) && _tableByKey[key].Count > 0) { - return _tableByKey[key].GenerateSeq(); + return _tableByKey[key].GetCurrentSeq(); } } finally @@ -96,9 +97,9 @@ namespace ZeroLevel.Network _lock.EnterReadLock(); try { - if (_tableByGroups.ContainsKey(group) && !_tableByGroups[group].IsEmpty) + if (_tableByGroups.ContainsKey(group) && _tableByGroups[group].Count > 0) { - return _tableByGroups[group].GenerateSeq(); + return _tableByGroups[group].GetCurrentSeq(); } } finally @@ -114,9 +115,9 @@ namespace ZeroLevel.Network _lock.EnterReadLock(); try { - if (_tableByTypes.ContainsKey(type) && !_tableByTypes[type].IsEmpty) + if (_tableByTypes.ContainsKey(type) && _tableByTypes[type].Count > 0) { - return _tableByTypes[type].GenerateSeq(); + return _tableByTypes[type].GetCurrentSeq(); } } finally diff --git a/ZeroLevel/Services/Network/ExClient.cs b/ZeroLevel/Services/Network/ExClient.cs index 310d5ad..2599bf9 100644 --- a/ZeroLevel/Services/Network/ExClient.cs +++ b/ZeroLevel/Services/Network/ExClient.cs @@ -8,195 +8,6 @@ using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network { - public sealed class ExClientSet - : IClientSet, IDisposable - { - public void Dispose() - { - throw new NotImplementedException(); - } - - #region IMultiClient - public InvokeResult Request(string alias, Action callback) - { - throw new NotImplementedException(); - } - - public InvokeResult Request(string alias, string inbox, Action callback) - { - throw new NotImplementedException(); - } - - public InvokeResult Request(string alias, Trequest request, Action callback) - { - throw new NotImplementedException(); - } - - public InvokeResult Request(string alias, string inbox, Trequest request, Action callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcast(string alias, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcast(string alias, string inbox, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcast(string alias, Trequest data, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcast(string alias, string inbox, Trequest data, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcastByGroup(string serviceGroup, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcastByGroup(string serviceGroup, string inbox, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcastByGroup(string serviceGroup, Trequest data, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcastByGroup(string serviceGroup, string inbox, Trequest data, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcastByType(string serviceType, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcastByType(string serviceType, string inbox, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcastByType(string serviceType, Trequest data, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult RequestBroadcastByType(string serviceType, string inbox, Trequest data, Action> callback) - { - throw new NotImplementedException(); - } - - public InvokeResult Send(string alias, T data) - { - throw new NotImplementedException(); - } - - public InvokeResult Send(string alias, string inbox, T data) - { - throw new NotImplementedException(); - } - - public InvokeResult SendBroadcast(string alias, T data) - { - throw new NotImplementedException(); - } - - public InvokeResult SendBroadcast(string alias, string inbox, T data) - { - throw new NotImplementedException(); - } - - public InvokeResult SendBroadcastByGroup(string serviceGroup, T data) - { - throw new NotImplementedException(); - } - - public InvokeResult SendBroadcastByGroup(string serviceGroup, string inbox, T data) - { - throw new NotImplementedException(); - } - - public InvokeResult SendBroadcastByType(string serviceType, T data) - { - throw new NotImplementedException(); - } - - public InvokeResult SendBroadcastByType(string serviceType, string inbox, T data) - { - throw new NotImplementedException(); - } - #endregion - - #region Private - private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data) - { - var response = new List(); - using (var waiter = new CountdownEvent(clients.Count)) - { - foreach (var client in clients) - { - Task.Run(() => - { - try - { - if (false == client.Request(inbox, data, resp => { waiter.Signal(); response.Add(resp); }).Success) - { - waiter.Signal(); - } - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); - waiter.Signal(); - } - }); - } - waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS); - } - return response; - } - - private IEnumerable _RequestBroadcast(List clients, string inbox) - { - var response = new List(); - using (var waiter = new CountdownEvent(clients.Count)) - { - foreach (var client in clients) - { - Task.Run(() => - { - try - { - if (false == client.Request(inbox, resp => { waiter.Signal(); response.Add(resp); }).Success) - { - waiter.Signal(); - } - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); - waiter.Signal(); - } - }); - } - waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS); - } - return response; - } - #endregion - } - public sealed class ExClient : IClient, IDisposable { diff --git a/ZeroLevel/Services/Network/ExServiceHost.cs b/ZeroLevel/Services/Network/ExServiceHost.cs deleted file mode 100644 index d0de213..0000000 --- a/ZeroLevel/Services/Network/ExServiceHost.cs +++ /dev/null @@ -1,229 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Linq.Expressions; -using System.Reflection; - -namespace ZeroLevel.Network -{ - /* - public sealed class ExServiceHost - : IDisposable - { - #region Transport helpers - - /// - /// Call service with round-robin balancing - /// - /// Service key - /// Service call code - /// true - service called succesfully - internal bool CallService(string serviceKey, Func callHandler) - { - if (_disposed) return false; - List candidates; - try - { - candidates = _discoveryClient.GetServiceEndpoints(serviceKey)?.ToList(); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ExServiceHost] Error when trying get endpoints for service key '{serviceKey}'"); - return false; - } - if (candidates == null || candidates.Any() == false) - { - Log.Debug($"[ExServiceHost] Not found endpoints for service key '{serviceKey}'"); - return false; - } - var success = false; - foreach (var service in candidates) - { - IExClient transport; - try - { - transport = ExchangeTransportFactory.GetClientWithCache(service.Endpoint); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ExServiceHost] Can't get transport for service '{serviceKey}'"); - continue; - } - try - { - success = callHandler(service.Endpoint, transport); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ExServiceHost] Error send/request data in service '{serviceKey}'. Endpoint '{service.Endpoint}'"); - success = false; - } - if (success) - { - break; - } - } - return success; - } - - internal bool CallServiceDirect(string endpoint, string serviceKey, Func callHandler) - { - if (_disposed) return false; - ServiceEndpointInfo candidate = null; - try - { - candidate = _discoveryClient.GetService(serviceKey, endpoint); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ExServiceHost] Error when trying get service info by key '{serviceKey}' and endpoint '{endpoint}'"); - return false; - } - if (candidate == null) - { - Log.Debug($"[ExServiceHost] Not found service info for key '{serviceKey}' and endpoint '{endpoint}'"); - return false; - } - IExClient transport; - try - { - transport = ExchangeTransportFactory.GetClientWithCache(candidate.Endpoint); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[ExServiceHost] Can't get transport for service '{serviceKey}'"); - return false; - } - return callHandler(transport); - } - - internal IEnumerable GetClientEnumerator(string serviceKey) - { - if (!_disposed) - { - List candidates; - try - { - candidates = _discoveryClient.GetServiceEndpoints(serviceKey)?.ToList(); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service key '{serviceKey}'"); - candidates = null; - } - if (candidates != null && candidates.Any()) - { - foreach (var service in candidates) - { - IExClient transport; - try - { - transport = ExchangeTransportFactory.GetClientWithCache(service.Endpoint); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Can't get transport for endpoint '{service.Endpoint}'"); - continue; - } - yield return transport; - } - } - else - { - Log.Debug($"[Exchange] Not found endpoints for service key '{serviceKey}'"); - } - } - } - - internal IEnumerable GetClientEnumeratorByType(string serviceType) - { - if (!_disposed) - { - List candidates; - try - { - candidates = _discoveryClient.GetServiceEndpointsByType(serviceType)?.ToList(); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service type '{serviceType}'"); - candidates = null; - } - if (candidates != null && candidates.Any()) - { - foreach (var service in candidates) - { - IExClient transport; - try - { - transport = ExchangeTransportFactory.GetClientWithCache(service.Endpoint); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Can't get transport for endpoint '{service.Endpoint}'"); - continue; - } - yield return transport; - } - } - else - { - Log.Debug($"[Exchange] Not found endpoints for service type '{serviceType}'"); - } - } - } - - internal IEnumerable GetClientEnumeratorByGroup(string serviceGroup) - { - if (!_disposed) - { - List candidates; - try - { - candidates = _discoveryClient.GetServiceEndpointsByGroup(serviceGroup)?.ToList(); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service group '{serviceGroup}'"); - candidates = null; - } - if (candidates != null && candidates.Any()) - { - foreach (var service in candidates) - { - IExClient transport; - try - { - transport = ExchangeTransportFactory.GetClientWithCache(service.Endpoint); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Can't get transport for endpoint '{service.Endpoint}'"); - continue; - } - yield return transport; - } - } - else - { - Log.Debug($"[Exchange] Not found endpoints for service group '{serviceGroup}'"); - } - } - } - - #endregion Transport helpers - - public void Dispose() - { - if (_disposed) return; - _disposed = true; - Sheduller.Remove(_registerTaskKey); - foreach (var s in _services) - { - s.Value.Server.Dispose(); - } - } - } - */ -} \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Exchange.cs b/ZeroLevel/Services/Network/Exchange.cs index b3ef3b4..dd76d43 100644 --- a/ZeroLevel/Services/Network/Exchange.cs +++ b/ZeroLevel/Services/Network/Exchange.cs @@ -1,184 +1,210 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Net; using System.Threading; using System.Threading.Tasks; +using ZeroLevel.Models; +using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network { - /* /// /// Provides data exchange between services /// public sealed class Exchange : + IClientSet, IDisposable { - private readonly IDiscoveryClient _discoveryClient; - private readonly ExServiceHost _host; + private IDiscoveryClient _discoveryClient = null; // Feature расширить до нескольких discovery + private readonly ServiceRouteStorage _aliases = new ServiceRouteStorage(); + private readonly ExClientServerCachee _cachee = new ExClientServerCachee(); #region Ctor - public Exchange(IDiscoveryClient discoveryClient) + public Exchange() { - this._discoveryClient = discoveryClient ?? throw new ArgumentNullException(nameof(discoveryClient)); - this._host = new ExServiceHost(this._discoveryClient); } #endregion Ctor + #region IMultiClient + /// - /// Registration service + /// Sending a message to the service /// - public IExchangeService RegisterService(IExchangeService service) + /// Service key or url + /// Message + /// + public bool Send(string alias, T data) { - return _host.RegisterService(service); + return CallService(alias, (transport) => transport.Send(BaseSocket.DEFAULT_MESSAGE_INBOX, data).Success); } - public IExchangeService RegisterService(ExServiceInfo service) + /// + /// Sending a message to the service + /// + /// Service key or url + /// Inbox name + /// Message + /// + public bool Send(string alias, string inbox, T data) { - return _host.RegisterService(service); + return CallService(alias, (transport) => transport.Send(inbox, data).Success); } - #region Balanced send - /// - /// Sending a message to the service + /// Sending a message to all services with the specified key, to the default handler /// + /// Message type /// Service key + /// Message + /// true - on successful submission + public bool SendBroadcast(string serviceKey, T data) => SendBroadcast(serviceKey, BaseSocket.DEFAULT_MESSAGE_INBOX, data); + + /// + /// Sending a message to all services with the specified key to the specified handler + /// + /// Message type + /// Service key /// Inbox name /// Message - /// - public bool Send(string serviceKey, string inbox, T data) + /// true - on successful submission + public bool SendBroadcast(string alias, string inbox, T data) { try { - return _host.CallService(serviceKey, (endpoint, transport) => transport.Send(inbox, data).Success); + foreach (var client in GetClientEnumerator(alias)) + { + Task.Run(() => + { + try + { + client.Send(inbox, data); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.SendBroadcast] Error broadcast send data to services '{alias}'. Inbox '{inbox}'"); + } + }); + } } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error send data in service '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.SendBroadcast] Error broadcast send data in service '{alias}'. Inbox '{inbox}'"); } return false; } - public bool Send(string serviceKey, T data) => Send(serviceKey, BaseSocket.DEFAULT_MESSAGE_INBOX, data); - - #endregion Balanced send - - #region Balanced request - - public Tresp Request(string serviceKey, string inbox, Treq data) + /// + /// Sending a message to all services of a specific type to the specified handler + /// + /// Message type + /// Service type + /// Inbox name + /// Message + /// true - on successful submission + public bool SendBroadcastByType(string type, string inbox, T data) { - Tresp response = default(Tresp); try { - if (false == _host.CallService(serviceKey, (endpoint, transport) => - { - try - { - using (var waiter = new ManualResetEventSlim(false)) - { - if (false == transport.Request(inbox, data, resp => - { - response = resp; - waiter.Set(); - }).Success) - { - return false; - } - if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS)) - { - return false; - } - } - return true; - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); - } - return false; - })) + foreach (var client in GetClientEnumeratorByType(type)) { - Log.SystemWarning($"[Exchange] No responce on request. Service key '{serviceKey}'. Inbox '{inbox}'"); + Task.Run(() => + { + try + { + client.Send(inbox, data); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.SendBroadcastByType] Error broadcast send data to services with type '{type}'. Inbox '{inbox}'"); + } + }); } } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.SendBroadcastByType] Error broadcast send data to services with type '{type}'. Inbox '{inbox}'"); } - return response; + return false; } - public Tresp Request(string serviceKey, string inbox) + /// + /// Sending a message to all services of a particular type, to the default handler + /// + /// Message type + /// Service type + /// Message + /// true - on successful submission + public bool SendBroadcastByType(string type, T data) => + SendBroadcastByType(type, BaseSocket.DEFAULT_MESSAGE_INBOX, data); + + /// + /// Sending a message to all services of a specific group to the specified handler + /// + /// Message type + /// Service group + /// Inbox name + /// Message + /// true - on successful submission + public bool SendBroadcastByGroup(string group, string inbox, T data) { - Tresp response = default(Tresp); try { - if (false == _host.CallService(serviceKey, (endpoint, transport) => - { - try - { - using (var waiter = new ManualResetEventSlim(false)) - { - if (false == transport.Request(inbox, resp => - { - response = resp; - waiter.Set(); - }).Success) - { - return false; - } - if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS)) - { - return false; - } - } - return true; - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); - } - return false; - })) + foreach (var client in GetClientEnumeratorByGroup(group)) { - Log.SystemWarning($"[Exchange] No responce on request. Service key '{serviceKey}'. Inbox '{inbox}'"); + Task.Run(() => + { + try + { + client.Send(inbox, data); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.SendBroadcastByGroup] Error broadcast send data to services with type '{group}'. Inbox '{inbox}'"); + } + }); } } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.SendBroadcastByGroup] Error broadcast send data to services with type '{group}'. Inbox '{inbox}'"); } - return response; + return false; } - public Tresp Request(string serviceKey, Treq data) => - Request(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX, data); - - public Tresp Request(string serviceKey) => - Request(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX); - - #endregion Balanced request + /// + /// Sending a message to all services of a specific group in the default handler + /// + /// Message type + /// Service group + /// Messsage + /// true - on successful submission + public bool SendBroadcastByGroup(string serviceGroup, T data) => + SendBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_MESSAGE_INBOX, data); - #region Direct request + public bool Request(string alias, Action callback) => + Request(alias, BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, callback); - public Tresp RequestDirect(string endpoint, string serviceKey, string inbox, Treq data) + public bool Request(string alias, string inbox, Action callback) { - Tresp response = default(Tresp); + bool success = false; + Tresponse response = default(Tresponse); try { - if (false == _host.CallServiceDirect(endpoint, serviceKey, (transport) => + if (false == CallService(alias, (transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { - if (false == transport.Request(inbox, data, resp => - { - response = resp; - waiter.Set(); - }).Success) + if (false == transport.Request(inbox, resp => + { + response = resp; + success = true; + waiter.Set(); + }).Success) { return false; } @@ -191,35 +217,41 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); } return false; })) { - Log.SystemWarning($"[Exchange] No responce on direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemWarning($"[Exchange.Request] No responce on request. Service key '{alias}'. Inbox '{inbox}'"); } } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); } - return response; + callback(response); + return success; } - public Tresp RequestDirect(string endpoint, string serviceKey, string inbox) + public bool Request(string alias, Trequest request, Action callback) + => Request(alias, BaseSocket.DEFAULT_REQUEST_INBOX, callback); + + public bool Request(string alias, string inbox, Trequest request, Action callback) { - Tresp response = default(Tresp); + bool success = false; + Tresponse response = default(Tresponse); try { - if (false == _host.CallServiceDirect(endpoint, serviceKey, (transport) => + if (false == CallService(alias, (transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { - if (false == transport.Request(inbox, resp => + if (false == transport.Request(inbox, request, resp => { response = resp; + success = true; waiter.Set(); }).Success) { @@ -234,229 +266,142 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); } return false; })) { - Log.SystemWarning($"[Exchange] No responce on direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemWarning($"[Exchange.Request] No responce on request. Service key '{alias}'. Inbox '{inbox}'"); } } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error direct request to service '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); } - return response; + callback(response); + return success; } - public Tresp RequestDirect(string endpoint, string serviceKey, Treq data) => - RequestDirect(endpoint, serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX, data); - - public Tresp RequestDirect(string endpoint, string serviceKey) => - RequestDirect(endpoint, serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX); - - #endregion Direct request - - #region Broadcast + /// + /// Broadcast polling of services by key, without message of request, to default handler + /// + /// Response message type + /// Service key + /// Response handler + /// true - in case of successful mailing + public bool RequestBroadcast(string alias, Action> callback) => + RequestBroadcast(alias, BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, callback); /// - /// Sending a message to all services with the specified key to the specified handler + /// Broadcast polling services by key /// - /// Message type - /// Service key + /// Response message type + /// Service key /// Inbox name - /// Message - /// true - on successful submission - public bool SendBroadcast(string serviceKey, string inbox, T data) + /// Request message + /// Response handler + /// true - in case of successful mailing + public bool RequestBroadcast(string alias, string inbox, Action> callback) { try { - foreach (var client in _host.GetClientEnumerator(serviceKey)) - { - Task.Run(() => - { - try - { - client.Send(inbox, data); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Error broadcast send data to services '{serviceKey}'. Inbox '{inbox}'"); - } - }); - } + var clients = GetClientEnumerator(alias).ToList(); + callback(_RequestBroadcast(clients, inbox)); + return true; } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error broadcast send data in service '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.RequestBroadcast] Error broadcast request to service '{alias}'. Inbox '{inbox}'"); } return false; } - /// - /// Sending a message to all services with the specified key, to the default handler - /// - /// Message type - /// Service key - /// Message - /// true - on successful submission - public bool SendBroadcast(string serviceKey, T data) => SendBroadcast(serviceKey, BaseSocket.DEFAULT_MESSAGE_INBOX, data); + public bool RequestBroadcast(string alias, Trequest data, Action> callback) + => RequestBroadcast(alias, BaseSocket.DEFAULT_REQUEST_INBOX, data, callback); - /// - /// Sending a message to all services of a specific type to the specified handler - /// - /// Message type - /// Service type - /// Inbox name - /// Message - /// true - on successful submission - public bool SendBroadcastByType(string serviceType, string inbox, T data) + public bool RequestBroadcast(string alias, string inbox, Trequest data + , Action> callback) { try { - foreach (var client in _host.GetClientEnumeratorByType(serviceType)) - { - Task.Run(() => - { - try - { - client.Send(inbox, data); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceType}'. Inbox '{inbox}'"); - } - }); - } + var clients = GetClientEnumerator(alias).ToList(); + callback(_RequestBroadcast(clients, inbox, data)); + return true; } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceType}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.RequestBroadcast] Error broadcast request to service '{alias}'. Inbox '{inbox}'"); } return false; } - /// - /// Sending a message to all services of a particular type, to the default handler - /// - /// Message type - /// Service type - /// Message - /// true - on successful submission - public bool SendBroadcastByType(string serviceType, T data) => - SendBroadcastByType(serviceType, BaseSocket.DEFAULT_MESSAGE_INBOX, data); + public bool RequestBroadcastByGroup(string serviceGroup, Action> callback) + => RequestBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_REQUEST_INBOX, callback); - /// - /// Sending a message to all services of a specific group to the specified handler - /// - /// Message type - /// Service group - /// Inbox name - /// Message - /// true - on successful submission - public bool SendBroadcastByGroup(string serviceGroup, string inbox, T data) + public bool RequestBroadcastByGroup(string serviceGroup, string inbox, Action> callback) { try { - foreach (var client in _host.GetClientEnumeratorByGroup(serviceGroup)) - { - Task.Run(() => - { - try - { - client.Send(inbox, data); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceGroup}'. Inbox '{inbox}'"); - } - }); - } + var clients = GetClientEnumeratorByGroup(serviceGroup).ToList(); + callback(_RequestBroadcast(clients, inbox)); + return true; } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceGroup}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange] Error broadcast request to service by group '{serviceGroup}'. Inbox '{inbox}'"); } return false; } - /// - /// Sending a message to all services of a specific group in the default handler - /// - /// Message type - /// Service group - /// Messsage - /// true - on successful submission - public bool SendBroadcastByGroup(string serviceGroup, T data) => - SendBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_MESSAGE_INBOX, data); + public bool RequestBroadcastByGroup(string serviceGroup, Trequest data, Action> callback) + => RequestBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_REQUEST_INBOX, data, callback); - /// - /// Broadcast polling services by key - /// - /// Request message type - /// Response message type - /// Service key - /// Inbox name - /// Request message - /// Response handler - /// true - in case of successful mailing - public IEnumerable RequestBroadcast(string serviceKey, string inbox, Treq data) + public bool RequestBroadcastByGroup(string serviceGroup, string inbox, Trequest data + , Action> callback) { try { - var clients = _host.GetClientEnumerator(serviceKey).ToList(); - return _RequestBroadcast(clients, inbox, data); + var clients = GetClientEnumeratorByGroup(serviceGroup).ToList(); + callback(_RequestBroadcast(clients, inbox, data)); + return true; } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error broadcast request to service '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange] Error broadcast request to service by group '{serviceGroup}'. Inbox '{inbox}'"); } - return Enumerable.Empty(); + return false; } - /// - /// Broadcast polling services by key, without message request - /// - /// Response message type - /// Service key - /// Inbox name - /// Response handler - /// true - in case of successful mailing - public IEnumerable RequestBroadcast(string serviceKey, string inbox) + public bool RequestBroadcastByType(string serviceType, Action> callback) + => RequestBroadcastByType(serviceType, BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, callback); + + public bool RequestBroadcastByType(string serviceType, string inbox, Action> callback) { try { - var clients = _host.GetClientEnumerator(serviceKey).ToList(); - return _RequestBroadcast(clients, inbox); + var clients = GetClientEnumeratorByType(serviceType).ToList(); + callback(_RequestBroadcast(clients, inbox)); + return true; } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error broadcast request to service '{serviceKey}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'"); } - return Enumerable.Empty(); + return false; } /// - /// Broadcast polling services by key, to default handler + /// Broadcast polling services by type of service, to default handler /// /// Request message type /// Response message type - /// Service key + /// Service type /// Request message - /// Response handler + /// Response handler /// true - in case of successful mailing - public IEnumerable RequestBroadcast(string serviceKey, Treq data) => - RequestBroadcast(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX, data); - - /// - /// Broadcast polling of services by key, without message of request, to default handler - /// - /// Response message type - /// Service key - /// Response handler - /// true - in case of successful mailing - public IEnumerable RequestBroadcast(string serviceKey) => - RequestBroadcast(serviceKey, BaseSocket.DEFAULT_REQUEST_INBOX); + public bool RequestBroadcastByType(string serviceType, Trequest data + , Action> callback) => + RequestBroadcastByType(serviceType, BaseSocket.DEFAULT_REQUEST_INBOX, data, callback); /// /// Broadcast polling services by type of service @@ -466,135 +411,287 @@ namespace ZeroLevel.Network /// Service type /// Inbox name /// Request message - /// Response handler + /// Response handler /// true - in case of successful mailing - public IEnumerable RequestBroadcastByType(string serviceType, string inbox, Treq data) + public bool RequestBroadcastByType(string serviceType, string inbox, Trequest data + , Action> callback) { try { - var clients = _host.GetClientEnumeratorByType(serviceType).ToList(); - return _RequestBroadcast(clients, inbox, data); + var clients = GetClientEnumeratorByType(serviceType).ToList(); + callback(_RequestBroadcast(clients, inbox, data)); + return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'"); } - return Enumerable.Empty(); + return false; } + #endregion - /// - /// Broadcast polling of services by type of service, without a request message - /// - /// Response message type - /// Service type - /// Inbox name - /// Response handler - /// true - in case of successful mailing - public IEnumerable RequestBroadcastByType(string serviceType, string inbox) + #region Discovery + private long _update_discovery_table_task = -1; + private long _register_in_discovery_table_task = -1; + private static TimeSpan _update_discovery_table_period = TimeSpan.FromSeconds(15); + private static TimeSpan _register_in_discovery_table_period = TimeSpan.FromSeconds(15); + + public void UseDiscovery() { - try + if (_discoveryClient != null) { - var clients = _host.GetClientEnumeratorByType(serviceType).ToList(); - return _RequestBroadcast(clients, inbox); + _discoveryClient.Dispose(); + _discoveryClient = null; } - catch (Exception ex) + var discovery = Configuration.Default.First("discovery"); + _discoveryClient = new DiscoveryClient(_cachee.GetClient(NetUtils.CreateIPEndPoint(discovery), false, BaseSocket.NullRouter)); + RestartDiscoveryTasks(); + } + + public void UseDiscovery(string endpoint) + { + if (_discoveryClient != null) { - Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'"); + _discoveryClient.Dispose(); + _discoveryClient = null; } - return Enumerable.Empty(); + _discoveryClient = new DiscoveryClient(_cachee.GetClient(NetUtils.CreateIPEndPoint(endpoint), false, BaseSocket.NullRouter)); + RestartDiscoveryTasks(); } - /// - /// Broadcast polling services by type of service, in the default handler - /// - /// Request message type - /// Response message type - /// Service type - /// Request message - /// Response handler - /// true - in case of successful mailing - public IEnumerable RequestBroadcastByType(string serviceType, Treq data) => - RequestBroadcastByType(serviceType, BaseSocket.DEFAULT_REQUEST_INBOX, data); + public void UseDiscovery(IPEndPoint endpoint) + { + if (_discoveryClient != null) + { + _discoveryClient.Dispose(); + _discoveryClient = null; + } + _discoveryClient = new DiscoveryClient(_cachee.GetClient(endpoint, false, BaseSocket.NullRouter)); + RestartDiscoveryTasks(); + } - /// - /// Broadcast polling services by type, without message request, in the default handler - /// - /// Response message type - /// Service type - /// Response handler - /// true - in case of successful mailing - public IEnumerable RequestBroadcastByType(string serviceType) => - RequestBroadcastByType(serviceType, BaseSocket.DEFAULT_REQUEST_INBOX); + private void RestartDiscoveryTasks() + { + if (_update_discovery_table_task != -1) + { + Sheduller.Remove(_update_discovery_table_task); + } + if (_register_in_discovery_table_task != -1) + { + Sheduller.Remove(_register_in_discovery_table_task); + } + RegisterServicesInDiscovery(); + _update_discovery_table_task = Sheduller.RemindEvery(_update_discovery_table_period, RegisterServicesInDiscovery); + _register_in_discovery_table_task = Sheduller.RemindEvery(_register_in_discovery_table_period, () => { }); + } - /// - /// Broadcast polling services for a group of services - /// - /// Request message type - /// Response message type - /// Service group - /// Inbox name - /// Request message - /// Response handler - /// true - in case of successful mailing - public IEnumerable RequestBroadcastByGroup(string serviceGroup, string inbox, Treq data) + private void RegisterServicesInDiscovery() + { + var services = _serverInstances. + Values. + Select(s => + { + var info = MessageSerializer.Copy(this._serviceInfo); + info.Port = s.LocalEndpoint.Port; + return info; + }). + ToList(); + foreach (var service in services) + { + _discoveryClient.Register(service); + } + } + #endregion + + #region Host service + public IRouter UseHost() + { + return _cachee.GetServer(new IPEndPoint(IPAddress.Any, NetUtils.GetFreeTcpPort()), new Router()).Router; + } + + public IRouter UseHost(int port) + { + return _cachee.GetServer(new IPEndPoint(IPAddress.Any, port), new Router()).Router; + } + + public IRouter UseHost(IPEndPoint endpoint) + { + return _cachee.GetServer(endpoint, new Router()).Router; + } + #endregion + + #region Private + internal IEnumerable GetClientEnumerator(string serviceKey) { + InvokeResult> candidates; try { - var clients = _host.GetClientEnumeratorByGroup(serviceGroup).ToList(); - return _RequestBroadcast(clients, inbox, data); + candidates = _aliases.GetAll(serviceKey); } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceGroup}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.GetClientEnumerator] Error when trying get endpoints for service key '{serviceKey}'"); + candidates = null; + } + if (candidates != null && candidates.Success && candidates.Value.Any()) + { + foreach (var endpoint in candidates.Value) + { + ExClient transport; + try + { + transport = _cachee.GetClient(endpoint, true); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.GetClientEnumerator] Can't get transport for endpoint '{endpoint}'"); + continue; + } + yield return transport; + } + } + else + { + Log.Debug($"[Exchange.GetClientEnumerator] Not found endpoints for service key '{serviceKey}'"); } - return Enumerable.Empty(); } - /// - /// Broadcast polling services for a group of services, without prompting - /// - /// Response message type - /// Service group - /// Inbox name - /// Response handler - /// true - in case of successful mailing - public IEnumerable RequestBroadcastByGroup(string serviceGroup, string inbox) + internal IEnumerable GetClientEnumeratorByType(string serviceType) { + InvokeResult> candidates; try { - var clients = _host.GetClientEnumeratorByGroup(serviceGroup).ToList(); - return _RequestBroadcast(clients, inbox); + candidates = _aliases.GetAllByType(serviceType); } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceGroup}'. Inbox '{inbox}'"); + Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByType] Error when trying get endpoints for service type '{serviceType}'"); + candidates = null; + } + if (candidates != null && candidates.Success && candidates.Value.Any()) + { + foreach (var endpoint in candidates.Value) + { + ExClient transport; + try + { + transport = _cachee.GetClient(endpoint, true); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByType] Can't get transport for endpoint '{endpoint}'"); + continue; + } + yield return transport; + } + } + else + { + Log.Debug($"[Exchange.GetClientEnumeratorByType] Not found endpoints for service type '{serviceType}'"); } - return Enumerable.Empty(); } + internal IEnumerable GetClientEnumeratorByGroup(string serviceGroup) + { + InvokeResult> candidates; + try + { + candidates = _aliases.GetAllByGroup(serviceGroup); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByGroup] Error when trying get endpoints for service group '{serviceGroup}'"); + candidates = null; + } + if (candidates != null && candidates.Success && candidates.Value.Any()) + { + foreach (var service in candidates.Value) + { + ExClient transport; + try + { + transport = _cachee.GetClient(service, true); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByGroup] Can't get transport for endpoint '{service}'"); + continue; + } + yield return transport; + } + } + else + { + Log.Debug($"[Exchange.GetClientEnumeratorByGroup] Not found endpoints for service group '{serviceGroup}'"); + } + } /// - /// Broadcast polling services by service group to default handler - /// - /// Request message type - /// Response message type - /// Service group - /// Request message - /// Response handler - /// true - in case of successful mailing - public IEnumerable RequestBroadcastByGroup(string serviceGroup, Treq data) => - RequestBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_REQUEST_INBOX, data); - - /// - ///Broadcast polling services for a group of services, without sending a request, to the default handler + /// Call service with round-robin balancing /// - /// Response message type - /// Service group - /// Response handler - /// true - in case of successful mailing - public IEnumerable RequestBroadcastByGroup(string serviceGroup) => - RequestBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_REQUEST_INBOX); + /// Service key + /// Service call code + /// true - service called succesfully + internal bool CallService(string serviceKey, Func callHandler) + { + InvokeResult> candidates; + try + { + candidates = _aliases.GetAll(serviceKey); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.CallService] Error when trying get endpoints for service key '{serviceKey}'"); + return false; + } + if (candidates == null || !candidates.Success || candidates.Value.Any() == false) + { + Log.Debug($"[Exchange.CallService] Not found endpoints for service key '{serviceKey}'"); + return false; + } + var success = false; + foreach (var endpoint in candidates.Value) + { + ExClient transport; + try + { + transport = _cachee.GetClient(endpoint, true); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.CallService] Can't get transport for service '{serviceKey}'"); + continue; + } + try + { + success = callHandler(transport); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.CallService] Error send/request data in service '{serviceKey}'. Endpoint '{endpoint}'"); + success = false; + } + if (success) + { + break; + } + } + return success; + } - #region Private + internal InvokeResult CallServiceDirect(string endpoint, Func callHandler) + { + ExClient transport; + try + { + transport = _cachee.GetClient(NetUtils.CreateIPEndPoint(endpoint), true); + } + catch (Exception ex) + { + Log.SystemError(ex, $"[Exchange.CallServiceDirect] Can't get transport for endpoint '{endpoint}'"); + return InvokeResult.Fault(ex.Message); + } + return callHandler(transport); + } private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data) { @@ -614,7 +711,7 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); + Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); @@ -642,7 +739,7 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); + Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); @@ -651,15 +748,19 @@ namespace ZeroLevel.Network } return response; } - - #endregion Private - - #endregion Broadcast + #endregion public void Dispose() { - this._host.Dispose(); + if (_update_discovery_table_task != -1) + { + Sheduller.Remove(_update_discovery_table_task); + } + if (_register_in_discovery_table_task != -1) + { + Sheduller.Remove(_register_in_discovery_table_task); + } + _cachee.Dispose(); } } - */ } \ No newline at end of file diff --git a/ZeroLevel/Services/Network/ServiceRouteStorage.cs b/ZeroLevel/Services/Network/ServiceRouteStorage.cs new file mode 100644 index 0000000..fa1ba98 --- /dev/null +++ b/ZeroLevel/Services/Network/ServiceRouteStorage.cs @@ -0,0 +1,255 @@ +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Threading; +using ZeroLevel.Models; +using ZeroLevel.Services.Collections; + +namespace ZeroLevel.Network +{ + /* + One IPEndpoint binded with one service. + Service can have one key, one type, one group. + Therefore IPEndpoint can be binded with one key, one type and one group. + + One key can refer to many IPEndPoints. + One type can refer to many IPEndPoints. + One group can refer to many IPEndPoints. + */ + + public sealed class ServiceRouteStorage + { + private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); + + private Dictionary> _tableByKey + = new Dictionary>(); + + private Dictionary> _tableByGroups + = new Dictionary>(); + + private Dictionary> _tableByTypes + = new Dictionary>(); + + private Dictionary _endpoints + = new Dictionary(); + + public void Set(IPEndPoint endpoint) + { + _lock.EnterWriteLock(); + try + { + var key = $"{endpoint.Address}:{endpoint.Port}"; + if (_endpoints.ContainsKey(endpoint)) + { + if (_tableByKey.ContainsKey(key)) + { + return; + } + Remove(endpoint); + } + AppendByKeys(key, endpoint); + _endpoints.Add(endpoint, new string[3] { $"{endpoint.Address}:{endpoint.Port}", null, null }); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void Set(IEnumerable endpoints) + { + foreach (var ep in endpoints) + { + Set(ep); + } + } + + public void Set(string key, IPEndPoint endpoint) + { + _lock.EnterWriteLock(); + try + { + if (_endpoints.ContainsKey(endpoint)) + { + var exists = _endpoints[endpoint]; + if (exists[0] != null + && _tableByKey.ContainsKey(exists[0]) + && _tableByKey[exists[0]].Count == 1 + && _tableByKey[exists[0]].Contains(endpoint)) + { + return; + } + Remove(endpoint); + } + AppendByKeys(key, endpoint); + _endpoints.Add(endpoint, new string[3] { key, null, null }); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void Set(string key, IEnumerable endpoints) + { + _lock.EnterWriteLock(); + try + { + if (_tableByKey.ContainsKey(key)) + { + if (_tableByKey[key].Source.OrderingEquals(endpoints)) + { + return; + } + var drop = _tableByKey[key].Source.ToArray(); + for (int i = 0; i < drop.Length; i++) + { + Remove(drop[i]); + } + } + foreach (var ep in endpoints) + { + _endpoints.Add(ep, new string[3] { key, null, null }); + AppendByKeys(key, ep); + } + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void Set(string key, string type, string group, IPEndPoint endpoint) + { + _lock.EnterWriteLock(); + try + { + Remove(endpoint); + if (key == null) + { + key = $"{endpoint.Address}:{endpoint.Port}"; + } + AppendByKeys(key, endpoint); + if (type != null) + { + AppendByType(key, endpoint); + } + if (group != null) + { + AppendByGroup(key, endpoint); + } + _endpoints.Add(endpoint, new string[3] { key, null, null }); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void Set(string key, string type, string group, IEnumerable endpoints) + { + _lock.EnterWriteLock(); + try + { + foreach (var ep in endpoints) + { + Remove(ep); + Set(key, type, group, ep); + } + } + finally + { + _lock.ExitWriteLock(); + } + } + + #region GET + public InvokeResult Get(string key) + { + if (_tableByKey.ContainsKey(key)) + { + if (_tableByKey[key].MoveNext()) + return InvokeResult.Succeeding(_tableByKey[key].Current); + } + return InvokeResult.Fault($"No endpoints by key '{key}'"); + } + public InvokeResult> GetAll(string key) + { + if (_tableByKey.ContainsKey(key)) + { + if (_tableByKey[key].MoveNext()) + return InvokeResult.Succeeding(_tableByKey[key].GetCurrentSeq()); + } + return InvokeResult.Fault>($"No endpoints by key '{key}'"); + } + public InvokeResult GetByType(string type) + { + if (_tableByTypes.ContainsKey(type)) + { + if (_tableByTypes[type].MoveNext()) + return InvokeResult.Succeeding(_tableByTypes[type].Current); + } + return InvokeResult.Fault($"No endpoints by type '{type}'"); + } + public InvokeResult> GetAllByType(string type) + { + if (_tableByTypes.ContainsKey(type)) + { + if (_tableByTypes[type].MoveNext()) + return InvokeResult.Succeeding(_tableByTypes[type].GetCurrentSeq()); + } + return InvokeResult.Fault>($"No endpoints by type '{type}'"); + } + public InvokeResult GetByGroup(string group) + { + if (_tableByGroups.ContainsKey(group)) + { + if (_tableByGroups[group].MoveNext()) + return InvokeResult.Succeeding(_tableByGroups[group].Current); + } + return InvokeResult.Fault($"No endpoints by group '{group}'"); + } + public InvokeResult> GetAllByGroup(string group) + { + if (_tableByGroups.ContainsKey(group)) + { + if (_tableByGroups[group].MoveNext()) + return InvokeResult.Succeeding(_tableByGroups[group].GetCurrentSeq()); + } + return InvokeResult.Fault>($"No endpoints by group '{group}'"); + } + #endregion + + #region Private + private void AppendByKeys(string key, IPEndPoint endpoint) + { + Append(key, endpoint, _tableByKey); + } + private void AppendByType(string type, IPEndPoint endpoint) + { + Append(type, endpoint, _tableByTypes); + } + private void AppendByGroup(string group, IPEndPoint endpoint) + { + Append(group, endpoint, _tableByGroups); + } + private void Append(string key, IPEndPoint value, Dictionary> dict) + { + if (!dict.ContainsKey(key)) + { + dict.Add(key, new RoundRobinCollection()); + } + dict[key].Add(value); + } + + private void Remove(IPEndPoint endpoint) + { + var refs = _endpoints[endpoint]; + if (refs[0] != null && _tableByKey.ContainsKey(refs[0])) _tableByKey[refs[0]].Remove(endpoint); + if (refs[1] != null && _tableByTypes.ContainsKey(refs[1])) _tableByTypes[refs[1]].Remove(endpoint); + if (refs[2] != null && _tableByGroups.ContainsKey(refs[2])) _tableByGroups[refs[2]].Remove(endpoint); + _endpoints.Remove(endpoint); + } + #endregion + } +} diff --git a/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs new file mode 100644 index 0000000..cda5e3a --- /dev/null +++ b/ZeroLevel/Services/Network/Utils/ExClientServerCachee.cs @@ -0,0 +1,84 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Net; + +namespace ZeroLevel.Network +{ + internal sealed class ExClientServerCachee + : IDisposable + { + private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); + + private readonly ConcurrentDictionary _serverInstances = new ConcurrentDictionary(); + private HashSet _clients = new HashSet(); + + public ExClient GetClient(IPEndPoint endpoint, bool use_cachee, IRouter router = null) + { + if (use_cachee) + { + string key = $"{endpoint.Address}:{endpoint.Port}"; + ExClient instance = null; + if (_clientInstances.ContainsKey(key)) + { + instance = _clientInstances[key]; + if (instance.Status == SocketClientStatus.Working) + { + return instance; + } + _clientInstances.TryRemove(key, out instance); + instance.Dispose(); + instance = null; + } + instance = new ExClient(new SocketClient(endpoint, router ?? new Router())); + _clientInstances[key] = instance; + return instance; + } + return new ExClient(new SocketClient(endpoint, router ?? new Router())); + } + + public SocketServer GetServer(IPEndPoint endpoint, IRouter router) + { + string key = $"{endpoint.Address}:{endpoint.Port}"; + if (_serverInstances.ContainsKey(key)) + { + return _serverInstances[key]; + } + var instance = new SocketServer(endpoint, router); + _serverInstances[key] = instance; + return instance; + } + + public void Dispose() + { + ExClient removed; + foreach (var client in _clients) + { + try + { + if (_clientInstances.TryRemove(client, out removed)) + { + removed.Dispose(); + } + } + catch (Exception ex) + { + Log.Error(ex, $"[ExClientServerCachee.Dispose()] Dispose SocketClient to endpoint {client}"); + } + } + _clients.Clear(); + foreach (var server in _serverInstances) + { + try + { + server.Value.Dispose(); + } + catch (Exception ex) + { + Log.Error(ex, $"[ExClientServerCachee.Dispose()] Dispose SocketServer with endpoint {server.Key}"); + } + } + _serverInstances.Clear(); + } + } +}