Refactoring

pull/1/head
a.bozhenov 5 years ago
parent 1203662604
commit 98a8f402ae

@ -6,7 +6,6 @@ namespace ZeroLevel.Network
public interface ISocketClient: public interface ISocketClient:
IDisposable IDisposable
{ {
event Action<ISocketClient, byte[], int> OnIncomingData;
event Action<ISocketClient> OnConnect; event Action<ISocketClient> OnConnect;
event Action<ISocketClient> OnDisconnect; event Action<ISocketClient> OnDisconnect;
IPEndPoint Endpoint { get; } IPEndPoint Endpoint { get; }

@ -3,6 +3,7 @@ using System.Collections.Concurrent;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading; using System.Threading;
using ZeroLevel.Services.Pools;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network namespace ZeroLevel.Network
@ -11,64 +12,81 @@ namespace ZeroLevel.Network
: BaseSocket, ISocketClient : BaseSocket, ISocketClient
{ {
#region Private #region Private
private class IncomingFrame
{
public FrameType type;
public int identity;
public byte[] data;
}
private class SendFrame
{
public bool isRequest;
public int identity;
public byte[] data;
}
private readonly IRouter _router;
private Socket _clientSocket; private Socket _clientSocket;
private NetworkStream _stream; private NetworkStream _stream;
private FrameParser _parser = new FrameParser(); private FrameParser _parser = new FrameParser();
private Thread _sendThread; private readonly RequestBuffer _requests = new RequestBuffer();
private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета
private int _current_heartbeat_period_in_ms = 0;
private long _heartbeat_key = -1; private long _heartbeat_key = -1;
private long _last_rw_time = DateTime.UtcNow.Ticks; private long _last_rw_time = DateTime.UtcNow.Ticks;
private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE]; private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
private readonly object _reconnection_lock = new object(); private readonly object _reconnection_lock = new object();
private BlockingCollection<SendInfo> _send_queue = new BlockingCollection<SendInfo>();
private readonly RequestBuffer _requests = new RequestBuffer();
private int _current_heartbeat_period_in_ms = 0;
private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета
private struct SendInfo private Thread _sendThread;
{ private Thread _receiveThread;
public bool isRequest; private BlockingCollection<IncomingFrame> _incoming_queue = new BlockingCollection<IncomingFrame>();
public int identity; private BlockingCollection<SendFrame> _send_queue = new BlockingCollection<SendFrame>(BaseSocket.MAX_SEND_QUEUE_SIZE);
public byte[] data; private static ObjectPool<IncomingFrame> _incoming_pool = new ObjectPool<IncomingFrame>(() => new IncomingFrame());
} private static ObjectPool<SendFrame> _sendinfo_pool = new ObjectPool<SendFrame>(() => new SendFrame());
#endregion Private #endregion Private
public IRouter Router { get { return _router; } } public IRouter Router { get; }
public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } } public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } }
public SocketClient(IPEndPoint ep, IRouter router) public SocketClient(IPEndPoint ep, IRouter router)
{ {
_router = router; Router = router;
Endpoint = ep; Endpoint = ep;
_parser.OnIncoming += _parser_OnIncoming; _parser.OnIncoming += _parser_OnIncoming;
_sendThread = new Thread(SendFramesJob); StartInternalThreads();
_sendThread.IsBackground = true;
_sendThread.Start();
EnsureConnection(); EnsureConnection();
} }
public SocketClient(Socket socket, IRouter router) public SocketClient(Socket socket, IRouter router)
{ {
_router = router; Router = router;
_socket_freezed = true; _socket_freezed = true;
_clientSocket = socket; _clientSocket = socket;
_stream = new NetworkStream(_clientSocket, true); _stream = new NetworkStream(_clientSocket, true);
Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint; Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint;
_parser.OnIncoming += _parser_OnIncoming; _parser.OnIncoming += _parser_OnIncoming;
StartInternalThreads();
Working();
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
private void StartInternalThreads()
{
_sendThread = new Thread(SendFramesJob); _sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true; _sendThread.IsBackground = true;
_sendThread.Start(); _sendThread.Start();
Working();
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null); _receiveThread = new Thread(IncomingFramesJob);
_receiveThread.IsBackground = true;
_receiveThread.Start();
} }
#region API #region API
public event Action<ISocketClient> OnConnect = (_) => { }; public event Action<ISocketClient> OnConnect = (_) => { };
public event Action<ISocketClient> OnDisconnect = (_) => { }; public event Action<ISocketClient> OnDisconnect = (_) => { };
public event Action<ISocketClient, byte[], int> OnIncomingData = (_, __, ___) => { };
public IPEndPoint Endpoint { get; } public IPEndPoint Endpoint { get; }
public void Request(Frame frame, Action<byte[]> callback, Action<string> fail = null) public void Request(Frame frame, Action<byte[]> callback, Action<string> fail = null)
@ -78,13 +96,11 @@ namespace ZeroLevel.Network
{ {
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{ {
Thread.Sleep(50); Thread.Sleep(1);
} }
var sendInfo = new SendInfo var sendInfo = _sendinfo_pool.Allocate();
{ sendInfo.isRequest = true;
isRequest = true, sendInfo.data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id);
data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id)
};
sendInfo.identity = id; sendInfo.identity = id;
_requests.RegisterForFrame(id, callback, fail); _requests.RegisterForFrame(id, callback, fail);
_send_queue.Add(sendInfo); _send_queue.Add(sendInfo);
@ -104,14 +120,13 @@ namespace ZeroLevel.Network
{ {
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{ {
Thread.Sleep(50); Thread.Sleep(1);
} }
_send_queue.Add(new SendInfo var info = _sendinfo_pool.Allocate();
{ info.isRequest = false;
isRequest = false, info.identity = 0;
identity = 0, info.data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame));
data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame)) _send_queue.Add(info);
});
frame.Release(); frame.Release();
} }
} }
@ -123,14 +138,13 @@ namespace ZeroLevel.Network
{ {
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{ {
Thread.Sleep(50); Thread.Sleep(1);
} }
_send_queue.Add(new SendInfo var info = _sendinfo_pool.Allocate();
{ info.isRequest = false;
isRequest = false, info.identity = 0;
identity = 0, info.data = NetworkPacketFactory.Response(data, identity);
data = NetworkPacketFactory.Response(data, identity) _send_queue.Add(info);
});
} }
} }
@ -153,30 +167,66 @@ namespace ZeroLevel.Network
#endregion #endregion
#region Private methods #region Private methods
private void _parser_OnIncoming(FrameType type, int identity, byte[] data) private void IncomingFramesJob()
{ {
try IncomingFrame frame = default(IncomingFrame);
while (Status != SocketClientStatus.Disposed)
{ {
switch (type) if (_send_queue.IsCompleted)
{ {
case FrameType.KeepAlive: return;
// Nothing }
return; try
case FrameType.Message: {
_router?.HandleMessage(MessageSerializer.Deserialize<Frame>(data), this); frame = _incoming_queue.Take();
break; }
case FrameType.Request: catch (Exception ex)
var response = _router?.HandleRequest(MessageSerializer.Deserialize<Frame>(data), this); {
if (response != null) Log.SystemError(ex, "[SocketClient.IncomingFramesJob] _incoming_queue.Take");
{ _incoming_queue.Dispose();
this.Response(response, identity); _incoming_queue = new BlockingCollection<IncomingFrame>();
} continue;
break; }
case FrameType.Response: try
_requests.Success(identity, data); {
break; switch (frame.type)
{
case FrameType.Message:
Router?.HandleMessage(MessageSerializer.Deserialize<Frame>(frame.data), this);
break;
case FrameType.Request:
var response = Router?.HandleRequest(MessageSerializer.Deserialize<Frame>(frame.data), this);
if (response != null)
{
this.Response(response, frame.identity);
}
break;
case FrameType.Response:
_requests.Success(frame.identity, frame.data);
break;
}
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame");
} }
OnIncomingData(this, data, identity); finally
{
_incoming_pool.Free(frame);
}
}
}
private void _parser_OnIncoming(FrameType type, int identity, byte[] data)
{
try
{
if (type == FrameType.KeepAlive) return;
var incoming = _incoming_pool.Allocate();
incoming.data = data;
incoming.type = type;
incoming.identity = identity;
_incoming_queue.Add(incoming);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -265,12 +315,11 @@ namespace ZeroLevel.Network
_requests.TestForTimeouts(); _requests.TestForTimeouts();
try try
{ {
_send_queue.Add(new SendInfo var info = _sendinfo_pool.Allocate();
{ info.isRequest = false;
identity = 0, info.identity = 0;
isRequest = false, info.data = NetworkPacketFactory.KeepAliveMessage();
data = NetworkPacketFactory.KeepAliveMessage() _send_queue.Add(info);
});
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -298,7 +347,7 @@ namespace ZeroLevel.Network
} }
else else
{ {
// TODO!!!!! // TODO or not TODO
Thread.Sleep(1); Thread.Sleep(1);
} }
if (Status == SocketClientStatus.Working if (Status == SocketClientStatus.Working
@ -318,63 +367,10 @@ namespace ZeroLevel.Network
OnDisconnect(this); OnDisconnect(this);
} }
} }
/*
private void SendFramesJob()
{
SendInfo frame;
int unsuccess = 0;
while (Status != SocketClientStatus.Disposed)
{
if (_send_queue.IsCompleted)
{
return;
}
if (Status != SocketClientStatus.Working)
{
Thread.Sleep(100);
try
{
EnsureConnection();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault");
}
if (Status == SocketClientStatus.Disposed) return;
if (Status == SocketClientStatus.Broken)
{
unsuccess++;
if (unsuccess > 30) unsuccess = 30;
}
if (Status == SocketClientStatus.Working)
{
unsuccess = 0;
}
Thread.Sleep(unsuccess * 100);
continue;
}
try
{
frame = _send_queue.Take();
if (frame.isRequest)
{
_requests.StartSend(frame.identity);
}
_stream.Write(frame.data, 0, frame.data.Length);
_last_rw_time = DateTime.UtcNow.Ticks;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[SocketClient.SendFramesJob] Backward send error.");
Broken();
OnDisconnect(this);
}
}
}
*/
private void SendFramesJob() private void SendFramesJob()
{ {
SendInfo frame = default(SendInfo); SendFrame frame;
int unsuccess = 0; int unsuccess = 0;
while (Status != SocketClientStatus.Disposed) while (Status != SocketClientStatus.Disposed)
{ {
@ -390,7 +386,7 @@ namespace ZeroLevel.Network
{ {
Log.SystemError(ex, "[SocketClient.SendFramesJob] send_queue.Take"); Log.SystemError(ex, "[SocketClient.SendFramesJob] send_queue.Take");
_send_queue.Dispose(); _send_queue.Dispose();
_send_queue = new BlockingCollection<SendInfo>(); _send_queue = new BlockingCollection<SendFrame>();
continue; continue;
} }
while (_stream?.CanWrite == false || Status != SocketClientStatus.Working) while (_stream?.CanWrite == false || Status != SocketClientStatus.Working)
@ -401,7 +397,7 @@ namespace ZeroLevel.Network
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault"); Log.SystemError(ex, "[SocketClient.SendFramesJob] Connection broken");
} }
if (Status == SocketClientStatus.Disposed) if (Status == SocketClientStatus.Disposed)
{ {
@ -415,23 +411,31 @@ namespace ZeroLevel.Network
if (Status == SocketClientStatus.Working) if (Status == SocketClientStatus.Working)
{ {
unsuccess = 0; unsuccess = 0;
break;
} }
Thread.Sleep(unsuccess * 128); Thread.Sleep(unsuccess * 128);
} }
try if (frame != null)
{ {
if (frame.isRequest) try
{ {
_requests.StartSend(frame.identity); if (frame.isRequest)
{
_requests.StartSend(frame.identity);
}
_stream.Write(frame.data, 0, frame.data.Length);
_last_rw_time = DateTime.UtcNow.Ticks;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[SocketClient.SendFramesJob] _stream.Write");
Broken();
OnDisconnect(this);
}
finally
{
_sendinfo_pool.Free(frame);
} }
_stream.Write(frame.data, 0, frame.data.Length);
_last_rw_time = DateTime.UtcNow.Ticks;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[SocketClient.SendFramesJob] _stream.Write");
Broken();
OnDisconnect(this);
} }
} }
} }

@ -1,36 +1,49 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using ZeroLevel.Services.Pools; using ZeroLevel.Services.Pools;
namespace ZeroLevel.Network namespace ZeroLevel.Network
{ {
internal sealed class RequestBuffer internal sealed class RequestBuffer
{ {
private readonly object _reqeust_lock = new object(); private SpinLock _reqeust_lock = new SpinLock();
private Dictionary<long, RequestInfo> _requests = new Dictionary<long, RequestInfo>(); private Dictionary<long, RequestInfo> _requests = new Dictionary<long, RequestInfo>();
private static ObjectPool<RequestInfo> _ri_pool = new ObjectPool<RequestInfo>(() => new RequestInfo()); private static ObjectPool<RequestInfo> _ri_pool = new ObjectPool<RequestInfo>(() => new RequestInfo());
public void RegisterForFrame(int identity, Action<byte[]> callback, Action<string> fail = null) public void RegisterForFrame(int identity, Action<byte[]> callback, Action<string> fail = null)
{ {
var ri = _ri_pool.Allocate(); var ri = _ri_pool.Allocate();
lock (_reqeust_lock) bool take = false;
try
{ {
_reqeust_lock.Enter(ref take);
ri.Reset(callback, fail); ri.Reset(callback, fail);
_requests.Add(identity, ri); _requests.Add(identity, ri);
} }
finally
{
if (take) _reqeust_lock.Exit(false);
}
} }
public void Fail(long frameId, string message) public void Fail(long frameId, string message)
{ {
RequestInfo ri = null; RequestInfo ri = null;
lock (_reqeust_lock) bool take = false;
try
{ {
_reqeust_lock.Enter(ref take);
if (_requests.ContainsKey(frameId)) if (_requests.ContainsKey(frameId))
{ {
ri = _requests[frameId]; ri = _requests[frameId];
_requests.Remove(frameId); _requests.Remove(frameId);
} }
} }
finally
{
if (take) _reqeust_lock.Exit(false);
}
if (ri != null) if (ri != null)
{ {
ri.Fail(message); ri.Fail(message);
@ -41,14 +54,20 @@ namespace ZeroLevel.Network
public void Success(long frameId, byte[] data) public void Success(long frameId, byte[] data)
{ {
RequestInfo ri = null; RequestInfo ri = null;
lock (_reqeust_lock) bool take = false;
try
{ {
_reqeust_lock.Enter(ref take);
if (_requests.ContainsKey(frameId)) if (_requests.ContainsKey(frameId))
{ {
ri = _requests[frameId]; ri = _requests[frameId];
_requests.Remove(frameId); _requests.Remove(frameId);
} }
} }
finally
{
if (take) _reqeust_lock.Exit(false);
}
if (ri != null) if (ri != null)
{ {
ri.Success(data); ri.Success(data);
@ -59,13 +78,19 @@ namespace ZeroLevel.Network
public void StartSend(long frameId) public void StartSend(long frameId)
{ {
RequestInfo ri = null; RequestInfo ri = null;
lock (_reqeust_lock) bool take = false;
try
{ {
_reqeust_lock.Enter(ref take);
if (_requests.ContainsKey(frameId)) if (_requests.ContainsKey(frameId))
{ {
ri = _requests[frameId]; ri = _requests[frameId];
} }
} }
finally
{
if (take) _reqeust_lock.Exit(false);
}
if (ri != null) if (ri != null)
{ {
ri.StartSend(); ri.StartSend();
@ -76,8 +101,10 @@ namespace ZeroLevel.Network
{ {
var now_ticks = DateTime.UtcNow.Ticks; var now_ticks = DateTime.UtcNow.Ticks;
var to_remove = new List<long>(); var to_remove = new List<long>();
lock (_reqeust_lock) bool take = false;
try
{ {
_reqeust_lock.Enter(ref take);
foreach (var pair in _requests) foreach (var pair in _requests)
{ {
if (pair.Value.Sended == false) continue; if (pair.Value.Sended == false) continue;
@ -88,6 +115,10 @@ namespace ZeroLevel.Network
} }
} }
} }
finally
{
if (take) _reqeust_lock.Exit(false);
}
foreach (var key in to_remove) foreach (var key in to_remove)
{ {
Fail(key, "Timeout"); Fail(key, "Timeout");

Loading…
Cancel
Save

Powered by TurnKey Linux.