You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Zero/ZeroLevel/Services/Network/ZSocketClient.cs

332 lines
10 KiB

6 years ago
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
6 years ago
{
public class ZSocketClient
: ZBaseNetwork, IZTransport
{
6 years ago
6 years ago
#region Private
6 years ago
private Socket _clientSocket;
private NetworkStream _stream;
private FrameParser _parser = new FrameParser();
private Thread _sendThread;
private long _heartbeat_key;
private long _last_rw_time = DateTime.UtcNow.Ticks;
private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
private readonly object _reconnection_lock = new object();
6 years ago
6 years ago
private readonly BlockingCollection<Frame> _send_queue = new BlockingCollection<Frame>();
private readonly RequestBuffer _requests = new RequestBuffer();
#endregion Private
6 years ago
public event EventHandler<Frame> OnServerMessage = (_, __) => { };
6 years ago
public event Action OnConnect = () => { };
6 years ago
public event Action OnDisconnect = () => { };
6 years ago
public IPEndPoint Endpoint { get; }
6 years ago
public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } }
public ZSocketClient(IPEndPoint ep)
{
6 years ago
Endpoint = ep;
6 years ago
_parser.OnIncomingFrame += _parser_OnIncomingFrame;
_heartbeat_key = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat);
_sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true;
_sendThread.Start();
}
#region Private methods
6 years ago
private void Heartbeat()
{
try
{
EnsureConnection();
}
catch
{
6 years ago
Broken();
6 years ago
return;
}
_requests.TestForTimeouts();
try
{
Request(FrameBuilder.BuildFrame(DEFAULT_PING_INBOX), r => { });
}
catch (Exception ex)
{
Log.SystemError(ex, "Fault ping reauest");
}
var diff_request_ms = ((DateTime.UtcNow.Ticks - _last_rw_time) / TimeSpan.TicksPerMillisecond);
if (diff_request_ms > (HEARTBEAT_UPDATE_PERIOD_MS * 2))
{
var port = (_clientSocket.LocalEndPoint as IPEndPoint)?.Port;
Log.Debug($"[ZClient] server disconnected, because last data was more thas {diff_request_ms} ms ago. Client port {port}");
6 years ago
Broken();
6 years ago
}
}
private void _parser_OnIncomingFrame(Frame frame)
{
if (frame == null || frame.Inbox == null) return;
_last_rw_time = DateTime.UtcNow.Ticks;
if (frame.IsRequest)
{
// Got response on request with id = packet_id
try
{
_requests.Success(frame.FrameId, frame);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZClient] Fault handle response");
}
}
else
{
// Got server comand
if (frame.Inbox.Equals(DEFAULT_PING_INBOX, StringComparison.Ordinal))
{
_last_rw_time = DateTime.UtcNow.Ticks;
}
else
{
try
{
OnServerMessage?.Invoke(this, frame);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZClient] Fault handle server message");
}
}
}
frame?.Release();
}
private void ReceiveAsyncCallback(IAsyncResult ar)
{
try
{
EnsureConnection();
var count = _stream.EndRead(ar);
if (count > 0)
{
_parser.Push(_buffer, 0, count);
_last_rw_time = DateTime.UtcNow.Ticks;
}
6 years ago
if (Status == ZTransportStatus.Working)
6 years ago
{
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
}
catch (ObjectDisposedException)
{
/// Nothing
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Error read data");
6 years ago
Broken();
6 years ago
OnDisconnect();
}
}
private void SendFramesJob()
{
Frame frame = null;
6 years ago
while (Status != ZTransportStatus.Disposed)
6 years ago
{
if (_send_queue.IsCompleted)
{
return;
}
6 years ago
if (Status != ZTransportStatus.Working)
6 years ago
{
Thread.Sleep(100);
try
{
6 years ago
EnsureConnection();
6 years ago
}
6 years ago
catch (Exception ex)
6 years ago
{
6 years ago
Log.SystemError(ex, "[ZSocketClient] Send next frame fault");
6 years ago
}
6 years ago
if (Status == ZTransportStatus.Disposed) return;
6 years ago
continue;
}
try
{
frame = _send_queue.Take();
var data = NetworkStreamFastObfuscator.PrepareData(MessageSerializer.Serialize(frame));
if (data != null && data.Length > 0)
{
if (frame.IsRequest)
{
_requests.StartSend(frame.FrameId);
}
_stream.Write(data, 0, data.Length);
_last_rw_time = DateTime.UtcNow.Ticks;
//NetworkStats.Send(data);
}
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketServerClient] Backward send error.");
6 years ago
Broken();
6 years ago
OnDisconnect();
}
finally
{
frame?.Release();
}
}
}
#endregion Private methods
6 years ago
#region API
6 years ago
private bool TryConnect()
{
6 years ago
if (Status == ZTransportStatus.Working)
{
return true;
}
if (Status == ZTransportStatus.Disposed)
{
return false;
}
6 years ago
if (_clientSocket != null)
{
try
{
_stream?.Close();
_stream?.Dispose();
_clientSocket.Dispose();
}
catch
{
/* ignore */
}
_clientSocket = null;
_stream = null;
}
try
{
_clientSocket = MakeClientSocket();
6 years ago
_clientSocket.Connect(Endpoint);
6 years ago
_stream = new NetworkStream(_clientSocket, true);
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[ZSocketClient] Connection fault");
6 years ago
Broken();
6 years ago
return false;
}
6 years ago
Working();
6 years ago
OnConnect();
return true;
}
public void EnsureConnection()
{
lock (_reconnection_lock)
{
if (Status == ZTransportStatus.Disposed)
{
throw new ObjectDisposedException("connection");
}
if (Status != ZTransportStatus.Working)
{
if (false == TryConnect())
{
throw new ObjectDisposedException("No connection");
}
}
}
}
public void Send(Frame frame)
{
if (frame == null) throw new ArgumentNullException(nameof(frame));
EnsureConnection();
if (frame != null && false == _send_queue.IsAddingCompleted)
{
while (_send_queue.Count >= ZBaseNetwork.MAX_SEND_QUEUE_SIZE)
{
Thread.Sleep(50);
}
_send_queue.Add(frame);
}
}
public void Request(Frame frame, Action<Frame> callback, Action<string> fail = null)
{
if (frame == null) throw new ArgumentNullException(nameof(frame));
try
{
EnsureConnection();
}
catch (Exception ex)
{
fail?.Invoke(ex.Message);
return;
}
_requests.RegisterForFrame(frame, callback, fail);
try
{
Send(frame);
}
catch (Exception ex)
{
fail?.Invoke(ex.Message);
6 years ago
Broken();
6 years ago
OnDisconnect();
Log.SystemError(ex, $"[ZSocketClient] Request error. Frame '{frame.FrameId}'. Inbox '{frame.Inbox}'");
}
}
#endregion API
6 years ago
#region Helper
6 years ago
private static Socket MakeClientSocket()
{
var s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
s.SetIPProtectionLevel(IPProtectionLevel.Unrestricted);
s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);
return s;
}
#endregion Helper
6 years ago
public override void Dispose()
{
6 years ago
if (Status == ZTransportStatus.Working)
6 years ago
{
OnDisconnect();
}
6 years ago
Disposed();
6 years ago
Sheduller.Remove(_heartbeat_key);
_stream?.Close();
_stream?.Dispose();
}
}
}

Powered by TurnKey Linux.