You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Zero/ZeroLevel/Services/Network/SocketClient.cs

351 lines
12 KiB

2 years ago
using MemoryPools;
using System;
6 years ago
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
6 years ago
{
6 years ago
public class SocketClient
: BaseSocket, ISocketClient
6 years ago
{
#region Private
5 years ago
#region Queues
private struct IncomingFrame
5 years ago
{
public FrameType type;
public int identity;
public byte[] data;
}
2 years ago
private struct OutcomingFrame
{
public bool is_request;
public int identity;
public byte[] data;
}
2 years ago
private readonly JetValPool<OutcomingFrame> _outcomingFramesPool = new JetValPool<OutcomingFrame>();
private ConcurrentQueue<IncomingFrame> _incoming_queue = new ConcurrentQueue<IncomingFrame>();
private ConcurrentQueue<OutcomingFrame> _outcoming_queue = new ConcurrentQueue<OutcomingFrame>();
private ManualResetEventSlim _outcomingFrameEvent = new ManualResetEventSlim(false);
5 years ago
#endregion
5 years ago
5 years ago
private Socket _clientSocket;
private FrameParser _parser;
private readonly RequestBuffer _requests = new RequestBuffer();
6 years ago
private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
5 years ago
private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета
6 years ago
private readonly object _reconnection_lock = new object();
5 years ago
private long _heartbeat_key;
5 years ago
private Thread _receiveThread;
2 years ago
private Thread _sendingThread;
5 years ago
#endregion Private
6 years ago
5 years ago
public IRouter Router { get; }
public SocketClient(IPEndPoint ep, IRouter router)
6 years ago
{
5 years ago
try
{
5 years ago
_clientSocket = new Socket(ep.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
5 years ago
_clientSocket.Connect(ep);
OnConnect(this);
}
catch (Exception ex)
{
5 years ago
Log.SystemError(ex, $"[SocketClient.ctor] connection fault. Endpoint: {ep.Address}:{ep.Port}");
5 years ago
Broken();
return;
}
5 years ago
Router = router;
6 years ago
Endpoint = ep;
5 years ago
_parser = new FrameParser(_parser_OnIncoming);
Working();
5 years ago
StartInternalThreads();
5 years ago
StartReceive();
6 years ago
}
6 years ago
public SocketClient(Socket socket, IRouter router)
6 years ago
{
5 years ago
Router = router;
6 years ago
_clientSocket = socket;
Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint;
5 years ago
_parser = new FrameParser(_parser_OnIncoming);
_socket_freezed = true;
5 years ago
Working();
5 years ago
StartInternalThreads();
StartReceive();
5 years ago
}
private void StartInternalThreads()
{
_receiveThread = new Thread(IncomingFramesJob);
_receiveThread.IsBackground = true;
_receiveThread.Start();
5 years ago
2 years ago
_sendingThread = new Thread(OutcomingFramesJob);
_sendingThread.IsBackground = true;
_sendingThread.Start();
5 years ago
_heartbeat_key = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat);
}
private void StartReceive()
{
try
{
_clientSocket.BeginReceive(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, SocketFlags.None, ReceiveAsyncCallback, null);
}
catch (NullReferenceException)
{
Broken();
Log.SystemError("[SocketClient.TryConnect] Client : Null Reference Exception - On Connect (begin receive section)");
_clientSocket.Disconnect(false);
}
catch (SocketException e)
{
Broken();
Log.SystemError(e, "[SocketClient.TryConnect] Client : Exception - On Connect (begin receive section)");
_clientSocket.Disconnect(false);
}
6 years ago
}
6 years ago
#region API
public event Action<ISocketClient> OnConnect = (_) => { };
public event Action<ISocketClient> OnDisconnect = (_) => { };
6 years ago
public IPEndPoint Endpoint { get; }
2 years ago
public void Request(Frame frame, Action<byte[]> callback, Action<string> fail = null)
6 years ago
{
2 years ago
if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Request] Socket status: {Status}");
5 years ago
var data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id);
_requests.RegisterForFrame(id, callback, fail);
2 years ago
Send(id, true, data);
6 years ago
}
2 years ago
public void Send(Frame frame)
6 years ago
{
5 years ago
if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Send] Socket status: {Status}");
5 years ago
var data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame));
2 years ago
Send(0, false, data);
6 years ago
}
5 years ago
2 years ago
public void Response(byte[] data, int identity)
6 years ago
{
if (data == null) throw new ArgumentNullException(nameof(data));
5 years ago
if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Response] Socket status: {Status}");
2 years ago
Send(0, false, NetworkPacketFactory.Response(data, identity));
6 years ago
}
6 years ago
#endregion
6 years ago
5 years ago
#region Private methods
5 years ago
private void _parser_OnIncoming(FrameType type, int identity, byte[] data)
{
try
{
if (type == FrameType.KeepAlive) return;
2 years ago
_incoming_queue.Enqueue(new IncomingFrame
{
data = data,
type = type,
identity = identity
});
6 years ago
}
6 years ago
catch (Exception ex)
{
Log.Error(ex, $"[SocketClient._parser_OnIncoming]");
}
6 years ago
}
5 years ago
public void ReceiveAsyncCallback(IAsyncResult ar)
6 years ago
{
5 years ago
try
6 years ago
{
5 years ago
var count = _clientSocket.EndReceive(ar);
if (count > 0)
6 years ago
{
5 years ago
_parser.Push(_buffer, count);
6 years ago
}
5 years ago
else
6 years ago
{
5 years ago
// TODO or not TODO
Thread.Sleep(1);
6 years ago
}
5 years ago
StartReceive();
6 years ago
}
5 years ago
catch (ObjectDisposedException)
6 years ago
{
5 years ago
/// Nothing
6 years ago
}
catch (Exception ex)
{
5 years ago
Log.SystemError(ex, $"[SocketClient.ReceiveAsyncCallback] Error read data");
6 years ago
Broken();
5 years ago
OnDisconnect(this);
6 years ago
}
}
5 years ago
private void EnsureConnection()
6 years ago
{
6 years ago
if (_socket_freezed)
{
return;
}
6 years ago
lock (_reconnection_lock)
{
6 years ago
if (Status == SocketClientStatus.Disposed)
6 years ago
{
throw new ObjectDisposedException("connection");
}
6 years ago
if (Status != SocketClientStatus.Working)
6 years ago
{
5 years ago
throw new Exception("No connection");
6 years ago
}
}
}
6 years ago
private void Heartbeat()
6 years ago
{
6 years ago
try
{
EnsureConnection();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.Heartbeat.EnsureConnection]");
Broken();
OnDisconnect(this);
6 years ago
return;
}
_requests.TestForTimeouts();
Send(0, false, NetworkPacketFactory.KeepAliveMessage());
6 years ago
}
5 years ago
5 years ago
private void IncomingFramesJob()
{
IncomingFrame frame = default(IncomingFrame);
while (Status != SocketClientStatus.Disposed)
5 years ago
{
2 years ago
if (_incoming_queue.TryDequeue(out frame))
5 years ago
{
2 years ago
try
5 years ago
{
2 years ago
switch (frame.type)
5 years ago
{
2 years ago
case FrameType.Message:
Router?.HandleMessage(MessageSerializer.Deserialize<Frame>(frame.data), this);
break;
case FrameType.Request:
{
Router?.HandleRequest(MessageSerializer.Deserialize<Frame>(frame.data), this, frame.identity, (id, response) =>
5 years ago
{
2 years ago
if (response != null)
5 years ago
{
2 years ago
this.Response(response, id);
}
});
}
break;
case FrameType.Response:
{
_requests.Success(frame.identity, frame.data);
}
break;
5 years ago
}
}
2 years ago
catch (Exception ex)
5 years ago
{
2 years ago
Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame");
5 years ago
}
2 years ago
}
else
{
Thread.Sleep(100);
5 years ago
}
}
}
2 years ago
private void OutcomingFramesJob()
5 years ago
{
2 years ago
while (Status != SocketClientStatus.Disposed)
5 years ago
{
2 years ago
if (Status == SocketClientStatus.Working)
5 years ago
{
2 years ago
if (_outcomingFrameEvent.Wait(100))
{
_outcomingFrameEvent.Reset();
5 years ago
}
2 years ago
while (_outcoming_queue.TryDequeue(out var frame))
{
try
{
if (frame.is_request)
{
_requests.StartSend(frame.identity);
}
_clientSocket.Send(frame.data, frame.data.Length, SocketFlags.None);
//var sended = _clientSocket.Send(frame.data, frame.data.Length, SocketFlags.None);
//return sended == frame.data.Length;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[SocketClient.OutcomingFramesJob] _str_clientSocketeam.Send");
Broken();
OnDisconnect(this);
}
finally
{
_outcomingFramesPool.Return(frame);
}
}
5 years ago
}
2 years ago
else
5 years ago
{
2 years ago
Thread.Sleep(400);
5 years ago
}
}
2 years ago
}
private void Send(int id, bool is_request, byte[] data)
{
var frame = _outcomingFramesPool.Get();
frame.data = data;
frame.identity = id;
frame.is_request = is_request;
_outcoming_queue.Enqueue(frame);
_outcomingFrameEvent.Set();
5 years ago
}
6 years ago
#endregion
6 years ago
public override void Dispose()
{
6 years ago
if (Status == SocketClientStatus.Working)
6 years ago
{
6 years ago
OnDisconnect(this);
6 years ago
}
6 years ago
Disposed();
6 years ago
Sheduller.Remove(_heartbeat_key);
5 years ago
try
{
_clientSocket?.Close();
_clientSocket?.Dispose();
}
catch (Exception ex)
{
Log.Error(ex, "[SocketClient.Dispose]");
}
6 years ago
}
}
6 years ago
}

Powered by TurnKey Linux.