From 2feed1901defef66868a049a89e05c1aafe7a9e4 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 2 Jul 2019 20:58:11 +0300 Subject: [PATCH] Duplex tcp channel --- TestApp/Program.cs | 4 ++- .../Network/Contracts/ISocketClient.cs | 2 +- ZeroLevel/Services/Network/ExClient.cs | 8 ++--- .../Services/Network/Model/RequestInfo.cs | 9 +++-- ZeroLevel/Services/Network/SocketClient.cs | 12 ++++--- .../Services/Network/Utils/FrameParser.cs | 5 +-- .../Network/Utils/NetworkPacketFactory.cs | 33 ++++++++++--------- .../Services/Network/Utils/RequestBuffer.cs | 6 ++-- ZeroLevel/Services/Network/Utils/Router.cs | 4 +-- 9 files changed, 46 insertions(+), 37 deletions(-) diff --git a/TestApp/Program.cs b/TestApp/Program.cs index 2a0bcfe..bb909ac 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -15,7 +15,9 @@ namespace TestApp .Run(); var router = se.Service.UseHost(8800); - router.RegisterInbox("upper", (c, s) => s.ToUpperInvariant()); + router.RegisterInbox("upper", (c, s) => s.ToUpperInvariant()); + + se.WaitWhileStatus(ZeroServiceStatus.Running) .Stop(); diff --git a/ZeroLevel/Services/Network/Contracts/ISocketClient.cs b/ZeroLevel/Services/Network/Contracts/ISocketClient.cs index 585bfd9..b27bcd0 100644 --- a/ZeroLevel/Services/Network/Contracts/ISocketClient.cs +++ b/ZeroLevel/Services/Network/Contracts/ISocketClient.cs @@ -17,7 +17,7 @@ namespace ZeroLevel.Network void ForceConnect(); void UseKeepAlive(TimeSpan period); void Send(Frame data); - void Request(Frame data, Action callback, Action fail = null); + void Request(Frame data, Action callback, Action fail = null); void Response(byte[] data, int identity); } } diff --git a/ZeroLevel/Services/Network/ExClient.cs b/ZeroLevel/Services/Network/ExClient.cs index c50203e..dbfc061 100644 --- a/ZeroLevel/Services/Network/ExClient.cs +++ b/ZeroLevel/Services/Network/ExClient.cs @@ -66,7 +66,7 @@ namespace ZeroLevel.Network { try { - _client.Request(Frame.FromPool(inbox), f => callback(f.Payload)); + _client.Request(Frame.FromPool(inbox), f => callback(f)); } catch (Exception ex) { @@ -80,7 +80,7 @@ namespace ZeroLevel.Network { try { - _client.Request(Frame.FromPool(inbox, data), f => callback(f.Payload)); + _client.Request(Frame.FromPool(inbox, data), f => callback(f)); } catch (Exception ex) { @@ -94,7 +94,7 @@ namespace ZeroLevel.Network { try { - _client.Request(Frame.FromPool(inbox), f => callback(MessageSerializer.DeserializeCompatible(f.Payload))); + _client.Request(Frame.FromPool(inbox), f => callback(MessageSerializer.DeserializeCompatible(f))); } catch (Exception ex) { @@ -109,7 +109,7 @@ namespace ZeroLevel.Network try { _client.Request(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible(request)), - f => callback(MessageSerializer.DeserializeCompatible(f.Payload))); + f => callback(MessageSerializer.DeserializeCompatible(f))); } catch (Exception ex) { diff --git a/ZeroLevel/Services/Network/Model/RequestInfo.cs b/ZeroLevel/Services/Network/Model/RequestInfo.cs index 5d427eb..e8fbf1d 100644 --- a/ZeroLevel/Services/Network/Model/RequestInfo.cs +++ b/ZeroLevel/Services/Network/Model/RequestInfo.cs @@ -4,14 +4,14 @@ namespace ZeroLevel.Network { internal sealed class RequestInfo { - private Action _handler; + private Action _handler; private Action _failHandler; private long _timestamp; public long Timestamp { get { return _timestamp; } } private bool _sended; public bool Sended { get { return _sended; } } - public void Reset(Action handler, Action failHandler) + public void Reset(Action handler, Action failHandler) { _sended = false; _handler = handler; @@ -24,10 +24,9 @@ namespace ZeroLevel.Network _timestamp = DateTime.UtcNow.Ticks; } - public void Success(Frame frame) + public void Success(byte[] data) { - _handler(frame); - frame?.Release(); + _handler(data); } public void Fail(string reasonPhrase) diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs index ac89d7a..c95de9c 100644 --- a/ZeroLevel/Services/Network/SocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -40,12 +40,13 @@ namespace ZeroLevel.Network public SocketClient(IPEndPoint ep, IRouter router) { - _router = router; + _router = router; Endpoint = ep; _parser.OnIncoming += _parser_OnIncoming; _sendThread = new Thread(SendFramesJob); _sendThread.IsBackground = true; _sendThread.Start(); + EnsureConnection(); } public SocketClient(Socket socket, IRouter router) @@ -53,11 +54,14 @@ namespace ZeroLevel.Network _router = router; _socket_freezed = true; _clientSocket = socket; + _stream = new NetworkStream(_clientSocket, true); Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint; _parser.OnIncoming += _parser_OnIncoming; _sendThread = new Thread(SendFramesJob); _sendThread.IsBackground = true; - _sendThread.Start(); + _sendThread.Start(); + _stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null); + Working(); } #region API @@ -66,7 +70,7 @@ namespace ZeroLevel.Network public event Action OnIncomingData = (_, __, ___) => { }; public IPEndPoint Endpoint { get; } - public void Request(Frame frame, Action callback, Action fail = null) + public void Request(Frame frame, Action callback, Action fail = null) { if (frame == null) throw new ArgumentNullException(nameof(frame)); if (frame != null && false == _send_queue.IsAddingCompleted) @@ -169,7 +173,7 @@ namespace ZeroLevel.Network } break; case FrameType.Response: - _requests.Success(identity, MessageSerializer.Deserialize(data)); + _requests.Success(identity, data); break; } OnIncomingData(this, data, identity); diff --git a/ZeroLevel/Services/Network/Utils/FrameParser.cs b/ZeroLevel/Services/Network/Utils/FrameParser.cs index d02b469..9050664 100644 --- a/ZeroLevel/Services/Network/Utils/FrameParser.cs +++ b/ZeroLevel/Services/Network/Utils/FrameParser.cs @@ -48,6 +48,7 @@ namespace ZeroLevel.Network } private byte[] _size_buf = new byte[4]; + private byte[] _id_buf = new byte[4]; private int offset; public int WriteSize(byte[] buf, int start, int length) @@ -74,11 +75,11 @@ namespace ZeroLevel.Network { for (; offset < 4 && start < length; offset++, start++) { - _size_buf[offset] = buf[start]; + _id_buf[offset] = buf[start]; } if (offset == 4) { - Identity = BitConverter.ToInt32(_size_buf, 0); + Identity = BitConverter.ToInt32(_id_buf, 0); IdentityFilled = true; offset = 0; } diff --git a/ZeroLevel/Services/Network/Utils/NetworkPacketFactory.cs b/ZeroLevel/Services/Network/Utils/NetworkPacketFactory.cs index 02072d3..600dd37 100644 --- a/ZeroLevel/Services/Network/Utils/NetworkPacketFactory.cs +++ b/ZeroLevel/Services/Network/Utils/NetworkPacketFactory.cs @@ -22,7 +22,7 @@ namespace ZeroLevel.Network var packet = new byte[data.Length + 6]; packet[0] = MAGIC; Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4); - packet[5] = (byte)(packet[0] ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]); + packet[5] = (byte)(MAGIC ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]); HashData(data, packet[5]); Array.Copy(data, 0, packet, 6, data.Length); return packet; @@ -32,17 +32,18 @@ namespace ZeroLevel.Network { var packet = new byte[data.Length + 6 + 4]; packet[0] = (MAGIC | MAGIC_REQUEST); - Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4); - packet[5] = (byte)(packet[0] ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]); + Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4); requestId = Interlocked.Increment(ref _current_request_id); var id = BitConverter.GetBytes(requestId); - packet[6] = id[0]; - packet[7] = id[1]; - packet[8] = id[2]; - packet[9] = id[3]; + packet[5] = id[0]; + packet[6] = id[1]; + packet[7] = id[2]; + packet[8] = id[3]; - HashData(data, packet[5]); + packet[9] = (byte)(MAGIC ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]); + + HashData(data, packet[9]); Array.Copy(data, 0, packet, 10, data.Length); return packet; } @@ -51,16 +52,18 @@ namespace ZeroLevel.Network { var packet = new byte[data.Length + 6 + 4]; packet[0] = (MAGIC | MAGIC_RESPONSE); - Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4); - packet[5] = (byte)(packet[0] ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]); + Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4); var id = BitConverter.GetBytes(requestId); - packet[6] = id[0]; - packet[7] = id[1]; - packet[8] = id[2]; - packet[9] = id[3]; + packet[5] = id[0]; + packet[6] = id[1]; + packet[7] = id[2]; + packet[8] = id[3]; - HashData(data, packet[5]); + + packet[9] = (byte)(MAGIC ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]); + + HashData(data, packet[9]); Array.Copy(data, 0, packet, 10, data.Length); return packet; } diff --git a/ZeroLevel/Services/Network/Utils/RequestBuffer.cs b/ZeroLevel/Services/Network/Utils/RequestBuffer.cs index 00b7739..b06f13d 100644 --- a/ZeroLevel/Services/Network/Utils/RequestBuffer.cs +++ b/ZeroLevel/Services/Network/Utils/RequestBuffer.cs @@ -10,7 +10,7 @@ namespace ZeroLevel.Network private Dictionary _requests = new Dictionary(); private static ObjectPool _ri_pool = new ObjectPool(() => new RequestInfo()); - public void RegisterForFrame(int identity, Action callback, Action fail = null) + public void RegisterForFrame(int identity, Action callback, Action fail = null) { var ri = _ri_pool.Allocate(); lock (_reqeust_lock) @@ -38,7 +38,7 @@ namespace ZeroLevel.Network } } - public void Success(long frameId, Frame frame) + public void Success(long frameId, byte[] data) { RequestInfo ri = null; lock (_reqeust_lock) @@ -51,7 +51,7 @@ namespace ZeroLevel.Network } if (ri != null) { - ri.Success(frame); + ri.Success(data); _ri_pool.Free(ri); } } diff --git a/ZeroLevel/Services/Network/Utils/Router.cs b/ZeroLevel/Services/Network/Utils/Router.cs index 2dfb33b..6f09928 100644 --- a/ZeroLevel/Services/Network/Utils/Router.cs +++ b/ZeroLevel/Services/Network/Utils/Router.cs @@ -105,7 +105,7 @@ namespace ZeroLevel.Network } else { - this._invoker.Invoke(this._instance, new object[] { incoming, client }); + this._invoker.Invoke(this._instance, new object[] { client, incoming }); } } else if (_typeReq == null) @@ -115,7 +115,7 @@ namespace ZeroLevel.Network else { var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data); - return this._invoker.Invoke(this._instance, new object[] { incoming, client }); + return this._invoker.Invoke(this._instance, new object[] { client, incoming }); } return null; }