From 00c4cff5d61356e917d81ee6497e9f04a96fa01c Mon Sep 17 00:00:00 2001 From: "a.bozhenov" Date: Wed, 3 Jul 2019 21:14:58 +0300 Subject: [PATCH] net upd Autoregister inboxes --- TestApp/MyService.cs | 63 +++- .../ZeroServiceInfo.cs} | 18 +- ZeroLevel/Services/AtomicBoolean.cs | 73 ++++ ZeroLevel/Services/BaseZeroService.cs | 352 +++++++++++++++--- .../Services/Collections/ITransactable.cs | 9 + .../KeyListValueTransactCollection.cs | 219 +++++++++++ .../Collections/KeyValueTransactCollection.cs | 271 ++++++++++++++ ZeroLevel/Services/Network/Alias.cs | 49 ++- ZeroLevel/Services/Network/BaseSocket.cs | 1 + .../Network/Contracts/IDiscoveryClient.cs | 6 +- ZeroLevel/Services/Network/DiscoveryClient.cs | 22 +- ZeroLevel/Services/Network/ExClient.cs | 6 +- .../Serialization/MessageSerializer.cs | 40 ++ 13 files changed, 1041 insertions(+), 88 deletions(-) rename ZeroLevel/{Services/Network/Model/ExServiceInfo.cs => Models/ZeroServiceInfo.cs} (84%) create mode 100644 ZeroLevel/Services/AtomicBoolean.cs create mode 100644 ZeroLevel/Services/Collections/ITransactable.cs create mode 100644 ZeroLevel/Services/Collections/KeyListValueTransactCollection.cs create mode 100644 ZeroLevel/Services/Collections/KeyValueTransactCollection.cs diff --git a/TestApp/MyService.cs b/TestApp/MyService.cs index 08e0a61..531a8c6 100644 --- a/TestApp/MyService.cs +++ b/TestApp/MyService.cs @@ -17,23 +17,62 @@ namespace TestApp protected override void StartAction() { Log.Info("Started"); - UseHost(8800) - .RegisterInbox("upper", (c, s) => s.ToUpperInvariant()) - .RegisterInbox("ip2str", (c, ip) => $"{ip.Address}:{ip.Port}"); + AutoregisterInboxes(UseHost(8800)); + var client = ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800)); Sheduller.RemindEvery(TimeSpan.FromSeconds(5), () => { - var client = ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800)); - client.Request("upper", "hello", s => Log.Info(s)); + client.Send("pum"); + client.Send(BaseSocket.DEFAULT_MESSAGE_INBOX, "'This is message'"); + client.Request("d2s", DateTime.Now, s => Log.Info($"Response: {s}")); + client.Request(BaseSocket.DEFAULT_REQUEST_INBOX, + new IPEndPoint(NetUtils.GetNonLoopbackAddress(), NetUtils.GetFreeTcpPort()), + s => Log.Info($"Response: {s}")); + client.Request("now", s => Log.Info($"Response date: {s}")); + client.Request(BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, s => Log.Info($"Response ip: {s}")); }); - - Sheduller.RemindEvery(TimeSpan.FromSeconds(6), () => - { - var client = ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800)); - client.Request("ip2str", new IPEndPoint(NetUtils.GetNonLoopbackAddress(), NetUtils.GetFreeTcpPort()), s => Log.Info(s)); - }); - + } + + [ExchangeHandler("pum")] + public void MessageHandler(ISocketClient client) + { + Log.Info("Called message handler without arguments"); + } + + [ExchangeMainHandler] + public void MessageHandler(ISocketClient client, string message) + { + Log.Info($"Called message handler (DEFAULT INBOX) with argument: {message}"); + } + + [ExchangeReplier("d2s")] + public string date2str(ISocketClient client, DateTime date) + { + Log.Info($"Called reqeust handler with argument: {date}"); + return date.ToLongDateString(); + } + + [ExchangeMainReplier] + public string ip2str(ISocketClient client, IPEndPoint ip) + { + Log.Info($"Called reqeust handler (DEFAULT INBOX) with argument: {ip.Address}:{ip.Port}"); + return $"{ip.Address}:{ip.Port}"; + } + + + [ExchangeReplierWithoutArg("now")] + public string GetTime(ISocketClient client) + { + Log.Info("Called reqeust handler without arguments"); + return DateTime.Now.ToShortDateString(); + } + + [ExchangeMainReplierWithoutArg] + public string GetMyIP(ISocketClient client) + { + Log.Info("Called reqeust handler (DEFAULT INBOX) without argument"); + return NetUtils.GetNonLoopbackAddress().ToString(); } protected override void StopAction() diff --git a/ZeroLevel/Services/Network/Model/ExServiceInfo.cs b/ZeroLevel/Models/ZeroServiceInfo.cs similarity index 84% rename from ZeroLevel/Services/Network/Model/ExServiceInfo.cs rename to ZeroLevel/Models/ZeroServiceInfo.cs index 3eea854..ec1d8cf 100644 --- a/ZeroLevel/Services/Network/Model/ExServiceInfo.cs +++ b/ZeroLevel/Models/ZeroServiceInfo.cs @@ -2,16 +2,19 @@ using System.Runtime.Serialization; using ZeroLevel.Services.Serialization; -namespace ZeroLevel.Network +namespace ZeroLevel { [Serializable] [DataContract] - public sealed class ExServiceInfo : - IEquatable, IBinarySerializable + public sealed class ZeroServiceInfo : + IEquatable, IBinarySerializable { public const string DEFAULT_GROUP_NAME = "__service_default_group__"; public const string DEFAULT_TYPE_NAME = "__service_default_type__"; + [DataMember] + public string Name { get; set; } + /// /// Service key, must be unique within the business functionality. /// two services with same key will be horizontally balanced @@ -41,11 +44,12 @@ namespace ZeroLevel.Network [DataMember] public int Port { get; set; } - public bool Equals(ExServiceInfo other) + public bool Equals(ZeroServiceInfo other) { if (other == null) return false; if (object.ReferenceEquals(this, other)) return true; if (this.Port != other.Port) return false; + if (string.Compare(this.Name, other.Name, true) != 0) return false; if (string.Compare(this.ServiceKey, other.ServiceKey, true) != 0) return false; if (string.Compare(this.ServiceGroup, other.ServiceGroup, true) != 0) return false; if (string.Compare(this.ServiceType, other.ServiceType, true) != 0) return false; @@ -55,7 +59,7 @@ namespace ZeroLevel.Network public override bool Equals(object obj) { - return base.Equals(obj); + return this.Equals(obj as ZeroServiceInfo); } public override int GetHashCode() @@ -66,6 +70,7 @@ namespace ZeroLevel.Network public void Serialize(IBinaryWriter writer) { writer.WriteInt32(this.Port); + writer.WriteString(this.Name); writer.WriteString(this.ServiceKey); writer.WriteString(this.ServiceGroup); writer.WriteString(this.ServiceType); @@ -75,6 +80,7 @@ namespace ZeroLevel.Network public void Deserialize(IBinaryReader reader) { this.Port = reader.ReadInt32(); + this.Name = reader.ReadString(); this.ServiceKey = reader.ReadString(); this.ServiceGroup = reader.ReadString(); this.ServiceType = reader.ReadString(); @@ -83,7 +89,7 @@ namespace ZeroLevel.Network public override string ToString() { - return $"{ServiceKey} ({Version})"; + return $"{ServiceKey } ({Version})"; } } } \ No newline at end of file diff --git a/ZeroLevel/Services/AtomicBoolean.cs b/ZeroLevel/Services/AtomicBoolean.cs new file mode 100644 index 0000000..6c0eeb7 --- /dev/null +++ b/ZeroLevel/Services/AtomicBoolean.cs @@ -0,0 +1,73 @@ +using System.Threading; + +namespace ZeroLevel.Services +{ + /// + /// Класс реализующий потокобезопасный флаг + /// + public sealed class AtomicBoolean + { + /// + /// Локер для переключения флага указывающего идет или нет процесс обработки + /// + private SpinLock _compareLocker = new SpinLock(); + /// + /// Флаг, указывает идет ли в текущий момент процесс обработки очереди + /// + private bool _lock; + /// + /// Потокобезопасное переназначение булевой переменной + /// функция сранивает переменную со значением comparand + /// и при совпадении заменяет значение переменной на value + /// и возвращает true. + /// При несовпадении значения переменной и comparand + /// значение переменной останется прежним и функция + /// вернет false. + /// + /// Переменная + /// Значение для проставления в случае совпадения + /// Сравниваемое значение + /// true - в случае совпадения значений target и comparand + private bool CompareExchange(ref bool target, bool value, bool comparand) + { + bool lockTaked = false; + try + { + _compareLocker.Enter(ref lockTaked); + if (target == comparand) + { + target = value; + return true; + } + return false; + } + finally + { + if (lockTaked) _compareLocker.Exit(); + } + } + /// + /// Установка значения в true + /// + /// true - если значение изменилось, false - если значение занято другим потоком и не изменилось + public bool Set() + { + return CompareExchange(ref _lock, true, false); + } + /// + /// Сброс значения + /// + public void Reset() + { + CompareExchange(ref _lock, false, true); + } + + public bool State + { + get + { + return _lock; + } + } + } +} diff --git a/ZeroLevel/Services/BaseZeroService.cs b/ZeroLevel/Services/BaseZeroService.cs index 701242d..bbf61e3 100644 --- a/ZeroLevel/Services/BaseZeroService.cs +++ b/ZeroLevel/Services/BaseZeroService.cs @@ -1,19 +1,25 @@ using System; using System.Collections.Concurrent; +using System.Linq; +using System.Linq.Expressions; using System.Net; +using System.Reflection; using System.Threading; using ZeroLevel.Network; +using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.Applications { public abstract class BaseZeroService : IZeroService { - public string Name { get; protected set; } - public string Key { get; private set; } - public string Version { get; private set; } - public string Group { get; private set; } - public string Type { get; private set; } + private readonly ZeroServiceInfo _serviceInfo = new ZeroServiceInfo(); + + public string Name { get { return _serviceInfo.Name; } private set { _serviceInfo.Name = value; } } + public string Key { get { return _serviceInfo.ServiceKey; } private set { _serviceInfo.ServiceKey = value; } } + public string Version { get { return _serviceInfo.Version; } private set { _serviceInfo.Version = value; } } + public string Group { get { return _serviceInfo.ServiceGroup; } private set { _serviceInfo.ServiceGroup = value; } } + public string Type { get { return _serviceInfo.ServiceType; } private set { _serviceInfo.ServiceType = value; } } public ZeroServiceStatus Status => _state; private ZeroServiceStatus _state; @@ -113,42 +119,97 @@ namespace ZeroLevel.Services.Applications #endregion Config #region Network - private IRouter _router; - private static IRouter _null_router = new NullRouter(); - private IDiscoveryClient _discoveryClient; + private static readonly IRouter _null_router = new NullRouter(); + private IDiscoveryClient _discoveryClient = null; // Feature расширить до нескольких discovery + private long _update_discovery_table_task = -1; + private long _register_in_discovery_table_task = -1; + private readonly AliasSet _aliases = new AliasSet(); + private static TimeSpan _update_discovery_table_period = TimeSpan.FromSeconds(15); + private static TimeSpan _register_in_discovery_table_period = TimeSpan.FromSeconds(15); + private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _serverInstances = new ConcurrentDictionary(); + + private void RestartDiscoveryTasks() + { + if (_update_discovery_table_task != -1) + { + Sheduller.Remove(_update_discovery_table_task); + } + if (_register_in_discovery_table_task != -1) + { + Sheduller.Remove(_register_in_discovery_table_task); + } + _update_discovery_table_task = Sheduller.RemindEvery(_update_discovery_table_period, RegisterServicesInDiscovery); + _register_in_discovery_table_task = Sheduller.RemindEvery(_register_in_discovery_table_period, () => { }); + } + + private void RegisterServicesInDiscovery() + { + var services = _serverInstances. + Values. + Select(s => + { + var info = MessageSerializer.Copy(this._serviceInfo); + info.Port = s.LocalEndpoint.Port; + return info; + }). + ToList(); + foreach (var service in services) + { + _discoveryClient.Register(service); + } + } public void UseDiscovery() { - if (_state == ZeroServiceStatus.Running || - _state == ZeroServiceStatus.Initialized) + if (_state == ZeroServiceStatus.Running + || _state == ZeroServiceStatus.Initialized) { + if (_discoveryClient != null) + { + _discoveryClient.Dispose(); + _discoveryClient = null; + } var discovery = Configuration.Default.First("discovery"); _discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(discovery), _null_router, false)); + RestartDiscoveryTasks(); } } public void UseDiscovery(string endpoint) { - if (_state == ZeroServiceStatus.Running || - _state == ZeroServiceStatus.Initialized) + if (_state == ZeroServiceStatus.Running + || _state == ZeroServiceStatus.Initialized) { + if (_discoveryClient != null) + { + _discoveryClient.Dispose(); + _discoveryClient = null; + } _discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), _null_router, false)); + RestartDiscoveryTasks(); } } public void UseDiscovery(IPEndPoint endpoint) { - if (_state == ZeroServiceStatus.Running || - _state == ZeroServiceStatus.Initialized) + if (_state == ZeroServiceStatus.Running + || _state == ZeroServiceStatus.Initialized) { + if (_discoveryClient != null) + { + _discoveryClient.Dispose(); + _discoveryClient = null; + } _discoveryClient = new DiscoveryClient(GetClient(endpoint, _null_router, false)); + RestartDiscoveryTasks(); } } public IRouter UseHost() { - if (_state == ZeroServiceStatus.Running || - _state == ZeroServiceStatus.Initialized) + if (_state == ZeroServiceStatus.Running + || _state == ZeroServiceStatus.Initialized) { return GetServer(new IPEndPoint(IPAddress.Any, NetUtils.GetFreeTcpPort()), new Router()).Router; } @@ -157,8 +218,8 @@ namespace ZeroLevel.Services.Applications public IRouter UseHost(int port) { - if (_state == ZeroServiceStatus.Running || - _state == ZeroServiceStatus.Initialized) + if (_state == ZeroServiceStatus.Running + || _state == ZeroServiceStatus.Initialized) { return GetServer(new IPEndPoint(IPAddress.Any, port), new Router()).Router; } @@ -167,8 +228,8 @@ namespace ZeroLevel.Services.Applications public IRouter UseHost(IPEndPoint endpoint) { - if (_state == ZeroServiceStatus.Running || - _state == ZeroServiceStatus.Initialized) + if (_state == ZeroServiceStatus.Running + || _state == ZeroServiceStatus.Initialized) { return GetServer(endpoint, new Router()).Router; } @@ -177,38 +238,210 @@ namespace ZeroLevel.Services.Applications public ExClient ConnectToService(string endpoint) { - if (_state == ZeroServiceStatus.Running) + if (_state == ZeroServiceStatus.Running + || _state == ZeroServiceStatus.Initialized) { - return new ExClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), new Router(), true)); + return GetClient(NetUtils.CreateIPEndPoint(endpoint), new Router(), true); } return null; } - public ExClient ConnectToService(string alias, string endpoint) + public ExClient ConnectToService(IPEndPoint endpoint) { - if (_state == ZeroServiceStatus.Running) + if (_state == ZeroServiceStatus.Running + || _state == ZeroServiceStatus.Initialized) { - return new ExClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), new Router(), true)); + return GetClient(endpoint, new Router(), true); } return null; } - public ExClient ConnectToService(IPEndPoint endpoint) + #region Autoregistration inboxes + private static Delegate CreateDelegate(Type delegateType, MethodInfo methodInfo, object target) { - if (_state == ZeroServiceStatus.Running) + Func getType; + var isAction = methodInfo.ReturnType.Equals((typeof(void))); + if (isAction) { - return new ExClient(GetClient(endpoint, new Router(), true)); + getType = Expression.GetActionType; } - return null; + else + { + getType = Expression.GetFuncType; + } + if (methodInfo.IsStatic) + { + return Delegate.CreateDelegate(delegateType, methodInfo); + } + return Delegate.CreateDelegate(delegateType, target, methodInfo.Name); } - public ExClient ConnectToService(string alias, IPEndPoint endpoint) + public void AutoregisterInboxes(IServer server) { - if (_state == ZeroServiceStatus.Running) + var type = server.GetType(); + // Search router registerinbox methods with inbox name + var register_methods = type.GetMethods(BindingFlags.Instance + | BindingFlags.Public + | BindingFlags.NonPublic + | BindingFlags.FlattenHierarchy)? + .Where(mi => mi.Name.Equals("RegisterInbox", StringComparison.Ordinal) && + mi.GetParameters().First().ParameterType == typeof(string)); + + var register_message_handler = register_methods.First(mi => mi.IsGenericMethod == false); + var register_message_handler_with_msg = register_methods.First(mi => + { + if (mi.IsGenericMethod) + { + var paremeters = mi.GetParameters().ToArray(); + if (paremeters.Length == 2 && paremeters[1].ParameterType.IsAssignableToGenericType(typeof(MessageHandler<>))) + { + return true; + } + } + return false; + }); + var register_request_handler_without_msg = register_methods.First(mi => { - return new ExClient(GetClient(endpoint, new Router(), true)); + if (mi.IsGenericMethod) + { + var paremeters = mi.GetParameters().ToArray(); + if (paremeters.Length == 2 && paremeters[1].ParameterType.IsAssignableToGenericType(typeof(RequestHandler<>))) + { + return true; + } + } + return false; + }); + var register_request_handler = register_methods.First(mi => + { + if (mi.IsGenericMethod) + { + var paremeters = mi.GetParameters().ToArray(); + if (paremeters.Length == 2 && paremeters[1].ParameterType.IsAssignableToGenericType(typeof(RequestHandler<,>))) + { + return true; + } + } + return false; + }); + + MethodInfo[] methods = this. + GetType(). + GetMethods(BindingFlags.NonPublic + | BindingFlags.Public + | BindingFlags.Instance + | BindingFlags.FlattenHierarchy + | BindingFlags.Instance); + + foreach (MethodInfo mi in methods) + { + try + { + foreach (Attribute attr in Attribute.GetCustomAttributes(mi, typeof(ExchangeAttribute))) + { + var args = mi.GetParameters().ToArray(); + if (attr.GetType() == typeof(ExchangeMainHandlerAttribute)) + { + if (args.Length == 1) + { + var handler = CreateDelegate(typeof(MessageHandler), mi, this); + register_message_handler.Invoke(server, new object[] { BaseSocket.DEFAULT_MESSAGE_INBOX, handler }); + } + else + { + var handler = CreateDelegate(typeof(MessageHandler<>).MakeGenericType(args[1].ParameterType), mi, this); + MethodInfo genericMethod = register_message_handler_with_msg.MakeGenericMethod(args[1].ParameterType); + genericMethod.Invoke(server, new object[] { BaseSocket.DEFAULT_MESSAGE_INBOX, handler }); + } + } + else if (attr.GetType() == typeof(ExchangeHandlerAttribute)) + { + if (args.Length == 1) + { + var handler = CreateDelegate(typeof(MessageHandler), mi, this); + register_message_handler.Invoke(server, new object[] { (attr as ExchangeHandlerAttribute).Inbox, handler }); + } + else + { + var handler = CreateDelegate(typeof(MessageHandler<>).MakeGenericType(args[1].ParameterType), mi, this); + MethodInfo genericMethod = register_message_handler_with_msg.MakeGenericMethod(args[1].ParameterType); + genericMethod.Invoke(server, new object[] { (attr as ExchangeHandlerAttribute).Inbox, handler }); + } + } + + else if (attr.GetType() == typeof(ExchangeMainReplierAttribute)) + { + var returnType = mi.ReturnType; + var genArgType = args[1].ParameterType; + MethodInfo genericMethod = register_request_handler.MakeGenericMethod(genArgType, returnType); + var requestHandler = CreateDelegate(typeof(RequestHandler<,>).MakeGenericType(args[1].ParameterType, returnType), mi, this); + genericMethod.Invoke(server, new object[] { BaseSocket.DEFAULT_REQUEST_INBOX, requestHandler }); + } + else if (attr.GetType() == typeof(ExchangeReplierAttribute)) + { + var returnType = mi.ReturnType; + var genArgType = args[1].ParameterType; + MethodInfo genericMethod = register_request_handler.MakeGenericMethod(genArgType, returnType); + var requestHandler = CreateDelegate(typeof(RequestHandler<,>).MakeGenericType(args[1].ParameterType, returnType), mi, this); + genericMethod.Invoke(server, new object[] { (attr as ExchangeReplierAttribute).Inbox, requestHandler }); + } + + else if (attr.GetType() == typeof(ExchangeMainReplierWithoutArgAttribute)) + { + var returnType = mi.ReturnType; + MethodInfo genericMethod = register_request_handler_without_msg.MakeGenericMethod(returnType); + var requestHandler = CreateDelegate(typeof(RequestHandler<>).MakeGenericType(returnType), mi, this); + genericMethod.Invoke(server, new object[] { BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, requestHandler }); + } + else if (attr.GetType() == typeof(ExchangeReplierWithoutArgAttribute)) + { + var returnType = mi.ReturnType; + MethodInfo genericMethod = register_request_handler_without_msg.MakeGenericMethod(returnType); + var requestHandler = CreateDelegate(typeof(RequestHandler<>).MakeGenericType(returnType), mi, this); + genericMethod.Invoke(server, new object[] { (attr as ExchangeReplierWithoutArgAttribute).Inbox, requestHandler }); + } + } + } + catch (Exception ex) + { + Log.Debug($"[ZExchange] Can't register method {mi.Name} as inbox handler. {ex}"); + } + } + } + #endregion + + + public void StoreConnection(string endpoint) + { + if (_state == ZeroServiceStatus.Running || + _state == ZeroServiceStatus.Initialized) + { + _aliases.Set(endpoint, NetUtils.CreateIPEndPoint(endpoint)); + } + } + public void StoreConnection(string alias, string endpoint) + { + if (_state == ZeroServiceStatus.Running || + _state == ZeroServiceStatus.Initialized) + { + _aliases.Set(alias, NetUtils.CreateIPEndPoint(endpoint)); + } + } + public void StoreConnection(IPEndPoint endpoint) + { + if (_state == ZeroServiceStatus.Running || + _state == ZeroServiceStatus.Initialized) + { + _aliases.Set($"{endpoint.Address}:{endpoint.Port}", endpoint); + } + } + public void StoreConnection(string alias, IPEndPoint endpoint) + { + if (_state == ZeroServiceStatus.Running || + _state == ZeroServiceStatus.Initialized) + { + _aliases.Set(alias, endpoint); } - return null; } #endregion @@ -284,16 +517,14 @@ namespace ZeroLevel.Services.Applications } #endregion - #region Utils - private static readonly ConcurrentDictionary _clientInstances = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _serverInstances = new ConcurrentDictionary(); + #region Utils - private ISocketClient GetClient(IPEndPoint endpoint, IRouter router, bool use_cachee) + private ExClient GetClient(IPEndPoint endpoint, IRouter router, bool use_cachee) { if (use_cachee) { string key = $"{endpoint.Address}:{endpoint.Port}"; - ISocketClient instance = null; + ExClient instance = null; if (_clientInstances.ContainsKey(key)) { instance = _clientInstances[key]; @@ -305,11 +536,11 @@ namespace ZeroLevel.Services.Applications instance.Dispose(); instance = null; } - instance = new SocketClient(endpoint, router); + instance = new ExClient(new SocketClient(endpoint, router)); _clientInstances[key] = instance; return instance; } - return new SocketClient(endpoint, router); + return new ExClient(new SocketClient(endpoint, router)); } private SocketServer GetServer(IPEndPoint endpoint, IRouter router) @@ -328,29 +559,42 @@ namespace ZeroLevel.Services.Applications public void Dispose() { - _state = ZeroServiceStatus.Disposed; - - foreach (var client in _clientInstances) + if (_state != ZeroServiceStatus.Disposed) { - try + _state = ZeroServiceStatus.Disposed; + + if (_update_discovery_table_task != -1) { - client.Value.Dispose(); + Sheduller.Remove(_update_discovery_table_task); } - catch (Exception ex) + + if (_register_in_discovery_table_task != -1) { - Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketClient to endpoint {client.Key}"); + Sheduller.Remove(_register_in_discovery_table_task); } - } - foreach (var server in _serverInstances) - { - try + foreach (var client in _clientInstances) { - server.Value.Dispose(); + try + { + client.Value.Dispose(); + } + catch (Exception ex) + { + Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketClient to endpoint {client.Key}"); + } } - catch (Exception ex) + + foreach (var server in _serverInstances) { - Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketServer with endpoint {server.Key}"); + try + { + server.Value.Dispose(); + } + catch (Exception ex) + { + Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketServer with endpoint {server.Key}"); + } } } } diff --git a/ZeroLevel/Services/Collections/ITransactable.cs b/ZeroLevel/Services/Collections/ITransactable.cs new file mode 100644 index 0000000..1c6c50e --- /dev/null +++ b/ZeroLevel/Services/Collections/ITransactable.cs @@ -0,0 +1,9 @@ +namespace ZeroLevel.Services.Collections +{ + public interface ITransactable + { + bool StartTransction(); + bool Commit(); + bool Rollback(); + } +} diff --git a/ZeroLevel/Services/Collections/KeyListValueTransactCollection.cs b/ZeroLevel/Services/Collections/KeyListValueTransactCollection.cs new file mode 100644 index 0000000..b43e348 --- /dev/null +++ b/ZeroLevel/Services/Collections/KeyListValueTransactCollection.cs @@ -0,0 +1,219 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; + +namespace ZeroLevel.Services.Collections +{ + /// + /// Класс обертывает коллекцию вида ключ-значение и позволяет проводить над ней транзакционные обновления + /// + /// Тип ключа коллекции + /// Тип значения коллекции + public class KeyListValueTransactCollection : + ITransactable + { + /// + /// Коллекция + /// + readonly Dictionary> _collection = new Dictionary>(); + private ReaderWriterLockSlim _rwLock = + new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); + /// + /// Проверка наличия ключа + /// + /// + /// + public bool HasKey(TKey key) + { + try + { + _rwLock.EnterReadLock(); + return _collection.ContainsKey(key); + } + finally + { + _rwLock.ExitReadLock(); + } + } + /// + /// Получение значения коллекции по ключу + /// + /// Ключ + /// Значение + public IEnumerable this[TKey key] + { + get + { + try + { + _rwLock.EnterReadLock(); + List value; + if (_collection.TryGetValue(key, out value)) + return value; + } + finally + { + _rwLock.ExitReadLock(); + } + throw new KeyNotFoundException(); + } + } + /// + /// Коллекция ключей + /// + public IEnumerable Keys + { + get + { + try + { + _rwLock.EnterReadLock(); + return _collection.Keys; + } + finally + { + _rwLock.ExitReadLock(); + } + } + } + /// + /// Коллекция значений + /// + public IEnumerable> Values + { + get + { + try + { + _rwLock.EnterReadLock(); + return _collection.Values.ToArray(); + } + finally + { + _rwLock.ExitReadLock(); + } + } + } + + #region Transaction update + /// + /// Список не обновленных данных (т.е. тех которые удалены в базе) + /// + readonly List _removingDate = new List(); + /// + /// Обновленные данные + /// + readonly Dictionary> _updatedRecords = new Dictionary>(); + /// + /// Новые данные + /// + readonly Dictionary> _newRecords = new Dictionary>(); + + void ClearTransactionDate() + { + _removingDate.Clear(); + _updatedRecords.Clear(); + _newRecords.Clear(); + } + + /// + /// Добавление или обновления записи + /// + /// Идентификатор записи + /// Значение + public void Post(TKey id, TValue value) + { + if (_isUpdating.State == false) + { + throw new Exception("Method Post allowed only in transaction"); + } + if (!HasKey(id)) + { + if (_newRecords.ContainsKey(id) == false) + { + _newRecords.Add(id, new List()); + } + _newRecords[id].Add(value); + } + else + { + if (!_updatedRecords.ContainsKey(id)) + { + _updatedRecords.Add(id, new List()); + } + _updatedRecords[id].Add(value); + if (_removingDate.Contains(id)) + _removingDate.Remove(id); + } + return; + } + readonly AtomicBoolean _isUpdating = new AtomicBoolean(); + + public bool Commit() + { + if (_isUpdating.State == false) return false; + try + { + _rwLock.EnterWriteLock(); + foreach (TKey id in _removingDate) + { + _collection.Remove(id); + } + foreach (TKey key in _newRecords.Keys) + { + _collection.Add(key, _newRecords[key]); + } + foreach (TKey key in _updatedRecords.Keys) + { + _collection[key] = _updatedRecords[key]; + } + } + finally + { + _rwLock.ExitWriteLock(); + ClearTransactionDate(); + _isUpdating.Reset(); + } + return true; + } + + public bool Rollback() + { + if (_isUpdating.State == false) return false; + ClearTransactionDate(); + _isUpdating.Reset(); + return true; + } + + public bool StartTransction() + { + if (_isUpdating.Set()) + { + _removingDate.AddRange(_collection.Keys.ToArray()); + return true; + } + return false; + } + #endregion + + #region IDisposable + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + if (_collection != null) + { + _collection.Clear(); + } + } + } + #endregion + } +} diff --git a/ZeroLevel/Services/Collections/KeyValueTransactCollection.cs b/ZeroLevel/Services/Collections/KeyValueTransactCollection.cs new file mode 100644 index 0000000..924a740 --- /dev/null +++ b/ZeroLevel/Services/Collections/KeyValueTransactCollection.cs @@ -0,0 +1,271 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; + +namespace ZeroLevel.Services.Collections +{ + /// + /// Класс обертывает коллекцию вида ключ-значение и позволяет проводить над ней транзакционные обновления + /// + /// Тип ключа коллекции + /// Тип значения коллекции + public class KeyValueTransactCollection : + ITransactable + { + private ReaderWriterLockSlim _rwLock = + new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); + /// + /// Коллекция + /// + readonly Dictionary _collection = new Dictionary(); + + public KeyValueTransactCollection() { } + /// + /// Проверка наличия ключа + /// + /// + /// + public bool HasKey(TKey key) + { + try + { + _rwLock.EnterReadLock(); + return _collection.ContainsKey(key); + } + finally + { + _rwLock.ExitReadLock(); + } + } + /// + /// Получение значения коллекции по ключу + /// + /// Ключ + /// Значение + public TValue this[TKey key] + { + get + { + try + { + _rwLock.EnterReadLock(); + TValue value; + if (_collection.TryGetValue(key, out value)) + return (value); + } + finally + { + _rwLock.ExitReadLock(); + } + throw new KeyNotFoundException(); + } + } + /// + /// Количество записей + /// + public int Count + { + get + { + try + { + _rwLock.EnterReadLock(); + return _collection.Count; + } + finally + { + _rwLock.ExitReadLock(); + } + } + } + /// + /// Коллекция ключей + /// + public IEnumerable Keys + { + get + { + try + { + _rwLock.EnterReadLock(); + return _collection.Keys; + } + finally + { + _rwLock.ExitReadLock(); + } + } + } + /// + /// Список ключ-значений + /// + public IEnumerable> Items + { + get + { + try + { + _rwLock.EnterReadLock(); + return _collection; + } + finally + { + _rwLock.ExitReadLock(); + } + } + } + /// + /// Коллекция значений + /// + public IEnumerable Values + { + get + { + try + { + _rwLock.EnterReadLock(); + return _collection.Values; + } + finally + { + _rwLock.ExitReadLock(); + } + } + } + + #region Transaction update + + /// + /// Список не обновленных данных (т.е. тех которые удалены в базе) + /// + readonly List _removingDate = new List(); + /// + /// Обновленные данные + /// + readonly Dictionary _updatedRecords = new Dictionary(); + /// + /// Новые данные + /// + readonly Dictionary _newRecords = new Dictionary(); + + void ClearTransactionDate() + { + _removingDate.Clear(); + _updatedRecords.Clear(); + _newRecords.Clear(); + } + /// + /// Добавление или обновления записи + /// + /// Идентификатор записи + /// Значение + public void Post(TKey id, TValue value) + { + if (_isUpdating.State == false) + { + throw new Exception("Method Post allowed only in transaction"); + } + if (!HasKey(id)) + { + if (_newRecords.ContainsKey(id) == false) + { + _newRecords.Add(id, value); + } + else + { + _newRecords[id] = value; + } + } + else + { + if (!_collection[id].Equals(value)) + { + if (false == _updatedRecords.ContainsKey(id)) + { + _updatedRecords.Add(id, value); + } + else + { + _updatedRecords[id] = value; + } + } + if (_removingDate.Contains(id)) + _removingDate.Remove(id); + } + return; + } + #endregion + + readonly AtomicBoolean _isUpdating = new AtomicBoolean(); + + public bool StartTransction() + { + if (_isUpdating.Set()) + { + _removingDate.AddRange(_collection.Keys.ToArray()); + return true; + } + return false; + } + public bool Commit() + { + if (_isUpdating.State == false) return false; + try + { + _rwLock.EnterWriteLock(); + foreach (TKey id in _removingDate) + { + _collection.Remove(id); + } + foreach (TKey key in _newRecords.Keys) + { + _collection.Add(key, _newRecords[key]); + } + foreach (TKey key in _updatedRecords.Keys) + { + _collection[key] = _updatedRecords[key]; + } + } + finally + { + _rwLock.ExitWriteLock(); + ClearTransactionDate(); + _isUpdating.Reset(); + } + return true; + } + public bool Rollback() + { + if (_isUpdating.State == false) return false; + ClearTransactionDate(); + _isUpdating.Reset(); + return true; + } + + #region IDisposable + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + if (_collection != null) + { + foreach (TKey key in _collection.Keys) + { + var disposable = _collection[key] as IDisposable; + if (disposable != null) + disposable.Dispose(); + } + _collection.Clear(); + } + } + } + #endregion + } +} diff --git a/ZeroLevel/Services/Network/Alias.cs b/ZeroLevel/Services/Network/Alias.cs index 8331061..86a4e48 100644 --- a/ZeroLevel/Services/Network/Alias.cs +++ b/ZeroLevel/Services/Network/Alias.cs @@ -1,11 +1,12 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Threading; namespace ZeroLevel.Network { - public sealed class AliasSet + public sealed class AliasSet { public sealed class _RoundRobinCollection : IDisposable @@ -97,6 +98,29 @@ namespace ZeroLevel.Network _index = -1; } + public IEnumerable GetCurrentSeq() + { + _lock.EnterReadLock(); + try + { + var arr = new T[_collection.Count]; + int p = 0; + for (int i = _index; i < _collection.Count; i++, p++) + { + arr[p] = _collection[i]; + } + for (int i = 0; i < _index; i++, p++) + { + arr[p] = _collection[i]; + } + return arr; + } + finally + { + _lock.ExitReadLock(); + } + } + public void Dispose() { _collection.Clear(); @@ -104,13 +128,13 @@ namespace ZeroLevel.Network } } - private readonly ConcurrentDictionary> _aliases = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary> _aliases = new ConcurrentDictionary>(); - public void Set(string alias, string address) + public void Set(string alias, T address) { if (_aliases.ContainsKey(alias) == false) { - if (_aliases.TryAdd(alias, new _RoundRobinCollection())) + if (_aliases.TryAdd(alias, new _RoundRobinCollection())) { _aliases[alias].Add(address); } @@ -122,11 +146,11 @@ namespace ZeroLevel.Network } } - public void Set(string alias, IEnumerable addresses) + public void Set(string alias, IEnumerable addresses) { if (_aliases.ContainsKey(alias) == false) { - if (_aliases.TryAdd(alias, new _RoundRobinCollection())) + if (_aliases.TryAdd(alias, new _RoundRobinCollection())) { foreach (var address in addresses) _aliases[alias].Add(address); @@ -140,13 +164,22 @@ namespace ZeroLevel.Network } } - public string GetAddress(string alias) + public T GetAddress(string alias) { if (_aliases.ContainsKey(alias) && _aliases[alias].MoveNext()) { return _aliases[alias].Current; } - return null; + return default(T); + } + + public IEnumerable GetAddresses(string alias) + { + if (_aliases.ContainsKey(alias) && _aliases[alias].MoveNext()) + { + return _aliases[alias].GetCurrentSeq(); + } + return Enumerable.Empty(); } } } diff --git a/ZeroLevel/Services/Network/BaseSocket.cs b/ZeroLevel/Services/Network/BaseSocket.cs index 3466e51..2317ea6 100644 --- a/ZeroLevel/Services/Network/BaseSocket.cs +++ b/ZeroLevel/Services/Network/BaseSocket.cs @@ -11,6 +11,7 @@ namespace ZeroLevel.Network public const string DEFAULT_MESSAGE_INBOX = "__message_inbox__"; public const string DEFAULT_REQUEST_INBOX = "__request_inbox__"; + public const string DEFAULT_REQUEST_WITHOUT_ARGS_INBOX = "__request_no_args_inbox__"; protected const string DEFAULT_REQUEST_ERROR_INBOX = "__request_error__"; /// diff --git a/ZeroLevel/Services/Network/Contracts/IDiscoveryClient.cs b/ZeroLevel/Services/Network/Contracts/IDiscoveryClient.cs index 70e94f3..b1dbe06 100644 --- a/ZeroLevel/Services/Network/Contracts/IDiscoveryClient.cs +++ b/ZeroLevel/Services/Network/Contracts/IDiscoveryClient.cs @@ -1,10 +1,12 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; namespace ZeroLevel.Network { public interface IDiscoveryClient + : IDisposable { - bool Register(ExServiceInfo info); + bool Register(ZeroServiceInfo info); IEnumerable GetServiceEndpoints(string serviceKey); diff --git a/ZeroLevel/Services/Network/DiscoveryClient.cs b/ZeroLevel/Services/Network/DiscoveryClient.cs index 6b1db1d..7e81d84 100644 --- a/ZeroLevel/Services/Network/DiscoveryClient.cs +++ b/ZeroLevel/Services/Network/DiscoveryClient.cs @@ -10,7 +10,8 @@ namespace ZeroLevel.Network public class DiscoveryClient : IDiscoveryClient { - private sealed class DCRouter + private sealed class DCRouter: + IDisposable { private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); private IEnumerable _empty = Enumerable.Empty(); @@ -124,14 +125,19 @@ namespace ZeroLevel.Network } return _empty; } + + public void Dispose() + { + _lock.Dispose(); + } } private readonly DCRouter _router = new DCRouter(); private readonly ExClient _discoveryServerClient; - public DiscoveryClient(ISocketClient client) + public DiscoveryClient(ExClient client) { - _discoveryServerClient = new ExClient(client); + _discoveryServerClient = client; UpdateServiceListInfo(); Sheduller.RemindEvery(TimeSpan.FromSeconds(30), UpdateServiceListInfo); } @@ -160,7 +166,7 @@ namespace ZeroLevel.Network } } - public bool Register(ExServiceInfo info) + public bool Register(ZeroServiceInfo info) { _discoveryServerClient.ForceConnect(); if (_discoveryServerClient.Status == SocketClientStatus.Working) @@ -168,7 +174,7 @@ namespace ZeroLevel.Network bool result = false; try { - _discoveryServerClient.Request("register", info, r => + _discoveryServerClient.Request("register", info, r => { result = r.Success; if (!result) @@ -194,5 +200,11 @@ namespace ZeroLevel.Network public IEnumerable GetServiceEndpointsByGroup(string serviceGroup) => _router.GetServiceEndpointsByGroup(serviceGroup); public IEnumerable GetServiceEndpointsByType(string serviceType) => _router.GetServiceEndpointsByType(serviceType); public ServiceEndpointInfo GetService(string serviceKey, string endpoint) => _router.GetService(serviceKey, endpoint); + + public void Dispose() + { + _router.Dispose(); + _discoveryServerClient.Dispose(); + } } } \ No newline at end of file diff --git a/ZeroLevel/Services/Network/ExClient.cs b/ZeroLevel/Services/Network/ExClient.cs index dbfc061..aebe15f 100644 --- a/ZeroLevel/Services/Network/ExClient.cs +++ b/ZeroLevel/Services/Network/ExClient.cs @@ -1,6 +1,10 @@ using System; +using System.Linq; +using System.Linq.Expressions; using System.Net; +using System.Reflection; using ZeroLevel.Models; +using ZeroLevel.Services.Invokation; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network @@ -108,7 +112,7 @@ namespace ZeroLevel.Network { try { - _client.Request(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible(request)), + _client.Request(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible(request)), f => callback(MessageSerializer.DeserializeCompatible(f))); } catch (Exception ex) diff --git a/ZeroLevel/Services/Serialization/MessageSerializer.cs b/ZeroLevel/Services/Serialization/MessageSerializer.cs index 27669bc..89b80fa 100644 --- a/ZeroLevel/Services/Serialization/MessageSerializer.cs +++ b/ZeroLevel/Services/Serialization/MessageSerializer.cs @@ -155,5 +155,45 @@ namespace ZeroLevel.Services.Serialization return PrimitiveTypeSerializer.Deserialize(reader, type); } } + + public static T Copy(T value) + where T : IBinarySerializable + { + using (var writer = new MemoryStreamWriter()) + { + value.Serialize(writer); + using (var reader = new MemoryStreamReader(writer.Complete())) + { + var direct = (IBinarySerializable)Activator.CreateInstance(); + direct.Deserialize(reader); + return (T)direct; + } + } + } + + public static T CopyCompatible(T value) + { + if (typeof(IBinarySerializable).IsAssignableFrom(typeof(T))) + { + using (var writer = new MemoryStreamWriter()) + { + ((IBinarySerializable)value).Serialize(writer); + using (var reader = new MemoryStreamReader(writer.Complete())) + { + var direct = (IBinarySerializable)Activator.CreateInstance(); + direct.Deserialize(reader); + return (T)direct; + } + } + } + using (var writer = new MemoryStreamWriter()) + { + PrimitiveTypeSerializer.Serialize(writer, value); + using (var reader = new MemoryStreamReader(writer.Complete())) + { + return PrimitiveTypeSerializer.Deserialize(reader); + } + } + } } } \ No newline at end of file