Network refactoring

pull/1/head
Ogoun 5 years ago
parent 00677aa03b
commit 3dd4d22c22

@ -1,6 +1,11 @@
using Newtonsoft.Json; using Newtonsoft.Json;
using System;
using System.Net;
using System.Threading;
using ZeroLevel; using ZeroLevel;
using ZeroLevel.Logging; using ZeroLevel.Logging;
using ZeroLevel.Network;
using ZeroLevel.Services.Serialization;
namespace TestApp namespace TestApp
{ {
@ -30,5 +35,38 @@ namespace TestApp
.Stop(); .Stop();
Bootstrap.Shutdown(); Bootstrap.Shutdown();
} }
static void SimpleCSTest()
{
var server_router = new Router();
server_router.RegisterInbox<string>("test", (c, line) =>
{
Console.WriteLine(line);
});
server_router.RegisterInbox<string, string>("req", (c, line) =>
{
Console.WriteLine($"Request: {line}");
return line.ToUpperInvariant();
});
var server = new SocketServer(new System.Net.IPEndPoint(IPAddress.Any, 666), server_router);
var client_router = new Router();
var client = new SocketClient(new IPEndPoint(IPAddress.Loopback, 666), client_router);
var frm = FrameFactory.Create("req", MessageSerializer.SerializeCompatible("Hello world"));
while (Console.KeyAvailable == false)
{
client.Request(frm, data =>
{
var line = MessageSerializer.DeserializeCompatible<string>(data);
Console.WriteLine($"Response: {line}");
});
Thread.Sleep(2000);
}
}
} }
} }

@ -13,8 +13,8 @@ namespace ZeroLevel.Network
IRouter Router { get; } IRouter Router { get; }
void Send(Frame data); bool Send(Frame data);
void Request(Frame data, Action<byte[]> callback, Action<string> fail = null); bool Request(Frame data, Action<byte[]> callback, Action<string> fail = null);
void Response(byte[] data, int identity); bool Response(byte[] data, int identity);
} }
} }

@ -22,8 +22,7 @@ namespace ZeroLevel.Network
{ {
try try
{ {
_client.Send(FrameFactory.Create(inbox)); return _client.Send(FrameFactory.Create(inbox));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -36,8 +35,7 @@ namespace ZeroLevel.Network
{ {
try try
{ {
_client.Send(FrameFactory.Create(inbox, data)); return _client.Send(FrameFactory.Create(inbox, data));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -50,8 +48,7 @@ namespace ZeroLevel.Network
{ {
try try
{ {
_client.Send(FrameFactory.Create(BaseSocket.DEFAULT_MESSAGE_INBOX, MessageSerializer.SerializeCompatible<T>(message))); return _client.Send(FrameFactory.Create(BaseSocket.DEFAULT_MESSAGE_INBOX, MessageSerializer.SerializeCompatible<T>(message)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -64,8 +61,7 @@ namespace ZeroLevel.Network
{ {
try try
{ {
_client.Send(FrameFactory.Create(inbox, MessageSerializer.SerializeCompatible<T>(message))); return _client.Send(FrameFactory.Create(inbox, MessageSerializer.SerializeCompatible<T>(message)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -78,8 +74,7 @@ namespace ZeroLevel.Network
{ {
try try
{ {
_client.Request(FrameFactory.Create(inbox), f => callback(f)); return _client.Request(FrameFactory.Create(inbox), f => callback(f));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -92,8 +87,7 @@ namespace ZeroLevel.Network
{ {
try try
{ {
_client.Request(FrameFactory.Create(inbox, data), f => callback(f)); return _client.Request(FrameFactory.Create(inbox, data), f => callback(f));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -106,8 +100,7 @@ namespace ZeroLevel.Network
{ {
try try
{ {
_client.Request(FrameFactory.Create(inbox), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f))); return _client.Request(FrameFactory.Create(inbox), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -120,8 +113,7 @@ namespace ZeroLevel.Network
{ {
try try
{ {
_client.Request(FrameFactory.Create(BaseSocket.DEFAULT_REQUEST_INBOX), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f))); return _client.Request(FrameFactory.Create(BaseSocket.DEFAULT_REQUEST_INBOX), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -134,9 +126,8 @@ namespace ZeroLevel.Network
{ {
try try
{ {
_client.Request(FrameFactory.Create(inbox, MessageSerializer.SerializeCompatible<Trequest>(request)), return _client.Request(FrameFactory.Create(inbox, MessageSerializer.SerializeCompatible<Trequest>(request)),
f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f))); f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -149,9 +140,8 @@ namespace ZeroLevel.Network
{ {
try try
{ {
_client.Request(FrameFactory.Create(BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, MessageSerializer.SerializeCompatible<Trequest>(request)), return _client.Request(FrameFactory.Create(BaseSocket.DEFAULT_REQUEST_WITHOUT_ARGS_INBOX, MessageSerializer.SerializeCompatible<Trequest>(request)),
f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f))); f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
return true;
} }
catch (Exception ex) catch (Exception ex)
{ {

@ -14,32 +14,14 @@ namespace ZeroLevel.Network
#region Private #region Private
#region Queues #region Queues
private class IncomingFrame private struct IncomingFrame
{ {
private IncomingFrame() { }
public FrameType type; public FrameType type;
public int identity; public int identity;
public byte[] data; public byte[] data;
public static IncomingFrame NewFrame() => new IncomingFrame();
}
private class SendFrame
{
private SendFrame() { }
public bool isRequest;
public int identity;
public byte[] data;
public static SendFrame NewFrame() => new SendFrame();
} }
private ObjectPool<IncomingFrame> _incoming_frames_pool = new ObjectPool<IncomingFrame>(() => IncomingFrame.NewFrame());
private ObjectPool<SendFrame> _send_frames_pool = new ObjectPool<SendFrame>(() => SendFrame.NewFrame());
private BlockingCollection<IncomingFrame> _incoming_queue = new BlockingCollection<IncomingFrame>(); private BlockingCollection<IncomingFrame> _incoming_queue = new BlockingCollection<IncomingFrame>();
private BlockingCollection<SendFrame> _send_queue = new BlockingCollection<SendFrame>(BaseSocket.MAX_SEND_QUEUE_SIZE);
#endregion #endregion
private Socket _clientSocket; private Socket _clientSocket;
@ -49,15 +31,12 @@ namespace ZeroLevel.Network
private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета
private readonly object _reconnection_lock = new object(); private readonly object _reconnection_lock = new object();
private long _heartbeat_key; private long _heartbeat_key;
private Thread _sendThread;
private Thread _receiveThread; private Thread _receiveThread;
#endregion Private #endregion Private
public IRouter Router { get; } public IRouter Router { get; }
public bool IsEmptySendQueue { get { return _send_queue.Count == 0; } }
public SocketClient(IPEndPoint ep, IRouter router) public SocketClient(IPEndPoint ep, IRouter router)
{ {
try try
@ -100,10 +79,6 @@ namespace ZeroLevel.Network
private void StartInternalThreads() private void StartInternalThreads()
{ {
_sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true;
_sendThread.Start();
_receiveThread = new Thread(IncomingFramesJob); _receiveThread = new Thread(IncomingFramesJob);
_receiveThread.IsBackground = true; _receiveThread.IsBackground = true;
_receiveThread.Start(); _receiveThread.Start();
@ -136,61 +111,27 @@ namespace ZeroLevel.Network
public event Action<ISocketClient> OnDisconnect = (_) => { }; public event Action<ISocketClient> OnDisconnect = (_) => { };
public IPEndPoint Endpoint { get; } public IPEndPoint Endpoint { get; }
public void Request(Frame frame, Action<byte[]> callback, Action<string> fail = null) public bool Request(Frame frame, Action<byte[]> callback, Action<string> fail = null)
{ {
if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Request] Socket status: {Status}"); if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Request] Socket status: {Status}");
var data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id); var data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id);
_requests.RegisterForFrame(id, callback, fail);
if (!_send_queue.IsAddingCompleted) return Send(id, true, data);
{
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{
Thread.Sleep(1);
}
_requests.RegisterForFrame(id, callback, fail);
var sf = _send_frames_pool.Allocate();
sf.isRequest = true;
sf.identity = id;
sf.data = data;
_send_queue.Add(sf);
}
} }
public void Send(Frame frame) public bool Send(Frame frame)
{ {
if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Send] Socket status: {Status}"); if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Send] Socket status: {Status}");
var data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame)); var data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame));
return Send(0, false, data);
if (!_send_queue.IsAddingCompleted)
{
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{
Thread.Sleep(1);
}
var sf = _send_frames_pool.Allocate();
sf.isRequest = false;
sf.identity = 0;
sf.data = data;
_send_queue.Add(sf);
}
} }
public void Response(byte[] data, int identity) public bool Response(byte[] data, int identity)
{ {
if (data == null) throw new ArgumentNullException(nameof(data)); if (data == null) throw new ArgumentNullException(nameof(data));
if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Response] Socket status: {Status}"); if (Status != SocketClientStatus.Working) throw new Exception($"[SocketClient.Response] Socket status: {Status}");
if (!_send_queue.IsAddingCompleted)
{ return Send(0, false, NetworkPacketFactory.Response(data, identity));
while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE)
{
Thread.Sleep(1);
}
var sf = _send_frames_pool.Allocate();
sf.isRequest = false;
sf.identity = 0;
sf.data = NetworkPacketFactory.Response(data, identity);
_send_queue.Add(sf);
}
} }
#endregion #endregion
@ -201,11 +142,12 @@ namespace ZeroLevel.Network
try try
{ {
if (type == FrameType.KeepAlive) return; if (type == FrameType.KeepAlive) return;
var inc_frame = _incoming_frames_pool.Allocate(); _incoming_queue.Add(new IncomingFrame
inc_frame.data = data; {
inc_frame.type = type; data = data,
inc_frame.identity = identity; type = type,
_incoming_queue.Add(inc_frame); identity = identity
});
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -274,17 +216,14 @@ namespace ZeroLevel.Network
return; return;
} }
_requests.TestForTimeouts(); _requests.TestForTimeouts();
var info = _send_frames_pool.Allocate();
info.isRequest = false; Send(0, false, NetworkPacketFactory.KeepAliveMessage());
info.identity = 0;
info.data = NetworkPacketFactory.KeepAliveMessage();
_send_queue.Add(info);
} }
private void IncomingFramesJob() private void IncomingFramesJob()
{ {
IncomingFrame frame = default(IncomingFrame); IncomingFrame frame = default(IncomingFrame);
while (Status != SocketClientStatus.Disposed && !_send_queue.IsCompleted) while (Status != SocketClientStatus.Disposed)
{ {
try try
{ {
@ -319,10 +258,6 @@ namespace ZeroLevel.Network
{ {
Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame"); Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame");
} }
finally
{
_incoming_frames_pool.Free(frame);
}
} }
} }
catch (Exception ex) catch (Exception ex)
@ -333,61 +268,33 @@ namespace ZeroLevel.Network
_incoming_queue.Dispose(); _incoming_queue.Dispose();
_incoming_queue = new BlockingCollection<IncomingFrame>(); _incoming_queue = new BlockingCollection<IncomingFrame>();
} }
if (frame != null)
{
_incoming_frames_pool.Free(frame);
}
continue; continue;
} }
} }
} }
private void SendFramesJob() private bool Send(int id, bool is_request, byte[] data)
{ {
SendFrame frame = null; if (Status == SocketClientStatus.Working)
while (Status != SocketClientStatus.Disposed && !_send_queue.IsCompleted)
{ {
try try
{ {
if (_send_queue.TryTake(out frame, 100)) if (is_request)
{ {
try _requests.StartSend(id);
{
if (frame.isRequest)
{
_requests.StartSend(frame.identity);
}
_clientSocket.Send(frame.data, frame.data.Length, SocketFlags.None);
}
catch (Exception ex)
{
Log.SystemError(ex, $"[SocketClient.SendFramesJob] _str_clientSocketeam.Send");
Broken();
OnDisconnect(this);
}
finally
{
_send_frames_pool.Free(frame);
}
} }
var sended = _clientSocket.Send(data, data.Length, SocketFlags.None);
return sended == data.Length;
} }
catch (Exception ex) catch (Exception ex)
{ {
Log.SystemError(ex, "[SocketClient.SendFramesJob] send_queue.TryTake"); Log.SystemError(ex, $"[SocketClient.SendFramesJob] _str_clientSocketeam.Send");
if (Status != SocketClientStatus.Disposed) Broken();
{ OnDisconnect(this);
_send_queue.Dispose();
_send_queue = new BlockingCollection<SendFrame>();
}
if (frame != null)
{
_send_frames_pool.Free(frame);
}
continue;
} }
} }
return false;
} }
#endregion #endregion
public override void Dispose() public override void Dispose()

@ -8,7 +8,7 @@ using ZeroLevel.Network.SDL;
namespace ZeroLevel.Network namespace ZeroLevel.Network
{ {
internal sealed class SocketServer public sealed class SocketServer
: BaseSocket, IRouter : BaseSocket, IRouter
{ {
private Socket _serverSocket; private Socket _serverSocket;

@ -1,7 +1,6 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using ZeroLevel.Services.Pools; using ZeroLevel.Services.Pools;
namespace ZeroLevel.Network namespace ZeroLevel.Network

@ -10,7 +10,7 @@ using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network namespace ZeroLevel.Network
{ {
internal sealed class Router public sealed class Router
: IRouter : IRouter
{ {
public event Action<ISocketClient> OnDisconnect = _ => { }; // must be never rised public event Action<ISocketClient> OnDisconnect = _ => { }; // must be never rised

Loading…
Cancel
Save

Powered by TurnKey Linux.