From 9a7d3d6a36a204eb66bad1e1ba62ca36a73f2387 Mon Sep 17 00:00:00 2001 From: "a.bozhenov" Date: Fri, 5 Jul 2019 21:19:48 +0300 Subject: [PATCH] Discovery update --- ZeroLevel/Services/BaseZeroService.cs | 3 +- .../Services/Network/Contracts/IClientSet.cs | 44 +++ ZeroLevel/Services/Network/ExClient.cs | 192 ++++++++++++ ZeroLevel/Services/Network/ExServiceHost.cs | 288 ------------------ 4 files changed, 238 insertions(+), 289 deletions(-) create mode 100644 ZeroLevel/Services/Network/Contracts/IClientSet.cs diff --git a/ZeroLevel/Services/BaseZeroService.cs b/ZeroLevel/Services/BaseZeroService.cs index b84464e..dd5268e 100644 --- a/ZeroLevel/Services/BaseZeroService.cs +++ b/ZeroLevel/Services/BaseZeroService.cs @@ -141,6 +141,7 @@ namespace ZeroLevel.Services.Applications { Sheduller.Remove(_register_in_discovery_table_task); } + RegisterServicesInDiscovery(); _update_discovery_table_task = Sheduller.RemindEvery(_update_discovery_table_period, RegisterServicesInDiscovery); _register_in_discovery_table_task = Sheduller.RemindEvery(_register_in_discovery_table_period, () => { }); } @@ -418,7 +419,7 @@ namespace ZeroLevel.Services.Applications public void StoreConnection(string endpoint) - { + { if (_state == ZeroServiceStatus.Running || _state == ZeroServiceStatus.Initialized) { diff --git a/ZeroLevel/Services/Network/Contracts/IClientSet.cs b/ZeroLevel/Services/Network/Contracts/IClientSet.cs new file mode 100644 index 0000000..dc066ac --- /dev/null +++ b/ZeroLevel/Services/Network/Contracts/IClientSet.cs @@ -0,0 +1,44 @@ +using System; +using System.Collections.Generic; +using ZeroLevel.Models; + +namespace ZeroLevel.Network +{ + public interface IClientSet + { + InvokeResult Send(string alias, T data); + InvokeResult Send(string alias, string inbox, T data); + InvokeResult Request(string alias, Action callback); + InvokeResult Request(string alias, string inbox, Action callback); + InvokeResult Request(string alias, Trequest request, Action callback); + InvokeResult Request(string alias, string inbox, Trequest request, Action callback); + + + InvokeResult SendBroadcast(string alias, T data); + InvokeResult SendBroadcast(string alias, string inbox, T data); + + InvokeResult SendBroadcastByType(string serviceType, T data); + InvokeResult SendBroadcastByType(string serviceType, string inbox, T data); + + InvokeResult SendBroadcastByGroup(string serviceGroup, T data); + InvokeResult SendBroadcastByGroup(string serviceGroup, string inbox, T data); + + InvokeResult RequestBroadcast(string alias, Action> callback); + InvokeResult RequestBroadcast(string alias, string inbox, Action> callback); + + InvokeResult RequestBroadcast(string alias, Trequest data, Action> callback); + InvokeResult RequestBroadcast(string alias, string inbox, Trequest data, Action> callback); + + InvokeResult RequestBroadcastByType(string serviceType, Action> callback); + InvokeResult RequestBroadcastByType(string serviceType, string inbox, Action> callback); + + InvokeResult RequestBroadcastByType(string serviceType, Trequest data, Action> callback); + InvokeResult RequestBroadcastByType(string serviceType, string inbox, Trequest data, Action> callback); + + InvokeResult RequestBroadcastByGroup(string serviceGroup, Action> callback); + InvokeResult RequestBroadcastByGroup(string serviceGroup, string inbox, Action> callback); + + InvokeResult RequestBroadcastByGroup(string serviceGroup, Trequest data, Action> callback); + InvokeResult RequestBroadcastByGroup(string serviceGroup, string inbox, Trequest data, Action> callback); + } +} diff --git a/ZeroLevel/Services/Network/ExClient.cs b/ZeroLevel/Services/Network/ExClient.cs index f2a1ba5..310d5ad 100644 --- a/ZeroLevel/Services/Network/ExClient.cs +++ b/ZeroLevel/Services/Network/ExClient.cs @@ -1,10 +1,202 @@ using System; +using System.Collections.Generic; using System.Net; +using System.Threading; +using System.Threading.Tasks; using ZeroLevel.Models; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network { + public sealed class ExClientSet + : IClientSet, IDisposable + { + public void Dispose() + { + throw new NotImplementedException(); + } + + #region IMultiClient + public InvokeResult Request(string alias, Action callback) + { + throw new NotImplementedException(); + } + + public InvokeResult Request(string alias, string inbox, Action callback) + { + throw new NotImplementedException(); + } + + public InvokeResult Request(string alias, Trequest request, Action callback) + { + throw new NotImplementedException(); + } + + public InvokeResult Request(string alias, string inbox, Trequest request, Action callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcast(string alias, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcast(string alias, string inbox, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcast(string alias, Trequest data, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcast(string alias, string inbox, Trequest data, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcastByGroup(string serviceGroup, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcastByGroup(string serviceGroup, string inbox, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcastByGroup(string serviceGroup, Trequest data, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcastByGroup(string serviceGroup, string inbox, Trequest data, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcastByType(string serviceType, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcastByType(string serviceType, string inbox, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcastByType(string serviceType, Trequest data, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult RequestBroadcastByType(string serviceType, string inbox, Trequest data, Action> callback) + { + throw new NotImplementedException(); + } + + public InvokeResult Send(string alias, T data) + { + throw new NotImplementedException(); + } + + public InvokeResult Send(string alias, string inbox, T data) + { + throw new NotImplementedException(); + } + + public InvokeResult SendBroadcast(string alias, T data) + { + throw new NotImplementedException(); + } + + public InvokeResult SendBroadcast(string alias, string inbox, T data) + { + throw new NotImplementedException(); + } + + public InvokeResult SendBroadcastByGroup(string serviceGroup, T data) + { + throw new NotImplementedException(); + } + + public InvokeResult SendBroadcastByGroup(string serviceGroup, string inbox, T data) + { + throw new NotImplementedException(); + } + + public InvokeResult SendBroadcastByType(string serviceType, T data) + { + throw new NotImplementedException(); + } + + public InvokeResult SendBroadcastByType(string serviceType, string inbox, T data) + { + throw new NotImplementedException(); + } + #endregion + + #region Private + private IEnumerable _RequestBroadcast(List clients, string inbox, Treq data) + { + var response = new List(); + using (var waiter = new CountdownEvent(clients.Count)) + { + foreach (var client in clients) + { + Task.Run(() => + { + try + { + if (false == client.Request(inbox, data, resp => { waiter.Signal(); response.Add(resp); }).Success) + { + waiter.Signal(); + } + } + catch (Exception ex) + { + Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); + waiter.Signal(); + } + }); + } + waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS); + } + return response; + } + + private IEnumerable _RequestBroadcast(List clients, string inbox) + { + var response = new List(); + using (var waiter = new CountdownEvent(clients.Count)) + { + foreach (var client in clients) + { + Task.Run(() => + { + try + { + if (false == client.Request(inbox, resp => { waiter.Signal(); response.Add(resp); }).Success) + { + waiter.Signal(); + } + } + catch (Exception ex) + { + Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); + waiter.Signal(); + } + }); + } + waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS); + } + return response; + } + #endregion + } + public sealed class ExClient : IClient, IDisposable { diff --git a/ZeroLevel/Services/Network/ExServiceHost.cs b/ZeroLevel/Services/Network/ExServiceHost.cs index 272d014..d0de213 100644 --- a/ZeroLevel/Services/Network/ExServiceHost.cs +++ b/ZeroLevel/Services/Network/ExServiceHost.cs @@ -11,294 +11,6 @@ namespace ZeroLevel.Network public sealed class ExServiceHost : IDisposable { - private class MetaService - { - public ExServiceInfo ServiceInfo { get; set; } - public IExService Server { get; set; } - } - - private bool _disposed = false; - private readonly long _registerTaskKey = -1; - private readonly IDiscoveryClient _discoveryClient; - - private readonly ConcurrentDictionary _services - = new ConcurrentDictionary(); - - public ExServiceHost(IDiscoveryClient client) - { - _discoveryClient = client; - _registerTaskKey = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(55), RegisterServicesInDiscovery); - } - - public IExService RegisterService(IExchangeService service) - { - try - { - if (_disposed) throw new ObjectDisposedException("ExServiceHost"); - if (service == null) throw new ArgumentNullException(nameof(service)); - ValidateService(service); - if (_services.ContainsKey(service.Key)) - { - throw new Exception($"[ExServiceHost] Service {service.Key} already registered"); - } - var server = ExchangeTransportFactory.GetServer(); - if (false == _services.TryAdd(service.Key, new MetaService - { - Server = server, - ServiceInfo = new ExServiceInfo - { - Port = server.Endpoint.Port, - ServiceKey = service.Key, - Version = service.Version, - ServiceGroup = service.Group, - ServiceType = service.Type - } - })) - { - server.Dispose(); - return null; - } - - RegisterServiceInboxes(service); - - return server; - } - catch (Exception ex) - { - Log.SystemError(ex, "[ExServiceHost] Fault register service"); - return null; - } - } - - public IExService RegisterService(ExServiceInfo serviceInfo) - { - try - { - if (_disposed) throw new ObjectDisposedException("ExServiceHost"); - if (serviceInfo == null) throw new ArgumentNullException(nameof(serviceInfo)); - ValidateService(serviceInfo); - - if (_services.ContainsKey(serviceInfo.ServiceKey)) - { - throw new Exception($"[ExServiceHost] Service {serviceInfo.ServiceKey} already registered"); - } - - var server = ExchangeTransportFactory.GetServer(); - if (false == _services.TryAdd(serviceInfo.ServiceKey, new MetaService - { - Server = server, - ServiceInfo = new ExServiceInfo - { - Port = server.Endpoint.Port, - ServiceKey = serviceInfo.ServiceKey, - Version = serviceInfo.Version, - ServiceGroup = serviceInfo.ServiceGroup, - ServiceType = serviceInfo.ServiceType - } - })) - { - server.Dispose(); - return null; - } - return server; - } - catch (Exception ex) - { - Log.SystemError(ex, "[ExServiceHost] Fault register service"); - return null; - } - } - - #region Private methods - - private void ValidateService(IExchangeService service) - { - if (string.IsNullOrWhiteSpace(service.Key)) - { - throw new ArgumentNullException("Service.Key"); - } - } - - private void ValidateService(ExServiceInfo service) - { - if (string.IsNullOrWhiteSpace(service.ServiceKey)) - { - throw new ArgumentNullException("ServiceKey"); - } - } - - private void RegisterServiceInboxes(IExchangeService service) - { - MethodInfo[] methods = service. - GetType(). - GetMethods(BindingFlags.NonPublic | BindingFlags.Public | - BindingFlags.Instance | - BindingFlags.FlattenHierarchy | - BindingFlags.Instance); - - var registerHandler = this.GetType().GetMethod("RegisterHandler"); - var registerReplier = this.GetType().GetMethod("RegisterReplier"); - var registerReplierWithNoRequestBody = this.GetType().GetMethod("RegisterReplierWithNoRequestBody"); - - foreach (MethodInfo mi in methods) - { - try - { - foreach (Attribute attr in Attribute.GetCustomAttributes(mi, typeof(ExchangeAttribute))) - { - if (attr.GetType() == typeof(ExchangeMainHandlerAttribute)) - { - var firstArgType = mi.GetParameters().First().ParameterType; - MethodInfo genericMethod = registerHandler.MakeGenericMethod(firstArgType); - genericMethod.Invoke(this, new object[] { ZBaseNetwork.DEFAULT_MESSAGE_INBOX, CreateDelegate(mi, service) }); - } - else if (attr.GetType() == typeof(ExchangeHandlerAttribute)) - { - var firstArgType = mi.GetParameters().First().ParameterType; - MethodInfo genericMethod = registerHandler.MakeGenericMethod(firstArgType); - genericMethod.Invoke(this, new object[] { (attr as ExchangeHandlerAttribute).Inbox, CreateDelegate(mi, service) }); - } - else if (attr.GetType() == typeof(ExchangeMainReplierAttribute)) - { - var returnType = mi.ReturnType; - var firstArgType = mi.GetParameters().First().ParameterType; - MethodInfo genericMethod = registerReplier.MakeGenericMethod(firstArgType, returnType); - genericMethod.Invoke(this, new object[] { ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) }); - } - else if (attr.GetType() == typeof(ExchangeReplierAttribute)) - { - var returnType = mi.ReturnType; - var firstArgType = mi.GetParameters().First().ParameterType; - MethodInfo genericMethod = registerReplier.MakeGenericMethod(firstArgType, returnType); - genericMethod.Invoke(this, new object[] { (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) }); - } - else if (attr.GetType() == typeof(ExchangeMainReplierWithoutArgAttribute)) - { - var returnType = mi.ReturnType; - var firstArgType = mi.GetParameters().First().ParameterType; - MethodInfo genericMethod = registerReplierWithNoRequestBody.MakeGenericMethod(returnType); - genericMethod.Invoke(this, new object[] { ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) }); - } - else if (attr.GetType() == typeof(ExchangeReplierWithoutArgAttribute)) - { - var returnType = mi.ReturnType; - var firstArgType = mi.GetParameters().First().ParameterType; - MethodInfo genericMethod = registerReplierWithNoRequestBody.MakeGenericMethod(returnType); - genericMethod.Invoke(this, new object[] { (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) }); - } - } - } - catch (Exception ex) - { - Log.Debug($"[ZExchange] Can't register method {mi.Name} as inbox handler. {ex}"); - } - } - } - - private void RegisterServicesInDiscovery() - { - var services = _services. - Values. - Select(s => s.ServiceInfo). - ToList(); - foreach (var service in services) - { - _discoveryClient.Register(service); - } - } - - #endregion Private methods - - #region Utils - - private static Delegate CreateDelegate(MethodInfo methodInfo, object target) - { - Func getType; - var isAction = methodInfo.ReturnType.Equals((typeof(void))); - var types = methodInfo.GetParameters().Select(p => p.ParameterType); - if (isAction) - { - getType = Expression.GetActionType; - } - else - { - getType = Expression.GetFuncType; - types = types.Concat(new[] { methodInfo.ReturnType }); - } - if (methodInfo.IsStatic) - { - return Delegate.CreateDelegate(getType(types.ToArray()), methodInfo); - } - return Delegate.CreateDelegate(getType(types.ToArray()), target, methodInfo.Name); - } - - #endregion Utils - - #region Inboxes - - /// - /// Registering an Inbox Handler - /// - /// Message type - /// Protocol - /// Inbox name - /// Handler - private void RegisterHandler(MetaService meta, string inbox, Action handler) - { - if (_disposed) return; - try - { - meta.Server.RegisterInbox(inbox, handler); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Register inbox handler error. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'"); - } - } - - /// - /// Registration method responding to an incoming request - /// - /// Request message type - /// Response message type - /// Protocol - /// Inbox name - /// Handler - private void RegisterReplier(MetaService meta, string inbox, Func handler) - { - if (_disposed) return; - try - { - meta.Server.RegisterInbox(inbox, handler); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Register inbox replier error. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'"); - } - } - - /// - /// Registration of the method of responding to the incoming request, not receiving incoming data - /// - /// Response message type - /// Protocol - /// Inbox name - /// Handler - private void RegisterReplierWithNoRequestBody(MetaService meta, string inbox, Func handler) - { - if (_disposed) return; - try - { - meta.Server.RegisterInbox(inbox, handler); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[Exchange] Register inbox replier error. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'"); - } - } - - #endregion Inboxes - #region Transport helpers ///