diff --git a/ZeroLevel/Services/Collections/RoundRobinCollection.cs b/ZeroLevel/Services/Collections/RoundRobinCollection.cs index 624347b..f6555ea 100644 --- a/ZeroLevel/Services/Collections/RoundRobinCollection.cs +++ b/ZeroLevel/Services/Collections/RoundRobinCollection.cs @@ -99,6 +99,25 @@ namespace ZeroLevel.Services.Collections return false; } + public bool MoveNextAndHandle(Action handler) + { + _lock.EnterReadLock(); + try + { + if (_collection.Count > 0) + { + _index = Interlocked.Increment(ref _index) % _collection.Count; + handler.Invoke(Current); + return true; + } + } + finally + { + _lock.ExitReadLock(); + } + return false; + } + public T Current { get diff --git a/ZeroLevel/Services/Utils/MultiHandler.cs b/ZeroLevel/Services/Utils/MultiHandler.cs new file mode 100644 index 0000000..7bc649c --- /dev/null +++ b/ZeroLevel/Services/Utils/MultiHandler.cs @@ -0,0 +1,87 @@ +using System; +using System.Linq; +using System.Collections.Generic; +using System.Threading; +using ZeroLevel.Services.Collections; +using System.Collections.Concurrent; + +namespace ZeroLevel.Services.Utils +{ + public class MultiHandler + : IDisposable + { + private class Executor + { + public Thread Thread; + public Action Handle; + public Func Count; + public bool Canceled; + } + + private RoundRobinCollection _handlers = new RoundRobinCollection(); + private readonly object _lock_handlers = new object(); + + public MultiHandler(Action handler, int size) + { + for (int i = 0; i < size; i++) + { + var t = new Thread((s) => + { + var queue = new BlockingCollection(); + var executor = new Executor + { + Handle = data => + { + queue.Add(data); + }, + Count = () => queue.Count, + Thread = Thread.CurrentThread, + Canceled = false + }; + lock (_lock_handlers) + { + ((RoundRobinCollection)s).Add(executor); + } + try + { + while (!executor.Canceled) + { + T obj; + if (queue.TryTake(out obj, 500)) + { + handler(obj); + } + } + queue.Dispose(); + } + catch (Exception ex) + { + Log.Error(ex, "[MultiHandler] Loop fault"); + } + }); + t.IsBackground = true; + t.Start(_handlers); + } + } + + public void Append(T t) + { + _handlers.MoveNextAndHandle(h => h.Handle(t)); + } + + private int Count => _handlers.Source.Sum(h => h.Count()); + + public void WaitForEmpty() + { + while (Count > 0) + { + Thread.Sleep(300); + } + } + + public void Dispose() + { + foreach (var h in _handlers.Source) h.Canceled = true; + } + } +} diff --git a/ZeroLevel/Services/Utils/Multiprocessor.cs b/ZeroLevel/Services/Utils/Multiprocessor.cs index a9a60df..d6d3398 100644 --- a/ZeroLevel/Services/Utils/Multiprocessor.cs +++ b/ZeroLevel/Services/Utils/Multiprocessor.cs @@ -23,7 +23,7 @@ namespace ZeroLevel.Utils T item; while (!_is_disposed && !_queue.IsCompleted) { - if (_queue.TryTake(out item, 200)) + if (_queue.TryTake(out item, 500)) { handler(item); }