From 3dd4d22c229866538c5507eca7172b9d0380c7c8 Mon Sep 17 00:00:00 2001 From: Ogoun Date: Tue, 5 May 2020 03:26:29 +0300 Subject: [PATCH] Network refactoring --- TestApp/Program.cs | 38 +++++ .../Network/Contracts/ISocketClient.cs | 6 +- ZeroLevel/Services/Network/ExClient.cs | 30 ++-- ZeroLevel/Services/Network/SocketClient.cs | 153 ++++-------------- ZeroLevel/Services/Network/SocketServer.cs | 2 +- .../Services/Network/Utils/RequestBuffer.cs | 1 - ZeroLevel/Services/Network/Utils/Router.cs | 2 +- 7 files changed, 83 insertions(+), 149 deletions(-) diff --git a/TestApp/Program.cs b/TestApp/Program.cs index 04923dd..06a9951 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -1,6 +1,11 @@ using Newtonsoft.Json; +using System; +using System.Net; +using System.Threading; using ZeroLevel; using ZeroLevel.Logging; +using ZeroLevel.Network; +using ZeroLevel.Services.Serialization; namespace TestApp { @@ -30,5 +35,38 @@ namespace TestApp .Stop(); Bootstrap.Shutdown(); } + + static void SimpleCSTest() + { + var server_router = new Router(); + server_router.RegisterInbox("test", (c, line) => + { + Console.WriteLine(line); + }); + + server_router.RegisterInbox("req", (c, line) => + { + Console.WriteLine($"Request: {line}"); + return line.ToUpperInvariant(); + }); + + var server = new SocketServer(new System.Net.IPEndPoint(IPAddress.Any, 666), server_router); + + + var client_router = new Router(); + var client = new SocketClient(new IPEndPoint(IPAddress.Loopback, 666), client_router); + + var frm = FrameFactory.Create("req", MessageSerializer.SerializeCompatible("Hello world")); + + while (Console.KeyAvailable == false) + { + client.Request(frm, data => + { + var line = MessageSerializer.DeserializeCompatible(data); + Console.WriteLine($"Response: {line}"); + }); + Thread.Sleep(2000); + } + } } } \ No newline at end of file diff --git a/ZeroLevel/Services/Network/Contracts/ISocketClient.cs b/ZeroLevel/Services/Network/Contracts/ISocketClient.cs index a2c0480..5cee702 100644 --- a/ZeroLevel/Services/Network/Contracts/ISocketClient.cs +++ b/ZeroLevel/Services/Network/Contracts/ISocketClient.cs @@ -13,8 +13,8 @@ namespace ZeroLevel.Network IRouter Router { get; } - void Send(Frame data); - void Request(Frame data, Action callback, Action fail = null); - void Response(byte[] data, int identity); + bool Send(Frame data); + bool Request(Frame data, Action callback, Action fail = null); + bool Response(byte[] data, int identity); } } diff --git a/ZeroLevel/Services/Network/ExClient.cs b/ZeroLevel/Services/Network/ExClient.cs index 8b1d99b..81bc9cf 100644 --- a/ZeroLevel/Services/Network/ExClient.cs +++ b/ZeroLevel/Services/Network/ExClient.cs @@ -22,8 +22,7 @@ namespace ZeroLevel.Network { try { - _client.Send(FrameFactory.Create(inbox)); - return true; + return _client.Send(FrameFactory.Create(inbox)); } catch (Exception ex) { @@ -36,8 +35,7 @@ namespace ZeroLevel.Network { try { - _client.Send(FrameFactory.Create(inbox, data)); - return true; + return _client.Send(FrameFactory.Create(inbox, data)); } catch (Exception ex) { @@ -50,8 +48,7 @@ namespace ZeroLevel.Network { try { - _client.Send(FrameFactory.Create(BaseSocket.DEFAULT_MESSAGE_INBOX, MessageSerializer.SerializeCompatible(message))); - return true; + return _client.Send(FrameFactory.Create(BaseSocket.DEFAULT_MESSAGE_INBOX, MessageSerializer.SerializeCompatible(message))); } catch (Exception ex) { @@ -64,8 +61,7 @@ namespace ZeroLevel.Network { try { - _client.Send(FrameFactory.Create(inbox, MessageSerializer.SerializeCompatible(message))); - return true; + return _client.Send(FrameFactory.Create(inbox, MessageSerializer.SerializeCompatible(message))); } catch (Exception ex) { @@ -78,8 +74,7 @@ namespace ZeroLevel.Network { try { - _client.Request(FrameFactory.Create(inbox), f => callback(f)); - return true; + return _client.Request(FrameFactory.Create(inbox), f => callback(f)); } catch (Exception ex) { @@ -92,8 +87,7 @@ namespace ZeroLevel.Network { try { - _client.Request(FrameFactory.Create(inbox, data), f => callback(f)); - return true; + return _client.Request(FrameFactory.Create(inbox, data), f => callback(f)); } catch (Exception ex) { @@ -106,8 +100,7 @@ namespace ZeroLevel.Network { try { - _client.Request(FrameFactory.Create(inbox), f => callback(MessageSerializer.DeserializeCompatible(f))); - return true; + return _client.Request(FrameFactory.Create(inbox), f => callback(MessageSerializer.DeserializeCompatible(f))); } catch (Exception ex) { @@ -120,8 +113,7 @@ namespace ZeroLevel.Network { try { - _client.Request(FrameFactory.Create(BaseSocket.DEFAULT_REQUEST_INBOX), f => callback(MessageSerializer.DeserializeCompatible(f))); - return true; + return _client.Request(FrameFactory.Create(BaseSocket.DEFAULT_REQUEST_INBOX), f => callback(MessageSerializer.DeserializeCompatible(f))); } catch (Exception ex) { @@ -134,9 +126,8 @@ namespace ZeroLevel.Network { try { - _client.Request(FrameFactory.Create(inbox, MessageSerializer.SerializeCompatible(request)), + return _client.Request(FrameFactory.Create(inbox, MessageSerializer.SerializeCompatible(request)), f => callback(MessageSerializer.DeserializeCompatible(f))); - return true; } catch (Exception ex) { @@ -149,9 +140,8 @@ namespace ZeroLevel.Network { try { - _client.Request(FrameFactory.Create(BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, MessageSerializer.SerializeCompatible(request)), + return _client.Request(FrameFactory.Create(BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, MessageSerializer.SerializeCompatible(request)), f => callback(MessageSerializer.DeserializeCompatible(f))); - return true; } catch (Exception ex) { diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs index e695a7a..fded75e 100644 --- a/ZeroLevel/Services/Network/SocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -14,32 +14,14 @@ namespace ZeroLevel.Network #region Private #region Queues - private class IncomingFrame + private struct IncomingFrame { - private IncomingFrame() { } public FrameType type; public int identity; public byte[] data; - - public static IncomingFrame NewFrame() => new IncomingFrame(); - } - - private class SendFrame - { - private SendFrame() { } - - public bool isRequest; - public int identity; - public byte[] data; - - public static SendFrame NewFrame() => new SendFrame(); } - private ObjectPool _incoming_frames_pool = new ObjectPool(() => IncomingFrame.NewFrame()); - private ObjectPool _send_frames_pool = new ObjectPool(() => SendFrame.NewFrame()); - private BlockingCollection _incoming_queue = new BlockingCollection(); - private BlockingCollection _send_queue = new BlockingCollection(BaseSocket.MAX_SEND_QUEUE_SIZE); #endregion private Socket _clientSocket; @@ -49,15 +31,12 @@ namespace ZeroLevel.Network private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета private readonly object _reconnection_lock = new object(); private long _heartbeat_key; - private Thread _sendThread; private Thread _receiveThread; #endregion Private public IRouter Router { get; } - public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } } - public SocketClient(IPEndPoint ep, IRouter router) { try @@ -100,10 +79,6 @@ namespace ZeroLevel.Network private void StartInternalThreads() { - _sendThread = new Thread(SendFramesJob); - _sendThread.IsBackground = true; - _sendThread.Start(); - _receiveThread = new Thread(IncomingFramesJob); _receiveThread.IsBackground = true; _receiveThread.Start(); @@ -136,61 +111,27 @@ namespace ZeroLevel.Network public event Action OnDisconnect = (_) => { }; public IPEndPoint Endpoint { get; } - public void Request(Frame frame, Action callback, Action fail = null) + public bool Request(Frame frame, Action callback, Action fail = null) { - if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Request] Socket status: {Status}"); + if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Request] Socket status: {Status}"); var data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id); - - if (!_send_queue.IsAddingCompleted) - { - while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) - { - Thread.Sleep(1); - } - _requests.RegisterForFrame(id, callback, fail); - var sf = _send_frames_pool.Allocate(); - sf.isRequest = true; - sf.identity = id; - sf.data = data; - _send_queue.Add(sf); - } + _requests.RegisterForFrame(id, callback, fail); + return Send(id, true, data); } - public void Send(Frame frame) + public bool Send(Frame frame) { if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Send] Socket status: {Status}"); var data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame)); - - if (!_send_queue.IsAddingCompleted) - { - while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) - { - Thread.Sleep(1); - } - var sf = _send_frames_pool.Allocate(); - sf.isRequest = false; - sf.identity = 0; - sf.data = data; - _send_queue.Add(sf); - } + return Send(0, false, data); } - public void Response(byte[] data, int identity) + public bool Response(byte[] data, int identity) { if (data == null) throw new ArgumentNullException(nameof(data)); if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Response] Socket status: {Status}"); - if (!_send_queue.IsAddingCompleted) - { - while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) - { - Thread.Sleep(1); - } - var sf = _send_frames_pool.Allocate(); - sf.isRequest = false; - sf.identity = 0; - sf.data = NetworkPacketFactory.Response(data, identity); - _send_queue.Add(sf); - } + + return Send(0, false, NetworkPacketFactory.Response(data, identity)); } #endregion @@ -201,11 +142,12 @@ namespace ZeroLevel.Network try { if (type == FrameType.KeepAlive) return; - var inc_frame = _incoming_frames_pool.Allocate(); - inc_frame.data = data; - inc_frame.type = type; - inc_frame.identity = identity; - _incoming_queue.Add(inc_frame); + _incoming_queue.Add(new IncomingFrame + { + data = data, + type = type, + identity = identity + }); } catch (Exception ex) { @@ -274,17 +216,14 @@ namespace ZeroLevel.Network return; } _requests.TestForTimeouts(); - var info = _send_frames_pool.Allocate(); - info.isRequest = false; - info.identity = 0; - info.data = NetworkPacketFactory.KeepAliveMessage(); - _send_queue.Add(info); + + Send(0, false, NetworkPacketFactory.KeepAliveMessage()); } private void IncomingFramesJob() { IncomingFrame frame = default(IncomingFrame); - while (Status != SocketClientStatus.Disposed && !_send_queue.IsCompleted) + while (Status != SocketClientStatus.Disposed) { try { @@ -319,10 +258,6 @@ namespace ZeroLevel.Network { Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame"); } - finally - { - _incoming_frames_pool.Free(frame); - } } } catch (Exception ex) @@ -333,61 +268,33 @@ namespace ZeroLevel.Network _incoming_queue.Dispose(); _incoming_queue = new BlockingCollection(); } - if (frame != null) - { - _incoming_frames_pool.Free(frame); - } continue; } } } - private void SendFramesJob() + private bool Send(int id, bool is_request, byte[] data) { - SendFrame frame = null; - while (Status != SocketClientStatus.Disposed && !_send_queue.IsCompleted) + if (Status == SocketClientStatus.Working) { try { - if (_send_queue.TryTake(out frame, 100)) - { - try - { - if (frame.isRequest) - { - _requests.StartSend(frame.identity); - } - _clientSocket.Send(frame.data, frame.data.Length, SocketFlags.None); - } - catch (Exception ex) - { - Log.SystemError(ex, $"[SocketClient.SendFramesJob] _str_clientSocketeam.Send"); - Broken(); - OnDisconnect(this); - } - finally - { - _send_frames_pool.Free(frame); - } + if (is_request) + { + _requests.StartSend(id); } + var sended = _clientSocket.Send(data, data.Length, SocketFlags.None); + return sended == data.Length; } catch (Exception ex) { - Log.SystemError(ex, "[SocketClient.SendFramesJob] send_queue.TryTake"); - if (Status != SocketClientStatus.Disposed) - { - _send_queue.Dispose(); - _send_queue = new BlockingCollection(); - } - if (frame != null) - { - _send_frames_pool.Free(frame); - } - continue; + Log.SystemError(ex, $"[SocketClient.SendFramesJob] _str_clientSocketeam.Send"); + Broken(); + OnDisconnect(this); } } + return false; } - #endregion public override void Dispose() diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs index 9433680..90b23c5 100644 --- a/ZeroLevel/Services/Network/SocketServer.cs +++ b/ZeroLevel/Services/Network/SocketServer.cs @@ -8,7 +8,7 @@ using ZeroLevel.Network.SDL; namespace ZeroLevel.Network { - internal sealed class SocketServer + public sealed class SocketServer : BaseSocket, IRouter { private Socket _serverSocket; diff --git a/ZeroLevel/Services/Network/Utils/RequestBuffer.cs b/ZeroLevel/Services/Network/Utils/RequestBuffer.cs index 13bc930..2f5d2e7 100644 --- a/ZeroLevel/Services/Network/Utils/RequestBuffer.cs +++ b/ZeroLevel/Services/Network/Utils/RequestBuffer.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Linq; using ZeroLevel.Services.Pools; namespace ZeroLevel.Network diff --git a/ZeroLevel/Services/Network/Utils/Router.cs b/ZeroLevel/Services/Network/Utils/Router.cs index fe52ca1..9f669c4 100644 --- a/ZeroLevel/Services/Network/Utils/Router.cs +++ b/ZeroLevel/Services/Network/Utils/Router.cs @@ -10,7 +10,7 @@ using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network { - internal sealed class Router + public sealed class Router : IRouter { public event Action OnDisconnect = _ => { }; // must be never rised