mirror of https://github.com/ogoun/Zero.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
532 lines
21 KiB
532 lines
21 KiB
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Linq.Expressions;
|
|
using System.Reflection;
|
|
using ZeroLevel.Microservices.Contracts;
|
|
using ZeroLevel.Network.Microservices;
|
|
using ZeroLevel.Services.Network;
|
|
|
|
namespace ZeroLevel.Microservices
|
|
{
|
|
internal sealed class ExServiceHost
|
|
: IDisposable
|
|
{
|
|
private class MetaService
|
|
{
|
|
public MicroserviceInfo ServiceInfo { get; set; }
|
|
public ExService Server { get; set; }
|
|
}
|
|
|
|
private bool _disposed = false;
|
|
private readonly long _registerTaskKey = -1;
|
|
private readonly IDiscoveryClient _discoveryClient;
|
|
|
|
private readonly ConcurrentDictionary<string, MetaService> _services
|
|
= new ConcurrentDictionary<string, MetaService>();
|
|
|
|
internal ExServiceHost(IDiscoveryClient client)
|
|
{
|
|
_discoveryClient = client;
|
|
_registerTaskKey = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(15), RegisterServicesInDiscovery);
|
|
}
|
|
|
|
internal IExService RegisterService(IExchangeService service)
|
|
{
|
|
try
|
|
{
|
|
if (_disposed) throw new ObjectDisposedException("ExServiceHost");
|
|
if (service == null) throw new ArgumentNullException(nameof(service));
|
|
ValidateService(service);
|
|
|
|
var key = $"{service.Key}.{service.Protocol}";
|
|
if (_services.ContainsKey(key))
|
|
{
|
|
throw new Exception($"[ExServiceHost] Service {key} already registered");
|
|
}
|
|
|
|
var server = ExchangeTransportFactory.GetServer(service.Protocol);
|
|
if (false == _services.TryAdd(key, new MetaService
|
|
{
|
|
Server = server,
|
|
ServiceInfo = new MicroserviceInfo
|
|
{
|
|
Endpoint = $"{server.Endpoint.Address}:{server.Endpoint.Port}",
|
|
Protocol = service.Protocol,
|
|
ServiceKey = service.Key,
|
|
Version = service.Version,
|
|
ServiceGroup = service.Group,
|
|
ServiceType = service.Type
|
|
}
|
|
}))
|
|
{
|
|
server.Dispose();
|
|
return null;
|
|
}
|
|
|
|
RegisterServiceInboxes(service);
|
|
|
|
return server;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, "[ExServiceHost] Fault register service");
|
|
return null;
|
|
}
|
|
}
|
|
|
|
internal IExService RegisterService(MicroserviceInfo serviceInfo)
|
|
{
|
|
try
|
|
{
|
|
if (_disposed) throw new ObjectDisposedException("ExServiceHost");
|
|
if (serviceInfo == null) throw new ArgumentNullException(nameof(serviceInfo));
|
|
ValidateService(serviceInfo);
|
|
|
|
var key = $"{serviceInfo.ServiceKey}.{serviceInfo.Protocol}";
|
|
if (_services.ContainsKey(key))
|
|
{
|
|
throw new Exception($"[ExServiceHost] Service {key} already registered");
|
|
}
|
|
|
|
var server = ExchangeTransportFactory.GetServer(serviceInfo.Protocol);
|
|
if (false == _services.TryAdd(key, new MetaService
|
|
{
|
|
Server = server,
|
|
ServiceInfo = new MicroserviceInfo
|
|
{
|
|
Endpoint = $"{server.Endpoint.Address}:{server.Endpoint.Port}",
|
|
Protocol = serviceInfo.Protocol,
|
|
ServiceKey = serviceInfo.ServiceKey,
|
|
Version = serviceInfo.Version,
|
|
ServiceGroup = serviceInfo.ServiceGroup,
|
|
ServiceType = serviceInfo.ServiceType
|
|
}
|
|
}))
|
|
{
|
|
server.Dispose();
|
|
return null;
|
|
}
|
|
return server;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, "[ExServiceHost] Fault register service");
|
|
return null;
|
|
}
|
|
}
|
|
|
|
#region Private methods
|
|
|
|
private void ValidateService(IExchangeService service)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(service.Protocol))
|
|
{
|
|
throw new ArgumentNullException("Service.Protocol");
|
|
}
|
|
if (string.IsNullOrWhiteSpace(service.Key))
|
|
{
|
|
throw new ArgumentNullException("Service.Key");
|
|
}
|
|
}
|
|
|
|
private void ValidateService(MicroserviceInfo service)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(service.Protocol))
|
|
{
|
|
throw new ArgumentNullException("Service.Protocol");
|
|
}
|
|
if (string.IsNullOrWhiteSpace(service.ServiceKey))
|
|
{
|
|
throw new ArgumentNullException("ServiceKey");
|
|
}
|
|
}
|
|
|
|
private void RegisterServiceInboxes(IExchangeService service)
|
|
{
|
|
MethodInfo[] methods = service.
|
|
GetType().
|
|
GetMethods(BindingFlags.NonPublic | BindingFlags.Public |
|
|
BindingFlags.Instance |
|
|
BindingFlags.FlattenHierarchy |
|
|
BindingFlags.Instance);
|
|
|
|
var registerHandler = this.GetType().GetMethod("RegisterHandler");
|
|
var registerReplier = this.GetType().GetMethod("RegisterReplier");
|
|
var registerReplierWithNoRequestBody = this.GetType().GetMethod("RegisterReplierWithNoRequestBody");
|
|
|
|
foreach (MethodInfo mi in methods)
|
|
{
|
|
try
|
|
{
|
|
foreach (Attribute attr in Attribute.GetCustomAttributes(mi, typeof(ExchangeAttribute)))
|
|
{
|
|
if (attr.GetType() == typeof(ExchangeMainHandlerAttribute))
|
|
{
|
|
var firstArgType = mi.GetParameters().First().ParameterType;
|
|
MethodInfo genericMethod = registerHandler.MakeGenericMethod(firstArgType);
|
|
genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, CreateDelegate(mi, service) });
|
|
}
|
|
else if (attr.GetType() == typeof(ExchangeHandlerAttribute))
|
|
{
|
|
var firstArgType = mi.GetParameters().First().ParameterType;
|
|
MethodInfo genericMethod = registerHandler.MakeGenericMethod(firstArgType);
|
|
genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeHandlerAttribute).Inbox, CreateDelegate(mi, service) });
|
|
}
|
|
else if (attr.GetType() == typeof(ExchangeMainReplierAttribute))
|
|
{
|
|
var returnType = mi.ReturnType;
|
|
var firstArgType = mi.GetParameters().First().ParameterType;
|
|
MethodInfo genericMethod = registerReplier.MakeGenericMethod(firstArgType, returnType);
|
|
genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) });
|
|
}
|
|
else if (attr.GetType() == typeof(ExchangeReplierAttribute))
|
|
{
|
|
var returnType = mi.ReturnType;
|
|
var firstArgType = mi.GetParameters().First().ParameterType;
|
|
MethodInfo genericMethod = registerReplier.MakeGenericMethod(firstArgType, returnType);
|
|
genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) });
|
|
}
|
|
else if (attr.GetType() == typeof(ExchangeMainReplierWithoutArgAttribute))
|
|
{
|
|
var returnType = mi.ReturnType;
|
|
var firstArgType = mi.GetParameters().First().ParameterType;
|
|
MethodInfo genericMethod = registerReplierWithNoRequestBody.MakeGenericMethod(returnType);
|
|
genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) });
|
|
}
|
|
else if (attr.GetType() == typeof(ExchangeReplierWithoutArgAttribute))
|
|
{
|
|
var returnType = mi.ReturnType;
|
|
var firstArgType = mi.GetParameters().First().ParameterType;
|
|
MethodInfo genericMethod = registerReplierWithNoRequestBody.MakeGenericMethod(returnType);
|
|
genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) });
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.Debug($"[ZExchange] Can't register method {mi.Name} as inbox handler. {ex.ToString()}");
|
|
}
|
|
}
|
|
}
|
|
|
|
private void RegisterServicesInDiscovery()
|
|
{
|
|
var services = _services.
|
|
Values.
|
|
Select(s => s.ServiceInfo).
|
|
ToList();
|
|
foreach (var service in services)
|
|
{
|
|
_discoveryClient.Register(service);
|
|
}
|
|
}
|
|
|
|
#endregion Private methods
|
|
|
|
#region Utils
|
|
|
|
private static Delegate CreateDelegate(MethodInfo methodInfo, object target)
|
|
{
|
|
Func<Type[], Type> getType;
|
|
var isAction = methodInfo.ReturnType.Equals((typeof(void)));
|
|
var types = methodInfo.GetParameters().Select(p => p.ParameterType);
|
|
if (isAction)
|
|
{
|
|
getType = Expression.GetActionType;
|
|
}
|
|
else
|
|
{
|
|
getType = Expression.GetFuncType;
|
|
types = types.Concat(new[] { methodInfo.ReturnType });
|
|
}
|
|
if (methodInfo.IsStatic)
|
|
{
|
|
return Delegate.CreateDelegate(getType(types.ToArray()), methodInfo);
|
|
}
|
|
return Delegate.CreateDelegate(getType(types.ToArray()), target, methodInfo.Name);
|
|
}
|
|
|
|
#endregion Utils
|
|
|
|
#region Inboxes
|
|
|
|
/// <summary>
|
|
/// Registering an Inbox Handler
|
|
/// </summary>
|
|
/// <typeparam name="T">Message type</typeparam>
|
|
/// <param name="protocol">Protocol</param>
|
|
/// <param name="inbox">Inbox name</param>
|
|
/// <param name="handler">Handler</param>
|
|
private void RegisterHandler<T>(MetaService meta, string inbox, Action<T, long, IZBackward> handler)
|
|
{
|
|
if (_disposed) return;
|
|
try
|
|
{
|
|
meta.Server.RegisterInbox(inbox, handler);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, $"[Exchange] Register inbox handler error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Registration method responding to an incoming request
|
|
/// </summary>
|
|
/// <typeparam name="Treq">Request message type</typeparam>
|
|
/// <typeparam name="Tresp">Response message type</typeparam>
|
|
/// <param name="protocol">Protocol</param>
|
|
/// <param name="inbox">Inbox name</param>
|
|
/// <param name="replier">Handler</param>
|
|
private void RegisterReplier<Treq, Tresp>(MetaService meta, string inbox, Func<Treq, long, IZBackward, Tresp> handler)
|
|
{
|
|
if (_disposed) return;
|
|
try
|
|
{
|
|
meta.Server.RegisterInbox(inbox, handler);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, $"[Exchange] Register inbox replier error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Registration of the method of responding to the incoming request, not receiving incoming data
|
|
/// </summary>
|
|
/// <typeparam name="Tresp">Response message type</typeparam>
|
|
/// <param name="protocol">Protocol</param>
|
|
/// <param name="inbox">Inbox name</param>
|
|
/// <param name="replier">Handler</param>
|
|
private void RegisterReplierWithNoRequestBody<Tresp>(MetaService meta, string inbox, Func<long, IZBackward, Tresp> handler)
|
|
{
|
|
if (_disposed) return;
|
|
try
|
|
{
|
|
meta.Server.RegisterInbox(inbox, handler);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, $"[Exchange] Register inbox replier error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
|
|
}
|
|
}
|
|
|
|
#endregion Inboxes
|
|
|
|
#region Transport helpers
|
|
|
|
/// <summary>
|
|
/// Call service with round-robin balancing
|
|
/// </summary>
|
|
/// <param name="serviceKey">Service key</param>
|
|
/// <param name="callHandler">Service call code</param>
|
|
/// <returns>true - service called succesfully</returns>
|
|
internal bool CallService(string serviceKey, Func<string, IExClient, bool> callHandler)
|
|
{
|
|
if (_disposed) return false;
|
|
List<ServiceEndpointInfo> candidates;
|
|
try
|
|
{
|
|
candidates = _discoveryClient.GetServiceEndpoints(serviceKey)?.ToList();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, $"[ExServiceHost] Error when trying get endpoints for service key '{serviceKey}'");
|
|
return false;
|
|
}
|
|
if (candidates == null || candidates.Any() == false)
|
|
{
|
|
Log.Debug($"[ExServiceHost] Not found endpoints for service key '{serviceKey}'");
|
|
return false;
|
|
}
|
|
var success = false;
|
|
foreach (var service in candidates)
|
|
{
|
|
IExClient transport;
|
|
try
|
|
{
|
|
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, "[ExServiceHost] Can't get transport for protocol '{0}', service '{1}'", service.Protocol, serviceKey);
|
|
continue;
|
|
}
|
|
try
|
|
{
|
|
success = callHandler(service.Endpoint, transport);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, $"[ExServiceHost] Error send/request data in service '{serviceKey}'. Endpoint '{service.Endpoint}'");
|
|
success = false;
|
|
}
|
|
if (success)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
return success;
|
|
}
|
|
|
|
internal bool CallServiceDirect(string endpoint, string serviceKey, Func<IExClient, bool> callHandler)
|
|
{
|
|
if (_disposed) return false;
|
|
ServiceEndpointInfo candidate = null;
|
|
try
|
|
{
|
|
candidate = _discoveryClient.GetService(serviceKey, endpoint);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, $"[ExServiceHost] Error when trying get service info by key '{serviceKey}' and endpoint '{endpoint}'");
|
|
return false;
|
|
}
|
|
if (candidate == null)
|
|
{
|
|
Log.Debug($"[ExServiceHost] Not found service info for key '{serviceKey}' and endpoint '{endpoint}'");
|
|
return false;
|
|
}
|
|
IExClient transport;
|
|
try
|
|
{
|
|
transport = ExchangeTransportFactory.GetClient(candidate.Protocol, candidate.Endpoint);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, $"[ExServiceHost] Can't get transport for protocol '{candidate.Protocol}', service '{serviceKey}'");
|
|
return false;
|
|
}
|
|
return callHandler(transport);
|
|
}
|
|
|
|
internal IEnumerable<IExClient> GetClientEnumerator(string serviceKey)
|
|
{
|
|
if (!_disposed)
|
|
{
|
|
List<ServiceEndpointInfo> candidates;
|
|
try
|
|
{
|
|
candidates = _discoveryClient.GetServiceEndpoints(serviceKey)?.ToList();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service key '{serviceKey}'");
|
|
candidates = null;
|
|
}
|
|
if (candidates != null && candidates.Any())
|
|
{
|
|
foreach (var service in candidates)
|
|
{
|
|
IExClient transport;
|
|
try
|
|
{
|
|
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint);
|
|
continue;
|
|
}
|
|
yield return transport;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Log.Debug($"[Exchange] Not found endpoints for service key '{serviceKey}'");
|
|
}
|
|
}
|
|
}
|
|
|
|
internal IEnumerable<IExClient> GetClientEnumeratorByType(string serviceType)
|
|
{
|
|
if (!_disposed)
|
|
{
|
|
List<ServiceEndpointInfo> candidates;
|
|
try
|
|
{
|
|
candidates = _discoveryClient.GetServiceEndpointsByType(serviceType)?.ToList();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service type '{serviceType}'");
|
|
candidates = null;
|
|
}
|
|
if (candidates != null && candidates.Any())
|
|
{
|
|
foreach (var service in candidates)
|
|
{
|
|
IExClient transport;
|
|
try
|
|
{
|
|
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint);
|
|
continue;
|
|
}
|
|
yield return transport;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Log.Debug($"[Exchange] Not found endpoints for service type '{serviceType}'");
|
|
}
|
|
}
|
|
}
|
|
|
|
internal IEnumerable<IExClient> GetClientEnumeratorByGroup(string serviceGroup)
|
|
{
|
|
if (!_disposed)
|
|
{
|
|
List<ServiceEndpointInfo> candidates;
|
|
try
|
|
{
|
|
candidates = _discoveryClient.GetServiceEndpointsByGroup(serviceGroup)?.ToList();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service group '{serviceGroup}'");
|
|
candidates = null;
|
|
}
|
|
if (candidates != null && candidates.Any())
|
|
{
|
|
foreach (var service in candidates)
|
|
{
|
|
IExClient transport;
|
|
try
|
|
{
|
|
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint);
|
|
continue;
|
|
}
|
|
yield return transport;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Log.Debug($"[Exchange] Not found endpoints for service group '{serviceGroup}'");
|
|
}
|
|
}
|
|
}
|
|
|
|
#endregion Transport helpers
|
|
|
|
public void Dispose()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
Sheduller.Remove(_registerTaskKey);
|
|
foreach (var s in _services)
|
|
{
|
|
s.Value.Server.Dispose();
|
|
}
|
|
}
|
|
}
|
|
} |