diff --git a/TestPipeLine/Processor/ProcessorService.cs b/TestPipeLine/Processor/ProcessorService.cs index 7e22532..cfa7399 100644 --- a/TestPipeLine/Processor/ProcessorService.cs +++ b/TestPipeLine/Processor/ProcessorService.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Threading; using ZeroLevel; using ZeroLevel.Network; @@ -9,8 +10,14 @@ namespace Processor public class ProcessorService : BaseZeroService { + private Thread _processThread; + protected override void StartAction() { + _processThread = new Thread(HandleIncoming); + _processThread.IsBackground = true; + _processThread.Start(); + ReadServiceInfo(); AutoregisterInboxes(UseHost()); @@ -21,6 +28,16 @@ namespace Processor }); } + private void HandleIncoming() + { + while (_incoming.IsCompleted == false) + { + int data = _incoming.Take(); + var next = (int)(data ^ Interlocked.Increment(ref _proceed)); + Exchange.Request("test.consumer", "handle", next, result => { }); + } + } + protected override void StopAction() { } @@ -45,11 +62,12 @@ namespace Processor return true; } + BlockingCollection _incoming = new BlockingCollection(); + [ExchangeHandler("handle")] public void Handler(ISocketClient client, int data) { - var next = (int)(data ^ Interlocked.Increment(ref _proceed)); - Exchange.Request("test.consumer", "handle", next, result => { }); + _incoming.Add(data); } } } diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index 1095f6f..0ece6bd 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -5,17 +5,16 @@ Infrastructure layer library ogoun ogoun - 3.0.0.0 - New bootstrap concept -New network layer -No backwards compatibility with previous version + 3.0.0.1 + Fixes https://github.com/ogoun/Zero/wiki Copyright Ogoun 2019 https://opensource.org/licenses/MIT https://raw.githubusercontent.com/ogoun/Zero/master/zero.png https://github.com/ogoun/Zero GitHub - 3.0.0 + 3.0.1 + 3.0.0.1