using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; using System.Reflection; using ZeroLevel.Microservices.Contracts; using ZeroLevel.Network.Microservices; using ZeroLevel.Services.Network; namespace ZeroLevel.Microservices { internal sealed class ExServiceHost : IDisposable { private class MetaService { public MicroserviceInfo ServiceInfo { get; set; } public ExService Server { get; set; } } private bool _disposed = false; private readonly long _registerTaskKey = -1; private readonly IDiscoveryClient _discoveryClient; private readonly ConcurrentDictionary _services = new ConcurrentDictionary(); internal ExServiceHost(IDiscoveryClient client) { _discoveryClient = client; _registerTaskKey = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(15), RegisterServicesInDiscovery); } internal IExService RegisterService(IExchangeService service) { try { if (_disposed) throw new ObjectDisposedException("ExServiceHost"); if (service == null) throw new ArgumentNullException(nameof(service)); ValidateService(service); var key = $"{service.Key}.{service.Protocol}"; if (_services.ContainsKey(key)) { throw new Exception($"[ExServiceHost] Service {key} already registered"); } var server = ExchangeTransportFactory.GetServer(service.Protocol); if (false == _services.TryAdd(key, new MetaService { Server = server, ServiceInfo = new MicroserviceInfo { Endpoint = $"{server.Endpoint.Address}:{server.Endpoint.Port}", Protocol = service.Protocol, ServiceKey = service.Key, Version = service.Version, ServiceGroup = service.Group, ServiceType = service.Type } })) { server.Dispose(); return null; } RegisterServiceInboxes(service); return server; } catch (Exception ex) { Log.SystemError(ex, "[ExServiceHost] Fault register service"); return null; } } internal IExService RegisterService(MicroserviceInfo serviceInfo) { try { if (_disposed) throw new ObjectDisposedException("ExServiceHost"); if (serviceInfo == null) throw new ArgumentNullException(nameof(serviceInfo)); ValidateService(serviceInfo); var key = $"{serviceInfo.ServiceKey}.{serviceInfo.Protocol}"; if (_services.ContainsKey(key)) { throw new Exception($"[ExServiceHost] Service {key} already registered"); } var server = ExchangeTransportFactory.GetServer(serviceInfo.Protocol); if (false == _services.TryAdd(key, new MetaService { Server = server, ServiceInfo = new MicroserviceInfo { Endpoint = $"{server.Endpoint.Address}:{server.Endpoint.Port}", Protocol = serviceInfo.Protocol, ServiceKey = serviceInfo.ServiceKey, Version = serviceInfo.Version, ServiceGroup = serviceInfo.ServiceGroup, ServiceType = serviceInfo.ServiceType } })) { server.Dispose(); return null; } return server; } catch (Exception ex) { Log.SystemError(ex, "[ExServiceHost] Fault register service"); return null; } } #region Private methods private void ValidateService(IExchangeService service) { if (string.IsNullOrWhiteSpace(service.Protocol)) { throw new ArgumentNullException("Service.Protocol"); } if (string.IsNullOrWhiteSpace(service.Key)) { throw new ArgumentNullException("Service.Key"); } } private void ValidateService(MicroserviceInfo service) { if (string.IsNullOrWhiteSpace(service.Protocol)) { throw new ArgumentNullException("Service.Protocol"); } if (string.IsNullOrWhiteSpace(service.ServiceKey)) { throw new ArgumentNullException("ServiceKey"); } } private void RegisterServiceInboxes(IExchangeService service) { MethodInfo[] methods = service. GetType(). GetMethods(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance | BindingFlags.FlattenHierarchy | BindingFlags.Instance); var registerHandler = this.GetType().GetMethod("RegisterHandler"); var registerReplier = this.GetType().GetMethod("RegisterReplier"); var registerReplierWithNoRequestBody = this.GetType().GetMethod("RegisterReplierWithNoRequestBody"); foreach (MethodInfo mi in methods) { try { foreach (Attribute attr in Attribute.GetCustomAttributes(mi, typeof(ExchangeAttribute))) { if (attr.GetType() == typeof(ExchangeMainHandlerAttribute)) { var firstArgType = mi.GetParameters().First().ParameterType; MethodInfo genericMethod = registerHandler.MakeGenericMethod(firstArgType); genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, CreateDelegate(mi, service) }); } else if (attr.GetType() == typeof(ExchangeHandlerAttribute)) { var firstArgType = mi.GetParameters().First().ParameterType; MethodInfo genericMethod = registerHandler.MakeGenericMethod(firstArgType); genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeHandlerAttribute).Inbox, CreateDelegate(mi, service) }); } else if (attr.GetType() == typeof(ExchangeMainReplierAttribute)) { var returnType = mi.ReturnType; var firstArgType = mi.GetParameters().First().ParameterType; MethodInfo genericMethod = registerReplier.MakeGenericMethod(firstArgType, returnType); genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) }); } else if (attr.GetType() == typeof(ExchangeReplierAttribute)) { var returnType = mi.ReturnType; var firstArgType = mi.GetParameters().First().ParameterType; MethodInfo genericMethod = registerReplier.MakeGenericMethod(firstArgType, returnType); genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) }); } else if (attr.GetType() == typeof(ExchangeMainReplierWithoutArgAttribute)) { var returnType = mi.ReturnType; var firstArgType = mi.GetParameters().First().ParameterType; MethodInfo genericMethod = registerReplierWithNoRequestBody.MakeGenericMethod(returnType); genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) }); } else if (attr.GetType() == typeof(ExchangeReplierWithoutArgAttribute)) { var returnType = mi.ReturnType; var firstArgType = mi.GetParameters().First().ParameterType; MethodInfo genericMethod = registerReplierWithNoRequestBody.MakeGenericMethod(returnType); genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) }); } } } catch (Exception ex) { Log.Debug($"[ZExchange] Can't register method {mi.Name} as inbox handler. {ex.ToString()}"); } } } private void RegisterServicesInDiscovery() { var services = _services. Values. Select(s => s.ServiceInfo). ToList(); foreach (var service in services) { _discoveryClient.Register(service); } } #endregion #region Utils private static Delegate CreateDelegate(MethodInfo methodInfo, object target) { Func getType; var isAction = methodInfo.ReturnType.Equals((typeof(void))); var types = methodInfo.GetParameters().Select(p => p.ParameterType); if (isAction) { getType = Expression.GetActionType; } else { getType = Expression.GetFuncType; types = types.Concat(new[] { methodInfo.ReturnType }); } if (methodInfo.IsStatic) { return Delegate.CreateDelegate(getType(types.ToArray()), methodInfo); } return Delegate.CreateDelegate(getType(types.ToArray()), target, methodInfo.Name); } #endregion #region Inboxes /// /// Регистрация обработчика входящих сообщений /// /// Тип сообщения /// Транспортный протокол /// Имя точки приема /// Обработчик private void RegisterHandler(MetaService meta, string inbox, Action handler) { if (_disposed) return; try { meta.Server.RegisterInbox(inbox, handler); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Register inbox handler error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'"); } } /// /// Регистрация метода отдающего ответ на входящий запрос /// /// Тип входного сообщения /// Тип ответа /// Транспортный протокол /// Имя точки приема /// Обработчик private void RegisterReplier(MetaService meta, string inbox, Func handler) { if (_disposed) return; try { meta.Server.RegisterInbox(inbox, handler); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Register inbox replier error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'"); } } /// /// Регистрация метода отдающего ответ на входящий запрос, не принимающего входящих данных /// /// Тип ответа /// Транспортный протокол /// Имя точки приема /// Обработчик private void RegisterReplierWithNoRequestBody(MetaService meta, string inbox, Func handler) { if (_disposed) return; try { meta.Server.RegisterInbox(inbox, handler); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Register inbox replier error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'"); } } #endregion #region Transport helpers /// /// Call service with round-robin balancing /// /// Service key /// Service call code /// true - service called succesfully internal bool CallService(string serviceKey, Func callHandler) { if (_disposed) return false; List candidates; try { candidates = _discoveryClient.GetServiceEndpoints(serviceKey)?.ToList(); } catch (Exception ex) { Log.SystemError(ex, $"[ExServiceHost] Error when trying get endpoints for service key '{serviceKey}'"); return false; } if (candidates == null || candidates.Any() == false) { Log.Debug($"[ExServiceHost] Not found endpoints for service key '{serviceKey}'"); return false; } var success = false; foreach (var service in candidates) { IExClient transport; try { transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint); } catch (Exception ex) { Log.SystemError(ex, "[ExServiceHost] Can't get transport for protocol '{0}', service '{1}'", service.Protocol, serviceKey); continue; } try { success = callHandler(service.Endpoint, transport); } catch (Exception ex) { Log.SystemError(ex, $"[ExServiceHost] Error send/request data in service '{serviceKey}'. Endpoint '{service.Endpoint}'"); success = false; } if (success) { break; } } return success; } internal bool CallServiceDirect(string endpoint, string serviceKey, Func callHandler) { if (_disposed) return false; ServiceEndpointInfo candidate = null; try { candidate = _discoveryClient.GetService(serviceKey, endpoint); } catch (Exception ex) { Log.SystemError(ex, $"[ExServiceHost] Error when trying get service info by key '{serviceKey}' and endpoint '{endpoint}'"); return false; } if (candidate == null) { Log.Debug($"[ExServiceHost] Not found service info for key '{serviceKey}' and endpoint '{endpoint}'"); return false; } IExClient transport; try { transport = ExchangeTransportFactory.GetClient(candidate.Protocol, candidate.Endpoint); } catch (Exception ex) { Log.SystemError(ex, $"[ExServiceHost] Can't get transport for protocol '{candidate.Protocol}', service '{serviceKey}'"); return false; } return callHandler(transport); } internal IEnumerable GetClientEnumerator(string serviceKey) { if (!_disposed) { List candidates; try { candidates = _discoveryClient.GetServiceEndpoints(serviceKey)?.ToList(); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service key '{serviceKey}'"); candidates = null; } if (candidates != null && candidates.Any()) { foreach (var service in candidates) { IExClient transport; try { transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint); } catch (Exception ex) { Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint); continue; } yield return transport; } } else { Log.Debug($"[Exchange] Not found endpoints for service key '{serviceKey}'"); } } } internal IEnumerable GetClientEnumeratorByType(string serviceType) { if (!_disposed) { List candidates; try { candidates = _discoveryClient.GetServiceEndpointsByType(serviceType)?.ToList(); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service type '{serviceType}'"); candidates = null; } if (candidates != null && candidates.Any()) { foreach (var service in candidates) { IExClient transport; try { transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint); } catch (Exception ex) { Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint); continue; } yield return transport; } } else { Log.Debug($"[Exchange] Not found endpoints for service type '{serviceType}'"); } } } internal IEnumerable GetClientEnumeratorByGroup(string serviceGroup) { if (!_disposed) { List candidates; try { candidates = _discoveryClient.GetServiceEndpointsByGroup(serviceGroup)?.ToList(); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange] Error when trying get endpoints for service group '{serviceGroup}'"); candidates = null; } if (candidates != null && candidates.Any()) { foreach (var service in candidates) { IExClient transport; try { transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint); } catch (Exception ex) { Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint); continue; } yield return transport; } } else { Log.Debug($"[Exchange] Not found endpoints for service group '{serviceGroup}'"); } } } #endregion public void Dispose() { if (_disposed) return; _disposed = true; Sheduller.Remove(_registerTaskKey); foreach (var s in _services) { s.Value.Server.Dispose(); } } } }