pull/1/head
a.bozhenov 5 years ago
parent 1d24a48ac2
commit 23b95c4848

@ -18,7 +18,7 @@ namespace TestApp
protected override void StartAction() protected override void StartAction()
{ {
var client = Exchange.GetConnection("192.168.51.104:50223"); var client = Exchange.GetConnection("192.168.51.104:49672");
client?.Request<ServiceDescription>("__service_description__", record => client?.Request<ServiceDescription>("__service_description__", record =>
{ {
Log.Info(record.ServiceInfo.ServiceKey); Log.Info(record.ServiceInfo.ServiceKey);

@ -4,11 +4,11 @@ using System.Globalization;
using System.IO; using System.IO;
using System.IO.Compression; using System.IO.Compression;
using System.Linq; using System.Linq;
using System.Security.AccessControl;
using System.Text; using System.Text;
using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Logging;
namespace ZeroLevel.Services.Logging.Implementation namespace ZeroLevel.Logging
{ {
public sealed class TextFileLoggerOptions public sealed class TextFileLoggerOptions
{ {

@ -1,4 +1,5 @@
using System; using System;
using ZeroLevel.Logging;
using ZeroLevel.Services.Logging; using ZeroLevel.Services.Logging;
using ZeroLevel.Services.Logging.Implementation; using ZeroLevel.Services.Logging.Implementation;

@ -1,13 +1,8 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Runtime.ExceptionServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks;
using ZeroLevel.Services;
using ZeroLevel.Services.Pools; using ZeroLevel.Services.Pools;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
@ -17,17 +12,24 @@ namespace ZeroLevel.Network
: BaseSocket, ISocketClient : BaseSocket, ISocketClient
{ {
#region Private #region Private
private struct IncomingFrame private class IncomingFrame
{ {
private IncomingFrame() { }
public FrameType type; public FrameType type;
public int identity; public int identity;
public byte[] data; public byte[] data;
public static IncomingFrame NewFrame() => new IncomingFrame();
} }
private struct SendFrame private class SendFrame
{ {
private SendFrame() { }
public bool isRequest; public bool isRequest;
public int identity; public int identity;
public byte[] data; public byte[] data;
public static SendFrame NewFrame() => new SendFrame();
} }
private Socket _clientSocket; private Socket _clientSocket;
@ -47,6 +49,8 @@ namespace ZeroLevel.Network
private Thread _receiveThread; private Thread _receiveThread;
private BlockingCollection<IncomingFrame> _incoming_queue = new BlockingCollection<IncomingFrame>(); private BlockingCollection<IncomingFrame> _incoming_queue = new BlockingCollection<IncomingFrame>();
private BlockingCollection<SendFrame> _send_queue = new BlockingCollection<SendFrame>(BaseSocket.MAX_SEND_QUEUE_SIZE); private BlockingCollection<SendFrame> _send_queue = new BlockingCollection<SendFrame>(BaseSocket.MAX_SEND_QUEUE_SIZE);
private ObjectPool<IncomingFrame> _incoming_frames_pool = new ObjectPool<IncomingFrame>(() => IncomingFrame.NewFrame());
private ObjectPool<SendFrame> _send_frames_pool = new ObjectPool<SendFrame>(() => SendFrame.NewFrame());
#endregion Private #endregion Private
public IRouter Router { get; } public IRouter Router { get; }
@ -105,12 +109,11 @@ namespace ZeroLevel.Network
Thread.Sleep(1); Thread.Sleep(1);
} }
_requests.RegisterForFrame(id, callback, fail); _requests.RegisterForFrame(id, callback, fail);
_send_queue.Add(new SendFrame var sf = _send_frames_pool.Allocate();
{ sf.isRequest = true;
isRequest = true, sf.identity = id;
data = data, sf.data = data;
identity = id _send_queue.Add(sf);
});
} }
} }
@ -132,15 +135,14 @@ namespace ZeroLevel.Network
{ {
Thread.Sleep(1); Thread.Sleep(1);
} }
_send_queue.Add(new SendFrame var sf = _send_frames_pool.Allocate();
{ sf.isRequest = false;
isRequest = false, sf.identity = 0;
identity = 0, sf.data = data;
data = data _send_queue.Add(sf);
});
} }
} }
public void Response(byte[] data, int identity) public void Response(byte[] data, int identity)
{ {
if (data == null) throw new ArgumentNullException(nameof(data)); if (data == null) throw new ArgumentNullException(nameof(data));
@ -150,12 +152,11 @@ namespace ZeroLevel.Network
{ {
Thread.Sleep(1); Thread.Sleep(1);
} }
_send_queue.Add(new SendFrame var sf = _send_frames_pool.Allocate();
{ sf.isRequest = false;
isRequest = false, sf.identity = 0;
identity = 0, sf.data = NetworkPacketFactory.Response(data, identity);
data = NetworkPacketFactory.Response(data, identity) _send_queue.Add(sf);
});
} }
} }
@ -184,12 +185,11 @@ namespace ZeroLevel.Network
try try
{ {
if (type == FrameType.KeepAlive) return; if (type == FrameType.KeepAlive) return;
_incoming_queue.Add(new IncomingFrame var inc_frame = _incoming_frames_pool.Allocate();
{ inc_frame.data = data;
data = data, inc_frame.type = type;
type = type, inc_frame.identity = identity;
identity = identity _incoming_queue.Add(inc_frame);
});
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -278,12 +278,10 @@ namespace ZeroLevel.Network
_requests.TestForTimeouts(); _requests.TestForTimeouts();
try try
{ {
var info = new SendFrame var info = _send_frames_pool.Allocate();
{ info.isRequest = false;
isRequest = false, info.identity = 0;
identity = 0, info.data = NetworkPacketFactory.KeepAliveMessage();
data = NetworkPacketFactory.KeepAliveMessage()
};
_send_queue.Add(info); _send_queue.Add(info);
} }
catch (Exception ex) catch (Exception ex)
@ -346,6 +344,10 @@ namespace ZeroLevel.Network
_incoming_queue.Dispose(); _incoming_queue.Dispose();
_incoming_queue = new BlockingCollection<IncomingFrame>(); _incoming_queue = new BlockingCollection<IncomingFrame>();
} }
if (frame != null)
{
_incoming_frames_pool.Free(frame);
}
continue; continue;
} }
try try
@ -377,12 +379,16 @@ namespace ZeroLevel.Network
{ {
Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame"); Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame");
} }
finally
{
_incoming_frames_pool.Free(frame);
}
} }
} }
private void SendFramesJob() private void SendFramesJob()
{ {
SendFrame frame; SendFrame frame = null;
int unsuccess = 0; int unsuccess = 0;
while (Status != SocketClientStatus.Disposed && !_send_queue.IsCompleted) while (Status != SocketClientStatus.Disposed && !_send_queue.IsCompleted)
{ {
@ -398,6 +404,10 @@ namespace ZeroLevel.Network
_send_queue.Dispose(); _send_queue.Dispose();
_send_queue = new BlockingCollection<SendFrame>(); _send_queue = new BlockingCollection<SendFrame>();
} }
if (frame != null)
{
_send_frames_pool.Free(frame);
}
continue; continue;
} }
while (_stream?.CanWrite == false || Status != SocketClientStatus.Working) while (_stream?.CanWrite == false || Status != SocketClientStatus.Working)
@ -441,6 +451,10 @@ namespace ZeroLevel.Network
Broken(); Broken();
OnDisconnect(this); OnDisconnect(this);
} }
finally
{
_send_frames_pool.Free(frame);
}
} }
} }

@ -76,9 +76,7 @@ namespace ZeroLevel.Network
try try
{ {
var client_socket = _serverSocket.EndAccept(ar); var client_socket = _serverSocket.EndAccept(ar);
_serverSocket.BeginAccept(BeginAcceptCallback, null);
_connection_set_lock.EnterWriteLock(); _connection_set_lock.EnterWriteLock();
var connection = new SocketClient(client_socket, _router); var connection = new SocketClient(client_socket, _router);
connection.OnDisconnect += Connection_OnDisconnect; connection.OnDisconnect += Connection_OnDisconnect;
_connections[connection.Endpoint] = new ExClient(connection); _connections[connection.Endpoint] = new ExClient(connection);
@ -88,12 +86,24 @@ namespace ZeroLevel.Network
catch (Exception ex) catch (Exception ex)
{ {
Broken(); Broken();
Log.SystemError(ex, "[ZSocketServer] Error with connect accepting"); Log.SystemError(ex, "[ZSocketServer.BeginAcceptCallback] Error with connect accepting");
} }
finally finally
{ {
_connection_set_lock.ExitWriteLock(); _connection_set_lock.ExitWriteLock();
}
try
{
_serverSocket.BeginAccept(BeginAcceptCallback, null);
} }
catch (Exception ex)
{
Log.SystemError(ex, "[ZSocketServer.BeginAcceptCallback] BeginAccept error");
}
}
else
{
Log.Warning($"Server socket change state to: {Status}");
} }
} }

@ -33,6 +33,8 @@ namespace ZeroLevel.Services.Pools
// than "new T()". // than "new T()".
private readonly Factory _factory; private readonly Factory _factory;
public int Count => _items?.Length ?? 0;
public ObjectPool(Factory factory) public ObjectPool(Factory factory)
: this(factory, Environment.ProcessorCount * 2) : this(factory, Environment.ProcessorCount * 2)
{ } { }

Loading…
Cancel
Save

Powered by TurnKey Linux.