You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Zero/ZeroLevel/Services/Utils/Multiprocessor.cs

88 lines
2.6 KiB

5 years ago
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
namespace ZeroLevel.Utils
{
public class Multiprocessor<T>
: IDisposable
{
private BlockingCollection<T> _queue = new BlockingCollection<T>();
private List<Thread> _threads = new List<Thread>();
private volatile bool _is_disposed = false;
5 years ago
private int _tasks_in_progress = 0;
public int Count => _queue.Count + _tasks_in_progress;
5 years ago
5 years ago
public Multiprocessor(Action<T> handler, int size, int stackSize = 1024 * 1024)
5 years ago
{
for (int i = 0; i < size; i++)
{
var t = new Thread(() =>
{
5 years ago
T item;
while (!_is_disposed)
5 years ago
{
5 years ago
try
5 years ago
{
5 years ago
if (_queue.TryTake(out item, 500))
5 years ago
{
5 years ago
Interlocked.Increment(ref _tasks_in_progress);
try
{
handler?.Invoke(item);
5 years ago
}
finally
{
Interlocked.Decrement(ref _tasks_in_progress);
}
5 years ago
}
}
5 years ago
catch (Exception ex)
{
Log.Error(ex, "[Multiprocessor.HandleThread]");
}
5 years ago
}
5 years ago
}, stackSize);
t.IsBackground = true;
_threads.Add(t);
}
foreach (var t in _threads) t.Start();
}
5 years ago
public void Append(T t) { if (!_is_disposed) _queue.Add(t); }
5 years ago
public bool WaitForEmpty(int timeoutInMs)
5 years ago
{
var start = DateTime.UtcNow;
while (Count > 0)
5 years ago
{
if (timeoutInMs > 0)
{
if ((DateTime.UtcNow - start).TotalMilliseconds > timeoutInMs)
{
return false;
}
}
5 years ago
Thread.Sleep(100);
}
return true;
5 years ago
}
public void WaitForEmpty()
{
while (Count > 0)
{
Thread.Sleep(200);
}
}
5 years ago
public void Dispose()
{
_is_disposed = true;
5 years ago
_queue.Dispose();
_threads = null;
5 years ago
}
}
}

Powered by TurnKey Linux.