diff --git a/Tests/ZeroLevel.UnitTests/SerializationTests.cs b/Tests/ZeroLevel.UnitTests/SerializationTests.cs
index 0304db0..6661c9b 100644
--- a/Tests/ZeroLevel.UnitTests/SerializationTests.cs
+++ b/Tests/ZeroLevel.UnitTests/SerializationTests.cs
@@ -1,9 +1,7 @@
-using Newtonsoft.Json.Linq;
-using System;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
-using System.Threading.Tasks;
using Xunit;
using ZeroLevel.DocumentObjectModel;
using ZeroLevel.Network;
diff --git a/ZeroLevel/Services/Network/AdaptiveBufferManager.cs b/ZeroLevel/Services/Network/AdaptiveBufferManager.cs
new file mode 100644
index 0000000..c124639
--- /dev/null
+++ b/ZeroLevel/Services/Network/AdaptiveBufferManager.cs
@@ -0,0 +1,132 @@
+using System;
+using System.Buffers;
+
+namespace ZeroLevel.Services.Network
+{
+ ///
+ /// Управляет адаптивным буфером для сетевых операций
+ ///
+ public class AdaptiveBufferManager : IDisposable
+ {
+ private byte[] _buffer;
+ private int _currentSize;
+ private readonly int _minSize;
+ private readonly int _maxSize;
+
+ // Счетчики для адаптивного изменения размера
+ private int _consecutiveSmallReads = 0;
+ private int _consecutiveLargeReads = 0;
+
+ // Настройки адаптации
+ private readonly double _increaseThreshold;
+ private readonly double _decreaseThreshold;
+ private readonly int _increaseAfterReads;
+ private readonly int _decreaseAfterReads;
+
+ public byte[] Buffer => _buffer;
+ public int CurrentSize => _currentSize;
+
+ public AdaptiveBufferManager(
+ int minSize = 4096,
+ int maxSize = 65536,
+ double increaseThreshold = 0.9,
+ double decreaseThreshold = 0.25,
+ int increaseAfterReads = 3,
+ int decreaseAfterReads = 10)
+ {
+ _minSize = minSize;
+ _maxSize = maxSize;
+ _currentSize = minSize;
+ _increaseThreshold = increaseThreshold;
+ _decreaseThreshold = decreaseThreshold;
+ _increaseAfterReads = increaseAfterReads;
+ _decreaseAfterReads = decreaseAfterReads;
+
+ AllocateBuffer(_currentSize);
+ }
+
+ ///
+ /// Обрабатывает результат чтения и адаптирует размер буфера
+ ///
+ public void ProcessReadResult(int bytesRead)
+ {
+ AdjustBufferSize(bytesRead);
+ }
+
+ ///
+ /// Принудительно изменить размер буфера
+ ///
+ public void ResizeBuffer(int newSize)
+ {
+ if (newSize < _minSize || newSize > _maxSize)
+ {
+ throw new ArgumentOutOfRangeException(nameof(newSize),
+ $"Size must be between {_minSize} and {_maxSize}");
+ }
+
+ AllocateBuffer(newSize);
+ }
+
+ private void AllocateBuffer(int size)
+ {
+ // Возвращаем старый буфер если есть
+ if (_buffer != null)
+ {
+ ArrayPool.Shared.Return(_buffer, clearArray: false);
+ }
+
+ // Арендуем новый буфер нужного размера
+ _buffer = ArrayPool.Shared.Rent(size);
+ _currentSize = _buffer.Length; // ArrayPool может вернуть буфер большего размера
+ }
+
+ private void AdjustBufferSize(int bytesRead)
+ {
+ // Если прочитали почти весь буфер, возможно нужен больший размер
+ if (bytesRead >= _currentSize * _increaseThreshold)
+ {
+ _consecutiveLargeReads++;
+ _consecutiveSmallReads = 0;
+
+ // После N больших чтений подряд увеличиваем буфер
+ if (_consecutiveLargeReads >= _increaseAfterReads && _currentSize < _maxSize)
+ {
+ var newSize = Math.Min(_currentSize * 2, _maxSize);
+ Log.Debug($"[AdaptiveBuffer] Increasing size from {_currentSize} to {newSize}");
+ AllocateBuffer(newSize);
+ _consecutiveLargeReads = 0;
+ }
+ }
+ // Если прочитали мало данных, возможно можно уменьшить буфер
+ else if (bytesRead < _currentSize * _decreaseThreshold)
+ {
+ _consecutiveSmallReads++;
+ _consecutiveLargeReads = 0;
+
+ // После N маленьких чтений подряд уменьшаем буфер
+ if (_consecutiveSmallReads >= _decreaseAfterReads && _currentSize > _minSize)
+ {
+ var newSize = Math.Max(_currentSize / 2, _minSize);
+ Log.Debug($"[AdaptiveBuffer] Decreasing size from {_currentSize} to {newSize}");
+ AllocateBuffer(newSize);
+ _consecutiveSmallReads = 0;
+ }
+ }
+ else
+ {
+ // Сброс счетчиков при среднем использовании
+ _consecutiveSmallReads = 0;
+ _consecutiveLargeReads = 0;
+ }
+ }
+
+ public void Dispose()
+ {
+ if (_buffer != null)
+ {
+ ArrayPool.Shared.Return(_buffer, clearArray: false);
+ _buffer = null!;
+ }
+ }
+ }
+}
diff --git a/ZeroLevel/Services/Network/BaseSocket.cs b/ZeroLevel/Services/Network/BaseSocket.cs
index b46d790..c743766 100644
--- a/ZeroLevel/Services/Network/BaseSocket.cs
+++ b/ZeroLevel/Services/Network/BaseSocket.cs
@@ -21,7 +21,7 @@ namespace ZeroLevel.Network
///
/// Buffer size for receiving data
///
- protected const int DEFAULT_RECEIVE_BUFFER_SIZE = 4096;
+ // protected const int DEFAULT_RECEIVE_BUFFER_SIZE = 4096;
///
/// If during the specified period there was no network activity, send a ping-request
diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs
index 3dbd918..54a395e 100644
--- a/ZeroLevel/Services/Network/SocketClient.cs
+++ b/ZeroLevel/Services/Network/SocketClient.cs
@@ -4,6 +4,7 @@ using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
+using ZeroLevel.Services.Network;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
@@ -35,24 +36,40 @@ namespace ZeroLevel.Network
private Socket _clientSocket;
private FrameParser _parser;
- private readonly RequestBuffer _requests = new RequestBuffer();
- private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
+ private readonly RequestBuffer _requests = new RequestBuffer();
private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета
private readonly object _reconnection_lock = new object();
private long _heartbeat_key;
private Thread _receiveThread;
private Thread _sendingThread;
+ //private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
+ private readonly AdaptiveBufferManager _bufferManager;
+
#endregion Private
public IRouter Router { get; }
public SocketClient(IPEndPoint ep, IRouter router)
{
+ _bufferManager = new AdaptiveBufferManager(
+ minSize: 4096,
+ maxSize: 65536,
+ increaseThreshold: 0.9,
+ decreaseThreshold: 0.25,
+ increaseAfterReads: 10,
+ decreaseAfterReads: 100
+ );
+
try
{
_clientSocket = new Socket(ep.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
- _clientSocket.Connect(ep);
+ _clientSocket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true); // Отключение алгоритма Nagle
+ _clientSocket.SendBufferSize = 65536; // Увеличение буфера отправки
+ _clientSocket.ReceiveBufferSize = 65536; // Увеличение буфера приема
+ //_clientSocket.Connect(ep);
+ OpenConnection(_clientSocket, ep);
+ _clientSocket.SetKeepAlive(true, 30000, 10000);
OnConnect(this);
}
catch (Exception ex)
@@ -71,8 +88,28 @@ namespace ZeroLevel.Network
StartReceive();
}
+ private static void OpenConnection(Socket socket, IPEndPoint ep)
+ {
+ var connectResult = socket.BeginConnect(ep, null, null);
+ if (!connectResult.AsyncWaitHandle.WaitOne(5000, false))
+ {
+ socket.Close();
+ throw new TimeoutException("Connection timeout");
+ }
+ socket.EndConnect(connectResult);
+ }
+
public SocketClient(Socket socket, IRouter router)
{
+ _bufferManager = new AdaptiveBufferManager(
+ minSize: 4096,
+ maxSize: 65536,
+ increaseThreshold: 0.9,
+ decreaseThreshold: 0.25,
+ increaseAfterReads: 10,
+ decreaseAfterReads: 100
+ );
+
Router = router;
_clientSocket = socket;
Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint;
@@ -99,26 +136,6 @@ namespace ZeroLevel.Network
_heartbeat_key = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat);
}
- private void StartReceive()
- {
- try
- {
- _clientSocket.BeginReceive(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, SocketFlags.None, ReceiveAsyncCallback, null);
- }
- catch (NullReferenceException)
- {
- Broken();
- Log.SystemError("[SocketClient.TryConnect] Client : Null Reference Exception - On Connect (begin receive section)");
- _clientSocket.Disconnect(false);
- }
- catch (SocketException e)
- {
- Broken();
- Log.SystemError(e, "[SocketClient.TryConnect] Client : Exception - On Connect (begin receive section)");
- _clientSocket.Disconnect(false);
- }
- }
-
#region API
public event Action OnConnect = (_) => { };
public event Action OnDisconnect = (_) => { };
@@ -167,6 +184,26 @@ namespace ZeroLevel.Network
}
}
+ private void StartReceive()
+ {
+ try
+ {
+ _clientSocket.BeginReceive(_bufferManager.Buffer, 0, _bufferManager.CurrentSize, SocketFlags.None, ReceiveAsyncCallback, null);
+ }
+ catch (NullReferenceException)
+ {
+ Broken();
+ Log.SystemError("[SocketClient.TryConnect] Client : Null Reference Exception - On Connect (begin receive section)");
+ _clientSocket.Disconnect(false);
+ }
+ catch (SocketException e)
+ {
+ Broken();
+ Log.SystemError(e, "[SocketClient.TryConnect] Client : Exception - On Connect (begin receive section)");
+ _clientSocket.Disconnect(false);
+ }
+ }
+
public void ReceiveAsyncCallback(IAsyncResult ar)
{
try
@@ -174,7 +211,8 @@ namespace ZeroLevel.Network
var count = _clientSocket.EndReceive(ar);
if (count > 0)
{
- _parser.Push(_buffer, count);
+ _parser.Push(_bufferManager.Buffer, count);
+ _bufferManager.ProcessReadResult(count);
}
else
{
@@ -339,6 +377,17 @@ namespace ZeroLevel.Network
}
Disposed();
Sheduller.Remove(_heartbeat_key);
+
+ _bufferManager?.Dispose();
+
+ try
+ {
+ _clientSocket?.Shutdown(SocketShutdown.Both);
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "[SocketClient.Dispose] Shutdown");
+ }
try
{
_clientSocket?.Close();
diff --git a/ZeroLevel/Services/Network/SocketExtensions.cs b/ZeroLevel/Services/Network/SocketExtensions.cs
new file mode 100644
index 0000000..39d9448
--- /dev/null
+++ b/ZeroLevel/Services/Network/SocketExtensions.cs
@@ -0,0 +1,68 @@
+using System;
+using System.Net.Sockets;
+using System.Runtime.InteropServices;
+
+namespace ZeroLevel.Services.Network
+{
+ public static class SocketExtensions
+ {
+ // Структура для настройки TCP KeepAlive (Windows)
+ [StructLayout(LayoutKind.Sequential)]
+ private struct TcpKeepAlive
+ {
+ public uint OnOff;
+ public uint KeepAliveTime;
+ public uint KeepAliveInterval;
+ }
+
+ // Метод расширения для настройки KeepAlive
+ public static void SetKeepAlive(this Socket socket, bool on, uint keepAliveTime, uint keepAliveInterval)
+ {
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ // Windows-специфичная настройка через IOControl
+ var keepAlive = new TcpKeepAlive
+ {
+ OnOff = on ? 1u : 0u,
+ KeepAliveTime = keepAliveTime, // Время в миллисекундах до первой проверки
+ KeepAliveInterval = keepAliveInterval // Интервал между проверками в миллисекундах
+ };
+
+ int size = Marshal.SizeOf(keepAlive);
+ IntPtr ptr = Marshal.AllocHGlobal(size);
+ try
+ {
+ Marshal.StructureToPtr(keepAlive, ptr, false);
+ byte[] buffer = new byte[size];
+ Marshal.Copy(ptr, buffer, 0, size);
+
+ socket.IOControl(IOControlCode.KeepAliveValues, buffer, null);
+ }
+ finally
+ {
+ Marshal.FreeHGlobal(ptr);
+ }
+ }
+ else
+ {
+ // Для Linux/MacOS используем socket options
+ socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, on);
+
+ if (on)
+ {
+ // На Linux эти параметры настраиваются через TCP socket options
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
+ {
+ // TCP_KEEPIDLE (секунды до первой проверки)
+ socket.SetSocketOption(SocketOptionLevel.Tcp, (SocketOptionName)4, (int)(keepAliveTime / 1000));
+ // TCP_KEEPINTVL (интервал между проверками в секундах)
+ socket.SetSocketOption(SocketOptionLevel.Tcp, (SocketOptionName)5, (int)(keepAliveInterval / 1000));
+ // TCP_KEEPCNT (количество проверок)
+ socket.SetSocketOption(SocketOptionLevel.Tcp, (SocketOptionName)6, 9);
+ }
+ }
+ }
+ }
+ }
+
+}
diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs
index c5390c9..5bb36ad 100644
--- a/ZeroLevel/Services/Network/SocketServer.cs
+++ b/ZeroLevel/Services/Network/SocketServer.cs
@@ -5,6 +5,7 @@ using System.Net;
using System.Net.Sockets;
using System.Threading;
using ZeroLevel.Network.SDL;
+using ZeroLevel.Services.Network;
namespace ZeroLevel.Network
{
@@ -62,6 +63,9 @@ namespace ZeroLevel.Network
LocalEndpoint = endpoint;
_serverSocket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_serverSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
+ _serverSocket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true); // Отключение алгоритма Nagle
+ _serverSocket.SendBufferSize = 65536; // Увеличение буфера отправки
+ _serverSocket.ReceiveBufferSize = 65536; // Увеличение буфера приема
_serverSocket.Bind(endpoint);
_serverSocket.Listen(100);
Working();
@@ -75,6 +79,14 @@ namespace ZeroLevel.Network
try
{
var client_socket = _serverSocket.EndAccept(ar);
+ // Настройка клиентского сокета
+ client_socket.NoDelay = true;
+ client_socket.SendBufferSize = 65536;
+ client_socket.ReceiveBufferSize = 65536;
+
+ // Настройка KeepAlive для клиентского соединения
+ client_socket.SetKeepAlive(true, 30000, 10000);
+
var ep = client_socket.RemoteEndPoint as IPEndPoint;
Log.SystemInfo($"[ZSocketServer.BeginAcceptCallback] Incoming connection {(ep?.Address?.ToString() ?? string.Empty)}:{(ep?.Port.ToString() ?? string.Empty)}");
_connection_set_lock.EnterWriteLock();
diff --git a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs
index 634d429..582ef01 100644
--- a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs
+++ b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs
@@ -249,7 +249,7 @@ namespace ZeroLevel.Services.Serialization
long deserialized = BitConverter.ToInt64(buffer, 0);
return DateTime.FromBinary(deserialized);
}
-
+
public IPAddress ReadIP()
{
var exists = ReadByte();
@@ -575,7 +575,7 @@ namespace ZeroLevel.Services.Serialization
}
public partial class MemoryStreamReader
- : IAsyncBinaryReader
+ : IAsyncBinaryReader
{
///
/// Reading byte-package (read the size of the specified number of bytes, and then the packet itself read size)