diff --git a/TestPipeLine/Consumer/App.config b/TestPipeLine/Consumer/App.config
new file mode 100644
index 0000000..d9cdee5
--- /dev/null
+++ b/TestPipeLine/Consumer/App.config
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/TestPipeLine/Consumer/Consumer.csproj b/TestPipeLine/Consumer/Consumer.csproj
new file mode 100644
index 0000000..aa12fe6
--- /dev/null
+++ b/TestPipeLine/Consumer/Consumer.csproj
@@ -0,0 +1,60 @@
+
+
+
+
+ Debug
+ AnyCPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}
+ Exe
+ Consumer
+ Consumer
+ v4.7.2
+ 512
+ true
+ true
+
+
+ AnyCPU
+ true
+ full
+ false
+ bin\Debug\
+ DEBUG;TRACE
+ prompt
+ 4
+
+
+ AnyCPU
+ pdbonly
+ true
+ bin\Release\
+ TRACE
+ prompt
+ 4
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {06c9e60e-d449-41a7-9bf0-a829aaf5d214}
+ ZeroLevel
+
+
+
+
\ No newline at end of file
diff --git a/TestPipeLine/Consumer/ConsumerService.cs b/TestPipeLine/Consumer/ConsumerService.cs
new file mode 100644
index 0000000..92c9576
--- /dev/null
+++ b/TestPipeLine/Consumer/ConsumerService.cs
@@ -0,0 +1,47 @@
+using System.Threading;
+using ZeroLevel;
+using ZeroLevel.Network;
+using ZeroLevel.Services.Applications;
+
+namespace Consumer
+{
+ public class ConsumerService
+ : BaseZeroService
+ {
+ protected override void StartAction()
+ {
+ ReadServiceInfo();
+ AutoregisterInboxes(UseHost());
+ }
+
+ protected override void StopAction()
+ {
+ }
+
+ [ExchangeReplierWithoutArg("meta")]
+ public ZeroServiceInfo GetCounter(ISocketClient client)
+ {
+ return ServiceInfo;
+ }
+
+ private long _proceed = 0;
+
+ [ExchangeReplierWithoutArg("Proceed")]
+ public long GetProceedItemsCount(ISocketClient client)
+ {
+ return _proceed;
+ }
+
+ [ExchangeReplierWithoutArg("ping")]
+ public bool Ping(ISocketClient client)
+ {
+ return true;
+ }
+
+ [ExchangeReplier("handle")]
+ public bool Handler(ISocketClient client, int data)
+ {
+ return (data ^ Interlocked.Increment(ref _proceed)) % 2 == 0;
+ }
+ }
+}
diff --git a/TestPipeLine/Consumer/Program.cs b/TestPipeLine/Consumer/Program.cs
new file mode 100644
index 0000000..c44e298
--- /dev/null
+++ b/TestPipeLine/Consumer/Program.cs
@@ -0,0 +1,18 @@
+using ZeroLevel;
+
+namespace Consumer
+{
+ static class Program
+ {
+ static void Main(string[] args)
+ {
+ Bootstrap.Startup(args)
+ .EnableConsoleLog(ZeroLevel.Services.Logging.LogLevel.FullStandart)
+ .UseDiscovery()
+ .Run()
+ .WaitWhileStatus(ZeroServiceStatus.Running)
+ .Stop();
+ Bootstrap.Shutdown();
+ }
+ }
+}
diff --git a/TestPipeLine/Consumer/Properties/AssemblyInfo.cs b/TestPipeLine/Consumer/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..679a8fd
--- /dev/null
+++ b/TestPipeLine/Consumer/Properties/AssemblyInfo.cs
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Consumer")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Consumer")]
+[assembly: AssemblyCopyright("Copyright © 2019")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("931dea89-42d1-4c06-9cb8-a3a0412093d6")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
diff --git a/TestPipeLine/Processor/App.config b/TestPipeLine/Processor/App.config
new file mode 100644
index 0000000..c0eba0f
--- /dev/null
+++ b/TestPipeLine/Processor/App.config
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/TestPipeLine/Processor/Processor.csproj b/TestPipeLine/Processor/Processor.csproj
new file mode 100644
index 0000000..3e65033
--- /dev/null
+++ b/TestPipeLine/Processor/Processor.csproj
@@ -0,0 +1,60 @@
+
+
+
+
+ Debug
+ AnyCPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}
+ Exe
+ Processor
+ Processor
+ v4.7.2
+ 512
+ true
+ true
+
+
+ AnyCPU
+ true
+ full
+ false
+ bin\Debug\
+ DEBUG;TRACE
+ prompt
+ 4
+
+
+ AnyCPU
+ pdbonly
+ true
+ bin\Release\
+ TRACE
+ prompt
+ 4
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {06c9e60e-d449-41a7-9bf0-a829aaf5d214}
+ ZeroLevel
+
+
+
+
\ No newline at end of file
diff --git a/TestPipeLine/Processor/ProcessorService.cs b/TestPipeLine/Processor/ProcessorService.cs
new file mode 100644
index 0000000..4aa0fd1
--- /dev/null
+++ b/TestPipeLine/Processor/ProcessorService.cs
@@ -0,0 +1,49 @@
+using System;
+using System.Threading;
+using ZeroLevel;
+using ZeroLevel.Network;
+using ZeroLevel.Services.Applications;
+
+namespace Processor
+{
+ public class ProcessorService
+ : BaseZeroService
+ {
+ protected override void StartAction()
+ {
+ ReadServiceInfo();
+ AutoregisterInboxes(UseHost());
+ }
+
+ protected override void StopAction()
+ {
+ }
+
+ [ExchangeReplierWithoutArg("meta")]
+ public ZeroServiceInfo GetCounter(ISocketClient client)
+ {
+ return ServiceInfo;
+ }
+
+ private long _proceed = 0;
+
+ [ExchangeReplierWithoutArg("Proceed")]
+ public long GetProceedItemsCount(ISocketClient client)
+ {
+ return _proceed;
+ }
+
+ [ExchangeReplierWithoutArg("ping")]
+ public bool Ping(ISocketClient client)
+ {
+ return true;
+ }
+
+ [ExchangeHandler("handle")]
+ public void Handler(ISocketClient client, int data)
+ {
+ var next = (int)(data ^ Interlocked.Increment(ref _proceed));
+ Exchange.Request("test.consumer", "handle", next, result => { });
+ }
+ }
+}
diff --git a/TestPipeLine/Processor/Program.cs b/TestPipeLine/Processor/Program.cs
new file mode 100644
index 0000000..62556ed
--- /dev/null
+++ b/TestPipeLine/Processor/Program.cs
@@ -0,0 +1,18 @@
+using ZeroLevel;
+
+namespace Processor
+{
+ static class Program
+ {
+ static void Main(string[] args)
+ {
+ Bootstrap.Startup(args)
+ .EnableConsoleLog(ZeroLevel.Services.Logging.LogLevel.FullStandart)
+ .UseDiscovery()
+ .Run()
+ .WaitWhileStatus(ZeroServiceStatus.Running)
+ .Stop();
+ Bootstrap.Shutdown();
+ }
+ }
+}
diff --git a/TestPipeLine/Processor/Properties/AssemblyInfo.cs b/TestPipeLine/Processor/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..1f15d25
--- /dev/null
+++ b/TestPipeLine/Processor/Properties/AssemblyInfo.cs
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Processor")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Processor")]
+[assembly: AssemblyCopyright("Copyright © 2019")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("806d0160-a4bf-4881-af33-308f4fef8e15")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
diff --git a/TestPipeLine/Source/App.config b/TestPipeLine/Source/App.config
new file mode 100644
index 0000000..16180a0
--- /dev/null
+++ b/TestPipeLine/Source/App.config
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/TestPipeLine/Source/Program.cs b/TestPipeLine/Source/Program.cs
new file mode 100644
index 0000000..cb7a349
--- /dev/null
+++ b/TestPipeLine/Source/Program.cs
@@ -0,0 +1,18 @@
+using ZeroLevel;
+
+namespace Source
+{
+ static class Program
+ {
+ static void Main(string[] args)
+ {
+ Bootstrap.Startup(args)
+ .EnableConsoleLog(ZeroLevel.Services.Logging.LogLevel.FullStandart)
+ .UseDiscovery()
+ .Run()
+ .WaitWhileStatus(ZeroServiceStatus.Running)
+ .Stop();
+ Bootstrap.Shutdown();
+ }
+ }
+}
diff --git a/TestPipeLine/Source/Properties/AssemblyInfo.cs b/TestPipeLine/Source/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..b5131af
--- /dev/null
+++ b/TestPipeLine/Source/Properties/AssemblyInfo.cs
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Source")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Source")]
+[assembly: AssemblyCopyright("Copyright © 2019")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("a1d60994-5744-47d1-b684-c1c0b782998b")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
diff --git a/TestPipeLine/Source/Source.csproj b/TestPipeLine/Source/Source.csproj
new file mode 100644
index 0000000..188e0eb
--- /dev/null
+++ b/TestPipeLine/Source/Source.csproj
@@ -0,0 +1,60 @@
+
+
+
+
+ Debug
+ AnyCPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}
+ Exe
+ Source
+ Source
+ v4.7.2
+ 512
+ true
+ true
+
+
+ AnyCPU
+ true
+ full
+ false
+ bin\Debug\
+ DEBUG;TRACE
+ prompt
+ 4
+
+
+ AnyCPU
+ pdbonly
+ true
+ bin\Release\
+ TRACE
+ prompt
+ 4
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {06c9e60e-d449-41a7-9bf0-a829aaf5d214}
+ ZeroLevel
+
+
+
+
\ No newline at end of file
diff --git a/TestPipeLine/Source/SourceService.cs b/TestPipeLine/Source/SourceService.cs
new file mode 100644
index 0000000..9c9befb
--- /dev/null
+++ b/TestPipeLine/Source/SourceService.cs
@@ -0,0 +1,50 @@
+using System;
+using System.Threading;
+using ZeroLevel;
+using ZeroLevel.Network;
+using ZeroLevel.Services.Applications;
+
+namespace Source
+{
+ public class SourceService
+ : BaseZeroService
+ {
+ protected override void StartAction()
+ {
+ ReadServiceInfo();
+ AutoregisterInboxes(UseHost());
+
+ Sheduller.RemindEvery(TimeSpan.FromMilliseconds(10), () =>
+ {
+ if (Exchange.Send("test.processor", "handle", Environment.TickCount))
+ {
+ Interlocked.Increment(ref _proceed);
+ }
+ });
+ }
+
+ protected override void StopAction()
+ {
+ }
+
+ [ExchangeReplierWithoutArg("meta")]
+ public ZeroServiceInfo GetCounter(ISocketClient client)
+ {
+ return ServiceInfo;
+ }
+
+ private long _proceed = 0;
+
+ [ExchangeReplierWithoutArg("Proceed")]
+ public long GetProceedItemsCount(ISocketClient client)
+ {
+ return _proceed;
+ }
+
+ [ExchangeReplierWithoutArg("ping")]
+ public bool Ping(ISocketClient client)
+ {
+ return true;
+ }
+ }
+}
diff --git a/TestPipeLine/Watcher/App.config b/TestPipeLine/Watcher/App.config
new file mode 100644
index 0000000..ed6214b
--- /dev/null
+++ b/TestPipeLine/Watcher/App.config
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/TestPipeLine/Watcher/Program.cs b/TestPipeLine/Watcher/Program.cs
new file mode 100644
index 0000000..0c9ff32
--- /dev/null
+++ b/TestPipeLine/Watcher/Program.cs
@@ -0,0 +1,17 @@
+using ZeroLevel;
+
+namespace Watcher
+{
+ static class Program
+ {
+ static void Main(string[] args)
+ {
+ Bootstrap.Startup(args)
+ .UseDiscovery()
+ .Run()
+ .WaitWhileStatus(ZeroServiceStatus.Running)
+ .Stop();
+ Bootstrap.Shutdown();
+ }
+ }
+}
diff --git a/TestPipeLine/Watcher/Properties/AssemblyInfo.cs b/TestPipeLine/Watcher/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..3ae2f28
--- /dev/null
+++ b/TestPipeLine/Watcher/Properties/AssemblyInfo.cs
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Watcher")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Watcher")]
+[assembly: AssemblyCopyright("Copyright © 2019")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("6e04f32a-fb90-41d2-9059-f37311f813b3")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
diff --git a/TestPipeLine/Watcher/Watcher.csproj b/TestPipeLine/Watcher/Watcher.csproj
new file mode 100644
index 0000000..4250320
--- /dev/null
+++ b/TestPipeLine/Watcher/Watcher.csproj
@@ -0,0 +1,60 @@
+
+
+
+
+ Debug
+ AnyCPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}
+ Exe
+ Watcher
+ Watcher
+ v4.7.2
+ 512
+ true
+ true
+
+
+ AnyCPU
+ true
+ full
+ false
+ bin\Debug\
+ DEBUG;TRACE
+ prompt
+ 4
+
+
+ AnyCPU
+ pdbonly
+ true
+ bin\Release\
+ TRACE
+ prompt
+ 4
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {06c9e60e-d449-41a7-9bf0-a829aaf5d214}
+ ZeroLevel
+
+
+
+
\ No newline at end of file
diff --git a/TestPipeLine/Watcher/WatcherService.cs b/TestPipeLine/Watcher/WatcherService.cs
new file mode 100644
index 0000000..cc01b2c
--- /dev/null
+++ b/TestPipeLine/Watcher/WatcherService.cs
@@ -0,0 +1,98 @@
+using System;
+using System.Linq;
+using System.Text;
+using ZeroLevel;
+using ZeroLevel.Network;
+using ZeroLevel.Services.Applications;
+
+namespace Watcher
+{
+ public class WatcherService
+ : BaseZeroService
+ {
+ protected override void StartAction()
+ {
+ ReadServiceInfo();
+ AutoregisterInboxes(UseHost());
+
+ Sheduller.RemindEvery(TimeSpan.FromMilliseconds(350), () =>
+ {
+ var sb = new StringBuilder();
+ sb.AppendLine("—————————————————————————————————————————————————————————————————————————");
+
+ var success = Exchange.RequestBroadcastByGroup("Test", "meta", records =>
+ {
+ foreach (var record in records.OrderBy(r=>r.Name))
+ {
+ sb.Append(record.Name);
+ sb.Append(" (");
+ sb.Append(record.Version);
+ sb.AppendLine(")");
+ sb.AppendLine(record.ServiceKey);
+ sb.AppendLine(record.ServiceType);
+ sb.AppendLine(record.ServiceGroup);
+ sb.AppendLine();
+ }
+ });
+ if (!success)
+ {
+ Log.Warning("[WatcherService] Can't send broadcast reqeust for meta");
+ }
+
+ success = Exchange.RequestBroadcastByType("Sources", "Proceed", records =>
+ {
+ sb.AppendLine("-----------------------------------------------------------------------------");
+ sb.Append("Source send items: ");
+ sb.AppendLine(records.Sum().ToString());
+ });
+ if (!success)
+ {
+ Log.Warning("[WatcherService] Can't send broadcast reqeust to 'Sources'");
+ }
+
+ success = Exchange.RequestBroadcastByType("Core", "Proceed", records =>
+ {
+ sb.AppendLine("-----------------------------------------------------------------------------");
+ sb.Append("Proccessor handle and send items: ");
+ sb.AppendLine(records.Sum().ToString());
+ });
+ if (!success)
+ {
+ Log.Warning("[WatcherService] Can't send broadcast reqeust to 'Core'");
+ }
+
+ success = Exchange.RequestBroadcastByType("Destination", "Proceed", records =>
+ {
+ sb.AppendLine("-----------------------------------------------------------------------------");
+ sb.Append("Consumer catch: ");
+ sb.AppendLine(records.Sum().ToString());
+ });
+ if (!success)
+ {
+ Log.Warning("[WatcherService] Can't send broadcast reqeust to 'Destination'");
+ }
+
+ sb.AppendLine("—————————————————————————————————————————————————————————————————————————");
+ sb.AppendLine();
+ Console.Clear();
+ Console.WriteLine($"Watch info: \r\n{sb}");
+ });
+ }
+
+ protected override void StopAction()
+ {
+ }
+
+ [ExchangeReplierWithoutArg("meta")]
+ public ZeroServiceInfo GetCounter(ISocketClient client)
+ {
+ return ServiceInfo;
+ }
+
+ [ExchangeReplierWithoutArg("ping")]
+ public bool Ping(ISocketClient client)
+ {
+ return true;
+ }
+ }
+}
diff --git a/WebSemanticService/semantic/Semantic.API.Proxy/Semantic.API.Proxy.csproj b/WebSemanticService/semantic/Semantic.API.Proxy/Semantic.API.Proxy.csproj
index 23948d3..c0097a3 100644
--- a/WebSemanticService/semantic/Semantic.API.Proxy/Semantic.API.Proxy.csproj
+++ b/WebSemanticService/semantic/Semantic.API.Proxy/Semantic.API.Proxy.csproj
@@ -47,8 +47,8 @@
-
- ..\..\packages\ZeroLevel.2.0.8\lib\netstandard2.0\ZeroLevel.dll
+
+ C:\Users\a.bozhenov\Desktop\SEOPortal\Utils\Semantic\packages\ZeroLevel.3.0.0\lib\netstandard2.0\ZeroLevel.dll
diff --git a/WebSemanticService/semantic/Semantic.API.Proxy/packages.config b/WebSemanticService/semantic/Semantic.API.Proxy/packages.config
index 6f6fd07..7fcda0d 100644
--- a/WebSemanticService/semantic/Semantic.API.Proxy/packages.config
+++ b/WebSemanticService/semantic/Semantic.API.Proxy/packages.config
@@ -1,5 +1,5 @@
-
+
\ No newline at end of file
diff --git a/ZeroLevel.SqlServer/ZeroLevel.SqlServer.csproj b/ZeroLevel.SqlServer/ZeroLevel.SqlServer.csproj
index fb33bdd..82de616 100644
--- a/ZeroLevel.SqlServer/ZeroLevel.SqlServer.csproj
+++ b/ZeroLevel.SqlServer/ZeroLevel.SqlServer.csproj
@@ -76,8 +76,8 @@
-
- ..\packages\ZeroLevel.2.0.8\lib\netstandard2.0\ZeroLevel.dll
+
+ C:\Users\a.bozhenov\Desktop\SEOPortal\Utils\Semantic\packages\ZeroLevel.3.0.0\lib\netstandard2.0\ZeroLevel.dll
diff --git a/ZeroLevel.SqlServer/packages.config b/ZeroLevel.SqlServer/packages.config
index a985e30..38f5c89 100644
--- a/ZeroLevel.SqlServer/packages.config
+++ b/ZeroLevel.SqlServer/packages.config
@@ -1,4 +1,4 @@
-
+
\ No newline at end of file
diff --git a/ZeroLevel.sln b/ZeroLevel.sln
index 49e866d..3af0526 100644
--- a/ZeroLevel.sln
+++ b/ZeroLevel.sln
@@ -19,6 +19,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FileTransferServer", "FileT
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ZeroLevel.SqlServer", "ZeroLevel.SqlServer\ZeroLevel.SqlServer.csproj", "{A8AD956F-1559-45EC-A7DB-42290494E2C5}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "TestPipeLine", "TestPipeLine", "{03ACF314-93FC-46FE-9FB8-3F46A01A5A15}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Watcher", "TestPipeLine\Watcher\Watcher.csproj", "{6E04F32A-FB90-41D2-9059-F37311F813B3}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Source", "TestPipeLine\Source\Source.csproj", "{A1D60994-5744-47D1-B684-C1C0B782998B}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Processor", "TestPipeLine\Processor\Processor.csproj", "{806D0160-A4BF-4881-AF33-308F4FEF8E15}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Consumer", "TestPipeLine\Consumer\Consumer.csproj", "{931DEA89-42D1-4C06-9CB8-A3A0412093D6}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -113,6 +123,54 @@ Global
{A8AD956F-1559-45EC-A7DB-42290494E2C5}.Release|x64.Build.0 = Release|x64
{A8AD956F-1559-45EC-A7DB-42290494E2C5}.Release|x86.ActiveCfg = Release|x86
{A8AD956F-1559-45EC-A7DB-42290494E2C5}.Release|x86.Build.0 = Release|x86
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|x64.Build.0 = Debug|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|x86.Build.0 = Debug|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|Any CPU.Build.0 = Release|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|x64.ActiveCfg = Release|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|x64.Build.0 = Release|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|x86.ActiveCfg = Release|Any CPU
+ {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|x86.Build.0 = Release|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|x64.Build.0 = Debug|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|x86.Build.0 = Debug|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|x64.ActiveCfg = Release|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|x64.Build.0 = Release|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|x86.ActiveCfg = Release|Any CPU
+ {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|x86.Build.0 = Release|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|x64.Build.0 = Debug|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|x86.Build.0 = Debug|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|Any CPU.Build.0 = Release|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|x64.ActiveCfg = Release|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|x64.Build.0 = Release|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|x86.ActiveCfg = Release|Any CPU
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|x86.Build.0 = Release|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|x64.Build.0 = Debug|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|x86.Build.0 = Debug|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|Any CPU.Build.0 = Release|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|x64.ActiveCfg = Release|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|x64.Build.0 = Release|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|x86.ActiveCfg = Release|Any CPU
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -120,6 +178,10 @@ Global
GlobalSection(NestedProjects) = preSolution
{F8B727E1-340D-4096-A784-E570AE13FABC} = {FC074553-5D9F-4DF1-9130-7092E37DE768}
{9BF859EE-EF90-4B5B-8576-E26770F2F792} = {FC074553-5D9F-4DF1-9130-7092E37DE768}
+ {6E04F32A-FB90-41D2-9059-F37311F813B3} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15}
+ {A1D60994-5744-47D1-B684-C1C0B782998B} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15}
+ {806D0160-A4BF-4881-AF33-308F4FEF8E15} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15}
+ {931DEA89-42D1-4C06-9CB8-A3A0412093D6} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB}
diff --git a/ZeroLevel/Services/Network/Contracts/IExchange.cs b/ZeroLevel/Services/Network/Contracts/IExchange.cs
new file mode 100644
index 0000000..7094c8e
--- /dev/null
+++ b/ZeroLevel/Services/Network/Contracts/IExchange.cs
@@ -0,0 +1,22 @@
+using System;
+using System.Net;
+
+namespace ZeroLevel.Network
+{
+ public interface IExchange
+ : IClientSet, IDisposable
+ {
+ void UseDiscovery();
+ void UseDiscovery(string endpoint);
+ void UseDiscovery(IPEndPoint endpoint);
+
+ IRouter UseHost();
+ IRouter UseHost(int port);
+ IRouter UseHost(IPEndPoint endpoint);
+
+ IServiceRoutesStorage RoutesStorage { get; }
+
+ ExClient GetConnection(string alias);
+ ExClient GetConnection(IPEndPoint endpoint);
+ }
+}
diff --git a/ZeroLevel/Services/Network/Exchange.cs b/ZeroLevel/Services/Network/Exchange.cs
index bfd3c08..9881ae0 100644
--- a/ZeroLevel/Services/Network/Exchange.cs
+++ b/ZeroLevel/Services/Network/Exchange.cs
@@ -9,33 +9,17 @@ using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
{
- public interface IExchange
- : IClientSet, IDisposable
- {
- void UseDiscovery();
- void UseDiscovery(string endpoint);
- void UseDiscovery(IPEndPoint endpoint);
-
- IRouter UseHost();
- IRouter UseHost(int port);
- IRouter UseHost(IPEndPoint endpoint);
-
- IServiceRoutesStorage RoutesStorage { get; }
-
- ExClient GetConnection(string alias);
- ExClient GetConnection(IPEndPoint endpoint);
- }
-
///
/// Provides data exchange between services
///
internal sealed class Exchange :
IExchange
{
- private readonly ServiceRouteStorage _aliases = new ServiceRouteStorage();
+ private readonly ServiceRouteStorage _dicovery_aliases = new ServiceRouteStorage();
+ private readonly ServiceRouteStorage _user_aliases = new ServiceRouteStorage();
private readonly ExClientServerCachee _cachee = new ExClientServerCachee();
- public IServiceRoutesStorage RoutesStorage => _aliases;
+ public IServiceRoutesStorage RoutesStorage => _user_aliases;
private readonly IZeroService _owner;
#region Ctor
@@ -325,8 +309,11 @@ namespace ZeroLevel.Network
try
{
var clients = GetClientEnumerator(alias).ToList();
- callback(_RequestBroadcast(clients, inbox));
- return true;
+ if (clients.Count > 0)
+ {
+ callback(_RequestBroadcast(clients, inbox));
+ return true;
+ }
}
catch (Exception ex)
{
@@ -344,8 +331,11 @@ namespace ZeroLevel.Network
try
{
var clients = GetClientEnumerator(alias).ToList();
- callback(_RequestBroadcast(clients, inbox, data));
- return true;
+ if (clients.Count > 0)
+ {
+ callback(_RequestBroadcast(clients, inbox, data));
+ return true;
+ }
}
catch (Exception ex)
{
@@ -362,8 +352,11 @@ namespace ZeroLevel.Network
try
{
var clients = GetClientEnumeratorByGroup(serviceGroup).ToList();
- callback(_RequestBroadcast(clients, inbox));
- return true;
+ if (clients.Count > 0)
+ {
+ callback(_RequestBroadcast(clients, inbox));
+ return true;
+ }
}
catch (Exception ex)
{
@@ -381,8 +374,11 @@ namespace ZeroLevel.Network
try
{
var clients = GetClientEnumeratorByGroup(serviceGroup).ToList();
- callback(_RequestBroadcast(clients, inbox, data));
- return true;
+ if (clients.Count > 0)
+ {
+ callback(_RequestBroadcast(clients, inbox, data));
+ return true;
+ }
}
catch (Exception ex)
{
@@ -399,8 +395,11 @@ namespace ZeroLevel.Network
try
{
var clients = GetClientEnumeratorByType(serviceType).ToList();
- callback(_RequestBroadcast(clients, inbox));
- return true;
+ if (clients.Count > 0)
+ {
+ callback(_RequestBroadcast(clients, inbox));
+ return true;
+ }
}
catch (Exception ex)
{
@@ -438,8 +437,11 @@ namespace ZeroLevel.Network
try
{
var clients = GetClientEnumeratorByType(serviceType).ToList();
- callback(_RequestBroadcast(clients, inbox, data));
- return true;
+ if (clients.Count > 0)
+ {
+ callback(_RequestBroadcast(clients, inbox, data));
+ return true;
+ }
}
catch (Exception ex)
{
@@ -460,7 +462,7 @@ namespace ZeroLevel.Network
try
{
var discoveryEndpoint = Configuration.Default.First("discovery");
- _aliases.Set(BaseSocket.DISCOVERY_ALIAS, NetUtils.CreateIPEndPoint(discoveryEndpoint));
+ _user_aliases.Set(BaseSocket.DISCOVERY_ALIAS, NetUtils.CreateIPEndPoint(discoveryEndpoint));
RestartDiscoveryTasks();
}
catch (Exception ex)
@@ -473,7 +475,7 @@ namespace ZeroLevel.Network
{
try
{
- _aliases.Set(BaseSocket.DISCOVERY_ALIAS, NetUtils.CreateIPEndPoint(discoveryEndpoint));
+ _user_aliases.Set(BaseSocket.DISCOVERY_ALIAS, NetUtils.CreateIPEndPoint(discoveryEndpoint));
RestartDiscoveryTasks();
}
catch (Exception ex)
@@ -486,7 +488,7 @@ namespace ZeroLevel.Network
{
try
{
- _aliases.Set(BaseSocket.DISCOVERY_ALIAS, discoveryEndpoint);
+ _user_aliases.Set(BaseSocket.DISCOVERY_ALIAS, discoveryEndpoint);
RestartDiscoveryTasks();
}
catch (Exception ex)
@@ -511,7 +513,7 @@ namespace ZeroLevel.Network
private void RegisterServicesInDiscovery()
{
- var discovery_endpoint = _aliases.Get(BaseSocket.DISCOVERY_ALIAS);
+ var discovery_endpoint = _user_aliases.Get(BaseSocket.DISCOVERY_ALIAS);
if (discovery_endpoint.Success)
{
var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true);
@@ -542,7 +544,7 @@ namespace ZeroLevel.Network
private void UpdateServiceListFromDiscovery()
{
- var discovery_endpoint = _aliases.Get(BaseSocket.DISCOVERY_ALIAS);
+ var discovery_endpoint = _user_aliases.Get(BaseSocket.DISCOVERY_ALIAS);
if (discovery_endpoint.Success)
{
var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true);
@@ -556,25 +558,34 @@ namespace ZeroLevel.Network
return;
}
var endpoints = new HashSet();
- foreach (var service in records)
+ _dicovery_aliases.BeginUpdate();
+ try
{
- endpoints.Clear();
- foreach (var ep in service.Endpoints)
+ foreach (var service in records)
{
- try
- {
- var endpoint = NetUtils.CreateIPEndPoint(ep);
- endpoints.Add(endpoint);
- }
- catch
+ endpoints.Clear();
+ foreach (var ep in service.Endpoints)
{
- Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Can't parse address {ep} as IPEndPoint");
+ try
+ {
+ var endpoint = NetUtils.CreateIPEndPoint(ep);
+ endpoints.Add(endpoint);
+ }
+ catch
+ {
+ Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Can't parse address {ep} as IPEndPoint");
+ }
}
+ _dicovery_aliases.Set(service.ServiceKey,
+ service.ServiceType,
+ service.ServiceGroup,
+ endpoints);
}
- _aliases.Set(service.ServiceKey,
- service.ServiceType,
- service.ServiceGroup,
- endpoints);
+ _dicovery_aliases.Commit();
+ }
+ catch
+ {
+ _dicovery_aliases.Rollback();
}
});
if (!ir.Success)
@@ -592,19 +603,30 @@ namespace ZeroLevel.Network
public ExClient GetConnection(string alias)
{
- var address = _aliases.Get(alias);
- if (address.Success)
- {
- return _cachee.GetClient(address.Value, true);
- }
- try
+ if (_update_discovery_table_task != -1)
{
- var endpoint = NetUtils.CreateIPEndPoint(alias);
- return _cachee.GetClient(endpoint, true);
+ var address = _dicovery_aliases.Get(alias);
+ if (address.Success)
+ {
+ return _cachee.GetClient(address.Value, true);
+ }
}
- catch (Exception ex)
+ else
{
- Log.SystemError(ex, "[Exchange.GetConnection]");
+ var address = _user_aliases.Get(alias);
+ if (address.Success)
+ {
+ return _cachee.GetClient(address.Value, true);
+ }
+ try
+ {
+ var endpoint = NetUtils.CreateIPEndPoint(alias);
+ return _cachee.GetClient(endpoint, true);
+ }
+ catch (Exception ex)
+ {
+ Log.SystemError(ex, "[Exchange.GetConnection]");
+ }
}
return null;
}
@@ -640,21 +662,111 @@ namespace ZeroLevel.Network
#endregion
#region Private
+ private IEnumerable GetAllAddresses(string serviceKey)
+ {
+ if (_update_discovery_table_task != -1)
+ {
+ var dr = _dicovery_aliases.GetAll(serviceKey);
+ var ur = _user_aliases.GetAll(serviceKey);
+ if (dr.Success && ur.Success)
+ {
+ return Enumerable.Union(dr.Value, ur.Value);
+ }
+ else if (dr.Success)
+ {
+ return dr.Value;
+ }
+ else if (ur.Success)
+ {
+ return ur.Value;
+ }
+ }
+ else
+ {
+ var result = _user_aliases.GetAll(serviceKey);
+ if (result.Success)
+ {
+ return result.Value;
+ }
+ }
+ return null;
+ }
+
+ private IEnumerable GetAllAddressesByType(string serviceType)
+ {
+ if (_update_discovery_table_task != -1)
+ {
+ var dr = _dicovery_aliases.GetAllByType(serviceType);
+ var ur = _user_aliases.GetAllByType(serviceType);
+ if (dr.Success && ur.Success)
+ {
+ return Enumerable.Union(dr.Value, ur.Value);
+ }
+ else if (dr.Success)
+ {
+ return dr.Value;
+ }
+ else if (ur.Success)
+ {
+ return ur.Value;
+ }
+ }
+ else
+ {
+ var result = _user_aliases.GetAllByType(serviceType);
+ if (result.Success)
+ {
+ return result.Value;
+ }
+ }
+ return null;
+ }
+
+ private IEnumerable GetAllAddressesByGroup(string serviceGroup)
+ {
+ if (_update_discovery_table_task != -1)
+ {
+ var dr = _dicovery_aliases.GetAllByGroup(serviceGroup);
+ var ur = _user_aliases.GetAllByGroup(serviceGroup);
+ if (dr.Success && ur.Success)
+ {
+ return Enumerable.Union(dr.Value, ur.Value);
+ }
+ else if (dr.Success)
+ {
+ return dr.Value;
+ }
+ else if (ur.Success)
+ {
+ return ur.Value;
+ }
+ }
+ else
+ {
+ var result = _user_aliases.GetAllByGroup(serviceGroup);
+ if (result.Success)
+ {
+ return result.Value;
+ }
+ }
+ return null;
+ }
+
private IEnumerable GetClientEnumerator(string serviceKey)
{
- InvokeResult> candidates;
+ IEnumerable candidates;
try
{
- candidates = _aliases.GetAll(serviceKey);
+ candidates = GetAllAddresses(serviceKey);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange.GetClientEnumerator] Error when trying get endpoints for service key '{serviceKey}'");
candidates = null;
}
- if (candidates != null && candidates.Success && candidates.Value.Any())
+ if (candidates != null && candidates.Any())
{
- foreach (var endpoint in candidates.Value)
+ foreach (var endpoint in candidates)
{
ExClient transport;
try
@@ -677,19 +789,19 @@ namespace ZeroLevel.Network
private IEnumerable GetClientEnumeratorByType(string serviceType)
{
- InvokeResult> candidates;
+ IEnumerable candidates;
try
{
- candidates = _aliases.GetAllByType(serviceType);
+ candidates = GetAllAddressesByType(serviceType);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByType] Error when trying get endpoints for service type '{serviceType}'");
candidates = null;
}
- if (candidates != null && candidates.Success && candidates.Value.Any())
+ if (candidates != null && candidates.Any())
{
- foreach (var endpoint in candidates.Value)
+ foreach (var endpoint in candidates)
{
ExClient transport;
try
@@ -712,19 +824,19 @@ namespace ZeroLevel.Network
private IEnumerable GetClientEnumeratorByGroup(string serviceGroup)
{
- InvokeResult> candidates;
+ IEnumerable candidates;
try
{
- candidates = _aliases.GetAllByGroup(serviceGroup);
+ candidates = GetAllAddressesByGroup(serviceGroup);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByGroup] Error when trying get endpoints for service group '{serviceGroup}'");
candidates = null;
}
- if (candidates != null && candidates.Success && candidates.Value.Any())
+ if (candidates != null && candidates.Any())
{
- foreach (var service in candidates.Value)
+ foreach (var service in candidates)
{
ExClient transport;
try
@@ -752,23 +864,23 @@ namespace ZeroLevel.Network
/// true - service called succesfully
private bool CallService(string serviceKey, Func callHandler)
{
- InvokeResult> candidates;
+ IEnumerable candidates;
try
{
- candidates = _aliases.GetAll(serviceKey);
+ candidates = GetAllAddresses(serviceKey);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[Exchange.CallService] Error when trying get endpoints for service key '{serviceKey}'");
return false;
}
- if (candidates == null || !candidates.Success || candidates.Value.Any() == false)
+ if (candidates == null || candidates.Any() == false)
{
Log.Debug($"[Exchange.CallService] Not found endpoints for service key '{serviceKey}'");
return false;
}
var success = false;
- foreach (var endpoint in candidates.Value)
+ foreach (var endpoint in candidates)
{
ExClient transport;
try
diff --git a/ZeroLevel/Services/Network/ServiceRouteStorage.cs b/ZeroLevel/Services/Network/ServiceRouteStorage.cs
index 8cdb01c..ae47af1 100644
--- a/ZeroLevel/Services/Network/ServiceRouteStorage.cs
+++ b/ZeroLevel/Services/Network/ServiceRouteStorage.cs
@@ -36,10 +36,18 @@ namespace ZeroLevel.Network
public void Set(IPEndPoint endpoint)
{
+ var key = $"{endpoint.Address}:{endpoint.Port}";
+
+ if (_in_transaction == 1)
+ {
+ TransAppendByKeys(key, endpoint);
+ _tran_endpoints[endpoint] = new string[] { key, null, null };
+ return;
+ }
+
_lock.EnterWriteLock();
try
{
- var key = $"{endpoint.Address}:{endpoint.Port}";
if (_endpoints.ContainsKey(endpoint))
{
if (_tableByKey.ContainsKey(key))
@@ -67,6 +75,15 @@ namespace ZeroLevel.Network
public void Set(string key, IPEndPoint endpoint)
{
+ key = key.ToUpperInvariant();
+
+ if (_in_transaction == 1)
+ {
+ TransAppendByKeys(key, endpoint);
+ _tran_endpoints[endpoint] = new string[] { key, null, null };
+ return;
+ }
+
_lock.EnterWriteLock();
try
{
@@ -83,7 +100,7 @@ namespace ZeroLevel.Network
RemoveLocked(endpoint);
}
AppendByKeys(key, endpoint);
- _endpoints.Add(endpoint, new string[3] { key.ToUpperInvariant(), null, null });
+ _endpoints.Add(endpoint, new string[3] { key, null, null });
}
finally
{
@@ -93,6 +110,17 @@ namespace ZeroLevel.Network
public void Set(string key, IEnumerable endpoints)
{
+ key = key.ToUpperInvariant();
+ if (_in_transaction == 1)
+ {
+ foreach (var endpoint in endpoints)
+ {
+ TransAppendByKeys(key, endpoint);
+ _tran_endpoints[endpoint] = new string[] { key, null, null };
+ }
+ return;
+ }
+
_lock.EnterWriteLock();
try
{
@@ -122,22 +150,44 @@ namespace ZeroLevel.Network
public void Set(string key, string type, string group, IPEndPoint endpoint)
{
+ if (key == null)
+ {
+ key = $"{endpoint.Address}:{endpoint.Port}";
+ }
+ else
+ {
+ key = key.ToUpperInvariant();
+ }
+ type = type.ToUpperInvariant();
+ group = group.ToUpperInvariant();
+
+ if (_in_transaction == 1)
+ {
+ TransAppendByKeys(key, endpoint);
+ if (type != null)
+ {
+ TransAppendByType(type, endpoint);
+ }
+ if (group != null)
+ {
+ TransAppendByGroup(group, endpoint);
+ }
+ _tran_endpoints[endpoint] = new string[] { key, type, group };
+ return;
+ }
+
_lock.EnterWriteLock();
try
{
RemoveLocked(endpoint);
- if (key == null)
- {
- key = $"{endpoint.Address}:{endpoint.Port}";
- }
AppendByKeys(key, endpoint);
if (type != null)
{
- AppendByType(key, endpoint);
+ AppendByType(type, endpoint);
}
if (group != null)
{
- AppendByGroup(key, endpoint);
+ AppendByGroup(group, endpoint);
}
_endpoints.Add(endpoint, new string[3] { key.ToUpperInvariant(), type.ToUpperInvariant(), group.ToUpperInvariant() });
}
@@ -149,6 +199,28 @@ namespace ZeroLevel.Network
public void Set(string key, string type, string group, IEnumerable endpoints)
{
+ if (_in_transaction == 1)
+ {
+ key = key.ToUpperInvariant();
+ type = type.ToUpperInvariant();
+ group = group.ToUpperInvariant();
+
+ foreach (var endpoint in endpoints)
+ {
+ TransAppendByKeys(key, endpoint);
+ if (type != null)
+ {
+ TransAppendByType(type, endpoint);
+ }
+ if (group != null)
+ {
+ TransAppendByGroup(group, endpoint);
+ }
+ _tran_endpoints[endpoint] = new string[] { key, type, group };
+ }
+ return;
+ }
+
foreach (var ep in endpoints)
{
RemoveLocked(ep);
@@ -235,15 +307,15 @@ namespace ZeroLevel.Network
#region Private
private void AppendByKeys(string key, IPEndPoint endpoint)
{
- Append(key.ToUpperInvariant(), endpoint, _tableByKey);
+ Append(key, endpoint, _tableByKey);
}
private void AppendByType(string type, IPEndPoint endpoint)
{
- Append(type.ToUpperInvariant(), endpoint, _tableByTypes);
+ Append(type, endpoint, _tableByTypes);
}
private void AppendByGroup(string group, IPEndPoint endpoint)
{
- Append(group.ToUpperInvariant(), endpoint, _tableByGroups);
+ Append(group, endpoint, _tableByGroups);
}
private void Append(string key, IPEndPoint value, Dictionary> dict)
{
@@ -266,5 +338,91 @@ namespace ZeroLevel.Network
}
}
#endregion
+
+ #region Transactional
+ private Dictionary> _tran_tableByKey
+ = new Dictionary>();
+
+ private Dictionary> _tran_tableByGroups
+ = new Dictionary>();
+
+ private Dictionary> _tran_tableByTypes
+ = new Dictionary>();
+
+ private Dictionary _tran_endpoints
+ = new Dictionary();
+
+ private int _in_transaction = 0;
+
+ internal void BeginUpdate()
+ {
+ if (Interlocked.Exchange(ref _in_transaction, 1) == 0)
+ {
+ _tran_endpoints.Clear();
+ _tran_tableByKey.Clear();
+ _tran_tableByGroups.Clear();
+ _tran_tableByTypes.Clear();
+ }
+ else
+ {
+ throw new System.Exception("Transaction started already");
+ }
+ }
+
+ internal void Commit()
+ {
+ if (Interlocked.Exchange(ref _in_transaction, 0) == 1)
+ {
+ _lock.EnterWriteLock();
+ try
+ {
+ _endpoints = _tran_endpoints.Select(pair => pair).ToDictionary(p => p.Key, p => p.Value);
+ _tableByGroups = _tran_tableByGroups.Select(pair => pair).ToDictionary(p => p.Key, p => new RoundRobinCollection(p.Value));
+ _tableByKey = _tran_tableByKey.Select(pair => pair).ToDictionary(p => p.Key, p => new RoundRobinCollection(p.Value));
+ _tableByTypes = _tran_tableByTypes.Select(pair => pair).ToDictionary(p => p.Key, p => new RoundRobinCollection(p.Value));
+ }
+ finally
+ {
+ _lock.ExitWriteLock();
+ }
+ _tran_endpoints.Clear();
+ _tran_tableByKey.Clear();
+ _tran_tableByGroups.Clear();
+ _tran_tableByTypes.Clear();
+ }
+ }
+
+ internal void Rollback()
+ {
+ if (Interlocked.Exchange(ref _in_transaction, 0) == 1)
+ {
+ _tran_endpoints.Clear();
+ _tran_tableByKey.Clear();
+ _tran_tableByGroups.Clear();
+ _tran_tableByTypes.Clear();
+ }
+ }
+
+ private void TransAppendByKeys(string key, IPEndPoint endpoint)
+ {
+ TransAppend(key.ToUpperInvariant(), endpoint, _tran_tableByKey);
+ }
+ private void TransAppendByType(string type, IPEndPoint endpoint)
+ {
+ TransAppend(type.ToUpperInvariant(), endpoint, _tran_tableByTypes);
+ }
+ private void TransAppendByGroup(string group, IPEndPoint endpoint)
+ {
+ TransAppend(group.ToUpperInvariant(), endpoint, _tran_tableByGroups);
+ }
+ private void TransAppend(string key, IPEndPoint value, Dictionary> dict)
+ {
+ if (!dict.ContainsKey(key))
+ {
+ dict.Add(key, new List());
+ }
+ dict[key].Add(value);
+ }
+ #endregion
}
}