From 886b2fd5b5532210649dcc05fdee47436440a9fd Mon Sep 17 00:00:00 2001 From: "a.bozhenov" Date: Tue, 9 Jul 2019 22:40:28 +0300 Subject: [PATCH] Add test apps --- TestPipeLine/Consumer/App.config | 17 ++ TestPipeLine/Consumer/Consumer.csproj | 60 ++++ TestPipeLine/Consumer/ConsumerService.cs | 47 ++++ TestPipeLine/Consumer/Program.cs | 18 ++ .../Consumer/Properties/AssemblyInfo.cs | 36 +++ TestPipeLine/Processor/App.config | 17 ++ TestPipeLine/Processor/Processor.csproj | 60 ++++ TestPipeLine/Processor/ProcessorService.cs | 49 ++++ TestPipeLine/Processor/Program.cs | 18 ++ .../Processor/Properties/AssemblyInfo.cs | 36 +++ TestPipeLine/Source/App.config | 17 ++ TestPipeLine/Source/Program.cs | 18 ++ .../Source/Properties/AssemblyInfo.cs | 36 +++ TestPipeLine/Source/Source.csproj | 60 ++++ TestPipeLine/Source/SourceService.cs | 50 ++++ TestPipeLine/Watcher/App.config | 17 ++ TestPipeLine/Watcher/Program.cs | 17 ++ .../Watcher/Properties/AssemblyInfo.cs | 36 +++ TestPipeLine/Watcher/Watcher.csproj | 60 ++++ TestPipeLine/Watcher/WatcherService.cs | 98 +++++++ .../Semantic.API.Proxy.csproj | 4 +- .../Semantic.API.Proxy/packages.config | 2 +- .../ZeroLevel.SqlServer.csproj | 4 +- ZeroLevel.SqlServer/packages.config | 2 +- ZeroLevel.sln | 62 ++++ .../Services/Network/Contracts/IExchange.cs | 22 ++ ZeroLevel/Services/Network/Exchange.cs | 264 +++++++++++++----- .../Services/Network/ServiceRouteStorage.cs | 180 +++++++++++- 28 files changed, 1214 insertions(+), 93 deletions(-) create mode 100644 TestPipeLine/Consumer/App.config create mode 100644 TestPipeLine/Consumer/Consumer.csproj create mode 100644 TestPipeLine/Consumer/ConsumerService.cs create mode 100644 TestPipeLine/Consumer/Program.cs create mode 100644 TestPipeLine/Consumer/Properties/AssemblyInfo.cs create mode 100644 TestPipeLine/Processor/App.config create mode 100644 TestPipeLine/Processor/Processor.csproj create mode 100644 TestPipeLine/Processor/ProcessorService.cs create mode 100644 TestPipeLine/Processor/Program.cs create mode 100644 TestPipeLine/Processor/Properties/AssemblyInfo.cs create mode 100644 TestPipeLine/Source/App.config create mode 100644 TestPipeLine/Source/Program.cs create mode 100644 TestPipeLine/Source/Properties/AssemblyInfo.cs create mode 100644 TestPipeLine/Source/Source.csproj create mode 100644 TestPipeLine/Source/SourceService.cs create mode 100644 TestPipeLine/Watcher/App.config create mode 100644 TestPipeLine/Watcher/Program.cs create mode 100644 TestPipeLine/Watcher/Properties/AssemblyInfo.cs create mode 100644 TestPipeLine/Watcher/Watcher.csproj create mode 100644 TestPipeLine/Watcher/WatcherService.cs create mode 100644 ZeroLevel/Services/Network/Contracts/IExchange.cs diff --git a/TestPipeLine/Consumer/App.config b/TestPipeLine/Consumer/App.config new file mode 100644 index 0000000..d9cdee5 --- /dev/null +++ b/TestPipeLine/Consumer/App.config @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/TestPipeLine/Consumer/Consumer.csproj b/TestPipeLine/Consumer/Consumer.csproj new file mode 100644 index 0000000..aa12fe6 --- /dev/null +++ b/TestPipeLine/Consumer/Consumer.csproj @@ -0,0 +1,60 @@ + + + + + Debug + AnyCPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6} + Exe + Consumer + Consumer + v4.7.2 + 512 + true + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + + {06c9e60e-d449-41a7-9bf0-a829aaf5d214} + ZeroLevel + + + + \ No newline at end of file diff --git a/TestPipeLine/Consumer/ConsumerService.cs b/TestPipeLine/Consumer/ConsumerService.cs new file mode 100644 index 0000000..92c9576 --- /dev/null +++ b/TestPipeLine/Consumer/ConsumerService.cs @@ -0,0 +1,47 @@ +using System.Threading; +using ZeroLevel; +using ZeroLevel.Network; +using ZeroLevel.Services.Applications; + +namespace Consumer +{ + public class ConsumerService + : BaseZeroService + { + protected override void StartAction() + { + ReadServiceInfo(); + AutoregisterInboxes(UseHost()); + } + + protected override void StopAction() + { + } + + [ExchangeReplierWithoutArg("meta")] + public ZeroServiceInfo GetCounter(ISocketClient client) + { + return ServiceInfo; + } + + private long _proceed = 0; + + [ExchangeReplierWithoutArg("Proceed")] + public long GetProceedItemsCount(ISocketClient client) + { + return _proceed; + } + + [ExchangeReplierWithoutArg("ping")] + public bool Ping(ISocketClient client) + { + return true; + } + + [ExchangeReplier("handle")] + public bool Handler(ISocketClient client, int data) + { + return (data ^ Interlocked.Increment(ref _proceed)) % 2 == 0; + } + } +} diff --git a/TestPipeLine/Consumer/Program.cs b/TestPipeLine/Consumer/Program.cs new file mode 100644 index 0000000..c44e298 --- /dev/null +++ b/TestPipeLine/Consumer/Program.cs @@ -0,0 +1,18 @@ +using ZeroLevel; + +namespace Consumer +{ + static class Program + { + static void Main(string[] args) + { + Bootstrap.Startup(args) + .EnableConsoleLog(ZeroLevel.Services.Logging.LogLevel.FullStandart) + .UseDiscovery() + .Run() + .WaitWhileStatus(ZeroServiceStatus.Running) + .Stop(); + Bootstrap.Shutdown(); + } + } +} diff --git a/TestPipeLine/Consumer/Properties/AssemblyInfo.cs b/TestPipeLine/Consumer/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..679a8fd --- /dev/null +++ b/TestPipeLine/Consumer/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Consumer")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Consumer")] +[assembly: AssemblyCopyright("Copyright © 2019")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("931dea89-42d1-4c06-9cb8-a3a0412093d6")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/TestPipeLine/Processor/App.config b/TestPipeLine/Processor/App.config new file mode 100644 index 0000000..c0eba0f --- /dev/null +++ b/TestPipeLine/Processor/App.config @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/TestPipeLine/Processor/Processor.csproj b/TestPipeLine/Processor/Processor.csproj new file mode 100644 index 0000000..3e65033 --- /dev/null +++ b/TestPipeLine/Processor/Processor.csproj @@ -0,0 +1,60 @@ + + + + + Debug + AnyCPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15} + Exe + Processor + Processor + v4.7.2 + 512 + true + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + + {06c9e60e-d449-41a7-9bf0-a829aaf5d214} + ZeroLevel + + + + \ No newline at end of file diff --git a/TestPipeLine/Processor/ProcessorService.cs b/TestPipeLine/Processor/ProcessorService.cs new file mode 100644 index 0000000..4aa0fd1 --- /dev/null +++ b/TestPipeLine/Processor/ProcessorService.cs @@ -0,0 +1,49 @@ +using System; +using System.Threading; +using ZeroLevel; +using ZeroLevel.Network; +using ZeroLevel.Services.Applications; + +namespace Processor +{ + public class ProcessorService + : BaseZeroService + { + protected override void StartAction() + { + ReadServiceInfo(); + AutoregisterInboxes(UseHost()); + } + + protected override void StopAction() + { + } + + [ExchangeReplierWithoutArg("meta")] + public ZeroServiceInfo GetCounter(ISocketClient client) + { + return ServiceInfo; + } + + private long _proceed = 0; + + [ExchangeReplierWithoutArg("Proceed")] + public long GetProceedItemsCount(ISocketClient client) + { + return _proceed; + } + + [ExchangeReplierWithoutArg("ping")] + public bool Ping(ISocketClient client) + { + return true; + } + + [ExchangeHandler("handle")] + public void Handler(ISocketClient client, int data) + { + var next = (int)(data ^ Interlocked.Increment(ref _proceed)); + Exchange.Request("test.consumer", "handle", next, result => { }); + } + } +} diff --git a/TestPipeLine/Processor/Program.cs b/TestPipeLine/Processor/Program.cs new file mode 100644 index 0000000..62556ed --- /dev/null +++ b/TestPipeLine/Processor/Program.cs @@ -0,0 +1,18 @@ +using ZeroLevel; + +namespace Processor +{ + static class Program + { + static void Main(string[] args) + { + Bootstrap.Startup(args) + .EnableConsoleLog(ZeroLevel.Services.Logging.LogLevel.FullStandart) + .UseDiscovery() + .Run() + .WaitWhileStatus(ZeroServiceStatus.Running) + .Stop(); + Bootstrap.Shutdown(); + } + } +} diff --git a/TestPipeLine/Processor/Properties/AssemblyInfo.cs b/TestPipeLine/Processor/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..1f15d25 --- /dev/null +++ b/TestPipeLine/Processor/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Processor")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Processor")] +[assembly: AssemblyCopyright("Copyright © 2019")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("806d0160-a4bf-4881-af33-308f4fef8e15")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/TestPipeLine/Source/App.config b/TestPipeLine/Source/App.config new file mode 100644 index 0000000..16180a0 --- /dev/null +++ b/TestPipeLine/Source/App.config @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/TestPipeLine/Source/Program.cs b/TestPipeLine/Source/Program.cs new file mode 100644 index 0000000..cb7a349 --- /dev/null +++ b/TestPipeLine/Source/Program.cs @@ -0,0 +1,18 @@ +using ZeroLevel; + +namespace Source +{ + static class Program + { + static void Main(string[] args) + { + Bootstrap.Startup(args) + .EnableConsoleLog(ZeroLevel.Services.Logging.LogLevel.FullStandart) + .UseDiscovery() + .Run() + .WaitWhileStatus(ZeroServiceStatus.Running) + .Stop(); + Bootstrap.Shutdown(); + } + } +} diff --git a/TestPipeLine/Source/Properties/AssemblyInfo.cs b/TestPipeLine/Source/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..b5131af --- /dev/null +++ b/TestPipeLine/Source/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Source")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Source")] +[assembly: AssemblyCopyright("Copyright © 2019")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("a1d60994-5744-47d1-b684-c1c0b782998b")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/TestPipeLine/Source/Source.csproj b/TestPipeLine/Source/Source.csproj new file mode 100644 index 0000000..188e0eb --- /dev/null +++ b/TestPipeLine/Source/Source.csproj @@ -0,0 +1,60 @@ + + + + + Debug + AnyCPU + {A1D60994-5744-47D1-B684-C1C0B782998B} + Exe + Source + Source + v4.7.2 + 512 + true + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + + {06c9e60e-d449-41a7-9bf0-a829aaf5d214} + ZeroLevel + + + + \ No newline at end of file diff --git a/TestPipeLine/Source/SourceService.cs b/TestPipeLine/Source/SourceService.cs new file mode 100644 index 0000000..9c9befb --- /dev/null +++ b/TestPipeLine/Source/SourceService.cs @@ -0,0 +1,50 @@ +using System; +using System.Threading; +using ZeroLevel; +using ZeroLevel.Network; +using ZeroLevel.Services.Applications; + +namespace Source +{ + public class SourceService + : BaseZeroService + { + protected override void StartAction() + { + ReadServiceInfo(); + AutoregisterInboxes(UseHost()); + + Sheduller.RemindEvery(TimeSpan.FromMilliseconds(10), () => + { + if (Exchange.Send("test.processor", "handle", Environment.TickCount)) + { + Interlocked.Increment(ref _proceed); + } + }); + } + + protected override void StopAction() + { + } + + [ExchangeReplierWithoutArg("meta")] + public ZeroServiceInfo GetCounter(ISocketClient client) + { + return ServiceInfo; + } + + private long _proceed = 0; + + [ExchangeReplierWithoutArg("Proceed")] + public long GetProceedItemsCount(ISocketClient client) + { + return _proceed; + } + + [ExchangeReplierWithoutArg("ping")] + public bool Ping(ISocketClient client) + { + return true; + } + } +} diff --git a/TestPipeLine/Watcher/App.config b/TestPipeLine/Watcher/App.config new file mode 100644 index 0000000..ed6214b --- /dev/null +++ b/TestPipeLine/Watcher/App.config @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/TestPipeLine/Watcher/Program.cs b/TestPipeLine/Watcher/Program.cs new file mode 100644 index 0000000..0c9ff32 --- /dev/null +++ b/TestPipeLine/Watcher/Program.cs @@ -0,0 +1,17 @@ +using ZeroLevel; + +namespace Watcher +{ + static class Program + { + static void Main(string[] args) + { + Bootstrap.Startup(args) + .UseDiscovery() + .Run() + .WaitWhileStatus(ZeroServiceStatus.Running) + .Stop(); + Bootstrap.Shutdown(); + } + } +} diff --git a/TestPipeLine/Watcher/Properties/AssemblyInfo.cs b/TestPipeLine/Watcher/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..3ae2f28 --- /dev/null +++ b/TestPipeLine/Watcher/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Watcher")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Watcher")] +[assembly: AssemblyCopyright("Copyright © 2019")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("6e04f32a-fb90-41d2-9059-f37311f813b3")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/TestPipeLine/Watcher/Watcher.csproj b/TestPipeLine/Watcher/Watcher.csproj new file mode 100644 index 0000000..4250320 --- /dev/null +++ b/TestPipeLine/Watcher/Watcher.csproj @@ -0,0 +1,60 @@ + + + + + Debug + AnyCPU + {6E04F32A-FB90-41D2-9059-F37311F813B3} + Exe + Watcher + Watcher + v4.7.2 + 512 + true + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + + {06c9e60e-d449-41a7-9bf0-a829aaf5d214} + ZeroLevel + + + + \ No newline at end of file diff --git a/TestPipeLine/Watcher/WatcherService.cs b/TestPipeLine/Watcher/WatcherService.cs new file mode 100644 index 0000000..cc01b2c --- /dev/null +++ b/TestPipeLine/Watcher/WatcherService.cs @@ -0,0 +1,98 @@ +using System; +using System.Linq; +using System.Text; +using ZeroLevel; +using ZeroLevel.Network; +using ZeroLevel.Services.Applications; + +namespace Watcher +{ + public class WatcherService + : BaseZeroService + { + protected override void StartAction() + { + ReadServiceInfo(); + AutoregisterInboxes(UseHost()); + + Sheduller.RemindEvery(TimeSpan.FromMilliseconds(350), () => + { + var sb = new StringBuilder(); + sb.AppendLine("—————————————————————————————————————————————————————————————————————————"); + + var success = Exchange.RequestBroadcastByGroup("Test", "meta", records => + { + foreach (var record in records.OrderBy(r=>r.Name)) + { + sb.Append(record.Name); + sb.Append(" ("); + sb.Append(record.Version); + sb.AppendLine(")"); + sb.AppendLine(record.ServiceKey); + sb.AppendLine(record.ServiceType); + sb.AppendLine(record.ServiceGroup); + sb.AppendLine(); + } + }); + if (!success) + { + Log.Warning("[WatcherService] Can't send broadcast reqeust for meta"); + } + + success = Exchange.RequestBroadcastByType("Sources", "Proceed", records => + { + sb.AppendLine("-----------------------------------------------------------------------------"); + sb.Append("Source send items: "); + sb.AppendLine(records.Sum().ToString()); + }); + if (!success) + { + Log.Warning("[WatcherService] Can't send broadcast reqeust to 'Sources'"); + } + + success = Exchange.RequestBroadcastByType("Core", "Proceed", records => + { + sb.AppendLine("-----------------------------------------------------------------------------"); + sb.Append("Proccessor handle and send items: "); + sb.AppendLine(records.Sum().ToString()); + }); + if (!success) + { + Log.Warning("[WatcherService] Can't send broadcast reqeust to 'Core'"); + } + + success = Exchange.RequestBroadcastByType("Destination", "Proceed", records => + { + sb.AppendLine("-----------------------------------------------------------------------------"); + sb.Append("Consumer catch: "); + sb.AppendLine(records.Sum().ToString()); + }); + if (!success) + { + Log.Warning("[WatcherService] Can't send broadcast reqeust to 'Destination'"); + } + + sb.AppendLine("—————————————————————————————————————————————————————————————————————————"); + sb.AppendLine(); + Console.Clear(); + Console.WriteLine($"Watch info: \r\n{sb}"); + }); + } + + protected override void StopAction() + { + } + + [ExchangeReplierWithoutArg("meta")] + public ZeroServiceInfo GetCounter(ISocketClient client) + { + return ServiceInfo; + } + + [ExchangeReplierWithoutArg("ping")] + public bool Ping(ISocketClient client) + { + return true; + } + } +} diff --git a/WebSemanticService/semantic/Semantic.API.Proxy/Semantic.API.Proxy.csproj b/WebSemanticService/semantic/Semantic.API.Proxy/Semantic.API.Proxy.csproj index 23948d3..c0097a3 100644 --- a/WebSemanticService/semantic/Semantic.API.Proxy/Semantic.API.Proxy.csproj +++ b/WebSemanticService/semantic/Semantic.API.Proxy/Semantic.API.Proxy.csproj @@ -47,8 +47,8 @@ - - ..\..\packages\ZeroLevel.2.0.8\lib\netstandard2.0\ZeroLevel.dll + + C:\Users\a.bozhenov\Desktop\SEOPortal\Utils\Semantic\packages\ZeroLevel.3.0.0\lib\netstandard2.0\ZeroLevel.dll diff --git a/WebSemanticService/semantic/Semantic.API.Proxy/packages.config b/WebSemanticService/semantic/Semantic.API.Proxy/packages.config index 6f6fd07..7fcda0d 100644 --- a/WebSemanticService/semantic/Semantic.API.Proxy/packages.config +++ b/WebSemanticService/semantic/Semantic.API.Proxy/packages.config @@ -1,5 +1,5 @@  - + \ No newline at end of file diff --git a/ZeroLevel.SqlServer/ZeroLevel.SqlServer.csproj b/ZeroLevel.SqlServer/ZeroLevel.SqlServer.csproj index fb33bdd..82de616 100644 --- a/ZeroLevel.SqlServer/ZeroLevel.SqlServer.csproj +++ b/ZeroLevel.SqlServer/ZeroLevel.SqlServer.csproj @@ -76,8 +76,8 @@ - - ..\packages\ZeroLevel.2.0.8\lib\netstandard2.0\ZeroLevel.dll + + C:\Users\a.bozhenov\Desktop\SEOPortal\Utils\Semantic\packages\ZeroLevel.3.0.0\lib\netstandard2.0\ZeroLevel.dll diff --git a/ZeroLevel.SqlServer/packages.config b/ZeroLevel.SqlServer/packages.config index a985e30..38f5c89 100644 --- a/ZeroLevel.SqlServer/packages.config +++ b/ZeroLevel.SqlServer/packages.config @@ -1,4 +1,4 @@  - + \ No newline at end of file diff --git a/ZeroLevel.sln b/ZeroLevel.sln index 49e866d..3af0526 100644 --- a/ZeroLevel.sln +++ b/ZeroLevel.sln @@ -19,6 +19,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FileTransferServer", "FileT EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ZeroLevel.SqlServer", "ZeroLevel.SqlServer\ZeroLevel.SqlServer.csproj", "{A8AD956F-1559-45EC-A7DB-42290494E2C5}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "TestPipeLine", "TestPipeLine", "{03ACF314-93FC-46FE-9FB8-3F46A01A5A15}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Watcher", "TestPipeLine\Watcher\Watcher.csproj", "{6E04F32A-FB90-41D2-9059-F37311F813B3}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Source", "TestPipeLine\Source\Source.csproj", "{A1D60994-5744-47D1-B684-C1C0B782998B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Processor", "TestPipeLine\Processor\Processor.csproj", "{806D0160-A4BF-4881-AF33-308F4FEF8E15}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Consumer", "TestPipeLine\Consumer\Consumer.csproj", "{931DEA89-42D1-4C06-9CB8-A3A0412093D6}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -113,6 +123,54 @@ Global {A8AD956F-1559-45EC-A7DB-42290494E2C5}.Release|x64.Build.0 = Release|x64 {A8AD956F-1559-45EC-A7DB-42290494E2C5}.Release|x86.ActiveCfg = Release|x86 {A8AD956F-1559-45EC-A7DB-42290494E2C5}.Release|x86.Build.0 = Release|x86 + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|x64.ActiveCfg = Debug|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|x64.Build.0 = Debug|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|x86.ActiveCfg = Debug|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Debug|x86.Build.0 = Debug|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|Any CPU.Build.0 = Release|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|x64.ActiveCfg = Release|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|x64.Build.0 = Release|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|x86.ActiveCfg = Release|Any CPU + {6E04F32A-FB90-41D2-9059-F37311F813B3}.Release|x86.Build.0 = Release|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|x64.ActiveCfg = Debug|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|x64.Build.0 = Debug|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|x86.ActiveCfg = Debug|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Debug|x86.Build.0 = Debug|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|Any CPU.Build.0 = Release|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|x64.ActiveCfg = Release|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|x64.Build.0 = Release|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|x86.ActiveCfg = Release|Any CPU + {A1D60994-5744-47D1-B684-C1C0B782998B}.Release|x86.Build.0 = Release|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|Any CPU.Build.0 = Debug|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|x64.ActiveCfg = Debug|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|x64.Build.0 = Debug|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|x86.ActiveCfg = Debug|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Debug|x86.Build.0 = Debug|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|Any CPU.ActiveCfg = Release|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|Any CPU.Build.0 = Release|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|x64.ActiveCfg = Release|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|x64.Build.0 = Release|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|x86.ActiveCfg = Release|Any CPU + {806D0160-A4BF-4881-AF33-308F4FEF8E15}.Release|x86.Build.0 = Release|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|x64.ActiveCfg = Debug|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|x64.Build.0 = Debug|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|x86.ActiveCfg = Debug|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Debug|x86.Build.0 = Debug|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|Any CPU.Build.0 = Release|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|x64.ActiveCfg = Release|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|x64.Build.0 = Release|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|x86.ActiveCfg = Release|Any CPU + {931DEA89-42D1-4C06-9CB8-A3A0412093D6}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -120,6 +178,10 @@ Global GlobalSection(NestedProjects) = preSolution {F8B727E1-340D-4096-A784-E570AE13FABC} = {FC074553-5D9F-4DF1-9130-7092E37DE768} {9BF859EE-EF90-4B5B-8576-E26770F2F792} = {FC074553-5D9F-4DF1-9130-7092E37DE768} + {6E04F32A-FB90-41D2-9059-F37311F813B3} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15} + {A1D60994-5744-47D1-B684-C1C0B782998B} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15} + {806D0160-A4BF-4881-AF33-308F4FEF8E15} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15} + {931DEA89-42D1-4C06-9CB8-A3A0412093D6} = {03ACF314-93FC-46FE-9FB8-3F46A01A5A15} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A65DB16F-877D-4586-A9F3-8BBBFBAF5CEB} diff --git a/ZeroLevel/Services/Network/Contracts/IExchange.cs b/ZeroLevel/Services/Network/Contracts/IExchange.cs new file mode 100644 index 0000000..7094c8e --- /dev/null +++ b/ZeroLevel/Services/Network/Contracts/IExchange.cs @@ -0,0 +1,22 @@ +using System; +using System.Net; + +namespace ZeroLevel.Network +{ + public interface IExchange + : IClientSet, IDisposable + { + void UseDiscovery(); + void UseDiscovery(string endpoint); + void UseDiscovery(IPEndPoint endpoint); + + IRouter UseHost(); + IRouter UseHost(int port); + IRouter UseHost(IPEndPoint endpoint); + + IServiceRoutesStorage RoutesStorage { get; } + + ExClient GetConnection(string alias); + ExClient GetConnection(IPEndPoint endpoint); + } +} diff --git a/ZeroLevel/Services/Network/Exchange.cs b/ZeroLevel/Services/Network/Exchange.cs index bfd3c08..9881ae0 100644 --- a/ZeroLevel/Services/Network/Exchange.cs +++ b/ZeroLevel/Services/Network/Exchange.cs @@ -9,33 +9,17 @@ using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network { - public interface IExchange - : IClientSet, IDisposable - { - void UseDiscovery(); - void UseDiscovery(string endpoint); - void UseDiscovery(IPEndPoint endpoint); - - IRouter UseHost(); - IRouter UseHost(int port); - IRouter UseHost(IPEndPoint endpoint); - - IServiceRoutesStorage RoutesStorage { get; } - - ExClient GetConnection(string alias); - ExClient GetConnection(IPEndPoint endpoint); - } - /// /// Provides data exchange between services /// internal sealed class Exchange : IExchange { - private readonly ServiceRouteStorage _aliases = new ServiceRouteStorage(); + private readonly ServiceRouteStorage _dicovery_aliases = new ServiceRouteStorage(); + private readonly ServiceRouteStorage _user_aliases = new ServiceRouteStorage(); private readonly ExClientServerCachee _cachee = new ExClientServerCachee(); - public IServiceRoutesStorage RoutesStorage => _aliases; + public IServiceRoutesStorage RoutesStorage => _user_aliases; private readonly IZeroService _owner; #region Ctor @@ -325,8 +309,11 @@ namespace ZeroLevel.Network try { var clients = GetClientEnumerator(alias).ToList(); - callback(_RequestBroadcast(clients, inbox)); - return true; + if (clients.Count > 0) + { + callback(_RequestBroadcast(clients, inbox)); + return true; + } } catch (Exception ex) { @@ -344,8 +331,11 @@ namespace ZeroLevel.Network try { var clients = GetClientEnumerator(alias).ToList(); - callback(_RequestBroadcast(clients, inbox, data)); - return true; + if (clients.Count > 0) + { + callback(_RequestBroadcast(clients, inbox, data)); + return true; + } } catch (Exception ex) { @@ -362,8 +352,11 @@ namespace ZeroLevel.Network try { var clients = GetClientEnumeratorByGroup(serviceGroup).ToList(); - callback(_RequestBroadcast(clients, inbox)); - return true; + if (clients.Count > 0) + { + callback(_RequestBroadcast(clients, inbox)); + return true; + } } catch (Exception ex) { @@ -381,8 +374,11 @@ namespace ZeroLevel.Network try { var clients = GetClientEnumeratorByGroup(serviceGroup).ToList(); - callback(_RequestBroadcast(clients, inbox, data)); - return true; + if (clients.Count > 0) + { + callback(_RequestBroadcast(clients, inbox, data)); + return true; + } } catch (Exception ex) { @@ -399,8 +395,11 @@ namespace ZeroLevel.Network try { var clients = GetClientEnumeratorByType(serviceType).ToList(); - callback(_RequestBroadcast(clients, inbox)); - return true; + if (clients.Count > 0) + { + callback(_RequestBroadcast(clients, inbox)); + return true; + } } catch (Exception ex) { @@ -438,8 +437,11 @@ namespace ZeroLevel.Network try { var clients = GetClientEnumeratorByType(serviceType).ToList(); - callback(_RequestBroadcast(clients, inbox, data)); - return true; + if (clients.Count > 0) + { + callback(_RequestBroadcast(clients, inbox, data)); + return true; + } } catch (Exception ex) { @@ -460,7 +462,7 @@ namespace ZeroLevel.Network try { var discoveryEndpoint = Configuration.Default.First("discovery"); - _aliases.Set(BaseSocket.DISCOVERY_ALIAS, NetUtils.CreateIPEndPoint(discoveryEndpoint)); + _user_aliases.Set(BaseSocket.DISCOVERY_ALIAS, NetUtils.CreateIPEndPoint(discoveryEndpoint)); RestartDiscoveryTasks(); } catch (Exception ex) @@ -473,7 +475,7 @@ namespace ZeroLevel.Network { try { - _aliases.Set(BaseSocket.DISCOVERY_ALIAS, NetUtils.CreateIPEndPoint(discoveryEndpoint)); + _user_aliases.Set(BaseSocket.DISCOVERY_ALIAS, NetUtils.CreateIPEndPoint(discoveryEndpoint)); RestartDiscoveryTasks(); } catch (Exception ex) @@ -486,7 +488,7 @@ namespace ZeroLevel.Network { try { - _aliases.Set(BaseSocket.DISCOVERY_ALIAS, discoveryEndpoint); + _user_aliases.Set(BaseSocket.DISCOVERY_ALIAS, discoveryEndpoint); RestartDiscoveryTasks(); } catch (Exception ex) @@ -511,7 +513,7 @@ namespace ZeroLevel.Network private void RegisterServicesInDiscovery() { - var discovery_endpoint = _aliases.Get(BaseSocket.DISCOVERY_ALIAS); + var discovery_endpoint = _user_aliases.Get(BaseSocket.DISCOVERY_ALIAS); if (discovery_endpoint.Success) { var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true); @@ -542,7 +544,7 @@ namespace ZeroLevel.Network private void UpdateServiceListFromDiscovery() { - var discovery_endpoint = _aliases.Get(BaseSocket.DISCOVERY_ALIAS); + var discovery_endpoint = _user_aliases.Get(BaseSocket.DISCOVERY_ALIAS); if (discovery_endpoint.Success) { var discoveryClient = _cachee.GetClient(discovery_endpoint.Value, true); @@ -556,25 +558,34 @@ namespace ZeroLevel.Network return; } var endpoints = new HashSet(); - foreach (var service in records) + _dicovery_aliases.BeginUpdate(); + try { - endpoints.Clear(); - foreach (var ep in service.Endpoints) + foreach (var service in records) { - try - { - var endpoint = NetUtils.CreateIPEndPoint(ep); - endpoints.Add(endpoint); - } - catch + endpoints.Clear(); + foreach (var ep in service.Endpoints) { - Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Can't parse address {ep} as IPEndPoint"); + try + { + var endpoint = NetUtils.CreateIPEndPoint(ep); + endpoints.Add(endpoint); + } + catch + { + Log.SystemWarning($"[Exchange.UpdateServiceListFromDiscovery] Can't parse address {ep} as IPEndPoint"); + } } + _dicovery_aliases.Set(service.ServiceKey, + service.ServiceType, + service.ServiceGroup, + endpoints); } - _aliases.Set(service.ServiceKey, - service.ServiceType, - service.ServiceGroup, - endpoints); + _dicovery_aliases.Commit(); + } + catch + { + _dicovery_aliases.Rollback(); } }); if (!ir.Success) @@ -592,19 +603,30 @@ namespace ZeroLevel.Network public ExClient GetConnection(string alias) { - var address = _aliases.Get(alias); - if (address.Success) - { - return _cachee.GetClient(address.Value, true); - } - try + if (_update_discovery_table_task != -1) { - var endpoint = NetUtils.CreateIPEndPoint(alias); - return _cachee.GetClient(endpoint, true); + var address = _dicovery_aliases.Get(alias); + if (address.Success) + { + return _cachee.GetClient(address.Value, true); + } } - catch (Exception ex) + else { - Log.SystemError(ex, "[Exchange.GetConnection]"); + var address = _user_aliases.Get(alias); + if (address.Success) + { + return _cachee.GetClient(address.Value, true); + } + try + { + var endpoint = NetUtils.CreateIPEndPoint(alias); + return _cachee.GetClient(endpoint, true); + } + catch (Exception ex) + { + Log.SystemError(ex, "[Exchange.GetConnection]"); + } } return null; } @@ -640,21 +662,111 @@ namespace ZeroLevel.Network #endregion #region Private + private IEnumerable GetAllAddresses(string serviceKey) + { + if (_update_discovery_table_task != -1) + { + var dr = _dicovery_aliases.GetAll(serviceKey); + var ur = _user_aliases.GetAll(serviceKey); + if (dr.Success && ur.Success) + { + return Enumerable.Union(dr.Value, ur.Value); + } + else if (dr.Success) + { + return dr.Value; + } + else if (ur.Success) + { + return ur.Value; + } + } + else + { + var result = _user_aliases.GetAll(serviceKey); + if (result.Success) + { + return result.Value; + } + } + return null; + } + + private IEnumerable GetAllAddressesByType(string serviceType) + { + if (_update_discovery_table_task != -1) + { + var dr = _dicovery_aliases.GetAllByType(serviceType); + var ur = _user_aliases.GetAllByType(serviceType); + if (dr.Success && ur.Success) + { + return Enumerable.Union(dr.Value, ur.Value); + } + else if (dr.Success) + { + return dr.Value; + } + else if (ur.Success) + { + return ur.Value; + } + } + else + { + var result = _user_aliases.GetAllByType(serviceType); + if (result.Success) + { + return result.Value; + } + } + return null; + } + + private IEnumerable GetAllAddressesByGroup(string serviceGroup) + { + if (_update_discovery_table_task != -1) + { + var dr = _dicovery_aliases.GetAllByGroup(serviceGroup); + var ur = _user_aliases.GetAllByGroup(serviceGroup); + if (dr.Success && ur.Success) + { + return Enumerable.Union(dr.Value, ur.Value); + } + else if (dr.Success) + { + return dr.Value; + } + else if (ur.Success) + { + return ur.Value; + } + } + else + { + var result = _user_aliases.GetAllByGroup(serviceGroup); + if (result.Success) + { + return result.Value; + } + } + return null; + } + private IEnumerable GetClientEnumerator(string serviceKey) { - InvokeResult> candidates; + IEnumerable candidates; try { - candidates = _aliases.GetAll(serviceKey); + candidates = GetAllAddresses(serviceKey); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.GetClientEnumerator] Error when trying get endpoints for service key '{serviceKey}'"); candidates = null; } - if (candidates != null && candidates.Success && candidates.Value.Any()) + if (candidates != null && candidates.Any()) { - foreach (var endpoint in candidates.Value) + foreach (var endpoint in candidates) { ExClient transport; try @@ -677,19 +789,19 @@ namespace ZeroLevel.Network private IEnumerable GetClientEnumeratorByType(string serviceType) { - InvokeResult> candidates; + IEnumerable candidates; try { - candidates = _aliases.GetAllByType(serviceType); + candidates = GetAllAddressesByType(serviceType); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByType] Error when trying get endpoints for service type '{serviceType}'"); candidates = null; } - if (candidates != null && candidates.Success && candidates.Value.Any()) + if (candidates != null && candidates.Any()) { - foreach (var endpoint in candidates.Value) + foreach (var endpoint in candidates) { ExClient transport; try @@ -712,19 +824,19 @@ namespace ZeroLevel.Network private IEnumerable GetClientEnumeratorByGroup(string serviceGroup) { - InvokeResult> candidates; + IEnumerable candidates; try { - candidates = _aliases.GetAllByGroup(serviceGroup); + candidates = GetAllAddressesByGroup(serviceGroup); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.GetClientEnumeratorByGroup] Error when trying get endpoints for service group '{serviceGroup}'"); candidates = null; } - if (candidates != null && candidates.Success && candidates.Value.Any()) + if (candidates != null && candidates.Any()) { - foreach (var service in candidates.Value) + foreach (var service in candidates) { ExClient transport; try @@ -752,23 +864,23 @@ namespace ZeroLevel.Network /// true - service called succesfully private bool CallService(string serviceKey, Func callHandler) { - InvokeResult> candidates; + IEnumerable candidates; try { - candidates = _aliases.GetAll(serviceKey); + candidates = GetAllAddresses(serviceKey); } catch (Exception ex) { Log.SystemError(ex, $"[Exchange.CallService] Error when trying get endpoints for service key '{serviceKey}'"); return false; } - if (candidates == null || !candidates.Success || candidates.Value.Any() == false) + if (candidates == null || candidates.Any() == false) { Log.Debug($"[Exchange.CallService] Not found endpoints for service key '{serviceKey}'"); return false; } var success = false; - foreach (var endpoint in candidates.Value) + foreach (var endpoint in candidates) { ExClient transport; try diff --git a/ZeroLevel/Services/Network/ServiceRouteStorage.cs b/ZeroLevel/Services/Network/ServiceRouteStorage.cs index 8cdb01c..ae47af1 100644 --- a/ZeroLevel/Services/Network/ServiceRouteStorage.cs +++ b/ZeroLevel/Services/Network/ServiceRouteStorage.cs @@ -36,10 +36,18 @@ namespace ZeroLevel.Network public void Set(IPEndPoint endpoint) { + var key = $"{endpoint.Address}:{endpoint.Port}"; + + if (_in_transaction == 1) + { + TransAppendByKeys(key, endpoint); + _tran_endpoints[endpoint] = new string[] { key, null, null }; + return; + } + _lock.EnterWriteLock(); try { - var key = $"{endpoint.Address}:{endpoint.Port}"; if (_endpoints.ContainsKey(endpoint)) { if (_tableByKey.ContainsKey(key)) @@ -67,6 +75,15 @@ namespace ZeroLevel.Network public void Set(string key, IPEndPoint endpoint) { + key = key.ToUpperInvariant(); + + if (_in_transaction == 1) + { + TransAppendByKeys(key, endpoint); + _tran_endpoints[endpoint] = new string[] { key, null, null }; + return; + } + _lock.EnterWriteLock(); try { @@ -83,7 +100,7 @@ namespace ZeroLevel.Network RemoveLocked(endpoint); } AppendByKeys(key, endpoint); - _endpoints.Add(endpoint, new string[3] { key.ToUpperInvariant(), null, null }); + _endpoints.Add(endpoint, new string[3] { key, null, null }); } finally { @@ -93,6 +110,17 @@ namespace ZeroLevel.Network public void Set(string key, IEnumerable endpoints) { + key = key.ToUpperInvariant(); + if (_in_transaction == 1) + { + foreach (var endpoint in endpoints) + { + TransAppendByKeys(key, endpoint); + _tran_endpoints[endpoint] = new string[] { key, null, null }; + } + return; + } + _lock.EnterWriteLock(); try { @@ -122,22 +150,44 @@ namespace ZeroLevel.Network public void Set(string key, string type, string group, IPEndPoint endpoint) { + if (key == null) + { + key = $"{endpoint.Address}:{endpoint.Port}"; + } + else + { + key = key.ToUpperInvariant(); + } + type = type.ToUpperInvariant(); + group = group.ToUpperInvariant(); + + if (_in_transaction == 1) + { + TransAppendByKeys(key, endpoint); + if (type != null) + { + TransAppendByType(type, endpoint); + } + if (group != null) + { + TransAppendByGroup(group, endpoint); + } + _tran_endpoints[endpoint] = new string[] { key, type, group }; + return; + } + _lock.EnterWriteLock(); try { RemoveLocked(endpoint); - if (key == null) - { - key = $"{endpoint.Address}:{endpoint.Port}"; - } AppendByKeys(key, endpoint); if (type != null) { - AppendByType(key, endpoint); + AppendByType(type, endpoint); } if (group != null) { - AppendByGroup(key, endpoint); + AppendByGroup(group, endpoint); } _endpoints.Add(endpoint, new string[3] { key.ToUpperInvariant(), type.ToUpperInvariant(), group.ToUpperInvariant() }); } @@ -149,6 +199,28 @@ namespace ZeroLevel.Network public void Set(string key, string type, string group, IEnumerable endpoints) { + if (_in_transaction == 1) + { + key = key.ToUpperInvariant(); + type = type.ToUpperInvariant(); + group = group.ToUpperInvariant(); + + foreach (var endpoint in endpoints) + { + TransAppendByKeys(key, endpoint); + if (type != null) + { + TransAppendByType(type, endpoint); + } + if (group != null) + { + TransAppendByGroup(group, endpoint); + } + _tran_endpoints[endpoint] = new string[] { key, type, group }; + } + return; + } + foreach (var ep in endpoints) { RemoveLocked(ep); @@ -235,15 +307,15 @@ namespace ZeroLevel.Network #region Private private void AppendByKeys(string key, IPEndPoint endpoint) { - Append(key.ToUpperInvariant(), endpoint, _tableByKey); + Append(key, endpoint, _tableByKey); } private void AppendByType(string type, IPEndPoint endpoint) { - Append(type.ToUpperInvariant(), endpoint, _tableByTypes); + Append(type, endpoint, _tableByTypes); } private void AppendByGroup(string group, IPEndPoint endpoint) { - Append(group.ToUpperInvariant(), endpoint, _tableByGroups); + Append(group, endpoint, _tableByGroups); } private void Append(string key, IPEndPoint value, Dictionary> dict) { @@ -266,5 +338,91 @@ namespace ZeroLevel.Network } } #endregion + + #region Transactional + private Dictionary> _tran_tableByKey + = new Dictionary>(); + + private Dictionary> _tran_tableByGroups + = new Dictionary>(); + + private Dictionary> _tran_tableByTypes + = new Dictionary>(); + + private Dictionary _tran_endpoints + = new Dictionary(); + + private int _in_transaction = 0; + + internal void BeginUpdate() + { + if (Interlocked.Exchange(ref _in_transaction, 1) == 0) + { + _tran_endpoints.Clear(); + _tran_tableByKey.Clear(); + _tran_tableByGroups.Clear(); + _tran_tableByTypes.Clear(); + } + else + { + throw new System.Exception("Transaction started already"); + } + } + + internal void Commit() + { + if (Interlocked.Exchange(ref _in_transaction, 0) == 1) + { + _lock.EnterWriteLock(); + try + { + _endpoints = _tran_endpoints.Select(pair => pair).ToDictionary(p => p.Key, p => p.Value); + _tableByGroups = _tran_tableByGroups.Select(pair => pair).ToDictionary(p => p.Key, p => new RoundRobinCollection(p.Value)); + _tableByKey = _tran_tableByKey.Select(pair => pair).ToDictionary(p => p.Key, p => new RoundRobinCollection(p.Value)); + _tableByTypes = _tran_tableByTypes.Select(pair => pair).ToDictionary(p => p.Key, p => new RoundRobinCollection(p.Value)); + } + finally + { + _lock.ExitWriteLock(); + } + _tran_endpoints.Clear(); + _tran_tableByKey.Clear(); + _tran_tableByGroups.Clear(); + _tran_tableByTypes.Clear(); + } + } + + internal void Rollback() + { + if (Interlocked.Exchange(ref _in_transaction, 0) == 1) + { + _tran_endpoints.Clear(); + _tran_tableByKey.Clear(); + _tran_tableByGroups.Clear(); + _tran_tableByTypes.Clear(); + } + } + + private void TransAppendByKeys(string key, IPEndPoint endpoint) + { + TransAppend(key.ToUpperInvariant(), endpoint, _tran_tableByKey); + } + private void TransAppendByType(string type, IPEndPoint endpoint) + { + TransAppend(type.ToUpperInvariant(), endpoint, _tran_tableByTypes); + } + private void TransAppendByGroup(string group, IPEndPoint endpoint) + { + TransAppend(group.ToUpperInvariant(), endpoint, _tran_tableByGroups); + } + private void TransAppend(string key, IPEndPoint value, Dictionary> dict) + { + if (!dict.ContainsKey(key)) + { + dict.Add(key, new List()); + } + dict[key].Add(value); + } + #endregion } }