using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; using ZeroLevel.Models; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network { /// /// Provides data exchange between services /// public sealed class Exchange : IClientSet, IDisposable { private IDiscoveryClient _discoveryClient = null; // Feature расширить до нескольких discovery private readonly ServiceRouteStorage _aliases = new ServiceRouteStorage(); private readonly ExClientServerCachee _cachee = new ExClientServerCachee(); #region Ctor public Exchange() { } #endregion Ctor #region IMultiClient /// /// Sending a message to the service /// /// Service key or url /// Message /// public bool Send(string alias, T data) { return CallService(alias, (transport) => transport.Send(BaseSocket.DEFAULT_MESSAGE_INBOX, data).Success); } /// /// Sending a message to the service /// /// Service key or url /// Inbox name /// Message /// public bool Send(string alias, string inbox, T data) { return CallService(alias, (transport) => transport.Send(inbox, data).Success); } /// /// Sending a message to all services with the specified key, to the default handler /// /// Message type /// Service key /// Message /// true - on successful submission public bool SendBroadcast(string serviceKey, T data) => SendBroadcast(serviceKey, BaseSocket.DEFAULT_MESSAGE_INBOX, data); /// /// Sending a message to all services with the specified key to the specified handler /// /// Message type /// Service key /// Inbox name /// Message /// true - on successful submission public bool SendBroadcast(string alias, string inbox, T data) { try { foreach (var client in GetClientEnumerator(alias)) { Task.Run(() => { try { client.Send(inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.SendBroadcast] Error broadcast send data to services '{alias}'. Inbox '{inbox}'"); } }); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.SendBroadcast] Error broadcast send data in service '{alias}'. Inbox '{inbox}'"); } return false; } /// /// Sending a message to all services of a specific type to the specified handler /// /// Message type /// Service type /// Inbox name /// Message /// true - on successful submission public bool SendBroadcastByType(string type, string inbox, T data) { try { foreach (var client in GetClientEnumeratorByType(type)) { Task.Run(() => { try { client.Send(inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.SendBroadcastByType] Error broadcast send data to services with type '{type}'. Inbox '{inbox}'"); } }); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.SendBroadcastByType] Error broadcast send data to services with type '{type}'. Inbox '{inbox}'"); } return false; } /// /// Sending a message to all services of a particular type, to the default handler /// /// Message type /// Service type /// Message /// true - on successful submission public bool SendBroadcastByType(string type, T data) => SendBroadcastByType(type, BaseSocket.DEFAULT_MESSAGE_INBOX, data); /// /// Sending a message to all services of a specific group to the specified handler /// /// Message type /// Service group /// Inbox name /// Message /// true - on successful submission public bool SendBroadcastByGroup(string group, string inbox, T data) { try { foreach (var client in GetClientEnumeratorByGroup(group)) { Task.Run(() => { try { client.Send(inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.SendBroadcastByGroup] Error broadcast send data to services with type '{group}'. Inbox '{inbox}'"); } }); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.SendBroadcastByGroup] Error broadcast send data to services with type '{group}'. Inbox '{inbox}'"); } return false; } /// /// Sending a message to all services of a specific group in the default handler /// /// Message type /// Service group /// Messsage /// true - on successful submission public bool SendBroadcastByGroup(string serviceGroup, T data) => SendBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_MESSAGE_INBOX, data); public bool Request(string alias, Action callback) => Request(alias, BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, callback); public bool Request(string alias, string inbox, Action callback) { bool success = false; Tresponse response = default(Tresponse); try { if (false == CallService(alias, (transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { if (false == transport.Request(inbox, resp => { response = resp; success = true; waiter.Set(); }).Success) { return false; } if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS)) { return false; } } return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); } return false; })) { Log.SystemWarning($"[Exchange.Request] No responce on request. Service key '{alias}'. Inbox '{inbox}'"); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); } callback(response); return success; } public bool Request(string alias, Trequest request, Action callback) => Request(alias, BaseSocket.DEFAULT_REQUEST_INBOX, callback); public bool Request(string alias, string inbox, Trequest request, Action callback) { bool success = false; Tresponse response = default(Tresponse); try { if (false == CallService(alias, (transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { if (false == transport.Request(inbox, request, resp => { response = resp; success = true; waiter.Set(); }).Success) { return false; } if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS)) { return false; } } return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); } return false; })) { Log.SystemWarning($"[Exchange.Request] No responce on request. Service key '{alias}'. Inbox '{inbox}'"); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); } callback(response); return success; } /// /// Broadcast polling of services by key, without message of request, to default handler /// /// Response message type /// Service key /// Response handler /// true - in case of successful mailing public bool RequestBroadcast(string alias, Action> callback) => RequestBroadcast(alias, BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, callback); /// /// Broadcast polling services by key /// /// Response message type /// Service key /// Inbox name /// Request message /// Response handler /// true - in case of successful mailing public bool RequestBroadcast(string alias, string inbox, Action> callback) { try { var clients = GetClientEnumerator(alias).ToList(); callback(_RequestBroadcast(clients, inbox)); return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.RequestBroadcast] Error broadcast request to service '{alias}'. Inbox '{inbox}'"); } return false; } public bool RequestBroadcast(string alias, Trequest data, Action> callback) => RequestBroadcast(alias, BaseSocket.DEFAULT_REQUEST_INBOX, data, callback); public bool RequestBroadcast(string alias, string inbox, Trequest data , Action> callback) { try { var clients = GetClientEnumerator(alias).ToList(); callback(_RequestBroadcast(clients, inbox, data)); return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.RequestBroadcast] Error broadcast request to service '{alias}'. Inbox '{inbox}'"); } return false; } public bool RequestBroadcastByGroup(string serviceGroup, Action> callback) => RequestBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_REQUEST_INBOX, callback); public bool RequestBroadcastByGroup(string serviceGroup, string inbox, Action> callback) { try { var clients = GetClientEnumeratorByGroup(serviceGroup).ToList(); callback(_RequestBroadcast(clients, inbox)); return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by group '{serviceGroup}'. Inbox '{inbox}'"); } return false; } public bool RequestBroadcastByGroup(string serviceGroup, Trequest data, Action> callback) => RequestBroadcastByGroup(serviceGroup, BaseSocket.DEFAULT_REQUEST_INBOX, data, callback); public bool RequestBroadcastByGroup(string serviceGroup, string inbox, Trequest data , Action> callback) { try { var clients = GetClientEnumeratorByGroup(serviceGroup).ToList(); callback(_RequestBroadcast(clients, inbox, data)); return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by group '{serviceGroup}'. Inbox '{inbox}'"); } return false; } public bool RequestBroadcastByType(string serviceType, Action> callback) => RequestBroadcastByType(serviceType, BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, callback); public bool RequestBroadcastByType(string serviceType, string inbox, Action> callback) { try { var clients = GetClientEnumeratorByType(serviceType).ToList(); callback(_RequestBroadcast(clients, inbox)); return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'"); } return false; } /// /// Broadcast polling services by type of service, to default handler /// /// Request message type /// Response message type /// Service type /// Request message /// Response handler /// true - in case of successful mailing public bool RequestBroadcastByType(string serviceType, Trequest data , Action> callback) => RequestBroadcastByType(serviceType, BaseSocket.DEFAULT_REQUEST_INBOX, data, callback); /// /// Broadcast polling services by type of service /// /// Request message type /// Response message type /// Service type /// Inbox name /// Request message /// Response handler /// true - in case of successful mailing public bool RequestBroadcastByType(string serviceType, string inbox, Trequest data , Action> callback) { try { var clients = GetClientEnumeratorByType(serviceType).ToList(); callback(_RequestBroadcast(clients, inbox, data)); return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'"); } return false; } #endregion #region Discovery private long _update_discovery_table_task = -1; private long _register_in_discovery_table_task = -1; private static TimeSpan _update_discovery_table_period = TimeSpan.FromSeconds(15); private static TimeSpan _register_in_discovery_table_period = TimeSpan.FromSeconds(15); public void UseDiscovery() { if (_discoveryClient != null) { _discoveryClient.Dispose(); _discoveryClient = null; } var discovery = Configuration.Default.First("discovery"); _discoveryClient = new DiscoveryClient(_cachee.GetClient(NetUtils.CreateIPEndPoint(discovery), false, BaseSocket.NullRouter)); RestartDiscoveryTasks(); } public void UseDiscovery(string endpoint) { if (_discoveryClient != null) { _discoveryClient.Dispose(); _discoveryClient = null; } _discoveryClient = new DiscoveryClient(_cachee.GetClient(NetUtils.CreateIPEndPoint(endpoint), false, BaseSocket.NullRouter)); RestartDiscoveryTasks(); } public void UseDiscovery(IPEndPoint endpoint) { if (_discoveryClient != null) { _discoveryClient.Dispose(); _discoveryClient = null; } _discoveryClient = new DiscoveryClient(_cachee.GetClient(endpoint, false, BaseSocket.NullRouter)); RestartDiscoveryTasks(); } private void RestartDiscoveryTasks() { if (_update_discovery_table_task != -1) { Sheduller.Remove(_update_discovery_table_task); } if (_register_in_discovery_table_task != -1) { Sheduller.Remove(_register_in_discovery_table_task); } RegisterServicesInDiscovery(); _update_discovery_table_task = Sheduller.RemindEvery(_update_discovery_table_period, RegisterServicesInDiscovery); _register_in_discovery_table_task = Sheduller.RemindEvery(_register_in_discovery_table_period, () => { }); } private void RegisterServicesInDiscovery() { var services = _serverInstances. Values. Select(s => { var info = MessageSerializer.Copy(this._serviceInfo); info.Port = s.LocalEndpoint.Port; return info; }). ToList(); foreach (var service in services) { _discoveryClient.Register(service); } } #endregion #region Host service public IRouter UseHost() { return _cachee.GetServer(new IPEndPoint(IPAddress.Any, NetUtils.GetFreeTcpPort()), new Router()).Router; } public IRouter UseHost(int port) { return _cachee.GetServer(new IPEndPoint(IPAddress.Any, port), new Router()).Router; } public IRouter UseHost(IPEndPoint endpoint) { return _cachee.GetServer(endpoint, new Router()).Router; } #endregion #region Private internal IEnumerable GetClientEnumerator(string serviceKey) { InvokeResult> candidates; try { candidates = _aliases.GetAll(serviceKey); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.GetClientEnumerator] Error when trying get endpoints for service key '{serviceKey}'"); candidates = null; } if (candidates != null && candidates.Success && candidates.Value.Any()) { foreach (var endpoint in candidates.Value) { ExClient transport; try { transport = _cachee.GetClient(endpoint, true); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.GetClientEnumerator] Can't get transport for endpoint '{endpoint}'"); continue; } yield return transport; } } else { Log.Debug($"[Exchange.GetClientEnumerator] Not found endpoints for service key '{serviceKey}'"); } } internal IEnumerable GetClientEnumeratorByType(string serviceType) { InvokeResult> candidates; try { candidates = _aliases.GetAllByType(serviceType); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByType] Error when trying get endpoints for service type '{serviceType}'"); candidates = null; } if (candidates != null && candidates.Success && candidates.Value.Any()) { foreach (var endpoint in candidates.Value) { ExClient transport; try { transport = _cachee.GetClient(endpoint, true); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByType] Can't get transport for endpoint '{endpoint}'"); continue; } yield return transport; } } else { Log.Debug($"[Exchange.GetClientEnumeratorByType] Not found endpoints for service type '{serviceType}'"); } } internal IEnumerable GetClientEnumeratorByGroup(string serviceGroup) { InvokeResult> candidates; try { candidates = _aliases.GetAllByGroup(serviceGroup); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByGroup] Error when trying get endpoints for service group '{serviceGroup}'"); candidates = null; } if (candidates != null && candidates.Success && candidates.Value.Any()) { foreach (var service in candidates.Value) { ExClient transport; try { transport = _cachee.GetClient(service, true); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByGroup] Can't get transport for endpoint '{service}'"); continue; } yield return transport; } } else { Log.Debug($"[Exchange.GetClientEnumeratorByGroup] Not found endpoints for service group '{serviceGroup}'"); } } /// /// Call service with round-robin balancing /// /// Service key /// Service call code /// true - service called succesfully internal bool CallService(string serviceKey, Func callHandler) { InvokeResult> candidates; try { candidates = _aliases.GetAll(serviceKey); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.CallService] Error when trying get endpoints for service key '{serviceKey}'"); return false; } if (candidates == null || !candidates.Success || candidates.Value.Any() == false) { Log.Debug($"[Exchange.CallService] Not found endpoints for service key '{serviceKey}'"); return false; } var success = false; foreach (var endpoint in candidates.Value) { ExClient transport; try { transport = _cachee.GetClient(endpoint, true); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.CallService] Can't get transport for service '{serviceKey}'"); continue; } try { success = callHandler(transport); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.CallService] Error send/request data in service '{serviceKey}'. Endpoint '{endpoint}'"); success = false; } if (success) { break; } } return success; } internal InvokeResult CallServiceDirect(string endpoint, Func callHandler) { ExClient transport; try { transport = _cachee.GetClient(NetUtils.CreateIPEndPoint(endpoint), true); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.CallServiceDirect] Can't get transport for endpoint '{endpoint}'"); return InvokeResult.Fault(ex.Message); } return callHandler(transport); } private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data) { var response = new List(); using (var waiter = new CountdownEvent(clients.Count)) { foreach (var client in clients) { Task.Run(() => { try { if (false == client.Request(inbox, data, resp => { waiter.Signal(); response.Add(resp); }).Success) { waiter.Signal(); } } catch (Exception ex) { Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); } waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS); } return response; } private IEnumerable _RequestBroadcast(List clients, string inbox) { var response = new List(); using (var waiter = new CountdownEvent(clients.Count)) { foreach (var client in clients) { Task.Run(() => { try { if (false == client.Request(inbox, resp => { waiter.Signal(); response.Add(resp); }).Success) { waiter.Signal(); } } catch (Exception ex) { Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); } waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS); } return response; } #endregion public void Dispose() { if (_update_discovery_table_task != -1) { Sheduller.Remove(_update_discovery_table_task); } if (_register_in_discovery_table_task != -1) { Sheduller.Remove(_register_in_discovery_table_task); } _cachee.Dispose(); } } }