diff --git a/ZeroLevel/Services/Network/Contracts/IClient.cs b/ZeroLevel/Services/Network/Contracts/IClient.cs index afd8aa7..f06934e 100644 --- a/ZeroLevel/Services/Network/Contracts/IClient.cs +++ b/ZeroLevel/Services/Network/Contracts/IClient.cs @@ -6,7 +6,7 @@ namespace ZeroLevel.Network public interface IClient : IDisposable { - IPEndPoint EndPoint { get; } + IPEndPoint Endpoint { get; } SocketClientStatus Status { get; } IRouter Router { get; } ISocketClient Socket { get; } diff --git a/ZeroLevel/Services/Network/ExClient.cs b/ZeroLevel/Services/Network/ExClient.cs index cb83b0d..7a4730f 100644 --- a/ZeroLevel/Services/Network/ExClient.cs +++ b/ZeroLevel/Services/Network/ExClient.cs @@ -8,7 +8,7 @@ namespace ZeroLevel.Network : IClient, IDisposable { private readonly ISocketClient _client; - public IPEndPoint EndPoint => _client?.Endpoint; + public IPEndPoint Endpoint => _client?.Endpoint; public SocketClientStatus Status => _client.Status; public IRouter Router => _client.Router; public ISocketClient Socket => _client; diff --git a/ZeroLevel/Services/Network/Exchange.cs b/ZeroLevel/Services/Network/Exchange.cs index 6767d14..9cd9d08 100644 --- a/ZeroLevel/Services/Network/Exchange.cs +++ b/ZeroLevel/Services/Network/Exchange.cs @@ -916,7 +916,7 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); + Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); @@ -944,7 +944,7 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.EndPoint}' in broadcast request. Inbox '{inbox}'"); + Log.SystemError(ex, $"[ExClientSet._RequestBroadcast] Error direct request to service '{client.Endpoint}' in broadcast request. Inbox '{inbox}'"); waiter.Signal(); } }); diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs index 12e01e9..39c8cd0 100644 --- a/ZeroLevel/Services/Network/SocketServer.cs +++ b/ZeroLevel/Services/Network/SocketServer.cs @@ -27,7 +27,7 @@ namespace ZeroLevel.Network try { _connection_set_lock.EnterReadLock(); - return _connections.Select(c => c.Value.EndPoint).ToList(); + return _connections.Select(c => c.Value.Endpoint).ToList(); } finally { @@ -101,7 +101,7 @@ namespace ZeroLevel.Network } else { - Log.Warning($"Server socket change state to: {Status}"); + Log.Warning($"[ZSocketServer.BeginAcceptCallback] Server socket change state to: {Status}"); } } diff --git a/ZeroLevel/Services/Network/Utils/Router.cs b/ZeroLevel/Services/Network/Utils/Router.cs index 18a8dc2..fe52ca1 100644 --- a/ZeroLevel/Services/Network/Utils/Router.cs +++ b/ZeroLevel/Services/Network/Utils/Router.cs @@ -220,25 +220,6 @@ namespace ZeroLevel.Network } } - public class Pack - : IBinarySerializable - { - public int Identity; - public long Timestamp; - - public void Deserialize(IBinaryReader reader) - { - this.Identity = reader.ReadInt32(); - this.Timestamp = reader.ReadLong(); - } - - public void Serialize(IBinaryWriter writer) - { - writer.WriteInt32(this.Identity); - writer.WriteLong(this.Timestamp); - } - } - public void HandleRequest(Frame frame, ISocketClient client, int identity, Action handler) { try diff --git a/ZeroLevel/Services/Utils/Multiprocessor.cs b/ZeroLevel/Services/Utils/Multiprocessor.cs index d6d3398..ea98c3e 100644 --- a/ZeroLevel/Services/Utils/Multiprocessor.cs +++ b/ZeroLevel/Services/Utils/Multiprocessor.cs @@ -11,6 +11,8 @@ namespace ZeroLevel.Utils private BlockingCollection _queue = new BlockingCollection(); private List _threads = new List(); private bool _is_disposed = false; + private int _tasks_in_progress = 0; + public int Count => _queue.Count + _tasks_in_progress; public Multiprocessor(Action handler, int size, int stackSize = 1024 * 1024) { @@ -18,20 +20,28 @@ namespace ZeroLevel.Utils { var t = new Thread(() => { - try + T item; + while (!_is_disposed) { - T item; - while (!_is_disposed && !_queue.IsCompleted) + try { if (_queue.TryTake(out item, 500)) { - handler(item); + Interlocked.Increment(ref _tasks_in_progress); + try + { + handler(item); + } + finally + { + Interlocked.Decrement(ref _tasks_in_progress); + } } } - } - catch (Exception ex) - { - Log.Error(ex, "[Multiprocessor.HandleThread]"); + catch (Exception ex) + { + Log.Error(ex, "[Multiprocessor.HandleThread]"); + } } }, stackSize); t.IsBackground = true; @@ -53,7 +63,7 @@ namespace ZeroLevel.Utils public void Dispose() { _queue.CompleteAdding(); - while (_queue.Count > 0) + while (_queue.Count > 0 || _tasks_in_progress > 0) { Thread.Sleep(100); }