You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Zero/ZeroLevel/Services/Network/SocketClient.cs

466 lines
16 KiB

6 years ago
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
6 years ago
{
6 years ago
public class SocketClient
: BaseSocket, ISocketClient
6 years ago
{
#region Private
private readonly IRouter _router;
6 years ago
private Socket _clientSocket;
private NetworkStream _stream;
private FrameParser _parser = new FrameParser();
private Thread _sendThread;
6 years ago
private long _heartbeat_key = -1;
6 years ago
private long _last_rw_time = DateTime.UtcNow.Ticks;
private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
private readonly object _reconnection_lock = new object();
5 years ago
private BlockingCollection<SendInfo> _send_queue = new BlockingCollection<SendInfo>();
6 years ago
private readonly RequestBuffer _requests = new RequestBuffer();
6 years ago
private int _current_heartbeat_period_in_ms = 0;
private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета
6 years ago
private struct SendInfo
{
public bool isRequest;
public int identity;
public byte[] data;
}
#endregion Private
6 years ago
public IRouter Router { get { return _router; } }
6 years ago
public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } }
public SocketClient(IPEndPoint ep, IRouter router)
6 years ago
{
5 years ago
_router = router;
6 years ago
Endpoint = ep;
6 years ago
_parser.OnIncoming += _parser_OnIncoming;
_sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true;
_sendThread.Start();
EnsureConnection();
6 years ago
}
6 years ago
public SocketClient(Socket socket, IRouter router)
6 years ago
{
_router = router;
6 years ago
_socket_freezed = true;
_clientSocket = socket;
_stream = new NetworkStream(_clientSocket, true);
6 years ago
Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint;
_parser.OnIncoming += _parser_OnIncoming;
6 years ago
_sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true;
5 years ago
_sendThread.Start();
Working();
5 years ago
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
6 years ago
}
6 years ago
#region API
public event Action<ISocketClient> OnConnect = (_) => { };
public event Action<ISocketClient> OnDisconnect = (_) => { };
6 years ago
public event Action<ISocketClient, byte[], int> OnIncomingData = (_, __, ___) => { };
public IPEndPoint Endpoint { get; }
public void Request(Frame frame, Action<byte[]> callback, Action<string> fail = null)
6 years ago
{
6 years ago
if (frame == null) throw new ArgumentNullException(nameof(frame));
if (frame != null && !_send_queue.IsAddingCompleted)
6 years ago
{
6 years ago
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{
Thread.Sleep(50);
}
var sendInfo = new SendInfo
{
isRequest = true,
data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id)
6 years ago
};
sendInfo.identity = id;
_requests.RegisterForFrame(id, callback, fail);
_send_queue.Add(sendInfo);
frame.Release();
6 years ago
}
}
6 years ago
public void ForceConnect()
{
EnsureConnection();
}
public void Send(Frame frame)
6 years ago
{
6 years ago
if (frame == null) throw new ArgumentNullException(nameof(frame));
if (frame != null && !_send_queue.IsAddingCompleted)
6 years ago
{
6 years ago
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
6 years ago
{
6 years ago
Thread.Sleep(50);
6 years ago
}
6 years ago
_send_queue.Add(new SendInfo
6 years ago
{
6 years ago
isRequest = false,
identity = 0,
data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame))
});
frame.Release();
6 years ago
}
6 years ago
}
public void Response(byte[] data, int identity)
{
if (data == null) throw new ArgumentNullException(nameof(data));
if (!_send_queue.IsAddingCompleted)
6 years ago
{
6 years ago
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
6 years ago
{
6 years ago
Thread.Sleep(50);
6 years ago
}
6 years ago
_send_queue.Add(new SendInfo
6 years ago
{
6 years ago
isRequest = false,
identity = 0,
data = NetworkPacketFactory.Response(data, identity)
});
6 years ago
}
}
6 years ago
public void UseKeepAlive(TimeSpan period)
6 years ago
{
6 years ago
if (_heartbeat_key != -1)
6 years ago
{
6 years ago
Sheduller.Remove(_heartbeat_key);
6 years ago
}
6 years ago
if (period != TimeSpan.Zero && period.TotalMilliseconds > MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS)
6 years ago
{
6 years ago
_current_heartbeat_period_in_ms = (int)period.TotalMilliseconds;
_heartbeat_key = Sheduller.RemindEvery(period, Heartbeat);
6 years ago
}
6 years ago
else
6 years ago
{
6 years ago
_current_heartbeat_period_in_ms = 0;
6 years ago
}
}
6 years ago
#endregion
6 years ago
6 years ago
#region Private methods
private void _parser_OnIncoming(FrameType type, int identity, byte[] data)
6 years ago
{
6 years ago
try
6 years ago
{
6 years ago
switch (type)
6 years ago
{
6 years ago
case FrameType.KeepAlive:
// Nothing
return;
6 years ago
case FrameType.Message:
_router?.HandleMessage(MessageSerializer.Deserialize<Frame>(data), this);
break;
6 years ago
case FrameType.Request:
var response = _router?.HandleRequest(MessageSerializer.Deserialize<Frame>(data), this);
if (response != null)
{
this.Response(response, identity);
}
6 years ago
break;
case FrameType.Response:
_requests.Success(identity, data);
6 years ago
break;
6 years ago
}
OnIncomingData(this, data, identity);
6 years ago
}
6 years ago
catch (Exception ex)
{
Log.Error(ex, $"[SocketClient._parser_OnIncoming]");
}
6 years ago
}
6 years ago
private bool TryConnect()
{
6 years ago
if (Status == SocketClientStatus.Working)
6 years ago
{
return true;
}
6 years ago
if (Status == SocketClientStatus.Disposed)
6 years ago
{
return false;
}
6 years ago
if (_clientSocket != null)
{
try
{
_stream?.Close();
_stream?.Dispose();
_clientSocket.Dispose();
}
catch
{
/* ignore */
}
_clientSocket = null;
_stream = null;
}
try
{
_clientSocket = MakeClientSocket();
6 years ago
_clientSocket.Connect(Endpoint);
6 years ago
_stream = new NetworkStream(_clientSocket, true);
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.TryConnect] Connection fault");
6 years ago
Broken();
6 years ago
return false;
}
6 years ago
Working();
6 years ago
OnConnect(this);
6 years ago
return true;
}
public void EnsureConnection()
{
6 years ago
if (_socket_freezed)
{
return;
}
6 years ago
lock (_reconnection_lock)
{
6 years ago
if (Status == SocketClientStatus.Disposed)
6 years ago
{
throw new ObjectDisposedException("connection");
}
6 years ago
if (Status != SocketClientStatus.Working)
6 years ago
{
if (false == TryConnect())
{
throw new Exception("No connection");
6 years ago
}
}
}
}
6 years ago
private void Heartbeat()
6 years ago
{
6 years ago
try
{
EnsureConnection();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.Heartbeat.EnsureConnection]");
Broken();
OnDisconnect(this);
6 years ago
return;
}
_requests.TestForTimeouts();
try
6 years ago
{
6 years ago
_send_queue.Add(new SendInfo
6 years ago
{
6 years ago
identity = 0,
isRequest = false,
data = NetworkPacketFactory.KeepAliveMessage()
});
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.Heartbeat.Request]");
}
var diff_request_ms = ((DateTime.UtcNow.Ticks - _last_rw_time) / TimeSpan.TicksPerMillisecond);
if (diff_request_ms > (_current_heartbeat_period_in_ms * 2))
{
var port = (_clientSocket.LocalEndPoint as IPEndPoint)?.Port;
Log.Debug($"[SocketClient.Heartbeat] server disconnected, because last data was more thas {diff_request_ms} ms ago. Client port {port}");
Broken();
6 years ago
}
}
6 years ago
private void ReceiveAsyncCallback(IAsyncResult ar)
6 years ago
{
try
{
EnsureConnection();
6 years ago
var count = _stream.EndRead(ar);
if (count > 0)
{
_parser.Push(_buffer, count);
_last_rw_time = DateTime.UtcNow.Ticks;
}
else
{
// TODO!!!!!
Thread.Sleep(1);
}
5 years ago
if (Status == SocketClientStatus.Working
|| Status == SocketClientStatus.Initialized)
6 years ago
{
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
6 years ago
}
6 years ago
catch (ObjectDisposedException)
6 years ago
{
6 years ago
/// Nothing
6 years ago
}
catch (Exception ex)
{
6 years ago
Log.SystemError(ex, $"[SocketClient.ReceiveAsyncCallback] Error read data");
6 years ago
Broken();
6 years ago
OnDisconnect(this);
}
}
5 years ago
/*
6 years ago
private void SendFramesJob()
{
SendInfo frame;
5 years ago
int unsuccess = 0;
6 years ago
while (Status != SocketClientStatus.Disposed)
{
if (_send_queue.IsCompleted)
{
return;
}
if (Status != SocketClientStatus.Working)
{
Thread.Sleep(100);
try
{
EnsureConnection();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault");
}
if (Status == SocketClientStatus.Disposed) return;
5 years ago
if (Status == SocketClientStatus.Broken)
{
unsuccess++;
if (unsuccess > 30) unsuccess = 30;
}
if (Status == SocketClientStatus.Working)
{
unsuccess = 0;
}
Thread.Sleep(unsuccess * 100);
6 years ago
continue;
}
try
{
frame = _send_queue.Take();
if (frame.isRequest)
{
_requests.StartSend(frame.identity);
}
_stream.Write(frame.data, 0, frame.data.Length);
_last_rw_time = DateTime.UtcNow.Ticks;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[SocketClient.SendFramesJob] Backward send error.");
Broken();
OnDisconnect(this);
}
6 years ago
}
}
5 years ago
*/
private void SendFramesJob()
{
SendInfo frame = default(SendInfo);
int unsuccess = 0;
while (Status != SocketClientStatus.Disposed)
{
if (_send_queue.IsCompleted)
{
return;
}
try
{
frame = _send_queue.Take();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.SendFramesJob] send_queue.Take");
_send_queue.Dispose();
_send_queue = new BlockingCollection<SendInfo>();
continue;
}
5 years ago
while (_stream?.CanWrite == false || Status != SocketClientStatus.Working)
5 years ago
{
try
{
EnsureConnection();
}
catch (Exception ex)
{
Log.SystemError(ex, "[SocketClient.SendFramesJob] Send next frame fault");
}
if (Status == SocketClientStatus.Disposed)
{
return;
}
if (Status == SocketClientStatus.Broken)
{
unsuccess++;
if (unsuccess > 30) unsuccess = 30;
}
if (Status == SocketClientStatus.Working)
{
unsuccess = 0;
}
Thread.Sleep(unsuccess * 128);
}
try
{
if (frame.isRequest)
{
_requests.StartSend(frame.identity);
}
_stream.Write(frame.data, 0, frame.data.Length);
_last_rw_time = DateTime.UtcNow.Ticks;
}
catch (Exception ex)
{
Log.SystemError(ex, $"[SocketClient.SendFramesJob] _stream.Write");
Broken();
OnDisconnect(this);
}
}
}
6 years ago
#endregion
6 years ago
#region Helper
6 years ago
private static Socket MakeClientSocket()
{
var s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
s.SetIPProtectionLevel(IPProtectionLevel.Unrestricted);
s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);
return s;
}
#endregion Helper
6 years ago
public override void Dispose()
{
6 years ago
if (Status == SocketClientStatus.Working)
6 years ago
{
6 years ago
OnDisconnect(this);
6 years ago
}
6 years ago
Disposed();
6 years ago
Sheduller.Remove(_heartbeat_key);
_stream?.Close();
_stream?.Dispose();
}
}
6 years ago
}

Powered by TurnKey Linux.