You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Zero/ZeroLevel.Microservices/ExServiceHost.cs

532 lines
22 KiB

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using ZeroLevel.Microservices.Contracts;
using ZeroLevel.Network.Microservices;
using ZeroLevel.Services.Network;
namespace ZeroLevel.Microservices
{
internal sealed class ExServiceHost
: IDisposable
{
private class MetaService
{
public MicroserviceInfo ServiceInfo { get; set; }
public ExService Server { get; set; }
}
private bool _disposed = false;
private readonly long _registerTaskKey = -1;
private readonly IDiscoveryClient _discoveryClient;
private readonly ConcurrentDictionary<string, MetaService> _services
= new ConcurrentDictionary<string, MetaService>();
internal ExServiceHost(IDiscoveryClient client)
{
_discoveryClient = client;
_registerTaskKey = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(15), RegisterServicesInDiscovery);
}
internal IExService RegisterService(IExchangeService service)
{
try
{
if (_disposed) throw new ObjectDisposedException("ExServiceHost");
if (service == null) throw new ArgumentNullException(nameof(service));
ValidateService(service);
var key = $"{service.Key}.{service.Protocol}";
if (_services.ContainsKey(key))
{
throw new Exception($"[ExServiceHost] Service {key} already registered");
}
var server = ExchangeTransportFactory.GetServer(service.Protocol);
if (false == _services.TryAdd(key, new MetaService
{
Server = server,
ServiceInfo = new MicroserviceInfo
{
Endpoint = $"{server.Endpoint.Address}:{server.Endpoint.Port}",
Protocol = service.Protocol,
ServiceKey = service.Key,
Version = service.Version,
ServiceGroup = service.Group,
ServiceType = service.Type
}
}))
{
server.Dispose();
return null;
}
RegisterServiceInboxes(service);
return server;
}
catch (Exception ex)
{
Log.SystemError(ex, "[ExServiceHost] Fault register service");
return null;
}
}
internal IExService RegisterService(MicroserviceInfo serviceInfo)
{
try
{
if (_disposed) throw new ObjectDisposedException("ExServiceHost");
if (serviceInfo == null) throw new ArgumentNullException(nameof(serviceInfo));
ValidateService(serviceInfo);
var key = $"{serviceInfo.ServiceKey}.{serviceInfo.Protocol}";
if (_services.ContainsKey(key))
{
throw new Exception($"[ExServiceHost] Service {key} already registered");
}
var server = ExchangeTransportFactory.GetServer(serviceInfo.Protocol);
if (false == _services.TryAdd(key, new MetaService
{
Server = server,
ServiceInfo = new MicroserviceInfo
{
Endpoint = $"{server.Endpoint.Address}:{server.Endpoint.Port}",
Protocol = serviceInfo.Protocol,
ServiceKey = serviceInfo.ServiceKey,
Version = serviceInfo.Version,
ServiceGroup = serviceInfo.ServiceGroup,
ServiceType = serviceInfo.ServiceType
}
}))
{
server.Dispose();
return null;
}
return server;
}
catch (Exception ex)
{
Log.SystemError(ex, "[ExServiceHost] Fault register service");
return null;
}
}
#region Private methods
private void ValidateService(IExchangeService service)
{
if (string.IsNullOrWhiteSpace(service.Protocol))
{
throw new ArgumentNullException("Service.Protocol");
}
if (string.IsNullOrWhiteSpace(service.Key))
{
throw new ArgumentNullException("Service.Key");
}
}
private void ValidateService(MicroserviceInfo service)
{
if (string.IsNullOrWhiteSpace(service.Protocol))
{
throw new ArgumentNullException("Service.Protocol");
}
if (string.IsNullOrWhiteSpace(service.ServiceKey))
{
throw new ArgumentNullException("ServiceKey");
}
}
private void RegisterServiceInboxes(IExchangeService service)
{
MethodInfo[] methods = service.
GetType().
GetMethods(BindingFlags.NonPublic | BindingFlags.Public |
BindingFlags.Instance |
BindingFlags.FlattenHierarchy |
BindingFlags.Instance);
var registerHandler = this.GetType().GetMethod("RegisterHandler");
var registerReplier = this.GetType().GetMethod("RegisterReplier");
var registerReplierWithNoRequestBody = this.GetType().GetMethod("RegisterReplierWithNoRequestBody");
foreach (MethodInfo mi in methods)
{
try
{
foreach (Attribute attr in Attribute.GetCustomAttributes(mi, typeof(ExchangeAttribute)))
{
if (attr.GetType() == typeof(ExchangeMainHandlerAttribute))
{
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerHandler.MakeGenericMethod(firstArgType);
genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, CreateDelegate(mi, service) });
}
else if (attr.GetType() == typeof(ExchangeHandlerAttribute))
{
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerHandler.MakeGenericMethod(firstArgType);
genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeHandlerAttribute).Inbox, CreateDelegate(mi, service) });
}
else if (attr.GetType() == typeof(ExchangeMainReplierAttribute))
{
var returnType = mi.ReturnType;
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerReplier.MakeGenericMethod(firstArgType, returnType);
genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) });
}
else if (attr.GetType() == typeof(ExchangeReplierAttribute))
{
var returnType = mi.ReturnType;
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerReplier.MakeGenericMethod(firstArgType, returnType);
genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) });
}
else if (attr.GetType() == typeof(ExchangeMainReplierWithoutArgAttribute))
{
var returnType = mi.ReturnType;
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerReplierWithNoRequestBody.MakeGenericMethod(returnType);
genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) });
}
else if (attr.GetType() == typeof(ExchangeReplierWithoutArgAttribute))
{
var returnType = mi.ReturnType;
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerReplierWithNoRequestBody.MakeGenericMethod(returnType);
genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) });
}
}
}
catch (Exception ex)
{
Log.Debug($"[ZExchange] Can't register method {mi.Name} as inbox handler. {ex.ToString()}");
}
}
}
private void RegisterServicesInDiscovery()
{
var services = _services.
Values.
Select(s => s.ServiceInfo).
ToList();
foreach (var service in services)
{
_discoveryClient.Register(service);
}
}
#endregion Private methods
#region Utils
private static Delegate CreateDelegate(MethodInfo methodInfo, object target)
{
Func<Type[], Type> getType;
var isAction = methodInfo.ReturnType.Equals((typeof(void)));
var types = methodInfo.GetParameters().Select(p => p.ParameterType);
if (isAction)
{
getType = Expression.GetActionType;
}
else
{
getType = Expression.GetFuncType;
types = types.Concat(new[] { methodInfo.ReturnType });
}
if (methodInfo.IsStatic)
{
return Delegate.CreateDelegate(getType(types.ToArray()), methodInfo);
}
return Delegate.CreateDelegate(getType(types.ToArray()), target, methodInfo.Name);
}
#endregion Utils
#region Inboxes
/// <summary>
/// Регистрация обработчика входящих сообщений
/// </summary>
/// <typeparam name="T">Тип сообщения</typeparam>
/// <param name="protocol">Транспортный протокол</param>
/// <param name="inbox">Имя точки приема</param>
/// <param name="handler">Обработчик</param>
private void RegisterHandler<T>(MetaService meta, string inbox, Action<T, long, IZBackward> handler)
{
if (_disposed) return;
try
{
meta.Server.RegisterInbox(inbox, handler);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Register inbox handler error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
}
}
/// <summary>
/// Регистрация метода отдающего ответ на входящий запрос
/// </summary>
/// <typeparam name="Treq">Тип входного сообщения</typeparam>
/// <typeparam name="Tresp">Тип ответа</typeparam>
/// <param name="protocol">Транспортный протокол</param>
/// <param name="inbox">Имя точки приема</param>
/// <param name="replier">Обработчик</param>
private void RegisterReplier<Treq, Tresp>(MetaService meta, string inbox, Func<Treq, long, IZBackward, Tresp> handler)
{
if (_disposed) return;
try
{
meta.Server.RegisterInbox(inbox, handler);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Register inbox replier error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
}
}
/// <summary>
/// Регистрация метода отдающего ответ на входящий запрос, не принимающего входящих данных
/// </summary>
/// <typeparam name="Tresp">Тип ответа</typeparam>
/// <param name="protocol">Транспортный протокол</param>
/// <param name="inbox">Имя точки приема</param>
/// <param name="replier">Обработчик</param>
private void RegisterReplierWithNoRequestBody<Tresp>(MetaService meta, string inbox, Func<long, IZBackward, Tresp> handler)
{
if (_disposed) return;
try
{
meta.Server.RegisterInbox(inbox, handler);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Register inbox replier error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
}
}
#endregion Inboxes
#region Transport helpers
/// <summary>
/// Call service with round-robin balancing
/// </summary>
/// <param name="serviceKey">Service key</param>
/// <param name="callHandler">Service call code</param>
/// <returns>true - service called succesfully</returns>
internal bool CallService(string serviceKey, Func<string, IExClient, bool> callHandler)
{
if (_disposed) return false;
List<ServiceEndpointInfo> candidates;
try
{
candidates = _discoveryClient.GetServiceEndpoints(serviceKey)?.ToList();
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExServiceHost] Error when trying get endpoints for service key '{serviceKey}'");
return false;
}
if (candidates == null || candidates.Any() == false)
{
Log.Debug($"[ExServiceHost] Not found endpoints for service key '{serviceKey}'");
return false;
}
var success = false;
foreach (var service in candidates)
{
IExClient transport;
try
{
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, "[ExServiceHost] Can't get transport for protocol '{0}', service '{1}'", service.Protocol, serviceKey);
continue;
}
try
{
success = callHandler(service.Endpoint, transport);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExServiceHost] Error send/request data in service '{serviceKey}'. Endpoint '{service.Endpoint}'");
success = false;
}
if (success)
{
break;
}
}
return success;
}
internal bool CallServiceDirect(string endpoint, string serviceKey, Func<IExClient, bool> callHandler)
{
if (_disposed) return false;
ServiceEndpointInfo candidate = null;
try
{
candidate = _discoveryClient.GetService(serviceKey, endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExServiceHost] Error when trying get service info by key '{serviceKey}' and endpoint '{endpoint}'");
return false;
}
if (candidate == null)
{
Log.Debug($"[ExServiceHost] Not found service info for key '{serviceKey}' and endpoint '{endpoint}'");
return false;
}
IExClient transport;
try
{
transport = ExchangeTransportFactory.GetClient(candidate.Protocol, candidate.Endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExServiceHost] Can't get transport for protocol '{candidate.Protocol}', service '{serviceKey}'");
return false;
}
return callHandler(transport);
}
internal IEnumerable<IExClient> GetClientEnumerator(string serviceKey)
{
if (!_disposed)
{
List<ServiceEndpointInfo> candidates;
try
{
candidates = _discoveryClient.GetServiceEndpoints(serviceKey)?.ToList();
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service key '{serviceKey}'");
candidates = null;
}
if (candidates != null && candidates.Any())
{
foreach (var service in candidates)
{
IExClient transport;
try
{
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint);
continue;
}
yield return transport;
}
}
else
{
Log.Debug($"[Exchange] Not found endpoints for service key '{serviceKey}'");
}
}
}
internal IEnumerable<IExClient> GetClientEnumeratorByType(string serviceType)
{
if (!_disposed)
{
List<ServiceEndpointInfo> candidates;
try
{
candidates = _discoveryClient.GetServiceEndpointsByType(serviceType)?.ToList();
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service type '{serviceType}'");
candidates = null;
}
if (candidates != null && candidates.Any())
{
foreach (var service in candidates)
{
IExClient transport;
try
{
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint);
continue;
}
yield return transport;
}
}
else
{
Log.Debug($"[Exchange] Not found endpoints for service type '{serviceType}'");
}
}
}
internal IEnumerable<IExClient> GetClientEnumeratorByGroup(string serviceGroup)
{
if (!_disposed)
{
List<ServiceEndpointInfo> candidates;
try
{
candidates = _discoveryClient.GetServiceEndpointsByGroup(serviceGroup)?.ToList();
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service group '{serviceGroup}'");
candidates = null;
}
if (candidates != null && candidates.Any())
{
foreach (var service in candidates)
{
IExClient transport;
try
{
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint);
continue;
}
yield return transport;
}
}
else
{
Log.Debug($"[Exchange] Not found endpoints for service group '{serviceGroup}'");
}
}
}
#endregion Transport helpers
public void Dispose()
{
if (_disposed) return;
_disposed = true;
Sheduller.Remove(_registerTaskKey);
foreach (var s in _services)
{
s.Value.Server.Dispose();
}
}
}
}

Powered by TurnKey Linux.