From 6bb9e87d3bc17f53dcf78febcbaa0e233f8284ed Mon Sep 17 00:00:00 2001 From: ogoun Date: Thu, 19 Jun 2025 19:17:56 +0300 Subject: [PATCH] Fixes Network --- .../ZeroLevel.UnitTests/SerializationTests.cs | 4 +- .../Services/Network/AdaptiveBufferManager.cs | 132 ++++++++++++++++++ ZeroLevel/Services/Network/BaseSocket.cs | 2 +- ZeroLevel/Services/Network/SocketClient.cs | 97 +++++++++---- .../Services/Network/SocketExtensions.cs | 68 +++++++++ ZeroLevel/Services/Network/SocketServer.cs | 12 ++ .../Serialization/MemoryStreamReader.cs | 4 +- 7 files changed, 289 insertions(+), 30 deletions(-) create mode 100644 ZeroLevel/Services/Network/AdaptiveBufferManager.cs create mode 100644 ZeroLevel/Services/Network/SocketExtensions.cs diff --git a/Tests/ZeroLevel.UnitTests/SerializationTests.cs b/Tests/ZeroLevel.UnitTests/SerializationTests.cs index 0304db0..6661c9b 100644 --- a/Tests/ZeroLevel.UnitTests/SerializationTests.cs +++ b/Tests/ZeroLevel.UnitTests/SerializationTests.cs @@ -1,9 +1,7 @@ -using Newtonsoft.Json.Linq; -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Net; -using System.Threading.Tasks; using Xunit; using ZeroLevel.DocumentObjectModel; using ZeroLevel.Network; diff --git a/ZeroLevel/Services/Network/AdaptiveBufferManager.cs b/ZeroLevel/Services/Network/AdaptiveBufferManager.cs new file mode 100644 index 0000000..c124639 --- /dev/null +++ b/ZeroLevel/Services/Network/AdaptiveBufferManager.cs @@ -0,0 +1,132 @@ +using System; +using System.Buffers; + +namespace ZeroLevel.Services.Network +{ + /// + /// Управляет адаптивным буфером для сетевых операций + /// + public class AdaptiveBufferManager : IDisposable + { + private byte[] _buffer; + private int _currentSize; + private readonly int _minSize; + private readonly int _maxSize; + + // Счетчики для адаптивного изменения размера + private int _consecutiveSmallReads = 0; + private int _consecutiveLargeReads = 0; + + // Настройки адаптации + private readonly double _increaseThreshold; + private readonly double _decreaseThreshold; + private readonly int _increaseAfterReads; + private readonly int _decreaseAfterReads; + + public byte[] Buffer => _buffer; + public int CurrentSize => _currentSize; + + public AdaptiveBufferManager( + int minSize = 4096, + int maxSize = 65536, + double increaseThreshold = 0.9, + double decreaseThreshold = 0.25, + int increaseAfterReads = 3, + int decreaseAfterReads = 10) + { + _minSize = minSize; + _maxSize = maxSize; + _currentSize = minSize; + _increaseThreshold = increaseThreshold; + _decreaseThreshold = decreaseThreshold; + _increaseAfterReads = increaseAfterReads; + _decreaseAfterReads = decreaseAfterReads; + + AllocateBuffer(_currentSize); + } + + /// + /// Обрабатывает результат чтения и адаптирует размер буфера + /// + public void ProcessReadResult(int bytesRead) + { + AdjustBufferSize(bytesRead); + } + + /// + /// Принудительно изменить размер буфера + /// + public void ResizeBuffer(int newSize) + { + if (newSize < _minSize || newSize > _maxSize) + { + throw new ArgumentOutOfRangeException(nameof(newSize), + $"Size must be between {_minSize} and {_maxSize}"); + } + + AllocateBuffer(newSize); + } + + private void AllocateBuffer(int size) + { + // Возвращаем старый буфер если есть + if (_buffer != null) + { + ArrayPool.Shared.Return(_buffer, clearArray: false); + } + + // Арендуем новый буфер нужного размера + _buffer = ArrayPool.Shared.Rent(size); + _currentSize = _buffer.Length; // ArrayPool может вернуть буфер большего размера + } + + private void AdjustBufferSize(int bytesRead) + { + // Если прочитали почти весь буфер, возможно нужен больший размер + if (bytesRead >= _currentSize * _increaseThreshold) + { + _consecutiveLargeReads++; + _consecutiveSmallReads = 0; + + // После N больших чтений подряд увеличиваем буфер + if (_consecutiveLargeReads >= _increaseAfterReads && _currentSize < _maxSize) + { + var newSize = Math.Min(_currentSize * 2, _maxSize); + Log.Debug($"[AdaptiveBuffer] Increasing size from {_currentSize} to {newSize}"); + AllocateBuffer(newSize); + _consecutiveLargeReads = 0; + } + } + // Если прочитали мало данных, возможно можно уменьшить буфер + else if (bytesRead < _currentSize * _decreaseThreshold) + { + _consecutiveSmallReads++; + _consecutiveLargeReads = 0; + + // После N маленьких чтений подряд уменьшаем буфер + if (_consecutiveSmallReads >= _decreaseAfterReads && _currentSize > _minSize) + { + var newSize = Math.Max(_currentSize / 2, _minSize); + Log.Debug($"[AdaptiveBuffer] Decreasing size from {_currentSize} to {newSize}"); + AllocateBuffer(newSize); + _consecutiveSmallReads = 0; + } + } + else + { + // Сброс счетчиков при среднем использовании + _consecutiveSmallReads = 0; + _consecutiveLargeReads = 0; + } + } + + public void Dispose() + { + if (_buffer != null) + { + ArrayPool.Shared.Return(_buffer, clearArray: false); + _buffer = null!; + } + } + } +} diff --git a/ZeroLevel/Services/Network/BaseSocket.cs b/ZeroLevel/Services/Network/BaseSocket.cs index b46d790..c743766 100644 --- a/ZeroLevel/Services/Network/BaseSocket.cs +++ b/ZeroLevel/Services/Network/BaseSocket.cs @@ -21,7 +21,7 @@ namespace ZeroLevel.Network /// /// Buffer size for receiving data /// - protected const int DEFAULT_RECEIVE_BUFFER_SIZE = 4096; + // protected const int DEFAULT_RECEIVE_BUFFER_SIZE = 4096; /// /// If during the specified period there was no network activity, send a ping-request diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs index 3dbd918..54a395e 100644 --- a/ZeroLevel/Services/Network/SocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -4,6 +4,7 @@ using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Threading; +using ZeroLevel.Services.Network; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Network @@ -35,24 +36,40 @@ namespace ZeroLevel.Network private Socket _clientSocket; private FrameParser _parser; - private readonly RequestBuffer _requests = new RequestBuffer(); - private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE]; + private readonly RequestBuffer _requests = new RequestBuffer(); private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета private readonly object _reconnection_lock = new object(); private long _heartbeat_key; private Thread _receiveThread; private Thread _sendingThread; + //private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE]; + private readonly AdaptiveBufferManager _bufferManager; + #endregion Private public IRouter Router { get; } public SocketClient(IPEndPoint ep, IRouter router) { + _bufferManager = new AdaptiveBufferManager( + minSize: 4096, + maxSize: 65536, + increaseThreshold: 0.9, + decreaseThreshold: 0.25, + increaseAfterReads: 10, + decreaseAfterReads: 100 + ); + try { _clientSocket = new Socket(ep.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - _clientSocket.Connect(ep); + _clientSocket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true); // Отключение алгоритма Nagle + _clientSocket.SendBufferSize = 65536; // Увеличение буфера отправки + _clientSocket.ReceiveBufferSize = 65536; // Увеличение буфера приема + //_clientSocket.Connect(ep); + OpenConnection(_clientSocket, ep); + _clientSocket.SetKeepAlive(true, 30000, 10000); OnConnect(this); } catch (Exception ex) @@ -71,8 +88,28 @@ namespace ZeroLevel.Network StartReceive(); } + private static void OpenConnection(Socket socket, IPEndPoint ep) + { + var connectResult = socket.BeginConnect(ep, null, null); + if (!connectResult.AsyncWaitHandle.WaitOne(5000, false)) + { + socket.Close(); + throw new TimeoutException("Connection timeout"); + } + socket.EndConnect(connectResult); + } + public SocketClient(Socket socket, IRouter router) { + _bufferManager = new AdaptiveBufferManager( + minSize: 4096, + maxSize: 65536, + increaseThreshold: 0.9, + decreaseThreshold: 0.25, + increaseAfterReads: 10, + decreaseAfterReads: 100 + ); + Router = router; _clientSocket = socket; Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint; @@ -99,26 +136,6 @@ namespace ZeroLevel.Network _heartbeat_key = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat); } - private void StartReceive() - { - try - { - _clientSocket.BeginReceive(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, SocketFlags.None, ReceiveAsyncCallback, null); - } - catch (NullReferenceException) - { - Broken(); - Log.SystemError("[SocketClient.TryConnect] Client : Null Reference Exception - On Connect (begin receive section)"); - _clientSocket.Disconnect(false); - } - catch (SocketException e) - { - Broken(); - Log.SystemError(e, "[SocketClient.TryConnect] Client : Exception - On Connect (begin receive section)"); - _clientSocket.Disconnect(false); - } - } - #region API public event Action OnConnect = (_) => { }; public event Action OnDisconnect = (_) => { }; @@ -167,6 +184,26 @@ namespace ZeroLevel.Network } } + private void StartReceive() + { + try + { + _clientSocket.BeginReceive(_bufferManager.Buffer, 0, _bufferManager.CurrentSize, SocketFlags.None, ReceiveAsyncCallback, null); + } + catch (NullReferenceException) + { + Broken(); + Log.SystemError("[SocketClient.TryConnect] Client : Null Reference Exception - On Connect (begin receive section)"); + _clientSocket.Disconnect(false); + } + catch (SocketException e) + { + Broken(); + Log.SystemError(e, "[SocketClient.TryConnect] Client : Exception - On Connect (begin receive section)"); + _clientSocket.Disconnect(false); + } + } + public void ReceiveAsyncCallback(IAsyncResult ar) { try @@ -174,7 +211,8 @@ namespace ZeroLevel.Network var count = _clientSocket.EndReceive(ar); if (count > 0) { - _parser.Push(_buffer, count); + _parser.Push(_bufferManager.Buffer, count); + _bufferManager.ProcessReadResult(count); } else { @@ -339,6 +377,17 @@ namespace ZeroLevel.Network } Disposed(); Sheduller.Remove(_heartbeat_key); + + _bufferManager?.Dispose(); + + try + { + _clientSocket?.Shutdown(SocketShutdown.Both); + } + catch (Exception ex) + { + Log.Error(ex, "[SocketClient.Dispose] Shutdown"); + } try { _clientSocket?.Close(); diff --git a/ZeroLevel/Services/Network/SocketExtensions.cs b/ZeroLevel/Services/Network/SocketExtensions.cs new file mode 100644 index 0000000..39d9448 --- /dev/null +++ b/ZeroLevel/Services/Network/SocketExtensions.cs @@ -0,0 +1,68 @@ +using System; +using System.Net.Sockets; +using System.Runtime.InteropServices; + +namespace ZeroLevel.Services.Network +{ + public static class SocketExtensions + { + // Структура для настройки TCP KeepAlive (Windows) + [StructLayout(LayoutKind.Sequential)] + private struct TcpKeepAlive + { + public uint OnOff; + public uint KeepAliveTime; + public uint KeepAliveInterval; + } + + // Метод расширения для настройки KeepAlive + public static void SetKeepAlive(this Socket socket, bool on, uint keepAliveTime, uint keepAliveInterval) + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + // Windows-специфичная настройка через IOControl + var keepAlive = new TcpKeepAlive + { + OnOff = on ? 1u : 0u, + KeepAliveTime = keepAliveTime, // Время в миллисекундах до первой проверки + KeepAliveInterval = keepAliveInterval // Интервал между проверками в миллисекундах + }; + + int size = Marshal.SizeOf(keepAlive); + IntPtr ptr = Marshal.AllocHGlobal(size); + try + { + Marshal.StructureToPtr(keepAlive, ptr, false); + byte[] buffer = new byte[size]; + Marshal.Copy(ptr, buffer, 0, size); + + socket.IOControl(IOControlCode.KeepAliveValues, buffer, null); + } + finally + { + Marshal.FreeHGlobal(ptr); + } + } + else + { + // Для Linux/MacOS используем socket options + socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, on); + + if (on) + { + // На Linux эти параметры настраиваются через TCP socket options + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) + { + // TCP_KEEPIDLE (секунды до первой проверки) + socket.SetSocketOption(SocketOptionLevel.Tcp, (SocketOptionName)4, (int)(keepAliveTime / 1000)); + // TCP_KEEPINTVL (интервал между проверками в секундах) + socket.SetSocketOption(SocketOptionLevel.Tcp, (SocketOptionName)5, (int)(keepAliveInterval / 1000)); + // TCP_KEEPCNT (количество проверок) + socket.SetSocketOption(SocketOptionLevel.Tcp, (SocketOptionName)6, 9); + } + } + } + } + } + +} diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs index c5390c9..5bb36ad 100644 --- a/ZeroLevel/Services/Network/SocketServer.cs +++ b/ZeroLevel/Services/Network/SocketServer.cs @@ -5,6 +5,7 @@ using System.Net; using System.Net.Sockets; using System.Threading; using ZeroLevel.Network.SDL; +using ZeroLevel.Services.Network; namespace ZeroLevel.Network { @@ -62,6 +63,9 @@ namespace ZeroLevel.Network LocalEndpoint = endpoint; _serverSocket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); _serverSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + _serverSocket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true); // Отключение алгоритма Nagle + _serverSocket.SendBufferSize = 65536; // Увеличение буфера отправки + _serverSocket.ReceiveBufferSize = 65536; // Увеличение буфера приема _serverSocket.Bind(endpoint); _serverSocket.Listen(100); Working(); @@ -75,6 +79,14 @@ namespace ZeroLevel.Network try { var client_socket = _serverSocket.EndAccept(ar); + // Настройка клиентского сокета + client_socket.NoDelay = true; + client_socket.SendBufferSize = 65536; + client_socket.ReceiveBufferSize = 65536; + + // Настройка KeepAlive для клиентского соединения + client_socket.SetKeepAlive(true, 30000, 10000); + var ep = client_socket.RemoteEndPoint as IPEndPoint; Log.SystemInfo($"[ZSocketServer.BeginAcceptCallback] Incoming connection {(ep?.Address?.ToString() ?? string.Empty)}:{(ep?.Port.ToString() ?? string.Empty)}"); _connection_set_lock.EnterWriteLock(); diff --git a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs index 634d429..582ef01 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs @@ -249,7 +249,7 @@ namespace ZeroLevel.Services.Serialization long deserialized = BitConverter.ToInt64(buffer, 0); return DateTime.FromBinary(deserialized); } - + public IPAddress ReadIP() { var exists = ReadByte(); @@ -575,7 +575,7 @@ namespace ZeroLevel.Services.Serialization } public partial class MemoryStreamReader - : IAsyncBinaryReader + : IAsyncBinaryReader { /// /// Reading byte-package (read the size of the specified number of bytes, and then the packet itself read size)