diff --git a/TestApp/MyService.cs b/TestApp/MyService.cs index 209f9fe..7c0efdf 100644 --- a/TestApp/MyService.cs +++ b/TestApp/MyService.cs @@ -18,7 +18,7 @@ namespace TestApp protected override void StartAction() { - var client = Exchange.GetConnection("192.168.51.104:50223"); + var client = Exchange.GetConnection("192.168.51.104:49672"); client?.Request("__service_description__", record => { Log.Info(record.ServiceInfo.ServiceKey); diff --git a/ZeroLevel/Services/Logging/Implementation/TextFileLogger.cs b/ZeroLevel/Services/Logging/Implementation/TextFileLogger.cs index d835757..d672c28 100644 --- a/ZeroLevel/Services/Logging/Implementation/TextFileLogger.cs +++ b/ZeroLevel/Services/Logging/Implementation/TextFileLogger.cs @@ -4,11 +4,11 @@ using System.Globalization; using System.IO; using System.IO.Compression; using System.Linq; -using System.Security.AccessControl; using System.Text; using ZeroLevel.Services.FileSystem; +using ZeroLevel.Services.Logging; -namespace ZeroLevel.Services.Logging.Implementation +namespace ZeroLevel.Logging { public sealed class TextFileLoggerOptions { diff --git a/ZeroLevel/Services/Logging/Log.cs b/ZeroLevel/Services/Logging/Log.cs index ccdca00..84c3702 100644 --- a/ZeroLevel/Services/Logging/Log.cs +++ b/ZeroLevel/Services/Logging/Log.cs @@ -1,4 +1,5 @@ using System; +using ZeroLevel.Logging; using ZeroLevel.Services.Logging; using ZeroLevel.Services.Logging.Implementation; diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs index f5317d0..18b1906 100644 --- a/ZeroLevel/Services/Network/SocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -1,13 +1,8 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; -using System.IO; using System.Net; using System.Net.Sockets; -using System.Runtime.ExceptionServices; using System.Threading; -using System.Threading.Tasks; -using ZeroLevel.Services; using ZeroLevel.Services.Pools; using ZeroLevel.Services.Serialization; @@ -17,17 +12,24 @@ namespace ZeroLevel.Network : BaseSocket, ISocketClient { #region Private - private struct IncomingFrame + private class IncomingFrame { + private IncomingFrame() { } public FrameType type; public int identity; public byte[] data; + + public static IncomingFrame NewFrame() => new IncomingFrame(); } - private struct SendFrame + private class SendFrame { + private SendFrame() { } + public bool isRequest; public int identity; public byte[] data; + + public static SendFrame NewFrame() => new SendFrame(); } private Socket _clientSocket; @@ -47,6 +49,8 @@ namespace ZeroLevel.Network private Thread _receiveThread; private BlockingCollection _incoming_queue = new BlockingCollection(); private BlockingCollection _send_queue = new BlockingCollection(BaseSocket.MAX_SEND_QUEUE_SIZE); + private ObjectPool _incoming_frames_pool = new ObjectPool(() => IncomingFrame.NewFrame()); + private ObjectPool _send_frames_pool = new ObjectPool(() => SendFrame.NewFrame()); #endregion Private public IRouter Router { get; } @@ -105,12 +109,11 @@ namespace ZeroLevel.Network Thread.Sleep(1); } _requests.RegisterForFrame(id, callback, fail); - _send_queue.Add(new SendFrame - { - isRequest = true, - data = data, - identity = id - }); + var sf = _send_frames_pool.Allocate(); + sf.isRequest = true; + sf.identity = id; + sf.data = data; + _send_queue.Add(sf); } } @@ -132,15 +135,14 @@ namespace ZeroLevel.Network { Thread.Sleep(1); } - _send_queue.Add(new SendFrame - { - isRequest = false, - identity = 0, - data = data - }); + var sf = _send_frames_pool.Allocate(); + sf.isRequest = false; + sf.identity = 0; + sf.data = data; + _send_queue.Add(sf); } } - + public void Response(byte[] data, int identity) { if (data == null) throw new ArgumentNullException(nameof(data)); @@ -150,12 +152,11 @@ namespace ZeroLevel.Network { Thread.Sleep(1); } - _send_queue.Add(new SendFrame - { - isRequest = false, - identity = 0, - data = NetworkPacketFactory.Response(data, identity) - }); + var sf = _send_frames_pool.Allocate(); + sf.isRequest = false; + sf.identity = 0; + sf.data = NetworkPacketFactory.Response(data, identity); + _send_queue.Add(sf); } } @@ -184,12 +185,11 @@ namespace ZeroLevel.Network try { if (type == FrameType.KeepAlive) return; - _incoming_queue.Add(new IncomingFrame - { - data = data, - type = type, - identity = identity - }); + var inc_frame = _incoming_frames_pool.Allocate(); + inc_frame.data = data; + inc_frame.type = type; + inc_frame.identity = identity; + _incoming_queue.Add(inc_frame); } catch (Exception ex) { @@ -278,12 +278,10 @@ namespace ZeroLevel.Network _requests.TestForTimeouts(); try { - var info = new SendFrame - { - isRequest = false, - identity = 0, - data = NetworkPacketFactory.KeepAliveMessage() - }; + var info = _send_frames_pool.Allocate(); + info.isRequest = false; + info.identity = 0; + info.data = NetworkPacketFactory.KeepAliveMessage(); _send_queue.Add(info); } catch (Exception ex) @@ -346,6 +344,10 @@ namespace ZeroLevel.Network _incoming_queue.Dispose(); _incoming_queue = new BlockingCollection(); } + if (frame != null) + { + _incoming_frames_pool.Free(frame); + } continue; } try @@ -377,12 +379,16 @@ namespace ZeroLevel.Network { Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame"); } + finally + { + _incoming_frames_pool.Free(frame); + } } } private void SendFramesJob() { - SendFrame frame; + SendFrame frame = null; int unsuccess = 0; while (Status != SocketClientStatus.Disposed && !_send_queue.IsCompleted) { @@ -398,6 +404,10 @@ namespace ZeroLevel.Network _send_queue.Dispose(); _send_queue = new BlockingCollection(); } + if (frame != null) + { + _send_frames_pool.Free(frame); + } continue; } while (_stream?.CanWrite == false || Status != SocketClientStatus.Working) @@ -441,6 +451,10 @@ namespace ZeroLevel.Network Broken(); OnDisconnect(this); } + finally + { + _send_frames_pool.Free(frame); + } } } diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs index 1231203..2b60296 100644 --- a/ZeroLevel/Services/Network/SocketServer.cs +++ b/ZeroLevel/Services/Network/SocketServer.cs @@ -76,9 +76,7 @@ namespace ZeroLevel.Network try { var client_socket = _serverSocket.EndAccept(ar); - _serverSocket.BeginAccept(BeginAcceptCallback, null); _connection_set_lock.EnterWriteLock(); - var connection = new SocketClient(client_socket, _router); connection.OnDisconnect += Connection_OnDisconnect; _connections[connection.Endpoint] = new ExClient(connection); @@ -88,12 +86,24 @@ namespace ZeroLevel.Network catch (Exception ex) { Broken(); - Log.SystemError(ex, "[ZSocketServer] Error with connect accepting"); + Log.SystemError(ex, "[ZSocketServer.BeginAcceptCallback] Error with connect accepting"); } finally { - _connection_set_lock.ExitWriteLock(); + _connection_set_lock.ExitWriteLock(); + } + try + { + _serverSocket.BeginAccept(BeginAcceptCallback, null); } + catch (Exception ex) + { + Log.SystemError(ex, "[ZSocketServer.BeginAcceptCallback] BeginAccept error"); + } + } + else + { + Log.Warning($"Server socket change state to: {Status}"); } } diff --git a/ZeroLevel/Services/Pools/ObjectPool.cs b/ZeroLevel/Services/Pools/ObjectPool.cs index 16b8f61..e4f2691 100644 --- a/ZeroLevel/Services/Pools/ObjectPool.cs +++ b/ZeroLevel/Services/Pools/ObjectPool.cs @@ -33,6 +33,8 @@ namespace ZeroLevel.Services.Pools // than "new T()". private readonly Factory _factory; + public int Count => _items?.Length ?? 0; + public ObjectPool(Factory factory) : this(factory, Environment.ProcessorCount * 2) { }