using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ZeroLevel.Microservices.Contracts;
using ZeroLevel.Microservices.Model;
using ZeroLevel.Network.Microservices;
using ZeroLevel.Services.Network;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Microservices
{
///
/// Обеспечивает обмен данными между сервисами
///
public sealed class Exchange :
IDisposable
{
private readonly IDiscoveryClient _discoveryClient;
private readonly ExServiceHost _host;
#region Ctor
public Exchange(IDiscoveryClient discoveryClient)
{
this._discoveryClient = discoveryClient ?? throw new ArgumentNullException(nameof(discoveryClient));
this._host = new ExServiceHost(this._discoveryClient);
}
#endregion
///
/// Регистрация сервиса
///
public IExService RegisterService(IExchangeService service)
{
return _host.RegisterService(service);
}
public IExService RegisterService(MicroserviceInfo service)
{
return _host.RegisterService(service);
}
#region Balanced send
///
/// Отправка сообщения сервису
///
/// Ключ сервиса
/// Имя точки приема сообщений
/// Сообщение
///
public bool Send(string serviceKey, string inbox, T data)
{
try
{
return _host.CallService(serviceKey, (endpoint, transport) => transport.Send(inbox, data).Success);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error send data in service '{serviceKey}'. Inbox '{inbox}'");
}
return false;
}
public bool Send(string serviceKey, T data) => Send(serviceKey, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data);
#endregion
#region Balanced request
public Tresp Request(string serviceKey, string inbox, Treq data)
{
Tresp response = default(Tresp);
try
{
if (false == _host.CallService(serviceKey, (endpoint, transport) =>
{
try
{
using (var waiter = new ManualResetEventSlim(false))
{
if (false == transport.Request(inbox, data, resp =>
{
response = resp;
waiter.Set();
}).Success)
{
return false;
}
if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS))
{
return false;
}
}
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'");
}
return false;
}))
{
Log.SystemWarning($"[Exchange] No responce on request. Service key '{serviceKey}'. Inbox '{inbox}'");
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'");
}
return response;
}
public Tresp Request(string serviceKey, string inbox)
{
Tresp response = default(Tresp);
try
{
if (false == _host.CallService(serviceKey, (endpoint, transport) =>
{
try
{
using (var waiter = new ManualResetEventSlim(false))
{
if (false == transport.Request(inbox, resp =>
{
response = resp;
waiter.Set();
}).Success)
{
return false;
}
if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS))
{
return false;
}
}
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'");
}
return false;
}))
{
Log.SystemWarning($"[Exchange] No responce on request. Service key '{serviceKey}'. Inbox '{inbox}'");
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error request to service '{serviceKey}'. Inbox '{inbox}'");
}
return response;
}
public Tresp Request(string serviceKey, Treq data) =>
Request(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data);
public Tresp Request(string serviceKey) =>
Request(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX);
#endregion
#region Direct request
public Tresp RequestDirect(string endpoint, string serviceKey, string inbox, Treq data)
{
Tresp response = default(Tresp);
try
{
if (false == _host.CallServiceDirect(endpoint, serviceKey, (transport) =>
{
try
{
using (var waiter = new ManualResetEventSlim(false))
{
if (false == transport.Request(inbox, data, resp =>
{
response = resp;
waiter.Set();
}).Success)
{
return false;
}
if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS))
{
return false;
}
}
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'");
}
return false;
}))
{
Log.SystemWarning($"[Exchange] No responce on direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'");
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'");
}
return response;
}
public Tresp RequestDirect(string endpoint, string serviceKey, string inbox)
{
Tresp response = default(Tresp);
try
{
if (false == _host.CallServiceDirect(endpoint, serviceKey, (transport) =>
{
try
{
using (var waiter = new ManualResetEventSlim(false))
{
if (false == transport.Request(inbox, resp =>
{
response = resp;
waiter.Set();
}).Success)
{
return false;
}
if (false == waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS))
{
return false;
}
}
return true;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'");
}
return false;
}))
{
Log.SystemWarning($"[Exchange] No responce on direct request to '{endpoint}'. Service key '{serviceKey}'. Inbox '{inbox}'");
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error direct request to service '{serviceKey}'. Inbox '{inbox}'");
}
return response;
}
public Tresp RequestDirect(string endpoint, string serviceKey, Treq data) =>
RequestDirect(endpoint, serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data);
public Tresp RequestDirect(string endpoint, string serviceKey) =>
RequestDirect(endpoint, serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX);
#endregion
#region Broadcast
///
/// Отправка сообщения всем сервисам с указанным ключом в указанный обработчик
///
/// Тип сообщения
/// Ключ сервиса
/// Имя обработчика
/// Сообщение
/// true - при успешной отправке
public bool SendBroadcast(string serviceKey, string inbox, T data)
{
try
{
foreach (var client in _host.GetClientEnumerator(serviceKey))
{
Task.Run(() =>
{
try
{
client.Send(inbox, data);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast send data to services '{serviceKey}'. Inbox '{inbox}'");
}
});
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast send data in service '{serviceKey}'. Inbox '{inbox}'");
}
return false;
}
///
/// Отправка сообщения всем сервисам с указанным ключом, в обработчик по умолчанию
///
/// Тип сообщения
/// Ключ сервиса
/// Сообщение
/// true - при успешной отправке
public bool SendBroadcast(string serviceKey, T data) => SendBroadcast(serviceKey, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data);
///
/// Отправка сообщения всем сервисам конкретного типа в указанный обработчик
///
/// Тип сообщения
/// Тип сервиса
/// Имя обработчика
/// Сообщение
/// true - при успешной отправке
public bool SendBroadcastByType(string serviceType, string inbox, T data)
{
try
{
foreach (var client in _host.GetClientEnumeratorByType(serviceType))
{
Task.Run(() =>
{
try
{
client.Send(inbox, data);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceType}'. Inbox '{inbox}'");
}
});
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceType}'. Inbox '{inbox}'");
}
return false;
}
///
/// Отправка сообщения всем сервисам конкретного типа, в обработчик по умолчанию
///
/// Тип сообщения
/// Тип сервиса
/// Сообщение
/// true - при успешной отправке
public bool SendBroadcastByType(string serviceType, T data) =>
SendBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data);
///
/// Отправка сообщения всем сервисам конкретной группы в указанный обработчик
///
/// Тип сообщения
/// Группа сервиса
/// Имя обработчика
/// Сообщение
/// true - при успешной отправке
public bool SendBroadcastByGroup(string serviceGroup, string inbox, T data)
{
try
{
foreach (var client in _host.GetClientEnumeratorByGroup(serviceGroup))
{
Task.Run(() =>
{
try
{
client.Send(inbox, data);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceGroup}'. Inbox '{inbox}'");
}
});
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast send data to services with type '{serviceGroup}'. Inbox '{inbox}'");
}
return false;
}
///
/// Отправка сообщения всем сервисам конкретной группы, в обработчик по умолчанию
///
/// Тип сообщения
/// Группа сервиса
/// Сообщение
/// true - при успешной отправке
public bool SendBroadcastByGroup(string serviceGroup, T data) =>
SendBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, data);
///
/// Широковещательный опрос сервисов по ключу
///
/// Тип запроса
/// Тип ответа
/// Ключ сервиса
/// Имя обработчика
/// Запрос
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcast(string serviceKey, string inbox, Treq data)
{
try
{
var clients = _host.GetClientEnumerator(serviceKey).ToList();
return _RequestBroadcast(clients, inbox, data);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast request to service '{serviceKey}'. Inbox '{inbox}'");
}
return Enumerable.Empty();
}
///
/// Широковещательный опрос сервисов по ключу, без сообщеня запроса
///
/// Тип ответа
/// Ключ сервиса
/// Имя обработчика
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcast(string serviceKey, string inbox)
{
try
{
var clients = _host.GetClientEnumerator(serviceKey).ToList();
return _RequestBroadcast(clients, inbox);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast request to service '{serviceKey}'. Inbox '{inbox}'");
}
return Enumerable.Empty();
}
///
/// Широковещательный опрос сервисов по ключу, в обработчик по умолчанию
///
/// Тип запроса
/// Тип ответа
/// Ключ сервиса
/// Запрос
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcast(string serviceKey, Treq data) =>
RequestBroadcast(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data);
///
/// Широковещательный опрос сервисов по ключу, без сообщеня запроса, в обработчик по умолчанию
///
/// Тип ответа
/// Ключ сервиса
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcast(string serviceKey) =>
RequestBroadcast(serviceKey, ZBaseNetwork.DEFAULT_REQUEST_INBOX);
///
/// Широковещательный опрос сервисов по типу сервису
///
/// Тип запроса
/// Тип ответа
/// Тип сервиса
/// Имя обработчика
/// Запрос
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcastByType(string serviceType, string inbox, Treq data)
{
try
{
var clients = _host.GetClientEnumeratorByType(serviceType).ToList();
return _RequestBroadcast(clients, inbox, data);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'");
}
return Enumerable.Empty();
}
///
/// Широковещательный опрос сервисов по типу сервису, без сообщеня запроса
///
/// Тип ответа
/// Тип сервиса
/// Имя обработчика
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcastByType(string serviceType, string inbox)
{
try
{
var clients = _host.GetClientEnumeratorByType(serviceType).ToList();
return _RequestBroadcast(clients, inbox);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceType}'. Inbox '{inbox}'");
}
return Enumerable.Empty();
}
///
/// Широковещательный опрос сервисов по типу сервису, в обработчик по умолчанию
///
/// Тип запроса
/// Тип ответа
/// Тип сервиса
/// Запрос
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcastByType(string serviceType, Treq data) =>
RequestBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data);
///
/// Широковещательный опрос сервисов по типу, без сообщеня запроса, в обработчик по умолчанию
///
/// Тип ответа
/// Тип сервиса
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcastByType(string serviceType) =>
RequestBroadcastByType(serviceType, ZBaseNetwork.DEFAULT_REQUEST_INBOX);
///
/// Широковещательный опрос сервисов по группе сервисов
///
/// Тип запроса
/// Тип ответа
/// Группа сервиса
/// Имя обработчика
/// Запрос
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcastByGroup(string serviceGroup, string inbox, Treq data)
{
try
{
var clients = _host.GetClientEnumeratorByGroup(serviceGroup).ToList();
return _RequestBroadcast(clients, inbox, data);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceGroup}'. Inbox '{inbox}'");
}
return Enumerable.Empty();
}
///
/// Широковещательный опрос сервисов по группе сервисов, без сообщения запроса
///
/// Тип ответа
/// Группа сервиса
/// Имя обработчика
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcastByGroup(string serviceGroup, string inbox)
{
try
{
var clients = _host.GetClientEnumeratorByGroup(serviceGroup).ToList();
return _RequestBroadcast(clients, inbox);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error broadcast request to service by type '{serviceGroup}'. Inbox '{inbox}'");
}
return Enumerable.Empty();
}
///
/// Широковещательный опрос сервисов по группе сервисов в обработчик по умолчанию
///
/// Тип запроса
/// Тип ответа
/// Группа сервиса
/// Запрос
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcastByGroup(string serviceGroup, Treq data) =>
RequestBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_REQUEST_INBOX, data);
///
/// Широковещательный опрос сервисов по группе сервисов, без сообщения запроса, в обработчик по умолчанию
///
/// Тип ответа
/// Группа сервиса
/// Обработчик ответа
/// true - в случае успешной рассылки
public IEnumerable RequestBroadcastByGroup(string serviceGroup) =>
RequestBroadcastByGroup(serviceGroup, ZBaseNetwork.DEFAULT_REQUEST_INBOX);
#region Private
private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data)
{
var response = new List();
using (var waiter = new CountdownEvent(clients.Count))
{
foreach (var client in clients)
{
Task.Run(() =>
{
try
{
if (false == client.Request(inbox, data, resp => { waiter.Signal(); response.Add(resp); }).Success)
{
waiter.Signal();
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'");
waiter.Signal();
}
});
}
waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS);
}
return response;
}
private IEnumerable _RequestBroadcast(List clients, string inbox)
{
var response = new List();
using (var waiter = new CountdownEvent(clients.Count))
{
foreach (var client in clients)
{
Task.Run(() =>
{
try
{
if (false == client.Request(inbox, resp => { waiter.Signal(); response.Add(resp); }).Success)
{
waiter.Signal();
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'");
waiter.Signal();
}
});
}
waiter.Wait(ZBaseNetwork.MAX_REQUEST_TIME_MS);
}
return response;
}
#endregion
#endregion
public void Dispose()
{
this._host.Dispose();
}
}
}