using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using ZeroLevel.Microservices.Contracts; using ZeroLevel.Network.Microservices; using ZeroLevel.Services.Network; namespace ZeroLevel.Microservices { /// /// Provides data exchange between services /// public sealed class Exchange : IDisposable { private readonly IDiscoveryClient _discoveryClient; private readonly ExServiceHost _host; #region Ctor public Exchange(IDiscoveryClient discoveryClient) { this._discoveryClient = discoveryClient ?? throw new ArgumentNullException(nameof(discoveryClient)); this._host = new ExServiceHost(this._discoveryClient); } #endregion Ctor /// /// Registration service /// public IExService RegisterService(IExchangeService service) { return _host.RegisterService(service); } public IExService RegisterService(MicroserviceInfo service) { return _host.RegisterService(service); } #region Balanced send /// /// Sending a message to the service /// /// Service key /// Inbox name /// Message /// public bool Send(string serviceKey, string inbox, T data) { try { return _host.CallService(serviceKey, (endpoint, transport) => transport.Send(inbox, data).Success); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error send data in service '{serviceKey}'. Inbox '{inbox}'"); } return false; } public bool Send(string serviceKey, T data) => Send(serviceKey, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); #endregion Balanced send #region Balanced request public Tresp Request(string serviceKey, string inbox, Treq data) { Tresp response = default(Tresp); try { if (false == _host.CallService(serviceKey, (endpoint, transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { if (false == transport.Request(inbox, data, resp => { response = resp; waiter.Set(); }).Success) { return false; } if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) { return false; } } return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); } return false; })) { Log.SystemWarning($"[Exchange] No responce on request. Service key '{serviceKey}'. Inbox '{inbox}'"); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); } return response; } public Tresp Request(string serviceKey, string inbox) { Tresp response = default(Tresp); try { if (false == _host.CallService(serviceKey, (endpoint, transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { if (false == transport.Request(inbox, resp => { response = resp; waiter.Set(); }).Success) { return false; } if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) { return false; } } return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); } return false; })) { Log.SystemWarning($"[Exchange] No responce on request. Service key '{serviceKey}'. Inbox '{inbox}'"); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); } return response; } public Tresp Request(string serviceKey, Treq data) => Request(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); public Tresp Request(string serviceKey) => Request(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX); #endregion Balanced request #region Direct request public Tresp RequestDirect(string endpoint, string serviceKey, string inbox, Treq data) { Tresp response = default(Tresp); try { if (false == _host.CallServiceDirect(endpoint, serviceKey, (transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { if (false == transport.Request(inbox, data, resp => { response = resp; waiter.Set(); }).Success) { return false; } if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) { return false; } } return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); } return false; })) { Log.SystemWarning($"[Exchange] No responce on direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); } return response; } public Tresp RequestDirect(string endpoint, string serviceKey, string inbox) { Tresp response = default(Tresp); try { if (false == _host.CallServiceDirect(endpoint, serviceKey, (transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { if (false == transport.Request(inbox, resp => { response = resp; waiter.Set(); }).Success) { return false; } if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) { return false; } } return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); } return false; })) { Log.SystemWarning($"[Exchange] No responce on direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to service '{serviceKey}'. Inbox '{inbox}'"); } return response; } public Tresp RequestDirect(string endpoint, string serviceKey, Treq data) => RequestDirect(endpoint, serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); public Tresp RequestDirect(string endpoint, string serviceKey) => RequestDirect(endpoint, serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX); #endregion Direct request #region Broadcast /// /// Sending a message to all services with the specified key to the specified handler /// /// Message type /// Service key /// Inbox name /// Message /// true - on successful submission public bool SendBroadcast(string serviceKey, string inbox, T data) { try { foreach (var client in _host.GetClientEnumerator(serviceKey)) { Task.Run(() => { try { client.Send(inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data to services '{serviceKey}'. Inbox '{inbox}'"); } }); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data in service '{serviceKey}'. Inbox '{inbox}'"); } return false; } /// /// Sending a message to all services with the specified key, to the default handler /// /// Message type /// Service key /// Message /// true - on successful submission public bool SendBroadcast(string serviceKey, T data) => SendBroadcast(serviceKey, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); /// /// Sending a message to all services of a specific type to the specified handler /// /// Message type /// Service type /// Inbox name /// Message /// true - on successful submission public bool SendBroadcastByType(string serviceType, string inbox, T data) { try { foreach (var client in _host.GetClientEnumeratorByType(serviceType)) { Task.Run(() => { try { client.Send(inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceType}'. Inbox '{inbox}'"); } }); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceType}'. Inbox '{inbox}'"); } return false; } /// /// Sending a message to all services of a particular type, to the default handler /// /// Message type /// Service type /// Message /// true - on successful submission public bool SendBroadcastByType(string serviceType, T data) => SendBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); /// /// Sending a message to all services of a specific group to the specified handler /// /// Message type /// Service group /// Inbox name /// Message /// true - on successful submission public bool SendBroadcastByGroup(string serviceGroup, string inbox, T data) { try { foreach (var client in _host.GetClientEnumeratorByGroup(serviceGroup)) { Task.Run(() => { try { client.Send(inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceGroup}'. Inbox '{inbox}'"); } }); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceGroup}'. Inbox '{inbox}'"); } return false; } /// /// Sending a message to all services of a specific group in the default handler /// /// Message type /// Service group /// Messsage /// true - on successful submission public bool SendBroadcastByGroup(string serviceGroup, T data) => SendBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); /// /// Broadcast polling services by key /// /// Request message type /// Response message type /// Service key /// Inbox name /// Request message /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcast(string serviceKey, string inbox, Treq data) { try { var clients = _host.GetClientEnumerator(serviceKey).ToList(); return _RequestBroadcast(clients, inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service '{serviceKey}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Broadcast polling services by key, without message request /// /// Response message type /// Service key /// Inbox name /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcast(string serviceKey, string inbox) { try { var clients = _host.GetClientEnumerator(serviceKey).ToList(); return _RequestBroadcast(clients, inbox); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service '{serviceKey}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Broadcast polling services by key, to default handler /// /// Request message type /// Response message type /// Service key /// Request message /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcast(string serviceKey, Treq data) => RequestBroadcast(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); /// /// Broadcast polling of services by key, without message of request, to default handler /// /// Response message type /// Service key /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcast(string serviceKey) => RequestBroadcast(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX); /// /// Broadcast polling services by type of service /// /// Request message type /// Response message type /// Service type /// Inbox name /// Request message /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByType(string serviceType, string inbox, Treq data) { try { var clients = _host.GetClientEnumeratorByType(serviceType).ToList(); return _RequestBroadcast(clients, inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Broadcast polling of services by type of service, without a request message /// /// Response message type /// Service type /// Inbox name /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByType(string serviceType, string inbox) { try { var clients = _host.GetClientEnumeratorByType(serviceType).ToList(); return _RequestBroadcast(clients, inbox); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Broadcast polling services by type of service, in the default handler /// /// Request message type /// Response message type /// Service type /// Request message /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByType(string serviceType, Treq data) => RequestBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); /// /// Broadcast polling services by type, without message request, in the default handler /// /// Response message type /// Service type /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByType(string serviceType) => RequestBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_REQUEST_INBOX); /// /// Broadcast polling services for a group of services /// /// Request message type /// Response message type /// Service group /// Inbox name /// Request message /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByGroup(string serviceGroup, string inbox, Treq data) { try { var clients = _host.GetClientEnumeratorByGroup(serviceGroup).ToList(); return _RequestBroadcast(clients, inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceGroup}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Broadcast polling services for a group of services, without prompting /// /// Response message type /// Service group /// Inbox name /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByGroup(string serviceGroup, string inbox) { try { var clients = _host.GetClientEnumeratorByGroup(serviceGroup).ToList(); return _RequestBroadcast(clients, inbox); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceGroup}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Broadcast polling services by service group to default handler /// /// Request message type /// Response message type /// Service group /// Request message /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByGroup(string serviceGroup, Treq data) => RequestBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); /// ///Broadcast polling services for a group of services, without sending a request, to the default handler /// /// Response message type /// Service group /// Response handler /// true - in case of successful mailing public IEnumerable RequestBroadcastByGroup(string serviceGroup) => RequestBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_REQUEST_INBOX); #region Private private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data) { var response = new List(); using (var waiter = new CountdownEvent(clients.Count)) { foreach (var client in clients) { Task.Run(() => { try { if (false == client.Request(inbox, data, resp => { waiter.Signal(); response.Add(resp); }).Success) { waiter.Signal(); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); } waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS); } return response; } private IEnumerable _RequestBroadcast(List clients, string inbox) { var response = new List(); using (var waiter = new CountdownEvent(clients.Count)) { foreach (var client in clients) { Task.Run(() => { try { if (false == client.Request(inbox, resp => { waiter.Signal(); response.Add(resp); }).Success) { waiter.Signal(); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); } waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS); } return response; } #endregion Private #endregion Broadcast public void Dispose() { this._host.Dispose(); } } }