Update network

pull/1/head
Ogoun 5 years ago
parent 9e96c150cb
commit 138cfc652e

@ -32,25 +32,19 @@ namespace TestApp
}); });
Exchange.RoutesStorage.Set("test.app", new IPEndPoint(IPAddress.Loopback, 8800)); Exchange.RoutesStorage.Set("test.app", new IPEndPoint(IPAddress.Loopback, 8800));
using (var waiter = new ManualResetEventSlim(false))
{
while (true) while (true)
{ {
try try
{ {
Exchange.GetConnection("test.app")?.Request<int>("counter", s => //var s = Exchange.Request<int>("test.app", "counter");
{ Interlocked.Add(ref counter, Exchange.Request<int>("test.app", "counter"));
waiter.Set();
Interlocked.Add(ref counter, s);
});
} }
catch catch(Exception ex)
{ {
Log.Error(ex, "Request fault");
Thread.Sleep(300); Thread.Sleep(300);
} }
waiter.Wait();
waiter.Reset();
}
} }
} }

@ -1,6 +1,7 @@
using Newtonsoft.Json; using Newtonsoft.Json;
using System; using System;
using ZeroLevel; using ZeroLevel;
using ZeroLevel.Logging;
using ZeroLevel.Services.Web; using ZeroLevel.Services.Web;
namespace TestApp namespace TestApp
@ -21,17 +22,6 @@ namespace TestApp
private static void Main(string[] args) private static void Main(string[] args)
{ {
var t = new TestQuery
{
Age = 133,
Roles = new[] { "ad\"\"\"min", "user", "operator" },
Name = "su"
};
var builder = new UrlBuilder(Serialize);
var url = builder.BuildRequestUrlFromDTO("http://google.com", "/api/v0/getuserinfo", t);
Console.WriteLine(url);
Console.ReadKey();
/*var fiber = new Fiber(); /*var fiber = new Fiber();
fiber fiber
.Add((s) => { Console.WriteLine("1"); s.Add<int>("1", 1); return s; }) .Add((s) => { Console.WriteLine("1"); s.Add<int>("1", 1); return s; })
@ -51,15 +41,15 @@ namespace TestApp
/*Configuration.Save(Configuration.ReadFromApplicationConfig()); Configuration.Save(Configuration.ReadFromApplicationConfig());
Bootstrap.Startup<MyService>(args, Bootstrap.Startup<MyService>(args,
() => Configuration.ReadSetFromIniFile("config.ini")) () => Configuration.ReadSetFromIniFile("config.ini"))
.EnableConsoleLog(ZeroLevel.Services.Logging.LogLevel.System | ZeroLevel.Services.Logging.LogLevel.FullDebug) .EnableConsoleLog(LogLevel.System | LogLevel.FullDebug)
//.UseDiscovery() //.UseDiscovery()
.Run() .Run()
.WaitWhileStatus(ZeroServiceStatus.Running) .WaitWhileStatus(ZeroServiceStatus.Running)
.Stop(); .Stop();
Bootstrap.Shutdown();*/ Bootstrap.Shutdown();
} }
} }
} }

@ -40,7 +40,7 @@ namespace Source
Interlocked.Increment(ref _proceed); Interlocked.Increment(ref _proceed);
waiter.Set(); waiter.Set();
}); });
if (ir == null || ir.Success == false) if (ir == null || ir == false)
{ {
Thread.Sleep(300); Thread.Sleep(300);
waiter.Set(); waiter.Set();

@ -9,6 +9,7 @@ namespace ZeroLevel.UnitTests
// In developing, not working! // In developing, not working!
public class DumpTests public class DumpTests
{ {
/*
[Fact] [Fact]
public void DumpStorageTest() public void DumpStorageTest()
{ {
@ -59,5 +60,6 @@ namespace ZeroLevel.UnitTests
} }
Assert.True(0 == storage.ReadAndTruncate().ToArray().Length); Assert.True(0 == storage.ReadAndTruncate().ToArray().Length);
} }
*/
} }
} }

@ -37,7 +37,7 @@ namespace ZeroLevel.NetworkUnitTests
locker.WaitOne(1000); locker.WaitOne(1000);
// Assert // Assert
Assert.True(ir.Success); Assert.True(ir);
Assert.True(info.Equals(received)); Assert.True(info.Equals(received));
// Dispose // Dispose
@ -85,7 +85,7 @@ namespace ZeroLevel.NetworkUnitTests
locker.WaitOne(1000); locker.WaitOne(1000);
// Assert // Assert
Assert.True(ir.Success); Assert.True(ir);
Assert.True(CollectionComparsionExtensions.OrderingEquals(new[] { info1, info2 }, received, (a, b) => a.Equals(b))); Assert.True(CollectionComparsionExtensions.OrderingEquals(new[] { info1, info2 }, received, (a, b) => a.Equals(b)));
// Dispose // Dispose

@ -3,7 +3,7 @@ using System.Threading;
using Xunit; using Xunit;
using ZeroLevel.Services.Applications; using ZeroLevel.Services.Applications;
namespace ZeroLevel.UnitTests namespace ZeroLevel.Network
{ {
public class NetworkTest public class NetworkTest
: BaseZeroService : BaseZeroService
@ -13,46 +13,55 @@ namespace ZeroLevel.UnitTests
{ {
// Arrange // Arrange
var server = UseHost(8181); var server = UseHost(8181);
var client = Exchange.GetConnection("127.0.0.1:8181"); Exchange.RoutesStorage.Set("test", NetUtils.CreateIPEndPoint("127.0.0.1:8181"));
bool got_message_no_request = false; bool got_message_no_request = false;
bool got_message_with_request = false; bool got_message_with_request = false;
bool got_response_message_no_request = false; bool got_response_message_no_request = false;
bool got_response_message_with_request = false; bool got_response_message_with_request = false;
using (var signal = new ManualResetEvent(false)) server.RegisterInbox("empty", (_) =>
{ {
server.RegisterInbox("empty", (_) => { signal.Set(); got_message_no_request = true; }); got_message_no_request = true;
server.RegisterInbox<string>((_, ___) => { signal.Set(); got_message_with_request = true; }); });
server.RegisterInbox<string>((_, ___) =>
{
got_message_with_request = true;
});
server.RegisterInbox<string>("get_response", (_) => "Hello"); server.RegisterInbox<string>("get_response", (_) => "Hello");
server.RegisterInbox<int, string>("convert", (__, num) => num.ToString()); server.RegisterInbox<int, string>("convert", (__, num) => num.ToString());
// Act Thread.Sleep(200);
signal.Reset();
client.Send("empty");
signal.WaitOne(1000);
signal.Reset();
client.Send<string>("hello");
signal.WaitOne(100);
signal.Reset(); Assert.True(Exchange.Peek("test", "empty"));
client.Request<string>("get_response", (s) => { signal.Set(); got_response_message_no_request = s.Equals("Hello", StringComparison.Ordinal); }); Assert.True(Exchange.Send<string>("test", "hello"));
signal.WaitOne(1000);
signal.Reset(); int repeat = 10;
client.Request<int, string>("convert", 10, (s) => { signal.Set(); got_response_message_with_request = s.Equals("10", StringComparison.Ordinal); }); while (!got_message_no_request || !got_message_with_request)
signal.WaitOne(1000); {
Thread.Sleep(200);
repeat--;
if (repeat == 0) break;
} }
// Assert // Assert
Assert.True(got_message_no_request, "No signal for no request default inbox"); Assert.True(got_message_no_request, "No signal for no request default inbox");
Assert.True(got_message_with_request, "No signal for default inbox"); Assert.True(got_message_with_request, "No signal for default inbox");
for (int i = 0; i < 100; i++)
{
got_response_message_no_request =
Exchange.Request<string>("test", "get_response")
.Equals("Hello", StringComparison.Ordinal);
got_response_message_with_request =
Exchange.Request<int, string>("test", "convert", 10)
.Equals("10", StringComparison.Ordinal);
Assert.True(got_response_message_no_request, "No response without request"); Assert.True(got_response_message_no_request, "No response without request");
Assert.True(got_response_message_with_request, "No response with request"); Assert.True(got_response_message_with_request, "No response with request");
} }
}
protected override void StartAction() protected override void StartAction()
{ {

@ -1,11 +1,8 @@
using System; using System.Linq;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Xunit; using Xunit;
using ZeroLevel.Services.Semantic; using ZeroLevel.Services.Semantic;
namespace ZeroLevel.UnitTests namespace ZeroLevel.Semantic
{ {
public class SemanticTests public class SemanticTests
{ {

@ -2,7 +2,7 @@
using ZeroLevel.Services.Semantic; using ZeroLevel.Services.Semantic;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
namespace ZeroLevel.UnitTests namespace ZeroLevel.Tries
{ {
public class TrieTests public class TrieTests
{ {

@ -99,6 +99,8 @@ namespace ZeroLevel.Models
#region Ctor #region Ctor
public InvokeResult() { }
public InvokeResult(bool success, string comment) public InvokeResult(bool success, string comment)
{ {
Success = success; Success = success;

@ -1,6 +1,5 @@
using System; using System;
using System.Net; using System.Net;
using ZeroLevel.Models;
namespace ZeroLevel.Network namespace ZeroLevel.Network
{ {
@ -12,15 +11,14 @@ namespace ZeroLevel.Network
IRouter Router { get; } IRouter Router { get; }
ISocketClient Socket { get; } ISocketClient Socket { get; }
InvokeResult Send<T>(T message); bool Send<T>(T message);
bool Send(string inbox);
bool Send(string inbox, byte[] data);
bool Send<T>(string inbox, T message);
InvokeResult Send(string inbox); bool Request(string inbox, Action<byte[]> callback);
InvokeResult Send(string inbox, byte[] data); bool Request(string inbox, byte[] data, Action<byte[]> callback);
InvokeResult Send<T>(string inbox, T message); bool Request<Tresponse>(string inbox, Action<Tresponse> callback);
bool Request<Trequest, Tresponse>(string inbox, Trequest request, Action<Tresponse> callback);
InvokeResult Request(string inbox, Action<byte[]> callback);
InvokeResult Request(string inbox, byte[] data, Action<byte[]> callback);
InvokeResult Request<Tresponse>(string inbox, Action<Tresponse> callback);
InvokeResult Request<Trequest, Tresponse>(string inbox, Trequest request, Action<Tresponse> callback);
} }
} }

@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using ZeroLevel.Models;
namespace ZeroLevel.Network namespace ZeroLevel.Network
{ {
@ -8,6 +7,9 @@ namespace ZeroLevel.Network
{ {
bool Send<T>(string alias, T data); bool Send<T>(string alias, T data);
bool Send<T>(string alias, string inbox, T data); bool Send<T>(string alias, string inbox, T data);
bool Peek(string alias, string inbox);
bool Request<Tresponse>(string alias, Action<Tresponse> callback); bool Request<Tresponse>(string alias, Action<Tresponse> callback);
bool Request<Tresponse>(string alias, string inbox, Action<Tresponse> callback); bool Request<Tresponse>(string alias, string inbox, Action<Tresponse> callback);
bool Request<Trequest, Tresponse>(string alias, Trequest request, Action<Tresponse> callback); bool Request<Trequest, Tresponse>(string alias, Trequest request, Action<Tresponse> callback);

@ -1,6 +1,5 @@
using System; using System;
using System.Net; using System.Net;
using ZeroLevel.Models;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network namespace ZeroLevel.Network
@ -19,146 +18,146 @@ namespace ZeroLevel.Network
_client = client; _client = client;
} }
public InvokeResult Send(string inbox) public bool Send(string inbox)
{ {
try try
{ {
_client.Send(Frame.FromPool(inbox)); _client.Send(Frame.FromPool(inbox));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "[NetworkNode.Send]"); Log.Error(ex, "[NetworkNode.Send(inbox)]");
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding(); return false;
} }
public InvokeResult Send(string inbox, byte[] data) public bool Send(string inbox, byte[] data)
{ {
try try
{ {
_client.Send(Frame.FromPool(inbox, data)); _client.Send(Frame.FromPool(inbox, data));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "[NetworkNode.Send]"); Log.Error(ex, "[NetworkNode.Send(inbox, data)]");
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding(); return false;
} }
public InvokeResult Send<T>(T message) public bool Send<T>(T message)
{ {
try try
{ {
_client.Send(Frame.FromPool(BaseSocket.DEFAULT_MESSAGE_INBOX, MessageSerializer.SerializeCompatible<T>(message))); _client.Send(Frame.FromPool(BaseSocket.DEFAULT_MESSAGE_INBOX, MessageSerializer.SerializeCompatible<T>(message)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "[NetworkNode.Send]"); Log.Error(ex, "[NetworkNode.Send(message)]");
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding(); return false;
} }
public InvokeResult Send<T>(string inbox, T message) public bool Send<T>(string inbox, T message)
{ {
try try
{ {
_client.Send(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible<T>(message))); _client.Send(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible<T>(message)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "[NetworkNode.Send]"); Log.Error(ex, "[NetworkNode.Send(inbox, message)]");
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding(); return false;
} }
public InvokeResult Request(string inbox, Action<byte[]> callback) public bool Request(string inbox, Action<byte[]> callback)
{ {
try try
{ {
_client.Request(Frame.FromPool(inbox), f => callback(f)); _client.Request(Frame.FromPool(inbox), f => callback(f));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "[NetworkNode.Request]"); Log.Error(ex, "[NetworkNode.Request(inbox, callback)]");
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding(); return false;
} }
public InvokeResult Request(string inbox, byte[] data, Action<byte[]> callback) public bool Request(string inbox, byte[] data, Action<byte[]> callback)
{ {
try try
{ {
_client.Request(Frame.FromPool(inbox, data), f => callback(f)); _client.Request(Frame.FromPool(inbox, data), f => callback(f));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "[NetworkNode.Request]"); Log.Error(ex, "[NetworkNode.Request(inbox, data, callback)]");
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding(); return false;
} }
public InvokeResult Request<Tresponse>(string inbox, Action<Tresponse> callback) public bool Request<Tresponse>(string inbox, Action<Tresponse> callback)
{ {
try try
{ {
_client.Request(Frame.FromPool(inbox), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f))); _client.Request(Frame.FromPool(inbox), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "[NetworkNode.Request]"); Log.Error(ex, "[NetworkNode.Request(inbox, callback)]");
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding(); return false;
} }
public InvokeResult Request<Tresponse>(Action<Tresponse> callback) public bool Request<Tresponse>(Action<Tresponse> callback)
{ {
try try
{ {
_client.Request(Frame.FromPool(BaseSocket.DEFAULT_REQUEST_INBOX), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f))); _client.Request(Frame.FromPool(BaseSocket.DEFAULT_REQUEST_INBOX), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "[NetworkNode.Request]"); Log.Error(ex, "[NetworkNode.Request(callback)]");
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding(); return false;
} }
public InvokeResult Request<Trequest, Tresponse>(string inbox, Trequest request, Action<Tresponse> callback) public bool Request<Trequest, Tresponse>(string inbox, Trequest request, Action<Tresponse> callback)
{ {
try try
{ {
_client.Request(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible<Trequest>(request)), _client.Request(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible<Trequest>(request)),
f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f))); f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "[NetworkNode.Request]"); Log.Error(ex, "[NetworkNode.Request(inbox, request, callback)]");
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding(); return false;
} }
public InvokeResult Request<Trequest, Tresponse>(Trequest request, Action<Tresponse> callback) public bool Request<Trequest, Tresponse>(Trequest request, Action<Tresponse> callback)
{ {
try try
{ {
_client.Request(Frame.FromPool(BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, MessageSerializer.SerializeCompatible<Trequest>(request)), _client.Request(Frame.FromPool(BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, MessageSerializer.SerializeCompatible<Trequest>(request)),
f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f))); f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.Error(ex, "[NetworkNode.Request]"); Log.Error(ex, "[NetworkNode.Request(request, callback)]");
return InvokeResult.Fault(ex.Message);
} }
return InvokeResult.Succeeding(); return false;
} }
public void Dispose() public void Dispose()

@ -32,7 +32,10 @@ namespace ZeroLevel.Network
#endregion Ctor #endregion Ctor
#region IMultiClient #region IMultiClient
public bool Peek(string alias, string inbox)
{
return CallService(alias, (transport) => transport.Send(inbox));
}
/// <summary> /// <summary>
/// Sending a message to the service /// Sending a message to the service
/// </summary> /// </summary>
@ -41,7 +44,7 @@ namespace ZeroLevel.Network
/// <returns></returns> /// <returns></returns>
public bool Send<T>(string alias, T data) public bool Send<T>(string alias, T data)
{ {
return CallService(alias, (transport) => transport.Send<T>(BaseSocket.DEFAULT_MESSAGE_INBOX, data).Success); return CallService(alias, (transport) => transport.Send<T>(BaseSocket.DEFAULT_MESSAGE_INBOX, data));
} }
/// <summary> /// <summary>
@ -53,7 +56,7 @@ namespace ZeroLevel.Network
/// <returns></returns> /// <returns></returns>
public bool Send<T>(string alias, string inbox, T data) public bool Send<T>(string alias, string inbox, T data)
{ {
return CallService(alias, (transport) => transport.Send<T>(inbox, data).Success); return CallService(alias, (transport) => transport.Send<T>(inbox, data));
} }
/// <summary> /// <summary>
@ -75,15 +78,19 @@ namespace ZeroLevel.Network
/// <returns>true - on successful submission</returns> /// <returns>true - on successful submission</returns>
public bool SendBroadcast<T>(string alias, string inbox, T data) public bool SendBroadcast<T>(string alias, string inbox, T data)
{ {
var result = false;
try try
{ {
foreach (var client in GetClientEnumerator(alias)) var clients = GetClientEnumerator(alias).ToArray();
var tasks = new Task[clients.Length];
int index = 0;
foreach (var client in clients)
{ {
Task.Run(() => tasks[index++] = Task.Run(() =>
{ {
try try
{ {
client.Send(inbox, data); result |= client.Send(inbox, data);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -91,12 +98,13 @@ namespace ZeroLevel.Network
} }
}); });
} }
Task.WaitAll(tasks);
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.SystemError(ex, $"[Exchange.SendBroadcast] Error broadcast send data in service '{alias}'. Inbox '{inbox}'"); Log.SystemError(ex, $"[Exchange.SendBroadcast] Error broadcast send data in service '{alias}'. Inbox '{inbox}'");
} }
return false; return result;
} }
/// <summary> /// <summary>
@ -109,15 +117,19 @@ namespace ZeroLevel.Network
/// <returns>true - on successful submission</returns> /// <returns>true - on successful submission</returns>
public bool SendBroadcastByType<T>(string type, string inbox, T data) public bool SendBroadcastByType<T>(string type, string inbox, T data)
{ {
var result = false;
try try
{ {
foreach (var client in GetClientEnumeratorByType(type)) var clients = GetClientEnumeratorByType(type).ToArray();
var tasks = new Task[clients.Length];
int index = 0;
foreach (var client in clients)
{ {
Task.Run(() => tasks[index++] = Task.Run(() =>
{ {
try try
{ {
client.Send(inbox, data); result |= client.Send(inbox, data);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -125,12 +137,13 @@ namespace ZeroLevel.Network
} }
}); });
} }
Task.WaitAll(tasks);
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.SystemError(ex, $"[Exchange.SendBroadcastByType] Error broadcast send data to services with type '{type}'. Inbox '{inbox}'"); Log.SystemError(ex, $"[Exchange.SendBroadcastByType] Error broadcast send data to services with type '{type}'. Inbox '{inbox}'");
} }
return false; return result;
} }
/// <summary> /// <summary>
@ -153,15 +166,19 @@ namespace ZeroLevel.Network
/// <returns>true - on successful submission</returns> /// <returns>true - on successful submission</returns>
public bool SendBroadcastByGroup<T>(string group, string inbox, T data) public bool SendBroadcastByGroup<T>(string group, string inbox, T data)
{ {
var result = false;
try try
{ {
foreach (var client in GetClientEnumeratorByGroup(group)) var clients = GetClientEnumeratorByGroup(group).ToArray();
var tasks = new Task[clients.Length];
int index = 0;
foreach (var client in clients)
{ {
Task.Run(() => tasks[index++] = Task.Run(() =>
{ {
try try
{ {
client.Send(inbox, data); result |= client.Send(inbox, data);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -169,12 +186,13 @@ namespace ZeroLevel.Network
} }
}); });
} }
Task.WaitAll(tasks);
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.SystemError(ex, $"[Exchange.SendBroadcastByGroup] Error broadcast send data to services with type '{group}'. Inbox '{inbox}'"); Log.SystemError(ex, $"[Exchange.SendBroadcastByGroup] Error broadcast send data to services with type '{group}'. Inbox '{inbox}'");
} }
return false; return result;
} }
/// <summary> /// <summary>
@ -192,48 +210,26 @@ namespace ZeroLevel.Network
public bool Request<Tresponse>(string alias, string inbox, Action<Tresponse> callback) public bool Request<Tresponse>(string alias, string inbox, Action<Tresponse> callback)
{ {
bool success = false;
Tresponse response = default(Tresponse);
try try
{ {
if (false == CallService(alias, (transport) => return CallService(alias, (transport) =>
{ {
try try
{ {
using (var waiter = new ManualResetEventSlim(false)) return transport.Request<Tresponse>(inbox, callback);
{
if (false == transport.Request<Tresponse>(inbox, resp =>
{
response = resp;
success = true;
waiter.Set();
}).Success)
{
return false;
}
if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS))
{
return false;
}
}
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'");
} }
return false; return false;
})) });
{
Log.SystemWarning($"[Exchange.Request] No responce on request. Service key '{alias}'. Inbox '{inbox}'");
}
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'");
} }
callback(response); return false;
return success;
} }
public bool Request<Trequest, Tresponse>(string alias, Trequest request, Action<Tresponse> callback) public bool Request<Trequest, Tresponse>(string alias, Trequest request, Action<Tresponse> callback)
@ -241,48 +237,26 @@ namespace ZeroLevel.Network
public bool Request<Trequest, Tresponse>(string alias, string inbox, Trequest request, Action<Tresponse> callback) public bool Request<Trequest, Tresponse>(string alias, string inbox, Trequest request, Action<Tresponse> callback)
{ {
bool success = false;
Tresponse response = default(Tresponse);
try try
{ {
if (false == CallService(alias, (transport) => return CallService(alias, (transport) =>
{ {
try try
{ {
using (var waiter = new ManualResetEventSlim(false)) return transport.Request(inbox, request, callback);
{
if (false == transport.Request<Trequest, Tresponse>(inbox, request, resp =>
{
response = resp;
success = true;
waiter.Set();
}).Success)
{
return false;
}
if (false == waiter.Wait(BaseSocket.MAX_REQUEST_TIME_MS))
{
return false;
}
}
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'");
} }
return false; return false;
})) });
{
Log.SystemWarning($"[Exchange.Request] No responce on request. Service key '{alias}'. Inbox '{inbox}'");
}
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'"); Log.SystemError(ex, $"[Exchange.Request] Error request to service '{alias}'. Inbox '{inbox}'");
} }
callback(response); return false;
return success;
} }
/// <summary> /// <summary>
@ -535,9 +509,9 @@ namespace ZeroLevel.Network
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled. {r.Comment}"); Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled. {r.Comment}");
} }
}); });
if (request.Success == false) if (request == false)
{ {
Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled.{request.Comment}"); Log.SystemWarning($"[Exchange.RegisterServicesInDiscovery] Register canceled.");
} }
} }
} }
@ -578,9 +552,9 @@ namespace ZeroLevel.Network
_dicovery_aliases.Rollback(); _dicovery_aliases.Rollback();
} }
}); });
if (!ir.Success) if (!ir)
{ {
Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Error request to inbox 'services'. {ir.Comment}"); Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Error request to inbox 'services'.");
} }
} }
catch (Exception ex) catch (Exception ex)
@ -935,7 +909,7 @@ namespace ZeroLevel.Network
{ {
try try
{ {
if (false == client.Request<Treq, Tresp>(inbox, data, resp => { response.Add(resp); waiter.Signal(); }).Success) if (false == client.Request<Treq, Tresp>(inbox, data, resp => { response.Add(resp); waiter.Signal(); }))
{ {
waiter.Signal(); waiter.Signal();
} }
@ -963,7 +937,7 @@ namespace ZeroLevel.Network
{ {
try try
{ {
if (false == client.Request<Tresp>(inbox, resp => { response.Add(resp); waiter.Signal(); }).Success) if (false == client.Request<Tresp>(inbox, resp => { response.Add(resp); waiter.Signal(); }))
{ {
waiter.Signal(); waiter.Signal();
} }

@ -0,0 +1,173 @@
using System;
using System.Threading;
using ZeroLevel.Services.Pools;
namespace ZeroLevel.Network
{
public static class ExchangeExtension
{
static ObjectPool<AutoResetEvent> _mrePool = new ObjectPool<AutoResetEvent>(() => new AutoResetEvent(false), 16);
public static Tresponse Request<Tresponse>(this IClientSet exchange, string alias, TimeSpan timeout)
{
Tresponse response = default;
var ev = _mrePool.Allocate();
try
{
if (exchange.Request<Tresponse>(alias,
_response => { response = _response; ev.Set(); }))
{
ev.WaitOne(timeout);
}
}
finally
{
_mrePool.Free(ev);
}
return response;
}
public static Tresponse Request<Tresponse>(this IClientSet exchange, string alias, string inbox, TimeSpan timeout)
{
Tresponse response = default;
var ev = _mrePool.Allocate();
try
{
if (exchange.Request<Tresponse>(alias, inbox,
_response => {
response = _response;
ev.Set();
}))
{
ev.WaitOne(timeout);
}
}
finally
{
_mrePool.Free(ev);
}
return response;
}
public static Tresponse Request<Trequest, Tresponse>(this IClientSet exchange, string alias, Trequest request, TimeSpan timeout)
{
Tresponse response = default;
var ev = _mrePool.Allocate();
try
{
if (exchange.Request<Trequest, Tresponse>(alias, request,
_response => { response = _response; ev.Set(); }))
{
ev.WaitOne(timeout);
}
}
finally
{
_mrePool.Free(ev);
}
return response;
}
public static Tresponse Request<Trequest, Tresponse>(this IClientSet exchange, string alias, string inbox
, Trequest request, TimeSpan timeout)
{
Tresponse response = default;
var ev = _mrePool.Allocate();
try
{
if (exchange.Request<Trequest, Tresponse>(alias, inbox, request,
_response => { response = _response; ev.Set(); }))
{
ev.WaitOne(timeout);
}
}
finally
{
_mrePool.Free(ev);
}
return response;
}
public static Tresponse Request<Tresponse>(this IClientSet exchange, string alias)
{
Tresponse response = default;
var ev = _mrePool.Allocate();
try
{
if (exchange.Request<Tresponse>(alias,
_response => { response = _response; ev.Set(); }))
{
ev.WaitOne(Network.BaseSocket.MAX_REQUEST_TIME_MS);
}
}
finally
{
_mrePool.Free(ev);
}
return response;
}
public static Tresponse Request<Tresponse>(this IClientSet exchange, string alias, string inbox)
{
Tresponse response = default;
var ev = _mrePool.Allocate();
try
{
if (exchange.Request<Tresponse>(alias, inbox,
_response => {
response = _response;
ev.Set();
}))
{
ev.WaitOne(Network.BaseSocket.MAX_REQUEST_TIME_MS);
}
}
finally
{
_mrePool.Free(ev);
}
return response;
}
public static Tresponse Request<Trequest, Tresponse>(this IClientSet exchange, string alias, Trequest request)
{
Tresponse response = default;
var ev = _mrePool.Allocate();
try
{
if (exchange.Request<Trequest, Tresponse>(alias, request,
_response => { response = _response; ev.Set(); }))
{
ev.WaitOne(Network.BaseSocket.MAX_REQUEST_TIME_MS);
}
}
finally
{
_mrePool.Free(ev);
}
return response;
}
public static Tresponse Request<Trequest, Tresponse>(this IClientSet exchange, string alias, string inbox
, Trequest request)
{
Tresponse response = default;
var ev = _mrePool.Allocate();
try
{
if (exchange.Request<Trequest, Tresponse>(alias, inbox, request,
_response => { response = _response; ev.Set(); }))
{
ev.WaitOne(Network.BaseSocket.MAX_REQUEST_TIME_MS);
}
}
finally
{
_mrePool.Free(ev);
}
return response;
}
}
}

@ -96,10 +96,10 @@ namespace ZeroLevel.Network.FileTransfer
Send<T>(client, inbox, frame, resendWhenConnectionError, false); Send<T>(client, inbox, frame, resendWhenConnectionError, false);
} }
}); });
sended = client.Request<T, InvokeResult>(inbox, frame, handle).Success; sended = client.Request<T, InvokeResult>(inbox, frame, handle);
if (sended == false && resendWhenConnectionError) if (sended == false && resendWhenConnectionError)
{ {
sended = client.Request<T, InvokeResult>(inbox, frame, handle).Success; sended = client.Request<T, InvokeResult>(inbox, frame, handle);
} }
return sended; return sended;
} }

@ -169,7 +169,7 @@ namespace ZeroLevel.Services.Serialization
public static T Copy<T>(T value) public static T Copy<T>(T value)
where T : IBinarySerializable where T : IBinarySerializable
{ {
if (default == value) return default; if (null == value) return default;
using (var writer = new MemoryStreamWriter()) using (var writer = new MemoryStreamWriter())
{ {
value.Serialize(writer); value.Serialize(writer);

Loading…
Cancel
Save

Powered by TurnKey Linux.