using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; namespace ZeroLevel.Utils { public class Multiprocessor : IDisposable { private BlockingCollection _queue = new BlockingCollection(); private List _threads = new List(); private bool _is_disposed = false; private int _tasks_in_progress = 0; public int Count => _queue.Count + _tasks_in_progress; public Multiprocessor(Action handler, int size, int stackSize = 1024 * 1024) { for (int i = 0; i < size; i++) { var t = new Thread(() => { T item; while (!_is_disposed) { try { if (_queue.TryTake(out item, 500)) { Interlocked.Increment(ref _tasks_in_progress); try { handler(item); } finally { Interlocked.Decrement(ref _tasks_in_progress); } } } catch (Exception ex) { Log.Error(ex, "[Multiprocessor.HandleThread]"); } } }, stackSize); t.IsBackground = true; _threads.Add(t); } foreach (var t in _threads) t.Start(); } public void Append(T t) => _queue.Add(t); public bool WaitForEmpty(int timeoutInMs) { var start = DateTime.UtcNow; while (Count > 0) { if (timeoutInMs > 0) { if ((DateTime.UtcNow - start).TotalMilliseconds > timeoutInMs) { return false; } } Thread.Sleep(100); } return true; } public void Dispose() { _is_disposed = true; try { _queue.CompleteAdding(); _queue.Dispose(); foreach (var thread in _threads) { try { thread.Abort(); } catch { } } foreach (var thread in _threads) { try { thread.Join(); } catch { } } } catch { } } } }