No stable fix

pull/1/head
Ogoun 6 years ago
parent c12a54f3b2
commit 09cb770765

@ -4,7 +4,6 @@
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.7.2" />
</startup>
<appSettings>
<add key="console" value="true" />
<add key="apiport" value="8885" />
<add key="socketport" value="8884" />
</appSettings>

@ -2,6 +2,7 @@
using ZeroLevel.Models;
using ZeroLevel.Network;
using ZeroLevel.Services.Applications;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Discovery
{
@ -36,7 +37,7 @@ namespace ZeroLevel.Discovery
Startup.StartWebPanel(port, false);
var socketPort = Configuration.Default.First<int>("socketport");
_exInbox = ExchangeTransportFactory.GetServer("socket", socketPort);
_exInbox = ExchangeTransportFactory.GetServer(socketPort);
_exInbox.RegisterInbox<IEnumerable<ServiceEndpointsInfo>>("services", (_, __) => routeTable.Get());
_exInbox.RegisterInbox<ExServiceInfo, InvokeResult>("register", (info, _, __) => routeTable.Append(info));

@ -4,6 +4,7 @@
{
private static void Main(string[] args)
{
Log.AddConsoleLogger(Services.Logging.LogLevel.System | Services.Logging.LogLevel.FullDebug);
Bootstrap.Startup<DiscoveryService>(args);
}
}

@ -91,11 +91,11 @@ namespace ZeroLevel.Discovery
#endregion Snapshot
private bool Ping(string protocol, string endpoint, string msg)
private bool Ping(string endpoint, string msg)
{
try
{
using (var client = ExchangeTransportFactory.GetClient(protocol, endpoint))
using (var client = ExchangeTransportFactory.GetClient(endpoint))
{
client.ForceConnect();
return client.Status == ZTransportStatus.Working;
@ -103,7 +103,7 @@ namespace ZeroLevel.Discovery
}
catch (Exception ex)
{
Log.Error(ex, $"[RouteTable] Fault ping endpoint {endpoint}, protocol {protocol}");
Log.Error(ex, $"[RouteTable] Fault ping endpoint {endpoint}");
return false;
}
}
@ -112,20 +112,20 @@ namespace ZeroLevel.Discovery
{
try
{
var removeEntities = new Dictionary<string, List<ServiceEndpointInfo>>();
var removeEntities = new Dictionary<string, List<string>>();
_lock.EnterReadLock();
try
{
foreach (var pair in _table)
{
var endpointsToRemove = new List<ServiceEndpointInfo>();
var endpointsToRemove = new List<string>();
foreach (var e in pair.Value.Endpoints)
{
if (Ping(e.Protocol, e.Endpoint, "HELLO") == false)
if (Ping(e, "HELLO") == false)
{
if (false == removeEntities.ContainsKey(pair.Key))
{
removeEntities.Add(pair.Key, new List<ServiceEndpointInfo>());
removeEntities.Add(pair.Key, new List<string>());
}
removeEntities[pair.Key].Add(e);
}
@ -169,7 +169,7 @@ namespace ZeroLevel.Discovery
public InvokeResult Append(ExServiceInfo serviceInfo)
{
InvokeResult result = null;
if (Ping(serviceInfo.Protocol, serviceInfo.Endpoint, serviceInfo.ServiceKey))
if (Ping(serviceInfo.Endpoint, serviceInfo.ServiceKey))
{
var key = $"{serviceInfo.ServiceGroup}:{serviceInfo.ServiceType}:{serviceInfo.ServiceKey.Trim().ToLowerInvariant()}";
_lock.EnterWriteLock();
@ -183,27 +183,18 @@ namespace ZeroLevel.Discovery
Version = serviceInfo.Version,
ServiceGroup = serviceInfo.ServiceGroup,
ServiceType = serviceInfo.ServiceType,
Endpoints = new List<ServiceEndpointInfo>()
Endpoints = new List<string>()
});
_table[key].Endpoints.Add(new ServiceEndpointInfo
{
Endpoint = serviceInfo.Endpoint,
Protocol = serviceInfo.Protocol
});
Log.Info($"The service '{serviceInfo.ServiceKey}' registered on protocol {serviceInfo.Protocol}, endpoint: {serviceInfo.Endpoint}");
_table[key].Endpoints.Add(serviceInfo.Endpoint);
Log.Info($"The service '{serviceInfo.ServiceKey}' registered on endpoint: {serviceInfo.Endpoint}");
}
else
{
var exists = _table[key];
var endpoint = new ServiceEndpointInfo
{
Endpoint = serviceInfo.Endpoint,
Protocol = serviceInfo.Protocol
};
if (exists.Endpoints.Contains(endpoint) == false)
if (exists.Endpoints.Contains(serviceInfo.Endpoint) == false)
{
Log.Info($"The service '{serviceInfo.ServiceKey}' register endpoint: {serviceInfo.Endpoint} on protocol {serviceInfo.Protocol}");
exists.Endpoints.Add(endpoint);
Log.Info($"The service '{serviceInfo.ServiceKey}' register endpoint: {serviceInfo.Endpoint}");
exists.Endpoints.Add(serviceInfo.Endpoint);
}
}
}

@ -7,7 +7,6 @@ namespace ZeroLevel.Services.Applications
{
public string Key { get; private set; }
public string Version { get; private set; }
public string Protocol { get; private set; }
public string Group { get; private set; }
public string Type { get; private set; }
@ -21,14 +20,12 @@ namespace ZeroLevel.Services.Applications
base.Name = ReadName(_config);
this.Key = ReadKey(_config);
this.Version = ReadVersion(_config);
this.Protocol = ReadProtocol(_config);
this.Group = ReadServiceGroup(_config);
this.Type = ReadServiceType(_config);
var discovery = _config.First("discovery");
var discoveryProtocol = _config.FirstOrDefault("discoveryProtocol", "socket");
_exchange = new Exchange(new DiscoveryClient(discoveryProtocol, discovery));
_exchange = new Exchange(new DiscoveryClient(discovery));
}
private IExService _self_service = null;
@ -80,15 +77,6 @@ namespace ZeroLevel.Services.Applications
return "1.0";
}
private string ReadProtocol(IConfiguration configuration)
{
if (_config.Contains("Protocol"))
return _config.First("Protocol");
if (_config.Contains("Transport"))
return _config.First("Transport");
return null;
}
private string ReadServiceGroup(IConfiguration configuration)
{
if (_config.Contains("DiscoveryGroup"))

@ -0,0 +1,73 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace ZeroLevel.Services.Collections
{
public sealed class RoundRobinOverCollection<T>
{
private class Node
{
public T Value;
public Node Next;
}
private int _count;
private Node _currentNode;
public bool IsEmpty => _count <= 0;
public RoundRobinOverCollection(IEnumerable<T> collection)
{
if (collection.Any())
{
_count = 1;
_currentNode = new Node { Value = collection.First() };
var prev = _currentNode;
foreach (var e in collection.Skip(1))
{
prev.Next = new Node { Value = e };
prev = prev.Next;
_count++;
}
prev.Next = _currentNode;
}
else
{
_count = 0;
}
}
public IEnumerable<T> Find(Func<T, bool> selector)
{
if (_count == 0)
{
yield break;
}
var cursor = _currentNode;
for (int i = 0; i < _count; i++)
{
if (selector(cursor.Value))
{
yield return cursor.Value;
}
cursor = cursor.Next;
}
}
public IEnumerable<T> GenerateSeq()
{
if (_count == 0)
{
yield break;
}
var cursor = _currentNode;
_currentNode = _currentNode.Next;
for (int i = 0; i < _count; i++)
{
yield return cursor.Value;
cursor = cursor.Next;
}
}
}
}

@ -102,9 +102,9 @@ namespace ZeroLevel.Services.Invokation
return Configure(typeof(T), filter);
}
public IEnumerable<string> Configure(Type type)
public IEnumerable<string> Configure(Type instanceType)
{
var result = type.GetMethods(BindingFlags.Static |
var result = instanceType.GetMethods(BindingFlags.Static |
BindingFlags.Instance |
BindingFlags.Public |
BindingFlags.NonPublic |
@ -113,9 +113,9 @@ namespace ZeroLevel.Services.Invokation
return result.Select(r => r.Item1).ToList();
}
public IEnumerable<string> Configure(Type type, string methodName)
public IEnumerable<string> Configure(Type instanceType, string methodName)
{
var result = type.GetMethods(BindingFlags.Static |
var result = instanceType.GetMethods(BindingFlags.Static |
BindingFlags.Instance |
BindingFlags.Public |
BindingFlags.NonPublic |
@ -125,9 +125,9 @@ namespace ZeroLevel.Services.Invokation
return result.Select(r => r.Item1).ToList();
}
public IEnumerable<string> ConfigureGeneric<T>(Type type, string methodName)
public IEnumerable<string> ConfigureGeneric<T>(Type instanceType, string methodName)
{
var result = type.GetMethods(BindingFlags.Static |
var result = instanceType.GetMethods(BindingFlags.Static |
BindingFlags.Instance |
BindingFlags.Public |
BindingFlags.NonPublic |
@ -149,9 +149,9 @@ namespace ZeroLevel.Services.Invokation
return result.Select(r => r.Item1).ToList();
}
public IEnumerable<string> ConfigureGeneric<T>(Type type, Func<MethodInfo, bool> filter)
public IEnumerable<string> ConfigureGeneric<T>(Type instanceType, Func<MethodInfo, bool> filter)
{
var result = type.GetMethods(BindingFlags.Static |
var result = instanceType.GetMethods(BindingFlags.Static |
BindingFlags.Instance |
BindingFlags.Public |
BindingFlags.NonPublic |
@ -173,9 +173,9 @@ namespace ZeroLevel.Services.Invokation
return result.Select(r => r.Item1).ToList();
}
public IEnumerable<string> Configure(Type type, Func<MethodInfo, bool> filter)
public IEnumerable<string> Configure(Type instanceType, Func<MethodInfo, bool> filter)
{
var result = type.GetMethods(BindingFlags.Static |
var result = instanceType.GetMethods(BindingFlags.Static |
BindingFlags.Instance |
BindingFlags.Public |
BindingFlags.NonPublic |

@ -6,7 +6,6 @@
string Key { get; }
string Endpoint { get; }
string Version { get; }
string Protocol { get; }
string Group { get; }
string Type { get; }
}

@ -1,113 +1,48 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reflection;
using System.Collections.Concurrent;
namespace ZeroLevel.Network
{
public static class ExchangeTransportFactory
{
private static readonly Dictionary<string, Type> _customServers = new Dictionary<string, Type>();
private static readonly Dictionary<string, Type> _customClients = new Dictionary<string, Type>();
private static readonly ConcurrentDictionary<string, IExClient> _clientInstances = new ConcurrentDictionary<string, IExClient>();
/// <summary>
/// Scanning the specified assembly to find the types that implement the IExchangeServer or IExchangeClient interfaces
/// </summary>
internal static void ScanAndRegisterCustomTransport(Assembly asm)
{
foreach (var type in asm.GetExportedTypes())
{
var serverAttr = type.GetCustomAttribute<ExchangeServerAttribute>();
if (serverAttr != null &&
string.IsNullOrWhiteSpace(serverAttr.Protocol) == false &&
typeof(IZObservableServer).IsAssignableFrom(type))
{
_customServers[serverAttr.Protocol] = type;
}
var clientAttr = type.GetCustomAttribute<ExchangeClientAttribute>();
if (clientAttr != null &&
string.IsNullOrWhiteSpace(clientAttr.Protocol) == false &&
typeof(IZTransport).IsAssignableFrom(type))
{
_customClients[clientAttr.Protocol] = type;
}
}
}
/// <summary>
/// Creates a server to receive messages using the specified protocol
/// </summary>
/// <param name="protocol">Protocol</param>
/// <returns>Server</returns>
public static IExService GetServer(string protocol, int port = -1)
public static IExService GetServer(int port = -1)
{
ExService instance = null;
if (protocol.Equals("socket", StringComparison.OrdinalIgnoreCase))
{
instance = new ExService(new ZExSocketObservableServer(new System.Net.IPEndPoint(NetUtils.GetNonLoopbackAddress(), port == -1 ? NetUtils.GetFreeTcpPort() : port)));
}
else
{
var key = protocol.Trim().ToLowerInvariant();
if (_customServers.ContainsKey(key))
{
instance = new ExService((IZObservableServer)Activator.CreateInstance(_customServers[key]));
}
}
if (instance != null)
{
return instance;
}
throw new NotSupportedException($"Protocol {protocol} not supported");
return new ExService(new ZExSocketObservableServer(new System.Net.IPEndPoint(NetUtils.GetNonLoopbackAddress(), port == -1 ? NetUtils.GetFreeTcpPort() : port)));
}
/// <summary>
/// Creates a client to access the server using the specified protocol
/// </summary>
/// <param name="protocol">Protocol</param>
/// <param name="endpoint">Server endpoint</param>
/// <returns>Client</returns>
public static IExClient GetClientWithCache(string protocol, string endpoint)
public static IExClient GetClientWithCache(string endpoint)
{
IExClient instance = null;
var cachee_key = $"{protocol}:{endpoint}";
if (_clientInstances.ContainsKey(cachee_key))
if (_clientInstances.ContainsKey(endpoint))
{
instance = _clientInstances[cachee_key];
instance = _clientInstances[endpoint];
if (instance.Status == ZTransportStatus.Working)
{
return instance;
}
_clientInstances.TryRemove(cachee_key, out instance);
_clientInstances.TryRemove(endpoint, out instance);
instance.Dispose();
instance = null;
}
instance = GetClient(protocol, endpoint);
_clientInstances[cachee_key] = instance;
instance = GetClient(endpoint);
_clientInstances[endpoint] = instance;
return instance;
}
public static IExClient GetClient(string protocol, string endpoint)
public static IExClient GetClient(string endpoint)
{
ExClient instance = null;
if (protocol.Equals("socket", StringComparison.OrdinalIgnoreCase))
{
instance = new ExClient(new ZSocketClient(NetUtils.CreateIPEndPoint(endpoint)));
}
else
{
var key = protocol.Trim().ToLowerInvariant();
if (_customClients.ContainsKey(key))
{
instance = new ExClient((IZTransport)Activator.CreateInstance(_customClients[key], new object[] { endpoint }));
}
}
if (instance != null)
{
return instance;
}
throw new NotSupportedException($"Protocol {protocol} not supported");
return new ExClient(new ZSocketClient(NetUtils.CreateIPEndPoint(endpoint)));
}
}
}

@ -31,12 +31,6 @@ namespace ZeroLevel.Network
[DataMember]
public string ServiceType { get; set; } = DEFAULT_TYPE_NAME;
/// <summary>
/// Protocol on which access to the service API is allowed
/// </summary>
[DataMember]
public string Protocol { get; set; }
/// <summary>
/// Connection point, address
/// </summary>
@ -60,7 +54,6 @@ namespace ZeroLevel.Network
if (string.Compare(this.Endpoint, other.Endpoint, true) != 0) return false;
if (string.Compare(this.Version, other.Version, true) != 0) return false;
if (string.Compare(this.Protocol, other.Protocol, true) != 0) return false;
return true;
}
@ -71,7 +64,7 @@ namespace ZeroLevel.Network
public override int GetHashCode()
{
return this.ServiceKey.GetHashCode() ^ this.Protocol.GetHashCode() ^ this.Endpoint.GetHashCode();
return this.ServiceKey.GetHashCode() ^ this.Endpoint.GetHashCode();
}
public void Serialize(IBinaryWriter writer)
@ -79,7 +72,6 @@ namespace ZeroLevel.Network
writer.WriteString(this.ServiceKey);
writer.WriteString(this.ServiceGroup);
writer.WriteString(this.ServiceType);
writer.WriteString(this.Protocol);
writer.WriteString(this.Endpoint);
writer.WriteString(this.Version);
}
@ -89,7 +81,6 @@ namespace ZeroLevel.Network
this.ServiceKey = reader.ReadString();
this.ServiceGroup = reader.ReadString();
this.ServiceType = reader.ReadString();
this.Protocol = reader.ReadString();
this.Endpoint = reader.ReadString();
this.Version = reader.ReadString();
}

@ -7,16 +7,17 @@ namespace ZeroLevel.Network
/// Endpoint
/// </summary>
public class ServiceEndpointInfo :
IEquatable<ServiceEndpointInfo>, IBinarySerializable
IEquatable<ServiceEndpointInfo>
{
public string Key { get; set; }
public string Type { get; set; }
public string Group { get; set; }
public string Endpoint { get; set; }
public string Protocol { get; set; }
public bool Equals(ServiceEndpointInfo other)
{
if (other == null) return false;
if (string.Compare(this.Endpoint, other.Endpoint, true) != 0) return false;
if (string.Compare(this.Protocol, other.Protocol, true) != 0) return false;
return true;
}
@ -27,18 +28,16 @@ namespace ZeroLevel.Network
public override int GetHashCode()
{
return Endpoint?.GetHashCode() ?? 0 ^ Protocol?.GetHashCode() ?? 0;
return Endpoint?.GetHashCode() ?? 0;
}
public void Serialize(IBinaryWriter writer)
{
writer.WriteString(this.Protocol);
writer.WriteString(this.Endpoint);
}
public void Deserialize(IBinaryReader reader)
{
this.Protocol = reader.ReadString();
this.Endpoint = reader.ReadString();
}
}

@ -14,7 +14,7 @@ namespace ZeroLevel.Network
public string Version { get; set; }
public string ServiceGroup { get; set; }
public string ServiceType { get; set; }
public List<ServiceEndpointInfo> Endpoints { get; set; }
public List<string> Endpoints { get; set; }
public bool Equals(ServiceEndpointsInfo other)
{
@ -23,7 +23,7 @@ namespace ZeroLevel.Network
if (string.Compare(this.Version, other.Version, true) != 0) return false;
if (string.Compare(this.ServiceGroup, other.ServiceGroup, true) != 0) return false;
if (string.Compare(this.ServiceType, other.ServiceType, true) != 0) return false;
if (false == CollectionComparsionExtensions.OrderingEquals(this.Endpoints, other.Endpoints, (a, b) => a.Equals(b))) return false;
if (!CollectionComparsionExtensions.NoOrderingEquals(this.Endpoints, other.Endpoints, (a, b) => a.Equals(b))) return false;
return true;
}
@ -52,7 +52,7 @@ namespace ZeroLevel.Network
this.Version = reader.ReadString();
this.ServiceGroup = reader.ReadString();
this.ServiceType = reader.ReadString();
this.Endpoints = reader.ReadCollection<ServiceEndpointInfo>();
this.Endpoints = reader.ReadStringCollection();
}
}
}

@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@ -8,57 +7,134 @@ using ZeroLevel.Services.Collections;
namespace ZeroLevel.Network
{
public class DiscoveryClient
: IDiscoveryClient
internal sealed class DCRouter
{
private readonly ConcurrentDictionary<string, RoundRobinCollection<ServiceEndpointInfo>> _tableByKey =
new ConcurrentDictionary<string, RoundRobinCollection<ServiceEndpointInfo>>();
private readonly ConcurrentDictionary<string, RoundRobinCollection<ServiceEndpointInfo>> _tableByGroups =
new ConcurrentDictionary<string, RoundRobinCollection<ServiceEndpointInfo>>();
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private IEnumerable<ServiceEndpointInfo> _empty = Enumerable.Empty<ServiceEndpointInfo>();
private List<ServiceEndpointInfo> _services = new List<ServiceEndpointInfo>();
private readonly ConcurrentDictionary<string, RoundRobinCollection<ServiceEndpointInfo>> _tableByTypes =
new ConcurrentDictionary<string, RoundRobinCollection<ServiceEndpointInfo>>();
private Dictionary<string, RoundRobinOverCollection<ServiceEndpointInfo>> _tableByKey;
private Dictionary<string, RoundRobinOverCollection<ServiceEndpointInfo>> _tableByGroups;
private Dictionary<string, RoundRobinOverCollection<ServiceEndpointInfo>> _tableByTypes;
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private readonly IExClient _discoveryServerClient;
internal void Update(IEnumerable<ServiceEndpointsInfo> records)
{
if (records == null)
{
Log.Warning("[DiscoveryClient] UpdateServiceListInfo. Discrovery response is empty");
return;
}
var services = new List<ServiceEndpointInfo>();
foreach (var service in records)
{
var key = service.ServiceKey.ToUpperInvariant();
var type = service.ServiceType.ToUpperInvariant();
var group = service.ServiceGroup.ToUpperInvariant();
services.AddRange(service.Endpoints.Select(e => new ServiceEndpointInfo { Endpoint = e, Group = group, Key = key, Type = type }));
}
_lock.EnterWriteLock();
try
{
_services = services;
_tableByKey = _services.GroupBy(r => r.Key).ToDictionary(g => g.Key, g => new RoundRobinOverCollection<ServiceEndpointInfo>(g));
_tableByTypes = _services.GroupBy(r => r.Type).ToDictionary(g => g.Key, g => new RoundRobinOverCollection<ServiceEndpointInfo>(g));
_tableByGroups = _services.GroupBy(r => r.Group).ToDictionary(g => g.Key, g => new RoundRobinOverCollection<ServiceEndpointInfo>(g));
}
catch (Exception ex)
{
Log.Error(ex, "[DiscoveryClient] UpdateServiceListInfo. Update local routing table error.");
}
finally
{
_lock.ExitWriteLock();
}
}
public DiscoveryClient(string protocol, string endpoint)
public ServiceEndpointInfo GetService(string serviceKey, string endpoint)
{
_discoveryServerClient = ExchangeTransportFactory.GetClient(protocol, endpoint);
UpdateServiceListInfo();
Sheduller.RemindEvery(TimeSpan.FromSeconds(30), UpdateServiceListInfo);
var key = serviceKey.ToUpperInvariant();
_lock.EnterReadLock();
try
{
if (_tableByKey.ContainsKey(key) && !_tableByKey[key].IsEmpty)
{
return _tableByKey[key].Find(s => s.Endpoint.Equals(endpoint, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();
}
}
finally
{
_lock.ExitReadLock();
}
return null;
}
private void UpdateOrAddRecord(string key, ServiceEndpointsInfo info)
public IEnumerable<ServiceEndpointInfo> GetServiceEndpoints(string serviceKey)
{
var groupName = info.ServiceGroup.ToLowerInvariant();
var typeName = info.ServiceType.ToLowerInvariant();
if (_tableByKey.ContainsKey(key) == false)
var key = serviceKey.Trim().ToLowerInvariant();
_lock.EnterReadLock();
try
{
_tableByKey.TryAdd(key, new RoundRobinCollection<ServiceEndpointInfo>());
if (_tableByKey.ContainsKey(key) && !_tableByKey[key].IsEmpty)
{
return _tableByKey[key].GenerateSeq();
}
}
else
finally
{
_tableByKey[key].Clear();
_lock.ExitReadLock();
}
if (_tableByGroups.ContainsKey(groupName) == false)
return _empty;
}
public IEnumerable<ServiceEndpointInfo> GetServiceEndpointsByGroup(string serviceGroup)
{
var group = serviceGroup.Trim().ToLowerInvariant();
_lock.EnterReadLock();
try
{
_tableByGroups.TryAdd(groupName, new RoundRobinCollection<ServiceEndpointInfo>());
if (_tableByGroups.ContainsKey(group) && !_tableByGroups[group].IsEmpty)
{
return _tableByGroups[group].GenerateSeq();
}
}
if (_tableByTypes.ContainsKey(typeName) == false)
finally
{
_tableByTypes.TryAdd(typeName, new RoundRobinCollection<ServiceEndpointInfo>());
_lock.ExitReadLock();
}
foreach (var e in info.Endpoints)
return _empty;
}
public IEnumerable<ServiceEndpointInfo> GetServiceEndpointsByType(string serviceType)
{
var type = serviceType.Trim().ToLowerInvariant();
_lock.EnterReadLock();
try
{
if (false == _tableByKey[key].Contains(e))
if (_tableByTypes.ContainsKey(type) && !_tableByTypes[type].IsEmpty)
{
_tableByKey[key].Add(e);
_tableByGroups[groupName].Add(e);
_tableByTypes[typeName].Add(e);
return _tableByTypes[type].GenerateSeq();
}
}
finally
{
_lock.ExitReadLock();
}
return _empty;
}
}
public class DiscoveryClient
: IDiscoveryClient
{
private readonly DCRouter _router = new DCRouter();
private readonly IExClient _discoveryServerClient;
public DiscoveryClient(string endpoint)
{
_discoveryServerClient = ExchangeTransportFactory.GetClient(endpoint);
UpdateServiceListInfo();
Sheduller.RemindEvery(TimeSpan.FromSeconds(30), UpdateServiceListInfo);
}
private void UpdateServiceListInfo()
@ -68,50 +144,15 @@ namespace ZeroLevel.Network
{
try
{
var ir = _discoveryServerClient.Request<IEnumerable<ServiceEndpointsInfo>>("services", records =>
{
if (records == null)
{
Log.Warning("[DiscoveryClient] UpdateServiceListInfo. Discrovery response is empty");
return;
}
_lock.EnterWriteLock();
try
{
_tableByGroups.Clear();
_tableByTypes.Clear();
var keysToRemove = new List<string>(_tableByKey.Keys);
foreach (var info in records)
{
var key = info.ServiceKey.Trim().ToLowerInvariant();
UpdateOrAddRecord(key, info);
keysToRemove.Remove(key);
}
foreach (var key in keysToRemove)
{
_tableByKey.TryRemove(key, out RoundRobinCollection<ServiceEndpointInfo> removed);
removed.Dispose();
}
}
catch (Exception ex)
{
Log.Error(ex, "[DiscoveryClient] UpdateServiceListInfo. Update local routing table error.");
}
finally
{
_lock.ExitWriteLock();
}
});
var ir = _discoveryServerClient.Request<IEnumerable<ServiceEndpointsInfo>>("services", records => _router.Update(records));
if (!ir.Success)
{
Log.Warning($"[DiscoveryClient] UpdateServiceListInfo. Error request to inbox 'services'. {ir.Comment}");
return;
}
}
catch (Exception ex)
{
Log.Error(ex, "[DiscoveryClient] UpdateServiceListInfo. Discrovery service response is absent");
return;
}
}
else
@ -150,44 +191,9 @@ namespace ZeroLevel.Network
}
}
public ServiceEndpointInfo GetService(string serviceKey, string endpoint)
{
var key = serviceKey.Trim().ToLowerInvariant();
if (_tableByKey.ContainsKey(key) && _tableByKey[key].MoveNext())
{
return _tableByKey[key].Find(s => s.Endpoint.Equals(endpoint, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();
}
return null;
}
public IEnumerable<ServiceEndpointInfo> GetServiceEndpoints(string serviceKey)
{
var key = serviceKey.Trim().ToLowerInvariant();
if (_tableByKey.ContainsKey(key) && _tableByKey[key].MoveNext())
{
return _tableByKey[key].GetCurrentSeq();
}
return Enumerable.Empty<ServiceEndpointInfo>();
}
public IEnumerable<ServiceEndpointInfo> GetServiceEndpointsByGroup(string serviceGroup)
{
var group = serviceGroup.Trim().ToLowerInvariant();
if (_tableByGroups.ContainsKey(group) && _tableByGroups[group].MoveNext())
{
return _tableByGroups[group].GetCurrentSeq();
}
return Enumerable.Empty<ServiceEndpointInfo>();
}
public IEnumerable<ServiceEndpointInfo> GetServiceEndpointsByType(string serviceType)
{
var type = serviceType.Trim().ToLowerInvariant();
if (_tableByTypes.ContainsKey(type) && _tableByTypes[type].MoveNext())
{
return _tableByTypes[type].GetCurrentSeq();
}
return Enumerable.Empty<ServiceEndpointInfo>();
}
public IEnumerable<ServiceEndpointInfo> GetServiceEndpoints(string serviceKey) => _router.GetServiceEndpoints(serviceKey);
public IEnumerable<ServiceEndpointInfo> GetServiceEndpointsByGroup(string serviceGroup) => _router.GetServiceEndpointsByGroup(serviceGroup);
public IEnumerable<ServiceEndpointInfo> GetServiceEndpointsByType(string serviceType) => _router.GetServiceEndpointsByType(serviceType);
public ServiceEndpointInfo GetService(string serviceKey, string endpoint) => _router.GetService(serviceKey, endpoint);
}
}

@ -36,21 +36,17 @@ namespace ZeroLevel.Network
if (_disposed) throw new ObjectDisposedException("ExServiceHost");
if (service == null) throw new ArgumentNullException(nameof(service));
ValidateService(service);
var key = $"{service.Key}.{service.Protocol}";
if (_services.ContainsKey(key))
if (_services.ContainsKey(service.Key))
{
throw new Exception($"[ExServiceHost] Service {key} already registered");
throw new Exception($"[ExServiceHost] Service {service.Key} already registered");
}
var server = ExchangeTransportFactory.GetServer(service.Protocol);
if (false == _services.TryAdd(key, new MetaService
var server = ExchangeTransportFactory.GetServer();
if (false == _services.TryAdd(service.Key, new MetaService
{
Server = server,
ServiceInfo = new ExServiceInfo
{
Endpoint = $"{server.Endpoint.Address}:{server.Endpoint.Port}",
Protocol = service.Protocol,
ServiceKey = service.Key,
Version = service.Version,
ServiceGroup = service.Group,
@ -81,20 +77,18 @@ namespace ZeroLevel.Network
if (serviceInfo == null) throw new ArgumentNullException(nameof(serviceInfo));
ValidateService(serviceInfo);
var key = $"{serviceInfo.ServiceKey}.{serviceInfo.Protocol}";
if (_services.ContainsKey(key))
if (_services.ContainsKey(serviceInfo.ServiceKey))
{
throw new Exception($"[ExServiceHost] Service {key} already registered");
throw new Exception($"[ExServiceHost] Service {serviceInfo.ServiceKey} already registered");
}
var server = ExchangeTransportFactory.GetServer(serviceInfo.Protocol);
if (false == _services.TryAdd(key, new MetaService
var server = ExchangeTransportFactory.GetServer();
if (false == _services.TryAdd(serviceInfo.ServiceKey, new MetaService
{
Server = server,
ServiceInfo = new ExServiceInfo
{
Endpoint = $"{server.Endpoint.Address}:{server.Endpoint.Port}",
Protocol = serviceInfo.Protocol,
ServiceKey = serviceInfo.ServiceKey,
Version = serviceInfo.Version,
ServiceGroup = serviceInfo.ServiceGroup,
@ -118,10 +112,6 @@ namespace ZeroLevel.Network
private void ValidateService(IExchangeService service)
{
if (string.IsNullOrWhiteSpace(service.Protocol))
{
throw new ArgumentNullException("Service.Protocol");
}
if (string.IsNullOrWhiteSpace(service.Key))
{
throw new ArgumentNullException("Service.Key");
@ -130,10 +120,6 @@ namespace ZeroLevel.Network
private void ValidateService(ExServiceInfo service)
{
if (string.IsNullOrWhiteSpace(service.Protocol))
{
throw new ArgumentNullException("Service.Protocol");
}
if (string.IsNullOrWhiteSpace(service.ServiceKey))
{
throw new ArgumentNullException("ServiceKey");
@ -163,47 +149,47 @@ namespace ZeroLevel.Network
{
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerHandler.MakeGenericMethod(firstArgType);
genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_MESSAGE_INBOX, CreateDelegate(mi, service) });
genericMethod.Invoke(this, new object[] { ZBaseNetwork.DEFAULT_MESSAGE_INBOX, CreateDelegate(mi, service) });
}
else if (attr.GetType() == typeof(ExchangeHandlerAttribute))
{
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerHandler.MakeGenericMethod(firstArgType);
genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeHandlerAttribute).Inbox, CreateDelegate(mi, service) });
genericMethod.Invoke(this, new object[] { (attr as ExchangeHandlerAttribute).Inbox, CreateDelegate(mi, service) });
}
else if (attr.GetType() == typeof(ExchangeMainReplierAttribute))
{
var returnType = mi.ReturnType;
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerReplier.MakeGenericMethod(firstArgType, returnType);
genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) });
genericMethod.Invoke(this, new object[] { ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) });
}
else if (attr.GetType() == typeof(ExchangeReplierAttribute))
{
var returnType = mi.ReturnType;
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerReplier.MakeGenericMethod(firstArgType, returnType);
genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) });
genericMethod.Invoke(this, new object[] { (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) });
}
else if (attr.GetType() == typeof(ExchangeMainReplierWithoutArgAttribute))
{
var returnType = mi.ReturnType;
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerReplierWithNoRequestBody.MakeGenericMethod(returnType);
genericMethod.Invoke(this, new object[] { service.Protocol, ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) });
genericMethod.Invoke(this, new object[] { ZBaseNetwork.DEFAULT_REQUEST_INBOX, CreateDelegate(mi, service) });
}
else if (attr.GetType() == typeof(ExchangeReplierWithoutArgAttribute))
{
var returnType = mi.ReturnType;
var firstArgType = mi.GetParameters().First().ParameterType;
MethodInfo genericMethod = registerReplierWithNoRequestBody.MakeGenericMethod(returnType);
genericMethod.Invoke(this, new object[] { service.Protocol, (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) });
genericMethod.Invoke(this, new object[] { (attr as ExchangeReplierAttribute).Inbox, CreateDelegate(mi, service) });
}
}
}
catch (Exception ex)
{
Log.Debug($"[ZExchange] Can't register method {mi.Name} as inbox handler. {ex.ToString()}");
Log.Debug($"[ZExchange] Can't register method {mi.Name} as inbox handler. {ex}");
}
}
}
@ -265,7 +251,7 @@ namespace ZeroLevel.Network
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Register inbox handler error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
Log.SystemError(ex, $"[Exchange] Register inbox handler error. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
}
}
@ -286,7 +272,7 @@ namespace ZeroLevel.Network
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Register inbox replier error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
Log.SystemError(ex, $"[Exchange] Register inbox replier error. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
}
}
@ -306,7 +292,7 @@ namespace ZeroLevel.Network
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange] Register inbox replier error. Protocol '{meta.ServiceInfo.Protocol}'. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
Log.SystemError(ex, $"[Exchange] Register inbox replier error. Inbox '{inbox}'. Service '{meta.ServiceInfo.ServiceKey}'");
}
}
@ -344,11 +330,11 @@ namespace ZeroLevel.Network
IExClient transport;
try
{
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
transport = ExchangeTransportFactory.GetClient(service.Endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, "[ExServiceHost] Can't get transport for protocol '{0}', service '{1}'", service.Protocol, serviceKey);
Log.SystemError(ex, $"[ExServiceHost] Can't get transport for service '{serviceKey}'");
continue;
}
try
@ -389,11 +375,11 @@ namespace ZeroLevel.Network
IExClient transport;
try
{
transport = ExchangeTransportFactory.GetClient(candidate.Protocol, candidate.Endpoint);
transport = ExchangeTransportFactory.GetClient(candidate.Endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ExServiceHost] Can't get transport for protocol '{candidate.Protocol}', service '{serviceKey}'");
Log.SystemError(ex, $"[ExServiceHost] Can't get transport for service '{serviceKey}'");
return false;
}
return callHandler(transport);
@ -420,11 +406,11 @@ namespace ZeroLevel.Network
IExClient transport;
try
{
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
transport = ExchangeTransportFactory.GetClient(service.Endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint);
Log.SystemError(ex, $"[Exchange] Can't get transport for endpoint '{service.Endpoint}'");
continue;
}
yield return transport;
@ -458,11 +444,11 @@ namespace ZeroLevel.Network
IExClient transport;
try
{
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
transport = ExchangeTransportFactory.GetClient(service.Endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint);
Log.SystemError(ex, $"[Exchange] Can't get transport for endpoint '{service.Endpoint}'");
continue;
}
yield return transport;
@ -496,11 +482,11 @@ namespace ZeroLevel.Network
IExClient transport;
try
{
transport = ExchangeTransportFactory.GetClient(service.Protocol, service.Endpoint);
transport = ExchangeTransportFactory.GetClient(service.Endpoint);
}
catch (Exception ex)
{
Log.SystemError(ex, "[Exchange] Can't get transport for protocol '{0}', endpoint '{1}'", service.Protocol, service.Endpoint);
Log.SystemError(ex, $"[Exchange] Can't get transport for endpoint '{service.Endpoint}'");
continue;
}
yield return transport;

@ -48,8 +48,11 @@ namespace ZeroLevel.Network
/// </summary>
public const int MAX_SEND_QUEUE_SIZE = 1024;
protected ZTransportStatus _status = ZTransportStatus.Initialized;
public ZTransportStatus Status { get { return _status; } }
private ZTransportStatus _socket_status = ZTransportStatus.Initialized;
protected void Broken() => _socket_status = _socket_status == ZTransportStatus.Disposed ? _socket_status : ZTransportStatus.Broken;
protected void Disposed() => _socket_status = ZTransportStatus.Disposed;
protected void Working() => _socket_status = _socket_status == ZTransportStatus.Disposed ? _socket_status : ZTransportStatus.Working;
public ZTransportStatus Status { get { return _socket_status; } }
public abstract void Dispose();
}

@ -114,6 +114,7 @@ namespace ZeroLevel.Network
private long _last_rw_time = DateTime.UtcNow.Ticks;
private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
private readonly object _reconnection_lock = new object();
private readonly BlockingCollection<Frame> _send_queue = new BlockingCollection<Frame>();
private readonly RequestBuffer _requests = new RequestBuffer();
@ -131,7 +132,6 @@ namespace ZeroLevel.Network
public ZSocketClient(IPEndPoint ep)
{
_status = ZTransportStatus.Initialized;
_endpoint = ep;
_parser.OnIncomingFrame += _parser_OnIncomingFrame;
@ -152,6 +152,7 @@ namespace ZeroLevel.Network
}
catch
{
Broken();
return;
}
_requests.TestForTimeouts();
@ -168,7 +169,7 @@ namespace ZeroLevel.Network
{
var port = (_clientSocket.LocalEndPoint as IPEndPoint)?.Port;
Log.Debug($"[ZClient] server disconnected, because last data was more thas {diff_request_ms} ms ago. Client port {port}");
_status = ZTransportStatus.Broken;
Broken();
}
}
@ -221,7 +222,7 @@ namespace ZeroLevel.Network
_parser.Push(_buffer, 0, count);
_last_rw_time = DateTime.UtcNow.Ticks;
}
if (_status == ZTransportStatus.Working)
if (Status == ZTransportStatus.Working)
{
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
@ -233,7 +234,7 @@ namespace ZeroLevel.Network
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Error read data");
_status = ZTransportStatus.Broken;
Broken();
OnDisconnect();
}
}
@ -241,22 +242,24 @@ namespace ZeroLevel.Network
private void SendFramesJob()
{
Frame frame = null;
while (_status != ZTransportStatus.Disposed)
while (Status != ZTransportStatus.Disposed)
{
if (_send_queue.IsCompleted)
{
return;
}
if (_status != ZTransportStatus.Working)
if (Status != ZTransportStatus.Working)
{
Thread.Sleep(100);
try
{
EnsureConnection();
}
catch
catch (Exception ex)
{
Log.SystemError(ex, "[ZSocketClient] Send next frame fault");
}
if (Status == ZTransportStatus.Disposed) return;
continue;
}
try
@ -277,7 +280,7 @@ namespace ZeroLevel.Network
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Backward send error.");
_status = ZTransportStatus.Broken;
Broken();
OnDisconnect();
}
finally
@ -293,7 +296,14 @@ namespace ZeroLevel.Network
private bool TryConnect()
{
if (_status == ZTransportStatus.Working) return true;
if (Status == ZTransportStatus.Working)
{
return true;
}
if (Status == ZTransportStatus.Disposed)
{
return false;
}
if (_clientSocket != null)
{
try
@ -319,10 +329,10 @@ namespace ZeroLevel.Network
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketClient] Connection fault");
_status = ZTransportStatus.Broken;
Broken();
return false;
}
_status = ZTransportStatus.Working;
Working();
OnConnect();
return true;
}
@ -379,7 +389,7 @@ namespace ZeroLevel.Network
catch (Exception ex)
{
fail?.Invoke(ex.Message);
_status = ZTransportStatus.Broken;
Broken();
OnDisconnect();
Log.SystemError(ex, $"[ZSocketClient] Request error. Frame '{frame.FrameId}'. Inbox '{frame.Inbox}'");
}
@ -401,15 +411,11 @@ namespace ZeroLevel.Network
public override void Dispose()
{
if (_status == ZTransportStatus.Disposed)
{
return;
}
if (_status == ZTransportStatus.Working)
if (Status == ZTransportStatus.Working)
{
OnDisconnect();
}
_status = ZTransportStatus.Disposed;
Disposed();
Sheduller.Remove(_heartbeat_key);
_stream?.Close();
_stream?.Dispose();

@ -81,7 +81,7 @@ namespace ZeroLevel.Network
private void BeginAcceptCallback(IAsyncResult ar)
{
if (_status == ZTransportStatus.Working)
if (Status == ZTransportStatus.Working)
{
try
{
@ -95,7 +95,7 @@ namespace ZeroLevel.Network
}
catch (Exception ex)
{
_status = ZTransportStatus.Broken;
Broken();
Log.SystemError(ex, $"[ZSocketServer] Error with connect accepting");
}
finally
@ -130,7 +130,7 @@ namespace ZeroLevel.Network
_serverSocket.Bind(endpoint);
_serverSocket.Listen(100);
_heartbeat_task = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat);
_status = ZTransportStatus.Working;
Working();
_serverSocket.BeginAccept(BeginAcceptCallback, null);
}
@ -140,12 +140,12 @@ namespace ZeroLevel.Network
public override void Dispose()
{
if (_status == ZTransportStatus.Disposed)
if (Status == ZTransportStatus.Disposed)
{
return;
}
Sheduller.Remove(_heartbeat_task);
_status = ZTransportStatus.Disposed;
Disposed();
_serverSocket.Close();
_serverSocket.Dispose();
try

@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Threading;
@ -42,7 +43,7 @@ namespace ZeroLevel.Network
_parser = new FrameParser();
_parser.OnIncomingFrame += _parser_OnIncomingFrame;
_status = ZTransportStatus.Working;
Working();
_sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true;
@ -53,7 +54,7 @@ namespace ZeroLevel.Network
public void SendBackward(Frame frame)
{
if (frame != null && _status == ZTransportStatus.Working && false == _send_queue.IsCompleted && false == _send_queue.IsAddingCompleted)
if (frame != null && Status == ZTransportStatus.Working && false == _send_queue.IsCompleted && false == _send_queue.IsAddingCompleted)
{
var data = MessageSerializer.Serialize(frame);
try
@ -74,7 +75,7 @@ namespace ZeroLevel.Network
private void SendFramesJob()
{
byte[] data;
while (_status == ZTransportStatus.Working)
while (Status == ZTransportStatus.Working)
{
if (_send_queue.IsCompleted)
{
@ -95,7 +96,7 @@ namespace ZeroLevel.Network
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Backward send error.");
_status = ZTransportStatus.Broken;
Broken();
RizeConnectionBrokenEvent();
}
}
@ -111,7 +112,7 @@ namespace ZeroLevel.Network
_parser.Push(_buffer, 0, count);
_last_rw_time = DateTime.UtcNow.Ticks;
}
if (_status == ZTransportStatus.Working)
if (Status == ZTransportStatus.Working)
{
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
@ -123,7 +124,7 @@ namespace ZeroLevel.Network
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Error read data");
_status = ZTransportStatus.Broken;
Broken();
RizeConnectionBrokenEvent();
}
}
@ -140,7 +141,7 @@ namespace ZeroLevel.Network
Frame response;
try
{
response = _requestor(frame, this);
response = _requestor?.Invoke(frame, this);
}
catch (Exception ex)
{
@ -160,7 +161,7 @@ namespace ZeroLevel.Network
{
try
{
_handler(frame, this);
_handler?.Invoke(frame, this);
}
catch (Exception ex)
{
@ -175,11 +176,11 @@ namespace ZeroLevel.Network
public override void Dispose()
{
if (_status == ZTransportStatus.Disposed)
if (Status == ZTransportStatus.Disposed)
{
return;
}
_status = ZTransportStatus.Disposed;
Disposed();
_send_queue.CompleteAdding();
_send_queue.Dispose();

@ -13,6 +13,21 @@ namespace ZeroLevel.Services.Reflection
/// </summary>
public static class TypeHelpers
{
public static bool IsAssignableToGenericType(Type givenType, Type genericType)
{
var interfaceTypes = givenType.GetInterfaces();
foreach (var it in interfaceTypes)
{
if (it.IsGenericType && it.GetGenericTypeDefinition() == genericType)
return true;
}
if (givenType.IsGenericType && givenType.GetGenericTypeDefinition() == genericType)
return true;
Type baseType = givenType.BaseType;
if (baseType == null) return false;
return IsAssignableToGenericType(baseType, genericType);
}
public static bool IsArray(Type type)
{
return type.Return(t => t.IsArray, false);

@ -14,6 +14,8 @@ namespace ZeroLevel.Services.Serialization
Double ReadDouble();
float ReadFloat();
Int32 ReadInt32();
Int64 ReadLong();
@ -52,6 +54,12 @@ namespace ZeroLevel.Services.Serialization
List<Double> ReadDoubleCollection();
List<Decimal> ReadDecimalCollection();
List<TimeSpan> ReadTimeSpanCollection();
List<float> ReadFloatCollection();
List<bool> ReadBooleanCollection();
List<byte> ReadByteCollection();

@ -16,6 +16,8 @@ namespace ZeroLevel.Services.Serialization
void WriteDouble(double val);
void WriteFloat(float val);
void WriteInt32(Int32 number);
void WriteLong(Int64 number);
@ -51,6 +53,12 @@ namespace ZeroLevel.Services.Serialization
void WriteCollection(IEnumerable<Double> collection);
void WriteCollection(IEnumerable<Decimal> collection);
void WriteCollection(IEnumerable<TimeSpan> collection);
void WriteCollection(IEnumerable<float> collection);
void WriteCollection(IEnumerable<bool> collection);
void WriteCollection(IEnumerable<byte> collection);

@ -81,6 +81,12 @@ namespace ZeroLevel.Services.Serialization
return new TimeSpan(ReadLong());
}
public float ReadFloat()
{
var buffer = ReadBuffer(4);
return BitConverter.ToSingle(buffer, 0);
}
public double ReadDouble()
{
var buffer = ReadBuffer(8);
@ -287,6 +293,20 @@ namespace ZeroLevel.Services.Serialization
return collection;
}
public List<float> ReadFloatCollection()
{
int count = ReadInt32();
var collection = new List<float>(count);
if (count > 0)
{
for (int i = 0; i < count; i++)
{
collection.Add(ReadFloat());
}
}
return collection;
}
public List<Double> ReadDoubleCollection()
{
int count = ReadInt32();
@ -343,6 +363,34 @@ namespace ZeroLevel.Services.Serialization
return collection;
}
public List<decimal> ReadDecimalCollection()
{
int count = ReadInt32();
var collection = new List<decimal>(count);
if (count > 0)
{
for (int i = 0; i < count; i++)
{
collection.Add(ReadDecimal());
}
}
return collection;
}
public List<TimeSpan> ReadTimeSpanCollection()
{
int count = ReadInt32();
var collection = new List<TimeSpan>(count);
if (count > 0)
{
for (int i = 0; i < count; i++)
{
collection.Add(ReadTimeSpan());
}
}
return collection;
}
#endregion Extensions
public void Dispose()

@ -93,6 +93,11 @@ namespace ZeroLevel.Services.Serialization
_stream.Write(BitConverter.GetBytes(val), 0, 8);
}
public void WriteFloat(float val)
{
_stream.Write(BitConverter.GetBytes(val), 0, 4);
}
/// <summary>
/// Write string (4 bytes long + Length bytes)
/// </summary>
@ -271,6 +276,18 @@ namespace ZeroLevel.Services.Serialization
}
}
public void WriteCollection(IEnumerable<float> collection)
{
WriteInt32(collection?.Count() ?? 0);
if (collection != null)
{
foreach (var item in collection)
{
WriteFloat(item);
}
}
}
public void WriteCollection(IEnumerable<Double> collection)
{
WriteInt32(collection?.Count() ?? 0);
@ -324,6 +341,30 @@ namespace ZeroLevel.Services.Serialization
WriteBytes(MessageSerializer.SerializeCompatible(item));
}
public void WriteCollection(IEnumerable<decimal> collection)
{
WriteInt32(collection?.Count() ?? 0);
if (collection != null)
{
foreach (var item in collection)
{
WriteDecimal(item);
}
}
}
public void WriteCollection(IEnumerable<TimeSpan> collection)
{
WriteInt32(collection?.Count() ?? 0);
if (collection != null)
{
foreach (var item in collection)
{
WriteTimeSpan(item);
}
}
}
#endregion Extension
}
}

@ -1,13 +1,10 @@
using System;
using System.Collections.Generic;
using ZeroLevel.Services.Reflection;
namespace ZeroLevel.Services.Serialization
{
public static class MessageSerializer
{
private readonly static Type _wgt = typeof(SerializedObjectWrapper<>);
public static T Deserialize<T>(byte[] data)
where T : IBinarySerializable
{
@ -77,94 +74,6 @@ namespace ZeroLevel.Services.Serialization
}
}
public static bool TrySerialize<T>(T obj, out byte[] data)
{
if (null == obj)
{
data = null;
return false;
}
try
{
var direct_seriazlizable = (obj as IBinarySerializable);
if (direct_seriazlizable != null)
{
using (var writer = new MemoryStreamWriter())
{
direct_seriazlizable.Serialize(writer);
data = writer.Complete();
return true;
}
}
}
catch (Exception ex)
{
Log.Debug($"[MessageSerializer] Fault direct serialization object.\r\n{ex.ToString()}");
data = null;
return false;
}
try
{
var wrapper = new SerializedObjectWrapper<T>(obj);
using (var writer = new MemoryStreamWriter())
{
wrapper.Serialize(writer);
data = writer.Complete();
return true;
}
}
catch (Exception ex)
{
Log.Debug($"[MessageSerializer] Can't serialize object. {ex.Message}");
}
data = null;
return false;
}
public static bool TryDeserialize<T>(byte[] data, out T result)
{
if (data == null || data.Length == 0)
{
result = default(T);
return false;
}
try
{
if (typeof(IBinarySerializable).IsAssignableFrom(typeof(T)))
{
using (var reader = new MemoryStreamReader(data))
{
var direct = (IBinarySerializable)Activator.CreateInstance<T>();
direct.Deserialize(reader);
result = (T)direct;
return true;
}
}
}
catch (Exception ex)
{
Log.Debug($"[MessageSerializer] Fault direct deserialization object.\r\n{ex.ToString()}");
result = default(T);
return false;
}
try
{
var wrapper = new SerializedObjectWrapper<T>();
using (var reader = new MemoryStreamReader(data))
{
wrapper.Deserialize(reader);
result = wrapper.Value;
return true;
}
}
catch (Exception ex)
{
Log.Debug($"[MessageSerializer] Can't deserialize object. {ex.Message}");
}
result = default(T);
return false;
}
public static byte[] SerializeCompatible(object obj)
{
var direct_seriazlizable = (obj as IBinarySerializable);
@ -176,11 +85,9 @@ namespace ZeroLevel.Services.Serialization
return writer.Complete();
}
}
var rt = _wgt.MakeGenericType(obj.GetType());
var instance = (IBinarySerializable)Activator.CreateInstance(rt, new object[] { obj });
using (var writer = new MemoryStreamWriter())
{
instance.Serialize(writer);
PrimitiveTypeSerializer.Serialize(writer, obj);
return writer.Complete();
}
}
@ -196,10 +103,9 @@ namespace ZeroLevel.Services.Serialization
return writer.Complete();
}
}
var wrapper = new SerializedObjectWrapper<T>(obj);
using (var writer = new MemoryStreamWriter())
{
wrapper.Serialize(writer);
PrimitiveTypeSerializer.Serialize<T>(writer, obj);
return writer.Complete();
}
}
@ -215,11 +121,9 @@ namespace ZeroLevel.Services.Serialization
return (T)direct;
}
}
var wrapper = new SerializedObjectWrapper<T>();
using (var reader = new MemoryStreamReader(data))
{
wrapper.Deserialize(reader);
return wrapper.Value;
return PrimitiveTypeSerializer.Deserialize<T>(reader);
}
}
@ -231,9 +135,7 @@ namespace ZeroLevel.Services.Serialization
direct.Deserialize(reader);
return (T)direct;
}
var wrapper = new SerializedObjectWrapper<T>();
wrapper.Deserialize(reader);
return wrapper.Value;
return PrimitiveTypeSerializer.Deserialize<T>(reader);
}
public static object DeserializeCompatible(Type type, byte[] data)
@ -248,13 +150,10 @@ namespace ZeroLevel.Services.Serialization
return direct;
}
}
var rt = _wgt.MakeGenericType(type);
var instance = (IBinarySerializable)Activator.CreateInstance(rt);
using (var reader = new MemoryStreamReader(data))
{
instance.Deserialize(reader);
return PrimitiveTypeSerializer.Deserialize(reader, type);
}
return TypeGetterSetterBuilder.BuildGetter(rt.GetProperty("Value"))(instance);
}
}
}

@ -0,0 +1,336 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Reflection;
using System.Runtime.CompilerServices;
using ZeroLevel.Services.Invokation;
using ZeroLevel.Services.Reflection;
namespace ZeroLevel.Services.Serialization
{
public static class PrimitiveTypeSerializer
{
static PrimitiveTypeSerializer()
{
PreloadCachee();
}
#region Cachee
private class Wrapper
{
public string ReadId;
public string WriteId;
public IInvokeWrapper Invoker;
public T Read<T>(IBinaryReader reader)
{
return (T)Invoker.Invoke(reader, ReadId);
}
public object ReadObject(IBinaryReader reader)
{
return Invoker.Invoke(reader, ReadId);
}
public void Write<T>(IBinaryWriter writer, T value)
{
Invoker.Invoke(writer, WriteId, new object[] { value });
}
public void WriteObject(IBinaryWriter writer, object value)
{
Invoker.Invoke(writer, WriteId, new object[] { value });
}
}
private readonly static Dictionary<Type, Wrapper> _cachee = new Dictionary<Type, Wrapper>();
private readonly static Dictionary<Type, Type> _enumTypesCachee = new Dictionary<Type, Type>();
private static void PreloadCachee()
{
_cachee.Add(typeof(Boolean), Create<Boolean>());
_cachee.Add(typeof(Byte), Create<Byte>());
_cachee.Add(typeof(Byte[]), Create<Byte[]>());
_cachee.Add(typeof(Int32), Create<Int32>());
_cachee.Add(typeof(Int64), Create<Int64>());
_cachee.Add(typeof(Double), Create<Double>());
_cachee.Add(typeof(float), Create<float>());
_cachee.Add(typeof(Decimal), Create<Decimal>());
_cachee.Add(typeof(DateTime), Create<DateTime>());
_cachee.Add(typeof(Guid), Create<Guid>());
_cachee.Add(typeof(String), Create<String>());
_cachee.Add(typeof(TimeSpan), Create<TimeSpan>());
_cachee.Add(typeof(IPEndPoint), Create<IPEndPoint>());
_cachee.Add(typeof(IPAddress), Create<IPAddress>());
_cachee.Add(typeof(IEnumerable<Boolean>), Create<IEnumerable<Boolean>>());
_cachee.Add(typeof(IEnumerable<Byte>), Create<IEnumerable<Byte>>());
_cachee.Add(typeof(IEnumerable<Byte[]>), Create<IEnumerable<Byte[]>>());
_cachee.Add(typeof(IEnumerable<Int32>), Create<IEnumerable<Int32>>());
_cachee.Add(typeof(IEnumerable<Int64>), Create<IEnumerable<Int64>>());
_cachee.Add(typeof(IEnumerable<Double>), Create<IEnumerable<Double>>());
_cachee.Add(typeof(IEnumerable<float>), Create<IEnumerable<float>>());
_cachee.Add(typeof(IEnumerable<Decimal>), Create<IEnumerable<Decimal>>());
_cachee.Add(typeof(IEnumerable<DateTime>), Create<IEnumerable<DateTime>>());
_cachee.Add(typeof(IEnumerable<Guid>), Create<IEnumerable<Guid>>());
_cachee.Add(typeof(IEnumerable<String>), Create<IEnumerable<String>>());
_cachee.Add(typeof(IEnumerable<TimeSpan>), Create<IEnumerable<TimeSpan>>());
_cachee.Add(typeof(IEnumerable<IPEndPoint>), Create<IEnumerable<IPEndPoint>>());
_cachee.Add(typeof(IEnumerable<IPAddress>), Create<IEnumerable<IPAddress>>());
_enumTypesCachee.Add(typeof(Boolean), typeof(IEnumerable<Boolean>));
_enumTypesCachee.Add(typeof(Byte), typeof(IEnumerable<Byte>));
_enumTypesCachee.Add(typeof(Byte[]), typeof(IEnumerable<Byte[]>));
_enumTypesCachee.Add(typeof(Int32), typeof(IEnumerable<Int32>));
_enumTypesCachee.Add(typeof(Int64), typeof(IEnumerable<Int64>));
_enumTypesCachee.Add(typeof(Double), typeof(IEnumerable<Double>));
_enumTypesCachee.Add(typeof(float), typeof(IEnumerable<float>));
_enumTypesCachee.Add(typeof(Decimal), typeof(IEnumerable<Decimal>));
_enumTypesCachee.Add(typeof(DateTime), typeof(IEnumerable<DateTime>));
_enumTypesCachee.Add(typeof(Guid), typeof(IEnumerable<Guid>));
_enumTypesCachee.Add(typeof(String), typeof(IEnumerable<String>));
_enumTypesCachee.Add(typeof(TimeSpan), typeof(IEnumerable<TimeSpan>));
_enumTypesCachee.Add(typeof(IPEndPoint), typeof(IEnumerable<IPEndPoint>));
_enumTypesCachee.Add(typeof(IPAddress), typeof(IEnumerable<IPAddress>));
}
private static Wrapper Create<Tw>()
{
var type = typeof(Tw);
var wrapper = new Wrapper { Invoker = InvokeWrapper.Create() };
if (type == typeof(Int32))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadInt32").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteInt32").First();
}
else if (type == typeof(Boolean))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadBoolean").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteBoolean").First();
}
else if (type == typeof(Byte))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadByte").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteByte").First();
}
else if (type == typeof(Byte[]))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadBytes").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteBytes").First();
}
else if (type == typeof(DateTime))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDateTime").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteDateTime").First();
}
else if (type == typeof(Decimal))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDecimal").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteDecimal").First();
}
else if (type == typeof(Double))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDouble").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteDouble").First();
}
else if (type == typeof(float))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadFloat").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteFloat").First();
}
else if (type == typeof(Guid))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadGuid").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteGuid").First();
}
else if (type == typeof(IPAddress))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadIP").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteIP").First();
}
else if (type == typeof(IPEndPoint))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadIPEndpoint").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteIPEndpoint").First();
}
else if (type == typeof(Int64))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadLong").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteLong").First();
}
else if (type == typeof(String))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadString").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteString").First();
}
else if (type == typeof(TimeSpan))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadTimeSpan").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteTimeSpan").First();
}
//
// Collections
//
else if (type == typeof(IEnumerable<Int32>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadInt32Collection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<Boolean>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadBooleanCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<Byte>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadByteCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<Byte[]>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadByteArrayCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<DateTime>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDateTimeCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<Double>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDoubleCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<float>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadFloatCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<Guid>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadGuidCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<IPAddress>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadIPCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<IPEndPoint>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadIPEndPointCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<Int64>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadInt64Collection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<String>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadStringCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<Decimal>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDecimalCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
else if (type == typeof(IEnumerable<TimeSpan>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadTimeSpanCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate<Tw>()).First();
}
//
// Not supported
//
else
{
throw new NotSupportedException($"Type {type.Name} not supported");
}
return wrapper;
}
private static Func<MethodInfo, bool> CreatePredicate<T>()
{
var typeArg = typeof(T).GetGenericArguments().First();
return mi => mi.Name.Equals("WriteCollection", StringComparison.Ordinal) &&
mi.GetParameters().First().ParameterType.GetGenericArguments().First().IsAssignableFrom(typeArg);
}
private readonly static Dictionary<Type, Wrapper> _concrete_type_cachee = new Dictionary<Type, Wrapper>();
private readonly static object _concrete_type_cachee_locker = new object();
private static Wrapper Find<T>()
{
return Find(typeof(T));
}
private static Wrapper Find(Type type)
{
if (_concrete_type_cachee.ContainsKey(type) == false)
{
lock (_concrete_type_cachee_locker)
{
if (_concrete_type_cachee.ContainsKey(type) == false)
{
if (_cachee.ContainsKey(type))
{
_concrete_type_cachee[type] = _cachee[type];
}
else if (TypeHelpers.IsAssignableToGenericType(type, typeof(IEnumerable<>)))
{
var typeArg = type.GetGenericArguments().First();
if (_enumTypesCachee.ContainsKey(typeArg))
{
_concrete_type_cachee[type] = _cachee[_enumTypesCachee[typeArg]];
}
else if (typeof(IBinarySerializable).IsAssignableFrom(typeArg))
{
var wrapper = new Wrapper { Invoker = InvokeWrapper.Create() };
wrapper.ReadId = wrapper.Invoker.ConfigureGeneric(typeof(MemoryStreamReader), typeArg, "ReadCollection").First();
wrapper.WriteId = wrapper.Invoker.ConfigureGeneric(typeof(MemoryStreamWriter), typeArg,
mi => mi.Name.Equals("WriteCollection") && mi.IsGenericMethod).First();
_concrete_type_cachee[type] = wrapper;
}
}
}
}
}
if (_concrete_type_cachee.ContainsKey(type) == false)
{
throw new NotSupportedException($"Type {type.Name} not supported");
}
return _concrete_type_cachee[type];
}
#endregion Cachee
public static T Deserialize<T>(IBinaryReader reader)
{
var wrapper = Find<T>();
return wrapper.Read<T>(reader);
}
public static void Serialize<T>(IBinaryWriter writer, T value)
{
var wrapper = Find<T>();
wrapper.Write<T>(writer, value);
}
public static object Deserialize(IBinaryReader reader, Type type)
{
var wrapper = Find(type);
return wrapper.ReadObject(reader);
}
public static void Serialize(IBinaryWriter writer, object value)
{
var wrapper = Find(value.GetType());
wrapper.WriteObject(writer, value);
}
}
}

@ -1,227 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Reflection;
using ZeroLevel.Services.Invokation;
namespace ZeroLevel.Services.Serialization
{
public class SerializedObjectWrapper<T>
: IBinarySerializable
{
#region Cachee
private class Wrapper
{
public string ReadId;
public string WriteId;
public IInvokeWrapper Invoker;
public T Read(IBinaryReader reader)
{
return (T)Invoker.Invoke(reader, ReadId);
}
public void Write(IBinaryWriter writer, T value)
{
Invoker.Invoke(writer, WriteId, new object[] { value });
}
}
private readonly static object _creation_lock = new object();
private readonly static Dictionary<Type, Wrapper> _cachee =
new Dictionary<Type, Wrapper>();
private static Wrapper Create<Tw>()
{
var type = typeof(Tw);
if (_cachee.ContainsKey(type) == false)
{
lock (_creation_lock)
{
if (_cachee.ContainsKey(type) == false)
{
var wrapper = new Wrapper { Invoker = InvokeWrapper.Create() };
if (type == typeof(Int32))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadInt32").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteInt32").First();
}
else if (typeof(T) == typeof(Boolean))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadBoolean").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteBoolean").First();
}
else if (typeof(T) == typeof(Byte))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadByte").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteByte").First();
}
else if (typeof(T) == typeof(Byte[]))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadBytes").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteBytes").First();
}
else if (typeof(T) == typeof(DateTime))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDateTime").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteDateTime").First();
}
else if (typeof(T) == typeof(Decimal))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDecimal").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteDecimal").First();
}
else if (typeof(T) == typeof(Double))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDouble").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteDouble").First();
}
else if (typeof(T) == typeof(Guid))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadGuid").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteGuid").First();
}
else if (typeof(T) == typeof(IPAddress))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadIP").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteIP").First();
}
else if (typeof(T) == typeof(IPEndPoint))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadIPEndpoint").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteIPEndpoint").First();
}
else if (typeof(T) == typeof(Int64))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadLong").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteLong").First();
}
else if (typeof(T) == typeof(String))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadString").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteString").First();
}
else if (typeof(T) == typeof(TimeSpan))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadTimeSpan").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), "WriteTimeSpan").First();
}
//
// Collections
//
else if (typeof(T) == typeof(IEnumerable<Int32>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadInt32Collection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
else if (typeof(T) == typeof(IEnumerable<Boolean>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadBooleanCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
else if (typeof(T) == typeof(IEnumerable<Byte>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadByteCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
else if (typeof(T) == typeof(IEnumerable<Byte[]>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadByteArrayCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
else if (typeof(T) == typeof(IEnumerable<DateTime>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDateTimeCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
else if (typeof(T) == typeof(IEnumerable<Double>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadDoubleCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
else if (typeof(T) == typeof(IEnumerable<Guid>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadGuidCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
else if (typeof(T) == typeof(IEnumerable<IPAddress>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadIPCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
else if (typeof(T) == typeof(IEnumerable<IPEndPoint>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadIPEndPointCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
else if (typeof(T) == typeof(IEnumerable<Int64>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadInt64Collection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
else if (typeof(T) == typeof(IEnumerable<String>))
{
wrapper.ReadId = wrapper.Invoker.Configure(typeof(MemoryStreamReader), "ReadStringCollection").First();
wrapper.WriteId = wrapper.Invoker.Configure(typeof(MemoryStreamWriter), CreatePredicate()).First();
}
//
// Generic collection
//
else if (typeof(System.Collections.IEnumerable).IsAssignableFrom(typeof(T)) &&
typeof(IBinarySerializable).IsAssignableFrom(typeof(T).GetGenericArguments().FirstOrDefault()))
{
var typeArg = typeof(T).GetGenericArguments().First();
wrapper.ReadId = wrapper.Invoker.ConfigureGeneric(typeof(MemoryStreamReader), typeArg, "ReadCollection").First();
wrapper.WriteId = wrapper.Invoker.ConfigureGeneric(typeof(MemoryStreamWriter), typeArg,
mi => mi.Name.Equals("WriteCollection") && mi.IsGenericMethod).First();
}
//
// Not supported
//
else
{
throw new NotSupportedException($"Type {typeof(T).Name} not supported");
}
_cachee.Add(type, wrapper);
}
}
}
return _cachee[type];
}
private Wrapper _wrapper;
#endregion Cachee
public SerializedObjectWrapper()
{
_wrapper = Create<T>();
}
public SerializedObjectWrapper(T obj) : this()
{
Value = obj;
}
public T Value { get; set; }
private static Func<MethodInfo, bool> CreatePredicate()
{
return mi => mi.Name.Equals("WriteCollection", StringComparison.Ordinal) &&
mi.GetParameters().First().ParameterType.GetGenericArguments().First() == typeof(T);
}
public void Deserialize(IBinaryReader reader)
{
this.Value = _wrapper.Read(reader);
}
public void Serialize(IBinaryWriter writer)
{
_wrapper.Write(writer, this.Value);
}
}
}

@ -1,60 +0,0 @@
using System;
using System.Collections.Generic;
using System.Net;
namespace ZeroLevel.Services.Serialization
{
public static class SerializedObjectWrapperExtension
{
public static SerializedObjectWrapper<Int32> WrapToSerialized(this Int32 value) => new SerializedObjectWrapper<Int32> { Value = value };
public static SerializedObjectWrapper<Boolean> WrapToSerialized(this Boolean value) => new SerializedObjectWrapper<Boolean> { Value = value };
public static SerializedObjectWrapper<Byte> WrapToSerialized(this Byte value) => new SerializedObjectWrapper<Byte> { Value = value };
public static SerializedObjectWrapper<Byte[]> WrapToSerialized(this Byte[] value) => new SerializedObjectWrapper<Byte[]> { Value = value };
public static SerializedObjectWrapper<DateTime> WrapToSerialized(this DateTime value) => new SerializedObjectWrapper<DateTime> { Value = value };
public static SerializedObjectWrapper<Decimal> WrapToSerialized(this Decimal value) => new SerializedObjectWrapper<Decimal> { Value = value };
public static SerializedObjectWrapper<Double> WrapToSerialized(this Double value) => new SerializedObjectWrapper<Double> { Value = value };
public static SerializedObjectWrapper<Guid> WrapToSerialized(this Guid value) => new SerializedObjectWrapper<Guid> { Value = value };
public static SerializedObjectWrapper<IPAddress> WrapToSerialized(this IPAddress value) => new SerializedObjectWrapper<IPAddress> { Value = value };
public static SerializedObjectWrapper<IPEndPoint> WrapToSerialized(this IPEndPoint value) => new SerializedObjectWrapper<IPEndPoint> { Value = value };
public static SerializedObjectWrapper<Int64> WrapToSerialized(this Int64 value) => new SerializedObjectWrapper<Int64> { Value = value };
public static SerializedObjectWrapper<String> WrapToSerialized(this String value) => new SerializedObjectWrapper<String> { Value = value };
public static SerializedObjectWrapper<TimeSpan> WrapToSerialized(this TimeSpan value) => new SerializedObjectWrapper<TimeSpan> { Value = value };
public static SerializedObjectWrapper<IEnumerable<Int32>> WrapToSerialized(this IEnumerable<Int32> value) => new SerializedObjectWrapper<IEnumerable<Int32>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<Boolean>> WrapToSerialized(this IEnumerable<Boolean> value) => new SerializedObjectWrapper<IEnumerable<Boolean>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<Byte>> WrapToSerialized(this IEnumerable<Byte> value) => new SerializedObjectWrapper<IEnumerable<Byte>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<Byte[]>> WrapToSerialized(this IEnumerable<Byte[]> value) => new SerializedObjectWrapper<IEnumerable<Byte[]>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<DateTime>> WrapToSerialized(this IEnumerable<DateTime> value) => new SerializedObjectWrapper<IEnumerable<DateTime>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<Double>> WrapToSerialized(this IEnumerable<Double> value) => new SerializedObjectWrapper<IEnumerable<Double>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<Guid>> WrapToSerialized(this IEnumerable<Guid> value) => new SerializedObjectWrapper<IEnumerable<Guid>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<IPAddress>> WrapToSerialized(this IEnumerable<IPAddress> value) => new SerializedObjectWrapper<IEnumerable<IPAddress>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<IPEndPoint>> WrapToSerialized(this IEnumerable<IPEndPoint> value) => new SerializedObjectWrapper<IEnumerable<IPEndPoint>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<Int64>> WrapToSerialized(this IEnumerable<Int64> value) => new SerializedObjectWrapper<IEnumerable<Int64>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<String>> WrapToSerialized(this IEnumerable<String> value) => new SerializedObjectWrapper<IEnumerable<String>> { Value = value };
public static SerializedObjectWrapper<IEnumerable<T>> WrapToSerialized<T>(this IEnumerable<T> value) where T : IBinarySerializable
=> new SerializedObjectWrapper<IEnumerable<T>> { Value = value };
}
}

@ -90,6 +90,7 @@
<Compile Include="Services\Collections\EverythingStorage.cs" />
<Compile Include="Services\Collections\IEverythingStorage.cs" />
<Compile Include="Services\Collections\RoundRobinCollection.cs" />
<Compile Include="Services\Collections\RoundRobinOverCollection.cs" />
<Compile Include="Services\DependencyInjection\Container.cs" />
<Compile Include="Services\DependencyInjection\ContainerFactory.cs" />
<Compile Include="Services\DependencyInjection\Contracts\ICompositionProvider.cs" />
@ -330,8 +331,7 @@
<Compile Include="Services\Serialization\MemoryStreamReader.cs" />
<Compile Include="Services\Serialization\MemoryStreamWriter.cs" />
<Compile Include="Services\Serialization\MessageSerializer.cs" />
<Compile Include="Services\Serialization\SerializedObjectWrapper.cs" />
<Compile Include="Services\Serialization\SerializedObjectWrapperExtension.cs" />
<Compile Include="Services\Serialization\PrimitiveTypeSerializer.cs" />
<Compile Include="Services\Shedulling\AsyncShedullerImpl.cs" />
<Compile Include="Services\Shedulling\DateTimeAsyncSheduller.cs" />
<Compile Include="Services\Shedulling\DateTimeSheduller.cs" />

@ -1 +1 @@
44e8c5c8420f166261f61c4c11d319a8acdc2c79
23b05e1da25edde99ed56a899f894605a2243d39

Loading…
Cancel
Save

Powered by TurnKey Linux.