using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using ZeroLevel.Microservices.Contracts; using ZeroLevel.Microservices.Model; using ZeroLevel.Network.Microservices; using ZeroLevel.Services.Network; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Microservices { /// /// Обеспечивает обмен данными между сервисами /// public sealed class Exchange : IDisposable { private readonly IDiscoveryClient _discoveryClient; private readonly ExServiceHost _host; #region Ctor public Exchange(IDiscoveryClient discoveryClient) { this._discoveryClient = discoveryClient ?? throw new ArgumentNullException(nameof(discoveryClient)); this._host = new ExServiceHost(this._discoveryClient); } #endregion /// /// Регистрация сервиса /// public IExService RegisterService(IExchangeService service) { return _host.RegisterService(service); } public IExService RegisterService(MicroserviceInfo service) { return _host.RegisterService(service); } #region Balanced send /// /// Отправка сообщения сервису /// /// Ключ сервиса /// Имя точки приема сообщений /// Сообщение /// public bool Send(string serviceKey, string inbox, T data) { try { return _host.CallService(serviceKey, (endpoint, transport) => transport.Send(inbox, data).Success); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error send data in service '{serviceKey}'. Inbox '{inbox}'"); } return false; } public bool Send(string serviceKey, T data) => Send(serviceKey, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); #endregion #region Balanced request public Tresp Request(string serviceKey, string inbox, Treq data) { Tresp response = default(Tresp); try { if (false == _host.CallService(serviceKey, (endpoint, transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { if (false == transport.Request(inbox, data, resp => { response = resp; waiter.Set(); }).Success) { return false; } if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) { return false; } } return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); } return false; })) { Log.SystemWarning($"[Exchange] No responce on request. Service key '{serviceKey}'. Inbox '{inbox}'"); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); } return response; } public Tresp Request(string serviceKey, string inbox) { Tresp response = default(Tresp); try { if (false == _host.CallService(serviceKey, (endpoint, transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { if (false == transport.Request(inbox, resp => { response = resp; waiter.Set(); }).Success) { return false; } if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) { return false; } } return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); } return false; })) { Log.SystemWarning($"[Exchange] No responce on request. Service key '{serviceKey}'. Inbox '{inbox}'"); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'"); } return response; } public Tresp Request(string serviceKey, Treq data) => Request(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); public Tresp Request(string serviceKey) => Request(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX); #endregion #region Direct request public Tresp RequestDirect(string endpoint, string serviceKey, string inbox, Treq data) { Tresp response = default(Tresp); try { if (false == _host.CallServiceDirect(endpoint, serviceKey, (transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { if (false == transport.Request(inbox, data, resp => { response = resp; waiter.Set(); }).Success) { return false; } if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) { return false; } } return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); } return false; })) { Log.SystemWarning($"[Exchange] No responce on direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); } return response; } public Tresp RequestDirect(string endpoint, string serviceKey, string inbox) { Tresp response = default(Tresp); try { if (false == _host.CallServiceDirect(endpoint, serviceKey, (transport) => { try { using (var waiter = new ManualResetEventSlim(false)) { if (false == transport.Request(inbox, resp => { response = resp; waiter.Set(); }).Success) { return false; } if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS)) { return false; } } return true; } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); } return false; })) { Log.SystemWarning($"[Exchange] No responce on direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'"); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to service '{serviceKey}'. Inbox '{inbox}'"); } return response; } public Tresp RequestDirect(string endpoint, string serviceKey, Treq data) => RequestDirect(endpoint, serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); public Tresp RequestDirect(string endpoint, string serviceKey) => RequestDirect(endpoint, serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX); #endregion #region Broadcast /// /// Отправка сообщения всем сервисам с указанным ключом в указанный обработчик /// /// Тип сообщения /// Ключ сервиса /// Имя обработчика /// Сообщение /// true - при успешной отправке public bool SendBroadcast(string serviceKey, string inbox, T data) { try { foreach (var client in _host.GetClientEnumerator(serviceKey)) { Task.Run(() => { try { client.Send(inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data to services '{serviceKey}'. Inbox '{inbox}'"); } }); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data in service '{serviceKey}'. Inbox '{inbox}'"); } return false; } /// /// Отправка сообщения всем сервисам с указанным ключом, в обработчик по умолчанию /// /// Тип сообщения /// Ключ сервиса /// Сообщение /// true - при успешной отправке public bool SendBroadcast(string serviceKey, T data) => SendBroadcast(serviceKey, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); /// /// Отправка сообщения всем сервисам конкретного типа в указанный обработчик /// /// Тип сообщения /// Тип сервиса /// Имя обработчика /// Сообщение /// true - при успешной отправке public bool SendBroadcastByType(string serviceType, string inbox, T data) { try { foreach (var client in _host.GetClientEnumeratorByType(serviceType)) { Task.Run(() => { try { client.Send(inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceType}'. Inbox '{inbox}'"); } }); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceType}'. Inbox '{inbox}'"); } return false; } /// /// Отправка сообщения всем сервисам конкретного типа, в обработчик по умолчанию /// /// Тип сообщения /// Тип сервиса /// Сообщение /// true - при успешной отправке public bool SendBroadcastByType(string serviceType, T data) => SendBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); /// /// Отправка сообщения всем сервисам конкретной группы в указанный обработчик /// /// Тип сообщения /// Группа сервиса /// Имя обработчика /// Сообщение /// true - при успешной отправке public bool SendBroadcastByGroup(string serviceGroup, string inbox, T data) { try { foreach (var client in _host.GetClientEnumeratorByGroup(serviceGroup)) { Task.Run(() => { try { client.Send(inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceGroup}'. Inbox '{inbox}'"); } }); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceGroup}'. Inbox '{inbox}'"); } return false; } /// /// Отправка сообщения всем сервисам конкретной группы, в обработчик по умолчанию /// /// Тип сообщения /// Группа сервиса /// Сообщение /// true - при успешной отправке public bool SendBroadcastByGroup(string serviceGroup, T data) => SendBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data); /// /// Широковещательный опрос сервисов по ключу /// /// Тип запроса /// Тип ответа /// Ключ сервиса /// Имя обработчика /// Запрос /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcast(string serviceKey, string inbox, Treq data) { try { var clients = _host.GetClientEnumerator(serviceKey).ToList(); return _RequestBroadcast(clients, inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service '{serviceKey}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Широковещательный опрос сервисов по ключу, без сообщеня запроса /// /// Тип ответа /// Ключ сервиса /// Имя обработчика /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcast(string serviceKey, string inbox) { try { var clients = _host.GetClientEnumerator(serviceKey).ToList(); return _RequestBroadcast(clients, inbox); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service '{serviceKey}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Широковещательный опрос сервисов по ключу, в обработчик по умолчанию /// /// Тип запроса /// Тип ответа /// Ключ сервиса /// Запрос /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcast(string serviceKey, Treq data) => RequestBroadcast(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); /// /// Широковещательный опрос сервисов по ключу, без сообщеня запроса, в обработчик по умолчанию /// /// Тип ответа /// Ключ сервиса /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcast(string serviceKey) => RequestBroadcast(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX); /// /// Широковещательный опрос сервисов по типу сервису /// /// Тип запроса /// Тип ответа /// Тип сервиса /// Имя обработчика /// Запрос /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcastByType(string serviceType, string inbox, Treq data) { try { var clients = _host.GetClientEnumeratorByType(serviceType).ToList(); return _RequestBroadcast(clients, inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Широковещательный опрос сервисов по типу сервису, без сообщеня запроса /// /// Тип ответа /// Тип сервиса /// Имя обработчика /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcastByType(string serviceType, string inbox) { try { var clients = _host.GetClientEnumeratorByType(serviceType).ToList(); return _RequestBroadcast(clients, inbox); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Широковещательный опрос сервисов по типу сервису, в обработчик по умолчанию /// /// Тип запроса /// Тип ответа /// Тип сервиса /// Запрос /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcastByType(string serviceType, Treq data) => RequestBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); /// /// Широковещательный опрос сервисов по типу, без сообщеня запроса, в обработчик по умолчанию /// /// Тип ответа /// Тип сервиса /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcastByType(string serviceType) => RequestBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_REQUEST_INBOX); /// /// Широковещательный опрос сервисов по группе сервисов /// /// Тип запроса /// Тип ответа /// Группа сервиса /// Имя обработчика /// Запрос /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcastByGroup(string serviceGroup, string inbox, Treq data) { try { var clients = _host.GetClientEnumeratorByGroup(serviceGroup).ToList(); return _RequestBroadcast(clients, inbox, data); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceGroup}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Широковещательный опрос сервисов по группе сервисов, без сообщения запроса /// /// Тип ответа /// Группа сервиса /// Имя обработчика /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcastByGroup(string serviceGroup, string inbox) { try { var clients = _host.GetClientEnumeratorByGroup(serviceGroup).ToList(); return _RequestBroadcast(clients, inbox); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceGroup}'. Inbox '{inbox}'"); } return Enumerable.Empty(); } /// /// Широковещательный опрос сервисов по группе сервисов в обработчик по умолчанию /// /// Тип запроса /// Тип ответа /// Группа сервиса /// Запрос /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcastByGroup(string serviceGroup, Treq data) => RequestBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data); /// /// Широковещательный опрос сервисов по группе сервисов, без сообщения запроса, в обработчик по умолчанию /// /// Тип ответа /// Группа сервиса /// Обработчик ответа /// true - в случае успешной рассылки public IEnumerable RequestBroadcastByGroup(string serviceGroup) => RequestBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_REQUEST_INBOX); #region Private private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data) { var response = new List(); using (var waiter = new CountdownEvent(clients.Count)) { foreach (var client in clients) { Task.Run(() => { try { if (false == client.Request(inbox, data, resp => { waiter.Signal(); response.Add(resp); }).Success) { waiter.Signal(); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); } waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS); } return response; } private IEnumerable _RequestBroadcast(List clients, string inbox) { var response = new List(); using (var waiter = new CountdownEvent(clients.Count)) { foreach (var client in clients) { Task.Run(() => { try { if (false == client.Request(inbox, resp => { waiter.Signal(); response.Add(resp); }).Success) { waiter.Signal(); } } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); } waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS); } return response; } #endregion #endregion public void Dispose() { this._host.Dispose(); } } }