diff --git a/ZeroLevel/Services/Network/Contracts/ISocketClient.cs b/ZeroLevel/Services/Network/Contracts/ISocketClient.cs index b27bcd0..1a7b30a 100644 --- a/ZeroLevel/Services/Network/Contracts/ISocketClient.cs +++ b/ZeroLevel/Services/Network/Contracts/ISocketClient.cs @@ -6,7 +6,6 @@ namespace ZeroLevel.Network public interface ISocketClient: IDisposable { - event Action OnIncomingData; event Action OnConnect; event Action OnDisconnect; IPEndPoint Endpoint { get; } diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs index 8d705f4..9d2ed97 100644 --- a/ZeroLevel/Services/Network/SocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -3,6 +3,7 @@ using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Threading; +using ZeroLevel.Services.Pools; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network @@ -11,64 +12,81 @@ namespace ZeroLevel.Network : BaseSocket, ISocketClient { #region Private + private class IncomingFrame + { + public FrameType type; + public int identity; + public byte[] data; + } + private class SendFrame + { + public bool isRequest; + public int identity; + public byte[] data; + } - private readonly IRouter _router; private Socket _clientSocket; private NetworkStream _stream; + private FrameParser _parser = new FrameParser(); - private Thread _sendThread; + private readonly RequestBuffer _requests = new RequestBuffer(); + + private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета + private int _current_heartbeat_period_in_ms = 0; private long _heartbeat_key = -1; private long _last_rw_time = DateTime.UtcNow.Ticks; private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE]; private readonly object _reconnection_lock = new object(); - private BlockingCollection _send_queue = new BlockingCollection(); - private readonly RequestBuffer _requests = new RequestBuffer(); - private int _current_heartbeat_period_in_ms = 0; - private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета - private struct SendInfo - { - public bool isRequest; - public int identity; - public byte[] data; - } + private Thread _sendThread; + private Thread _receiveThread; + private BlockingCollection _incoming_queue = new BlockingCollection(); + private BlockingCollection _send_queue = new BlockingCollection(BaseSocket.MAX_SEND_QUEUE_SIZE); + private static ObjectPool _incoming_pool = new ObjectPool(() => new IncomingFrame()); + private static ObjectPool _sendinfo_pool = new ObjectPool(() => new SendFrame()); #endregion Private - public IRouter Router { get { return _router; } } + public IRouter Router { get; } public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } } public SocketClient(IPEndPoint ep, IRouter router) { - _router = router; + Router = router; Endpoint = ep; _parser.OnIncoming += _parser_OnIncoming; - _sendThread = new Thread(SendFramesJob); - _sendThread.IsBackground = true; - _sendThread.Start(); + StartInternalThreads(); EnsureConnection(); } public SocketClient(Socket socket, IRouter router) { - _router = router; + Router = router; _socket_freezed = true; _clientSocket = socket; _stream = new NetworkStream(_clientSocket, true); Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint; _parser.OnIncoming += _parser_OnIncoming; + StartInternalThreads(); + Working(); + + _stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null); + } + + private void StartInternalThreads() + { _sendThread = new Thread(SendFramesJob); _sendThread.IsBackground = true; _sendThread.Start(); - Working(); - _stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null); + _receiveThread = new Thread(IncomingFramesJob); + _receiveThread.IsBackground = true; + _receiveThread.Start(); } #region API public event Action OnConnect = (_) => { }; public event Action OnDisconnect = (_) => { }; - public event Action OnIncomingData = (_, __, ___) => { }; public IPEndPoint Endpoint { get; } public void Request(Frame frame, Action callback, Action fail = null) @@ -78,13 +96,11 @@ namespace ZeroLevel.Network { while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { - Thread.Sleep(50); + Thread.Sleep(1); } - var sendInfo = new SendInfo - { - isRequest = true, - data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id) - }; + var sendInfo = _sendinfo_pool.Allocate(); + sendInfo.isRequest = true; + sendInfo.data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id); sendInfo.identity = id; _requests.RegisterForFrame(id, callback, fail); _send_queue.Add(sendInfo); @@ -104,14 +120,13 @@ namespace ZeroLevel.Network { while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { - Thread.Sleep(50); + Thread.Sleep(1); } - _send_queue.Add(new SendInfo - { - isRequest = false, - identity = 0, - data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame)) - }); + var info = _sendinfo_pool.Allocate(); + info.isRequest = false; + info.identity = 0; + info.data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame)); + _send_queue.Add(info); frame.Release(); } } @@ -123,14 +138,13 @@ namespace ZeroLevel.Network { while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { - Thread.Sleep(50); + Thread.Sleep(1); } - _send_queue.Add(new SendInfo - { - isRequest = false, - identity = 0, - data = NetworkPacketFactory.Response(data, identity) - }); + var info = _sendinfo_pool.Allocate(); + info.isRequest = false; + info.identity = 0; + info.data = NetworkPacketFactory.Response(data, identity); + _send_queue.Add(info); } } @@ -153,30 +167,66 @@ namespace ZeroLevel.Network #endregion #region Private methods - private void _parser_OnIncoming(FrameType type, int identity, byte[] data) + private void IncomingFramesJob() { - try + IncomingFrame frame = default(IncomingFrame); + while (Status != SocketClientStatus.Disposed) { - switch (type) + if (_send_queue.IsCompleted) { - case FrameType.KeepAlive: - // Nothing - return; - case FrameType.Message: - _router?.HandleMessage(MessageSerializer.Deserialize(data), this); - break; - case FrameType.Request: - var response = _router?.HandleRequest(MessageSerializer.Deserialize(data), this); - if (response != null) - { - this.Response(response, identity); - } - break; - case FrameType.Response: - _requests.Success(identity, data); - break; + return; + } + try + { + frame = _incoming_queue.Take(); + } + catch (Exception ex) + { + Log.SystemError(ex, "[SocketClient.IncomingFramesJob] _incoming_queue.Take"); + _incoming_queue.Dispose(); + _incoming_queue = new BlockingCollection(); + continue; + } + try + { + switch (frame.type) + { + case FrameType.Message: + Router?.HandleMessage(MessageSerializer.Deserialize(frame.data), this); + break; + case FrameType.Request: + var response = Router?.HandleRequest(MessageSerializer.Deserialize(frame.data), this); + if (response != null) + { + this.Response(response, frame.identity); + } + break; + case FrameType.Response: + _requests.Success(frame.identity, frame.data); + break; + } + } + catch (Exception ex) + { + Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame"); } - OnIncomingData(this, data, identity); + finally + { + _incoming_pool.Free(frame); + } + } + } + + private void _parser_OnIncoming(FrameType type, int identity, byte[] data) + { + try + { + if (type == FrameType.KeepAlive) return; + var incoming = _incoming_pool.Allocate(); + incoming.data = data; + incoming.type = type; + incoming.identity = identity; + _incoming_queue.Add(incoming); } catch (Exception ex) { @@ -265,12 +315,11 @@ namespace ZeroLevel.Network _requests.TestForTimeouts(); try { - _send_queue.Add(new SendInfo - { - identity = 0, - isRequest = false, - data = NetworkPacketFactory.KeepAliveMessage() - }); + var info = _sendinfo_pool.Allocate(); + info.isRequest = false; + info.identity = 0; + info.data = NetworkPacketFactory.KeepAliveMessage(); + _send_queue.Add(info); } catch (Exception ex) { @@ -298,7 +347,7 @@ namespace ZeroLevel.Network } else { - // TODO!!!!! + // TODO or not TODO Thread.Sleep(1); } if (Status == SocketClientStatus.Working @@ -318,63 +367,10 @@ namespace ZeroLevel.Network OnDisconnect(this); } } - /* - private void SendFramesJob() - { - SendInfo frame; - int unsuccess = 0; - while (Status != SocketClientStatus.Disposed) - { - if (_send_queue.IsCompleted) - { - return; - } - if (Status != SocketClientStatus.Working) - { - Thread.Sleep(100); - try - { - EnsureConnection(); - } - catch (Exception ex) - { - Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault"); - } - if (Status == SocketClientStatus.Disposed) return; - if (Status == SocketClientStatus.Broken) - { - unsuccess++; - if (unsuccess > 30) unsuccess = 30; - } - if (Status == SocketClientStatus.Working) - { - unsuccess = 0; - } - Thread.Sleep(unsuccess * 100); - continue; - } - try - { - frame = _send_queue.Take(); - if (frame.isRequest) - { - _requests.StartSend(frame.identity); - } - _stream.Write(frame.data, 0, frame.data.Length); - _last_rw_time = DateTime.UtcNow.Ticks; - } - catch (Exception ex) - { - Log.SystemError(ex, $"[SocketClient.SendFramesJob] Backward send error."); - Broken(); - OnDisconnect(this); - } - } - } - */ + private void SendFramesJob() { - SendInfo frame = default(SendInfo); + SendFrame frame; int unsuccess = 0; while (Status != SocketClientStatus.Disposed) { @@ -390,7 +386,7 @@ namespace ZeroLevel.Network { Log.SystemError(ex, "[SocketClient.SendFramesJob] send_queue.Take"); _send_queue.Dispose(); - _send_queue = new BlockingCollection(); + _send_queue = new BlockingCollection(); continue; } while (_stream?.CanWrite == false || Status != SocketClientStatus.Working) @@ -401,7 +397,7 @@ namespace ZeroLevel.Network } catch (Exception ex) { - Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault"); + Log.SystemError(ex, "[SocketClient.SendFramesJob] Connection broken"); } if (Status == SocketClientStatus.Disposed) { @@ -415,23 +411,31 @@ namespace ZeroLevel.Network if (Status == SocketClientStatus.Working) { unsuccess = 0; + break; } Thread.Sleep(unsuccess * 128); } - try + if (frame != null) { - if (frame.isRequest) + try { - _requests.StartSend(frame.identity); + if (frame.isRequest) + { + _requests.StartSend(frame.identity); + } + _stream.Write(frame.data, 0, frame.data.Length); + _last_rw_time = DateTime.UtcNow.Ticks; + } + catch (Exception ex) + { + Log.SystemError(ex, $"[SocketClient.SendFramesJob] _stream.Write"); + Broken(); + OnDisconnect(this); + } + finally + { + _sendinfo_pool.Free(frame); } - _stream.Write(frame.data, 0, frame.data.Length); - _last_rw_time = DateTime.UtcNow.Ticks; - } - catch (Exception ex) - { - Log.SystemError(ex, $"[SocketClient.SendFramesJob] _stream.Write"); - Broken(); - OnDisconnect(this); } } } diff --git a/ZeroLevel/Services/Network/Utils/RequestBuffer.cs b/ZeroLevel/Services/Network/Utils/RequestBuffer.cs index b06f13d..7a59c9d 100644 --- a/ZeroLevel/Services/Network/Utils/RequestBuffer.cs +++ b/ZeroLevel/Services/Network/Utils/RequestBuffer.cs @@ -1,36 +1,49 @@ using System; using System.Collections.Generic; +using System.Threading; using ZeroLevel.Services.Pools; namespace ZeroLevel.Network { internal sealed class RequestBuffer { - private readonly object _reqeust_lock = new object(); + private SpinLock _reqeust_lock = new SpinLock(); private Dictionary _requests = new Dictionary(); private static ObjectPool _ri_pool = new ObjectPool(() => new RequestInfo()); public void RegisterForFrame(int identity, Action callback, Action fail = null) { var ri = _ri_pool.Allocate(); - lock (_reqeust_lock) + bool take = false; + try { + _reqeust_lock.Enter(ref take); ri.Reset(callback, fail); _requests.Add(identity, ri); } + finally + { + if (take) _reqeust_lock.Exit(false); + } } public void Fail(long frameId, string message) { RequestInfo ri = null; - lock (_reqeust_lock) + bool take = false; + try { + _reqeust_lock.Enter(ref take); if (_requests.ContainsKey(frameId)) { ri = _requests[frameId]; _requests.Remove(frameId); } } + finally + { + if (take) _reqeust_lock.Exit(false); + } if (ri != null) { ri.Fail(message); @@ -41,14 +54,20 @@ namespace ZeroLevel.Network public void Success(long frameId, byte[] data) { RequestInfo ri = null; - lock (_reqeust_lock) + bool take = false; + try { + _reqeust_lock.Enter(ref take); if (_requests.ContainsKey(frameId)) { ri = _requests[frameId]; _requests.Remove(frameId); } } + finally + { + if (take) _reqeust_lock.Exit(false); + } if (ri != null) { ri.Success(data); @@ -59,13 +78,19 @@ namespace ZeroLevel.Network public void StartSend(long frameId) { RequestInfo ri = null; - lock (_reqeust_lock) + bool take = false; + try { + _reqeust_lock.Enter(ref take); if (_requests.ContainsKey(frameId)) { ri = _requests[frameId]; } } + finally + { + if (take) _reqeust_lock.Exit(false); + } if (ri != null) { ri.StartSend(); @@ -76,8 +101,10 @@ namespace ZeroLevel.Network { var now_ticks = DateTime.UtcNow.Ticks; var to_remove = new List(); - lock (_reqeust_lock) + bool take = false; + try { + _reqeust_lock.Enter(ref take); foreach (var pair in _requests) { if (pair.Value.Sended == false) continue; @@ -88,6 +115,10 @@ namespace ZeroLevel.Network } } } + finally + { + if (take) _reqeust_lock.Exit(false); + } foreach (var key in to_remove) { Fail(key, "Timeout");