Autoregister inboxes
pull/1/head
a.bozhenov 6 years ago
parent e5415d8dbc
commit 00c4cff5d6

@ -17,23 +17,62 @@ namespace TestApp
protected override void StartAction()
{
Log.Info("Started");
UseHost(8800)
.RegisterInbox<string, string>("upper", (c, s) => s.ToUpperInvariant())
.RegisterInbox<IPEndPoint, string>("ip2str", (c, ip) => $"{ip.Address}:{ip.Port}");
AutoregisterInboxes(UseHost(8800));
var client = ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800));
Sheduller.RemindEvery(TimeSpan.FromSeconds(5), () =>
{
var client = ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800));
client.Request<string, string>("upper", "hello", s => Log.Info(s));
client.Send("pum");
client.Send<string>(BaseSocket.DEFAULT_MESSAGE_INBOX, "'This is message'");
client.Request<DateTime, string>("d2s", DateTime.Now, s => Log.Info($"Response: {s}"));
client.Request<IPEndPoint, string>(BaseSocket.DEFAULT_REQUEST_INBOX,
new IPEndPoint(NetUtils.GetNonLoopbackAddress(), NetUtils.GetFreeTcpPort()),
s => Log.Info($"Response: {s}"));
client.Request<string>("now", s => Log.Info($"Response date: {s}"));
client.Request<string>(BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, s => Log.Info($"Response ip: {s}"));
});
Sheduller.RemindEvery(TimeSpan.FromSeconds(6), () =>
{
var client = ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800));
client.Request<IPEndPoint, string>("ip2str", new IPEndPoint(NetUtils.GetNonLoopbackAddress(), NetUtils.GetFreeTcpPort()), s => Log.Info(s));
});
}
[ExchangeHandler("pum")]
public void MessageHandler(ISocketClient client)
{
Log.Info("Called message handler without arguments");
}
[ExchangeMainHandler]
public void MessageHandler(ISocketClient client, string message)
{
Log.Info($"Called message handler (DEFAULT INBOX) with argument: {message}");
}
[ExchangeReplier("d2s")]
public string date2str(ISocketClient client, DateTime date)
{
Log.Info($"Called reqeust handler with argument: {date}");
return date.ToLongDateString();
}
[ExchangeMainReplier]
public string ip2str(ISocketClient client, IPEndPoint ip)
{
Log.Info($"Called reqeust handler (DEFAULT INBOX) with argument: {ip.Address}:{ip.Port}");
return $"{ip.Address}:{ip.Port}";
}
[ExchangeReplierWithoutArg("now")]
public string GetTime(ISocketClient client)
{
Log.Info("Called reqeust handler without arguments");
return DateTime.Now.ToShortDateString();
}
[ExchangeMainReplierWithoutArg]
public string GetMyIP(ISocketClient client)
{
Log.Info("Called reqeust handler (DEFAULT INBOX) without argument");
return NetUtils.GetNonLoopbackAddress().ToString();
}
protected override void StopAction()

@ -2,16 +2,19 @@
using System.Runtime.Serialization;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
namespace ZeroLevel
{
[Serializable]
[DataContract]
public sealed class ExServiceInfo :
IEquatable<ExServiceInfo>, IBinarySerializable
public sealed class ZeroServiceInfo :
IEquatable<ZeroServiceInfo>, IBinarySerializable
{
public const string DEFAULT_GROUP_NAME = "__service_default_group__";
public const string DEFAULT_TYPE_NAME = "__service_default_type__";
[DataMember]
public string Name { get; set; }
/// <summary>
/// Service key, must be unique within the business functionality.
/// two services with same key will be horizontally balanced
@ -41,11 +44,12 @@ namespace ZeroLevel.Network
[DataMember]
public int Port { get; set; }
public bool Equals(ExServiceInfo other)
public bool Equals(ZeroServiceInfo other)
{
if (other == null) return false;
if (object.ReferenceEquals(this, other)) return true;
if (this.Port != other.Port) return false;
if (string.Compare(this.Name, other.Name, true) != 0) return false;
if (string.Compare(this.ServiceKey, other.ServiceKey, true) != 0) return false;
if (string.Compare(this.ServiceGroup, other.ServiceGroup, true) != 0) return false;
if (string.Compare(this.ServiceType, other.ServiceType, true) != 0) return false;
@ -55,7 +59,7 @@ namespace ZeroLevel.Network
public override bool Equals(object obj)
{
return base.Equals(obj);
return this.Equals(obj as ZeroServiceInfo);
}
public override int GetHashCode()
@ -66,6 +70,7 @@ namespace ZeroLevel.Network
public void Serialize(IBinaryWriter writer)
{
writer.WriteInt32(this.Port);
writer.WriteString(this.Name);
writer.WriteString(this.ServiceKey);
writer.WriteString(this.ServiceGroup);
writer.WriteString(this.ServiceType);
@ -75,6 +80,7 @@ namespace ZeroLevel.Network
public void Deserialize(IBinaryReader reader)
{
this.Port = reader.ReadInt32();
this.Name = reader.ReadString();
this.ServiceKey = reader.ReadString();
this.ServiceGroup = reader.ReadString();
this.ServiceType = reader.ReadString();
@ -83,7 +89,7 @@ namespace ZeroLevel.Network
public override string ToString()
{
return $"{ServiceKey} ({Version})";
return $"{ServiceKey } ({Version})";
}
}
}

@ -0,0 +1,73 @@
using System.Threading;
namespace ZeroLevel.Services
{
/// <summary>
/// Класс реализующий потокобезопасный флаг
/// </summary>
public sealed class AtomicBoolean
{
/// <summary>
/// Локер для переключения флага указывающего идет или нет процесс обработки
/// </summary>
private SpinLock _compareLocker = new SpinLock();
/// <summary>
/// Флаг, указывает идет ли в текущий момент процесс обработки очереди
/// </summary>
private bool _lock;
/// <summary>
/// Потокобезопасное переназначение булевой переменной
/// функция сранивает переменную со значением comparand
/// и при совпадении заменяет значение переменной на value
/// и возвращает true.
/// При несовпадении значения переменной и comparand
/// значение переменной останется прежним и функция
/// вернет false.
/// </summary>
/// <param name="target">Переменная</param>
/// <param name="value">Значение для проставления в случае совпадения</param>
/// <param name="comparand">Сравниваемое значение</param>
/// <returns>true - в случае совпадения значений target и comparand</returns>
private bool CompareExchange(ref bool target, bool value, bool comparand)
{
bool lockTaked = false;
try
{
_compareLocker.Enter(ref lockTaked);
if (target == comparand)
{
target = value;
return true;
}
return false;
}
finally
{
if (lockTaked) _compareLocker.Exit();
}
}
/// <summary>
/// Установка значения в true
/// </summary>
/// <returns>true - если значение изменилось, false - если значение занято другим потоком и не изменилось</returns>
public bool Set()
{
return CompareExchange(ref _lock, true, false);
}
/// <summary>
/// Сброс значения
/// </summary>
public void Reset()
{
CompareExchange(ref _lock, false, true);
}
public bool State
{
get
{
return _lock;
}
}
}
}

@ -1,19 +1,25 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Reflection;
using System.Threading;
using ZeroLevel.Network;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.Applications
{
public abstract class BaseZeroService
: IZeroService
{
public string Name { get; protected set; }
public string Key { get; private set; }
public string Version { get; private set; }
public string Group { get; private set; }
public string Type { get; private set; }
private readonly ZeroServiceInfo _serviceInfo = new ZeroServiceInfo();
public string Name { get { return _serviceInfo.Name; } private set { _serviceInfo.Name = value; } }
public string Key { get { return _serviceInfo.ServiceKey; } private set { _serviceInfo.ServiceKey = value; } }
public string Version { get { return _serviceInfo.Version; } private set { _serviceInfo.Version = value; } }
public string Group { get { return _serviceInfo.ServiceGroup; } private set { _serviceInfo.ServiceGroup = value; } }
public string Type { get { return _serviceInfo.ServiceType; } private set { _serviceInfo.ServiceType = value; } }
public ZeroServiceStatus Status => _state;
private ZeroServiceStatus _state;
@ -113,42 +119,97 @@ namespace ZeroLevel.Services.Applications
#endregion Config
#region Network
private IRouter _router;
private static IRouter _null_router = new NullRouter();
private IDiscoveryClient _discoveryClient;
private static readonly IRouter _null_router = new NullRouter();
private IDiscoveryClient _discoveryClient = null; // Feature расширить до нескольких discovery
private long _update_discovery_table_task = -1;
private long _register_in_discovery_table_task = -1;
private readonly AliasSet<IPEndPoint> _aliases = new AliasSet<IPEndPoint>();
private static TimeSpan _update_discovery_table_period = TimeSpan.FromSeconds(15);
private static TimeSpan _register_in_discovery_table_period = TimeSpan.FromSeconds(15);
private static readonly ConcurrentDictionary<string, ExClient> _clientInstances = new ConcurrentDictionary<string, ExClient>();
private readonly ConcurrentDictionary<string, SocketServer> _serverInstances = new ConcurrentDictionary<string, SocketServer>();
private void RestartDiscoveryTasks()
{
if (_update_discovery_table_task != -1)
{
Sheduller.Remove(_update_discovery_table_task);
}
if (_register_in_discovery_table_task != -1)
{
Sheduller.Remove(_register_in_discovery_table_task);
}
_update_discovery_table_task = Sheduller.RemindEvery(_update_discovery_table_period, RegisterServicesInDiscovery);
_register_in_discovery_table_task = Sheduller.RemindEvery(_register_in_discovery_table_period, () => { });
}
private void RegisterServicesInDiscovery()
{
var services = _serverInstances.
Values.
Select(s =>
{
var info = MessageSerializer.Copy(this._serviceInfo);
info.Port = s.LocalEndpoint.Port;
return info;
}).
ToList();
foreach (var service in services)
{
_discoveryClient.Register(service);
}
}
public void UseDiscovery()
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
if (_state == ZeroServiceStatus.Running
|| _state == ZeroServiceStatus.Initialized)
{
if (_discoveryClient != null)
{
_discoveryClient.Dispose();
_discoveryClient = null;
}
var discovery = Configuration.Default.First("discovery");
_discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(discovery), _null_router, false));
RestartDiscoveryTasks();
}
}
public void UseDiscovery(string endpoint)
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
if (_state == ZeroServiceStatus.Running
|| _state == ZeroServiceStatus.Initialized)
{
if (_discoveryClient != null)
{
_discoveryClient.Dispose();
_discoveryClient = null;
}
_discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), _null_router, false));
RestartDiscoveryTasks();
}
}
public void UseDiscovery(IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
if (_state == ZeroServiceStatus.Running
|| _state == ZeroServiceStatus.Initialized)
{
if (_discoveryClient != null)
{
_discoveryClient.Dispose();
_discoveryClient = null;
}
_discoveryClient = new DiscoveryClient(GetClient(endpoint, _null_router, false));
RestartDiscoveryTasks();
}
}
public IRouter UseHost()
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
if (_state == ZeroServiceStatus.Running
|| _state == ZeroServiceStatus.Initialized)
{
return GetServer(new IPEndPoint(IPAddress.Any, NetUtils.GetFreeTcpPort()), new Router()).Router;
}
@ -157,8 +218,8 @@ namespace ZeroLevel.Services.Applications
public IRouter UseHost(int port)
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
if (_state == ZeroServiceStatus.Running
|| _state == ZeroServiceStatus.Initialized)
{
return GetServer(new IPEndPoint(IPAddress.Any, port), new Router()).Router;
}
@ -167,8 +228,8 @@ namespace ZeroLevel.Services.Applications
public IRouter UseHost(IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
if (_state == ZeroServiceStatus.Running
|| _state == ZeroServiceStatus.Initialized)
{
return GetServer(endpoint, new Router()).Router;
}
@ -177,38 +238,210 @@ namespace ZeroLevel.Services.Applications
public ExClient ConnectToService(string endpoint)
{
if (_state == ZeroServiceStatus.Running)
if (_state == ZeroServiceStatus.Running
|| _state == ZeroServiceStatus.Initialized)
{
return new ExClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), new Router(), true));
return GetClient(NetUtils.CreateIPEndPoint(endpoint), new Router(), true);
}
return null;
}
public ExClient ConnectToService(string alias, string endpoint)
public ExClient ConnectToService(IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running)
if (_state == ZeroServiceStatus.Running
|| _state == ZeroServiceStatus.Initialized)
{
return new ExClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), new Router(), true));
return GetClient(endpoint, new Router(), true);
}
return null;
}
public ExClient ConnectToService(IPEndPoint endpoint)
#region Autoregistration inboxes
private static Delegate CreateDelegate(Type delegateType, MethodInfo methodInfo, object target)
{
if (_state == ZeroServiceStatus.Running)
Func<Type[], Type> getType;
var isAction = methodInfo.ReturnType.Equals((typeof(void)));
if (isAction)
{
return new ExClient(GetClient(endpoint, new Router(), true));
getType = Expression.GetActionType;
}
return null;
else
{
getType = Expression.GetFuncType;
}
if (methodInfo.IsStatic)
{
return Delegate.CreateDelegate(delegateType, methodInfo);
}
return Delegate.CreateDelegate(delegateType, target, methodInfo.Name);
}
public ExClient ConnectToService(string alias, IPEndPoint endpoint)
public void AutoregisterInboxes(IServer server)
{
if (_state == ZeroServiceStatus.Running)
var type = server.GetType();
// Search router registerinbox methods with inbox name
var register_methods = type.GetMethods(BindingFlags.Instance
| BindingFlags.Public
| BindingFlags.NonPublic
| BindingFlags.FlattenHierarchy)?
.Where(mi => mi.Name.Equals("RegisterInbox", StringComparison.Ordinal) &&
mi.GetParameters().First().ParameterType == typeof(string));
var register_message_handler = register_methods.First(mi => mi.IsGenericMethod == false);
var register_message_handler_with_msg = register_methods.First(mi =>
{
if (mi.IsGenericMethod)
{
var paremeters = mi.GetParameters().ToArray();
if (paremeters.Length == 2 && paremeters[1].ParameterType.IsAssignableToGenericType(typeof(MessageHandler<>)))
{
return true;
}
}
return false;
});
var register_request_handler_without_msg = register_methods.First(mi =>
{
return new ExClient(GetClient(endpoint, new Router(), true));
if (mi.IsGenericMethod)
{
var paremeters = mi.GetParameters().ToArray();
if (paremeters.Length == 2 && paremeters[1].ParameterType.IsAssignableToGenericType(typeof(RequestHandler<>)))
{
return true;
}
}
return false;
});
var register_request_handler = register_methods.First(mi =>
{
if (mi.IsGenericMethod)
{
var paremeters = mi.GetParameters().ToArray();
if (paremeters.Length == 2 && paremeters[1].ParameterType.IsAssignableToGenericType(typeof(RequestHandler<,>)))
{
return true;
}
}
return false;
});
MethodInfo[] methods = this.
GetType().
GetMethods(BindingFlags.NonPublic
| BindingFlags.Public
| BindingFlags.Instance
| BindingFlags.FlattenHierarchy
| BindingFlags.Instance);
foreach (MethodInfo mi in methods)
{
try
{
foreach (Attribute attr in Attribute.GetCustomAttributes(mi, typeof(ExchangeAttribute)))
{
var args = mi.GetParameters().ToArray();
if (attr.GetType() == typeof(ExchangeMainHandlerAttribute))
{
if (args.Length == 1)
{
var handler = CreateDelegate(typeof(MessageHandler), mi, this);
register_message_handler.Invoke(server, new object[] { BaseSocket.DEFAULT_MESSAGE_INBOX, handler });
}
else
{
var handler = CreateDelegate(typeof(MessageHandler<>).MakeGenericType(args[1].ParameterType), mi, this);
MethodInfo genericMethod = register_message_handler_with_msg.MakeGenericMethod(args[1].ParameterType);
genericMethod.Invoke(server, new object[] { BaseSocket.DEFAULT_MESSAGE_INBOX, handler });
}
}
else if (attr.GetType() == typeof(ExchangeHandlerAttribute))
{
if (args.Length == 1)
{
var handler = CreateDelegate(typeof(MessageHandler), mi, this);
register_message_handler.Invoke(server, new object[] { (attr as ExchangeHandlerAttribute).Inbox, handler });
}
else
{
var handler = CreateDelegate(typeof(MessageHandler<>).MakeGenericType(args[1].ParameterType), mi, this);
MethodInfo genericMethod = register_message_handler_with_msg.MakeGenericMethod(args[1].ParameterType);
genericMethod.Invoke(server, new object[] { (attr as ExchangeHandlerAttribute).Inbox, handler });
}
}
else if (attr.GetType() == typeof(ExchangeMainReplierAttribute))
{
var returnType = mi.ReturnType;
var genArgType = args[1].ParameterType;
MethodInfo genericMethod = register_request_handler.MakeGenericMethod(genArgType, returnType);
var requestHandler = CreateDelegate(typeof(RequestHandler<,>).MakeGenericType(args[1].ParameterType, returnType), mi, this);
genericMethod.Invoke(server, new object[] { BaseSocket.DEFAULT_REQUEST_INBOX, requestHandler });
}
else if (attr.GetType() == typeof(ExchangeReplierAttribute))
{
var returnType = mi.ReturnType;
var genArgType = args[1].ParameterType;
MethodInfo genericMethod = register_request_handler.MakeGenericMethod(genArgType, returnType);
var requestHandler = CreateDelegate(typeof(RequestHandler<,>).MakeGenericType(args[1].ParameterType, returnType), mi, this);
genericMethod.Invoke(server, new object[] { (attr as ExchangeReplierAttribute).Inbox, requestHandler });
}
else if (attr.GetType() == typeof(ExchangeMainReplierWithoutArgAttribute))
{
var returnType = mi.ReturnType;
MethodInfo genericMethod = register_request_handler_without_msg.MakeGenericMethod(returnType);
var requestHandler = CreateDelegate(typeof(RequestHandler<>).MakeGenericType(returnType), mi, this);
genericMethod.Invoke(server, new object[] { BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, requestHandler });
}
else if (attr.GetType() == typeof(ExchangeReplierWithoutArgAttribute))
{
var returnType = mi.ReturnType;
MethodInfo genericMethod = register_request_handler_without_msg.MakeGenericMethod(returnType);
var requestHandler = CreateDelegate(typeof(RequestHandler<>).MakeGenericType(returnType), mi, this);
genericMethod.Invoke(server, new object[] { (attr as ExchangeReplierWithoutArgAttribute).Inbox, requestHandler });
}
}
}
catch (Exception ex)
{
Log.Debug($"[ZExchange] Can't register method {mi.Name} as inbox handler. {ex}");
}
}
}
#endregion
public void StoreConnection(string endpoint)
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
{
_aliases.Set(endpoint, NetUtils.CreateIPEndPoint(endpoint));
}
}
public void StoreConnection(string alias, string endpoint)
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
{
_aliases.Set(alias, NetUtils.CreateIPEndPoint(endpoint));
}
}
public void StoreConnection(IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
{
_aliases.Set($"{endpoint.Address}:{endpoint.Port}", endpoint);
}
}
public void StoreConnection(string alias, IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
{
_aliases.Set(alias, endpoint);
}
return null;
}
#endregion
@ -284,16 +517,14 @@ namespace ZeroLevel.Services.Applications
}
#endregion
#region Utils
private static readonly ConcurrentDictionary<string, ISocketClient> _clientInstances = new ConcurrentDictionary<string, ISocketClient>();
private readonly ConcurrentDictionary<string, SocketServer> _serverInstances = new ConcurrentDictionary<string, SocketServer>();
#region Utils
private ISocketClient GetClient(IPEndPoint endpoint, IRouter router, bool use_cachee)
private ExClient GetClient(IPEndPoint endpoint, IRouter router, bool use_cachee)
{
if (use_cachee)
{
string key = $"{endpoint.Address}:{endpoint.Port}";
ISocketClient instance = null;
ExClient instance = null;
if (_clientInstances.ContainsKey(key))
{
instance = _clientInstances[key];
@ -305,11 +536,11 @@ namespace ZeroLevel.Services.Applications
instance.Dispose();
instance = null;
}
instance = new SocketClient(endpoint, router);
instance = new ExClient(new SocketClient(endpoint, router));
_clientInstances[key] = instance;
return instance;
}
return new SocketClient(endpoint, router);
return new ExClient(new SocketClient(endpoint, router));
}
private SocketServer GetServer(IPEndPoint endpoint, IRouter router)
@ -328,29 +559,42 @@ namespace ZeroLevel.Services.Applications
public void Dispose()
{
_state = ZeroServiceStatus.Disposed;
foreach (var client in _clientInstances)
if (_state != ZeroServiceStatus.Disposed)
{
try
_state = ZeroServiceStatus.Disposed;
if (_update_discovery_table_task != -1)
{
client.Value.Dispose();
Sheduller.Remove(_update_discovery_table_task);
}
catch (Exception ex)
if (_register_in_discovery_table_task != -1)
{
Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketClient to endpoint {client.Key}");
Sheduller.Remove(_register_in_discovery_table_task);
}
}
foreach (var server in _serverInstances)
{
try
foreach (var client in _clientInstances)
{
server.Value.Dispose();
try
{
client.Value.Dispose();
}
catch (Exception ex)
{
Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketClient to endpoint {client.Key}");
}
}
catch (Exception ex)
foreach (var server in _serverInstances)
{
Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketServer with endpoint {server.Key}");
try
{
server.Value.Dispose();
}
catch (Exception ex)
{
Log.Error(ex, $"[BaseZeroService`{Name ?? string.Empty}.Dispose()] Dispose SocketServer with endpoint {server.Key}");
}
}
}
}

@ -0,0 +1,9 @@
namespace ZeroLevel.Services.Collections
{
public interface ITransactable
{
bool StartTransction();
bool Commit();
bool Rollback();
}
}

@ -0,0 +1,219 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace ZeroLevel.Services.Collections
{
/// <summary>
/// Класс обертывает коллекцию вида ключ-значение и позволяет проводить над ней транзакционные обновления
/// </summary>
/// <typeparam name="TKey">Тип ключа коллекции</typeparam>
/// <typeparam name="TValue">Тип значения коллекции</typeparam>
public class KeyListValueTransactCollection<TKey, TValue> :
ITransactable
{
/// <summary>
/// Коллекция
/// </summary>
readonly Dictionary<TKey, List<TValue>> _collection = new Dictionary<TKey, List<TValue>>();
private ReaderWriterLockSlim _rwLock =
new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
/// <summary>
/// Проверка наличия ключа
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public bool HasKey(TKey key)
{
try
{
_rwLock.EnterReadLock();
return _collection.ContainsKey(key);
}
finally
{
_rwLock.ExitReadLock();
}
}
/// <summary>
/// Получение значения коллекции по ключу
/// </summary>
/// <param name="key">Ключ</param>
/// <returns>Значение</returns>
public IEnumerable<TValue> this[TKey key]
{
get
{
try
{
_rwLock.EnterReadLock();
List<TValue> value;
if (_collection.TryGetValue(key, out value))
return value;
}
finally
{
_rwLock.ExitReadLock();
}
throw new KeyNotFoundException();
}
}
/// <summary>
/// Коллекция ключей
/// </summary>
public IEnumerable<TKey> Keys
{
get
{
try
{
_rwLock.EnterReadLock();
return _collection.Keys;
}
finally
{
_rwLock.ExitReadLock();
}
}
}
/// <summary>
/// Коллекция значений
/// </summary>
public IEnumerable<List<TValue>> Values
{
get
{
try
{
_rwLock.EnterReadLock();
return _collection.Values.ToArray();
}
finally
{
_rwLock.ExitReadLock();
}
}
}
#region Transaction update
/// <summary>
/// Список не обновленных данных (т.е. тех которые удалены в базе)
/// </summary>
readonly List<TKey> _removingDate = new List<TKey>();
/// <summary>
/// Обновленные данные
/// </summary>
readonly Dictionary<TKey, List<TValue>> _updatedRecords = new Dictionary<TKey, List<TValue>>();
/// <summary>
/// Новые данные
/// </summary>
readonly Dictionary<TKey, List<TValue>> _newRecords = new Dictionary<TKey, List<TValue>>();
void ClearTransactionDate()
{
_removingDate.Clear();
_updatedRecords.Clear();
_newRecords.Clear();
}
/// <summary>
/// Добавление или обновления записи
/// </summary>
/// <param name="id">Идентификатор записи</param>
/// <param name="value">Значение</param>
public void Post(TKey id, TValue value)
{
if (_isUpdating.State == false)
{
throw new Exception("Method Post allowed only in transaction");
}
if (!HasKey(id))
{
if (_newRecords.ContainsKey(id) == false)
{
_newRecords.Add(id, new List<TValue>());
}
_newRecords[id].Add(value);
}
else
{
if (!_updatedRecords.ContainsKey(id))
{
_updatedRecords.Add(id, new List<TValue>());
}
_updatedRecords[id].Add(value);
if (_removingDate.Contains(id))
_removingDate.Remove(id);
}
return;
}
readonly AtomicBoolean _isUpdating = new AtomicBoolean();
public bool Commit()
{
if (_isUpdating.State == false) return false;
try
{
_rwLock.EnterWriteLock();
foreach (TKey id in _removingDate)
{
_collection.Remove(id);
}
foreach (TKey key in _newRecords.Keys)
{
_collection.Add(key, _newRecords[key]);
}
foreach (TKey key in _updatedRecords.Keys)
{
_collection[key] = _updatedRecords[key];
}
}
finally
{
_rwLock.ExitWriteLock();
ClearTransactionDate();
_isUpdating.Reset();
}
return true;
}
public bool Rollback()
{
if (_isUpdating.State == false) return false;
ClearTransactionDate();
_isUpdating.Reset();
return true;
}
public bool StartTransction()
{
if (_isUpdating.Set())
{
_removingDate.AddRange(_collection.Keys.ToArray());
return true;
}
return false;
}
#endregion
#region IDisposable
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (_collection != null)
{
_collection.Clear();
}
}
}
#endregion
}
}

@ -0,0 +1,271 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace ZeroLevel.Services.Collections
{
/// <summary>
/// Класс обертывает коллекцию вида ключ-значение и позволяет проводить над ней транзакционные обновления
/// </summary>
/// <typeparam name="TKey">Тип ключа коллекции</typeparam>
/// <typeparam name="TValue">Тип значения коллекции</typeparam>
public class KeyValueTransactCollection<TKey, TValue> :
ITransactable
{
private ReaderWriterLockSlim _rwLock =
new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
/// <summary>
/// Коллекция
/// </summary>
readonly Dictionary<TKey, TValue> _collection = new Dictionary<TKey, TValue>();
public KeyValueTransactCollection() { }
/// <summary>
/// Проверка наличия ключа
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public bool HasKey(TKey key)
{
try
{
_rwLock.EnterReadLock();
return _collection.ContainsKey(key);
}
finally
{
_rwLock.ExitReadLock();
}
}
/// <summary>
/// Получение значения коллекции по ключу
/// </summary>
/// <param name="key">Ключ</param>
/// <returns>Значение</returns>
public TValue this[TKey key]
{
get
{
try
{
_rwLock.EnterReadLock();
TValue value;
if (_collection.TryGetValue(key, out value))
return (value);
}
finally
{
_rwLock.ExitReadLock();
}
throw new KeyNotFoundException();
}
}
/// <summary>
/// Количество записей
/// </summary>
public int Count
{
get
{
try
{
_rwLock.EnterReadLock();
return _collection.Count;
}
finally
{
_rwLock.ExitReadLock();
}
}
}
/// <summary>
/// Коллекция ключей
/// </summary>
public IEnumerable<TKey> Keys
{
get
{
try
{
_rwLock.EnterReadLock();
return _collection.Keys;
}
finally
{
_rwLock.ExitReadLock();
}
}
}
/// <summary>
/// Список ключ-значений
/// </summary>
public IEnumerable<KeyValuePair<TKey, TValue>> Items
{
get
{
try
{
_rwLock.EnterReadLock();
return _collection;
}
finally
{
_rwLock.ExitReadLock();
}
}
}
/// <summary>
/// Коллекция значений
/// </summary>
public IEnumerable<TValue> Values
{
get
{
try
{
_rwLock.EnterReadLock();
return _collection.Values;
}
finally
{
_rwLock.ExitReadLock();
}
}
}
#region Transaction update
/// <summary>
/// Список не обновленных данных (т.е. тех которые удалены в базе)
/// </summary>
readonly List<TKey> _removingDate = new List<TKey>();
/// <summary>
/// Обновленные данные
/// </summary>
readonly Dictionary<TKey, TValue> _updatedRecords = new Dictionary<TKey, TValue>();
/// <summary>
/// Новые данные
/// </summary>
readonly Dictionary<TKey, TValue> _newRecords = new Dictionary<TKey, TValue>();
void ClearTransactionDate()
{
_removingDate.Clear();
_updatedRecords.Clear();
_newRecords.Clear();
}
/// <summary>
/// Добавление или обновления записи
/// </summary>
/// <param name="id">Идентификатор записи</param>
/// <param name="value">Значение</param>
public void Post(TKey id, TValue value)
{
if (_isUpdating.State == false)
{
throw new Exception("Method Post allowed only in transaction");
}
if (!HasKey(id))
{
if (_newRecords.ContainsKey(id) == false)
{
_newRecords.Add(id, value);
}
else
{
_newRecords[id] = value;
}
}
else
{
if (!_collection[id].Equals(value))
{
if (false == _updatedRecords.ContainsKey(id))
{
_updatedRecords.Add(id, value);
}
else
{
_updatedRecords[id] = value;
}
}
if (_removingDate.Contains(id))
_removingDate.Remove(id);
}
return;
}
#endregion
readonly AtomicBoolean _isUpdating = new AtomicBoolean();
public bool StartTransction()
{
if (_isUpdating.Set())
{
_removingDate.AddRange(_collection.Keys.ToArray());
return true;
}
return false;
}
public bool Commit()
{
if (_isUpdating.State == false) return false;
try
{
_rwLock.EnterWriteLock();
foreach (TKey id in _removingDate)
{
_collection.Remove(id);
}
foreach (TKey key in _newRecords.Keys)
{
_collection.Add(key, _newRecords[key]);
}
foreach (TKey key in _updatedRecords.Keys)
{
_collection[key] = _updatedRecords[key];
}
}
finally
{
_rwLock.ExitWriteLock();
ClearTransactionDate();
_isUpdating.Reset();
}
return true;
}
public bool Rollback()
{
if (_isUpdating.State == false) return false;
ClearTransactionDate();
_isUpdating.Reset();
return true;
}
#region IDisposable
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (_collection != null)
{
foreach (TKey key in _collection.Keys)
{
var disposable = _collection[key] as IDisposable;
if (disposable != null)
disposable.Dispose();
}
_collection.Clear();
}
}
}
#endregion
}
}

@ -1,11 +1,12 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace ZeroLevel.Network
{
public sealed class AliasSet
public sealed class AliasSet<T>
{
public sealed class _RoundRobinCollection<T> :
IDisposable
@ -97,6 +98,29 @@ namespace ZeroLevel.Network
_index = -1;
}
public IEnumerable<T> GetCurrentSeq()
{
_lock.EnterReadLock();
try
{
var arr = new T[_collection.Count];
int p = 0;
for (int i = _index; i < _collection.Count; i++, p++)
{
arr[p] = _collection[i];
}
for (int i = 0; i < _index; i++, p++)
{
arr[p] = _collection[i];
}
return arr;
}
finally
{
_lock.ExitReadLock();
}
}
public void Dispose()
{
_collection.Clear();
@ -104,13 +128,13 @@ namespace ZeroLevel.Network
}
}
private readonly ConcurrentDictionary<string, _RoundRobinCollection<string>> _aliases = new ConcurrentDictionary<string, _RoundRobinCollection<string>>();
private readonly ConcurrentDictionary<string, _RoundRobinCollection<T>> _aliases = new ConcurrentDictionary<string, _RoundRobinCollection<T>>();
public void Set(string alias, string address)
public void Set(string alias, T address)
{
if (_aliases.ContainsKey(alias) == false)
{
if (_aliases.TryAdd(alias, new _RoundRobinCollection<string>()))
if (_aliases.TryAdd(alias, new _RoundRobinCollection<T>()))
{
_aliases[alias].Add(address);
}
@ -122,11 +146,11 @@ namespace ZeroLevel.Network
}
}
public void Set(string alias, IEnumerable<string> addresses)
public void Set(string alias, IEnumerable<T> addresses)
{
if (_aliases.ContainsKey(alias) == false)
{
if (_aliases.TryAdd(alias, new _RoundRobinCollection<string>()))
if (_aliases.TryAdd(alias, new _RoundRobinCollection<T>()))
{
foreach (var address in addresses)
_aliases[alias].Add(address);
@ -140,13 +164,22 @@ namespace ZeroLevel.Network
}
}
public string GetAddress(string alias)
public T GetAddress(string alias)
{
if (_aliases.ContainsKey(alias) && _aliases[alias].MoveNext())
{
return _aliases[alias].Current;
}
return null;
return default(T);
}
public IEnumerable<T> GetAddresses(string alias)
{
if (_aliases.ContainsKey(alias) && _aliases[alias].MoveNext())
{
return _aliases[alias].GetCurrentSeq();
}
return Enumerable.Empty<T>();
}
}
}

@ -11,6 +11,7 @@ namespace ZeroLevel.Network
public const string DEFAULT_MESSAGE_INBOX = "__message_inbox__";
public const string DEFAULT_REQUEST_INBOX = "__request_inbox__";
public const string DEFAULT_REQUEST_WITHOUT_ARGS_INBOX = "__request_no_args_inbox__";
protected const string DEFAULT_REQUEST_ERROR_INBOX = "__request_error__";
/// <summary>

@ -1,10 +1,12 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
namespace ZeroLevel.Network
{
public interface IDiscoveryClient
: IDisposable
{
bool Register(ExServiceInfo info);
bool Register(ZeroServiceInfo info);
IEnumerable<ServiceEndpointInfo> GetServiceEndpoints(string serviceKey);

@ -10,7 +10,8 @@ namespace ZeroLevel.Network
public class DiscoveryClient
: IDiscoveryClient
{
private sealed class DCRouter
private sealed class DCRouter:
IDisposable
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private IEnumerable<ServiceEndpointInfo> _empty = Enumerable.Empty<ServiceEndpointInfo>();
@ -124,14 +125,19 @@ namespace ZeroLevel.Network
}
return _empty;
}
public void Dispose()
{
_lock.Dispose();
}
}
private readonly DCRouter _router = new DCRouter();
private readonly ExClient _discoveryServerClient;
public DiscoveryClient(ISocketClient client)
public DiscoveryClient(ExClient client)
{
_discoveryServerClient = new ExClient(client);
_discoveryServerClient = client;
UpdateServiceListInfo();
Sheduller.RemindEvery(TimeSpan.FromSeconds(30), UpdateServiceListInfo);
}
@ -160,7 +166,7 @@ namespace ZeroLevel.Network
}
}
public bool Register(ExServiceInfo info)
public bool Register(ZeroServiceInfo info)
{
_discoveryServerClient.ForceConnect();
if (_discoveryServerClient.Status == SocketClientStatus.Working)
@ -168,7 +174,7 @@ namespace ZeroLevel.Network
bool result = false;
try
{
_discoveryServerClient.Request<ExServiceInfo, InvokeResult>("register", info, r =>
_discoveryServerClient.Request<ZeroServiceInfo, InvokeResult>("register", info, r =>
{
result = r.Success;
if (!result)
@ -194,5 +200,11 @@ namespace ZeroLevel.Network
public IEnumerable<ServiceEndpointInfo> GetServiceEndpointsByGroup(string serviceGroup) => _router.GetServiceEndpointsByGroup(serviceGroup);
public IEnumerable<ServiceEndpointInfo> GetServiceEndpointsByType(string serviceType) => _router.GetServiceEndpointsByType(serviceType);
public ServiceEndpointInfo GetService(string serviceKey, string endpoint) => _router.GetService(serviceKey, endpoint);
public void Dispose()
{
_router.Dispose();
_discoveryServerClient.Dispose();
}
}
}

@ -1,6 +1,10 @@
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Reflection;
using ZeroLevel.Models;
using ZeroLevel.Services.Invokation;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
@ -108,7 +112,7 @@ namespace ZeroLevel.Network
{
try
{
_client.Request(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible<Trequest>(request)),
_client.Request(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible<Trequest>(request)),
f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
}
catch (Exception ex)

@ -155,5 +155,45 @@ namespace ZeroLevel.Services.Serialization
return PrimitiveTypeSerializer.Deserialize(reader, type);
}
}
public static T Copy<T>(T value)
where T : IBinarySerializable
{
using (var writer = new MemoryStreamWriter())
{
value.Serialize(writer);
using (var reader = new MemoryStreamReader(writer.Complete()))
{
var direct = (IBinarySerializable)Activator.CreateInstance<T>();
direct.Deserialize(reader);
return (T)direct;
}
}
}
public static T CopyCompatible<T>(T value)
{
if (typeof(IBinarySerializable).IsAssignableFrom(typeof(T)))
{
using (var writer = new MemoryStreamWriter())
{
((IBinarySerializable)value).Serialize(writer);
using (var reader = new MemoryStreamReader(writer.Complete()))
{
var direct = (IBinarySerializable)Activator.CreateInstance<T>();
direct.Deserialize(reader);
return (T)direct;
}
}
}
using (var writer = new MemoryStreamWriter())
{
PrimitiveTypeSerializer.Serialize<T>(writer, value);
using (var reader = new MemoryStreamReader(writer.Complete()))
{
return PrimitiveTypeSerializer.Deserialize<T>(reader);
}
}
}
}
}
Loading…
Cancel
Save

Powered by TurnKey Linux.