Fixes Network

master
ogoun 2 weeks ago
parent 1692324219
commit 6bb9e87d3b

@ -1,9 +1,7 @@
using Newtonsoft.Json.Linq;
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Xunit;
using ZeroLevel.DocumentObjectModel;
using ZeroLevel.Network;

@ -0,0 +1,132 @@
using System;
using System.Buffers;
namespace ZeroLevel.Services.Network
{
/// <summary>
/// Управляет адаптивным буфером для сетевых операций
/// </summary>
public class AdaptiveBufferManager : IDisposable
{
private byte[] _buffer;
private int _currentSize;
private readonly int _minSize;
private readonly int _maxSize;
// Счетчики для адаптивного изменения размера
private int _consecutiveSmallReads = 0;
private int _consecutiveLargeReads = 0;
// Настройки адаптации
private readonly double _increaseThreshold;
private readonly double _decreaseThreshold;
private readonly int _increaseAfterReads;
private readonly int _decreaseAfterReads;
public byte[] Buffer => _buffer;
public int CurrentSize => _currentSize;
public AdaptiveBufferManager(
int minSize = 4096,
int maxSize = 65536,
double increaseThreshold = 0.9,
double decreaseThreshold = 0.25,
int increaseAfterReads = 3,
int decreaseAfterReads = 10)
{
_minSize = minSize;
_maxSize = maxSize;
_currentSize = minSize;
_increaseThreshold = increaseThreshold;
_decreaseThreshold = decreaseThreshold;
_increaseAfterReads = increaseAfterReads;
_decreaseAfterReads = decreaseAfterReads;
AllocateBuffer(_currentSize);
}
/// <summary>
/// Обрабатывает результат чтения и адаптирует размер буфера
/// </summary>
public void ProcessReadResult(int bytesRead)
{
AdjustBufferSize(bytesRead);
}
/// <summary>
/// Принудительно изменить размер буфера
/// </summary>
public void ResizeBuffer(int newSize)
{
if (newSize < _minSize || newSize > _maxSize)
{
throw new ArgumentOutOfRangeException(nameof(newSize),
$"Size must be between {_minSize} and {_maxSize}");
}
AllocateBuffer(newSize);
}
private void AllocateBuffer(int size)
{
// Возвращаем старый буфер если есть
if (_buffer != null)
{
ArrayPool<byte>.Shared.Return(_buffer, clearArray: false);
}
// Арендуем новый буфер нужного размера
_buffer = ArrayPool<byte>.Shared.Rent(size);
_currentSize = _buffer.Length; // ArrayPool может вернуть буфер большего размера
}
private void AdjustBufferSize(int bytesRead)
{
// Если прочитали почти весь буфер, возможно нужен больший размер
if (bytesRead >= _currentSize * _increaseThreshold)
{
_consecutiveLargeReads++;
_consecutiveSmallReads = 0;
// После N больших чтений подряд увеличиваем буфер
if (_consecutiveLargeReads >= _increaseAfterReads && _currentSize < _maxSize)
{
var newSize = Math.Min(_currentSize * 2, _maxSize);
Log.Debug($"[AdaptiveBuffer] Increasing size from {_currentSize} to {newSize}");
AllocateBuffer(newSize);
_consecutiveLargeReads = 0;
}
}
// Если прочитали мало данных, возможно можно уменьшить буфер
else if (bytesRead < _currentSize * _decreaseThreshold)
{
_consecutiveSmallReads++;
_consecutiveLargeReads = 0;
// После N маленьких чтений подряд уменьшаем буфер
if (_consecutiveSmallReads >= _decreaseAfterReads && _currentSize > _minSize)
{
var newSize = Math.Max(_currentSize / 2, _minSize);
Log.Debug($"[AdaptiveBuffer] Decreasing size from {_currentSize} to {newSize}");
AllocateBuffer(newSize);
_consecutiveSmallReads = 0;
}
}
else
{
// Сброс счетчиков при среднем использовании
_consecutiveSmallReads = 0;
_consecutiveLargeReads = 0;
}
}
public void Dispose()
{
if (_buffer != null)
{
ArrayPool<byte>.Shared.Return(_buffer, clearArray: false);
_buffer = null!;
}
}
}
}

@ -21,7 +21,7 @@ namespace ZeroLevel.Network
/// <summary>
/// Buffer size for receiving data
/// </summary>
protected const int DEFAULT_RECEIVE_BUFFER_SIZE = 4096;
// protected const int DEFAULT_RECEIVE_BUFFER_SIZE = 4096;
/// <summary>
/// If during the specified period there was no network activity, send a ping-request

@ -4,6 +4,7 @@ using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using ZeroLevel.Services.Network;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Network
@ -36,23 +37,39 @@ namespace ZeroLevel.Network
private Socket _clientSocket;
private FrameParser _parser;
private readonly RequestBuffer _requests = new RequestBuffer();
private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
private bool _socket_freezed = false; // используется для связи сервер-клиент, запрещает пересоздание сокета
private readonly object _reconnection_lock = new object();
private long _heartbeat_key;
private Thread _receiveThread;
private Thread _sendingThread;
//private readonly byte[] _buffer = new byte[DEFAULT_RECEIVE_BUFFER_SIZE];
private readonly AdaptiveBufferManager _bufferManager;
#endregion Private
public IRouter Router { get; }
public SocketClient(IPEndPoint ep, IRouter router)
{
_bufferManager = new AdaptiveBufferManager(
minSize: 4096,
maxSize: 65536,
increaseThreshold: 0.9,
decreaseThreshold: 0.25,
increaseAfterReads: 10,
decreaseAfterReads: 100
);
try
{
_clientSocket = new Socket(ep.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_clientSocket.Connect(ep);
_clientSocket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true); // Отключение алгоритма Nagle
_clientSocket.SendBufferSize = 65536; // Увеличение буфера отправки
_clientSocket.ReceiveBufferSize = 65536; // Увеличение буфера приема
//_clientSocket.Connect(ep);
OpenConnection(_clientSocket, ep);
_clientSocket.SetKeepAlive(true, 30000, 10000);
OnConnect(this);
}
catch (Exception ex)
@ -71,8 +88,28 @@ namespace ZeroLevel.Network
StartReceive();
}
private static void OpenConnection(Socket socket, IPEndPoint ep)
{
var connectResult = socket.BeginConnect(ep, null, null);
if (!connectResult.AsyncWaitHandle.WaitOne(5000, false))
{
socket.Close();
throw new TimeoutException("Connection timeout");
}
socket.EndConnect(connectResult);
}
public SocketClient(Socket socket, IRouter router)
{
_bufferManager = new AdaptiveBufferManager(
minSize: 4096,
maxSize: 65536,
increaseThreshold: 0.9,
decreaseThreshold: 0.25,
increaseAfterReads: 10,
decreaseAfterReads: 100
);
Router = router;
_clientSocket = socket;
Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint;
@ -99,26 +136,6 @@ namespace ZeroLevel.Network
_heartbeat_key = Sheduller.RemindEvery(TimeSpan.FromMilliseconds(MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS), Heartbeat);
}
private void StartReceive()
{
try
{
_clientSocket.BeginReceive(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, SocketFlags.None, ReceiveAsyncCallback, null);
}
catch (NullReferenceException)
{
Broken();
Log.SystemError("[SocketClient.TryConnect] Client : Null Reference Exception - On Connect (begin receive section)");
_clientSocket.Disconnect(false);
}
catch (SocketException e)
{
Broken();
Log.SystemError(e, "[SocketClient.TryConnect] Client : Exception - On Connect (begin receive section)");
_clientSocket.Disconnect(false);
}
}
#region API
public event Action<ISocketClient> OnConnect = (_) => { };
public event Action<ISocketClient> OnDisconnect = (_) => { };
@ -167,6 +184,26 @@ namespace ZeroLevel.Network
}
}
private void StartReceive()
{
try
{
_clientSocket.BeginReceive(_bufferManager.Buffer, 0, _bufferManager.CurrentSize, SocketFlags.None, ReceiveAsyncCallback, null);
}
catch (NullReferenceException)
{
Broken();
Log.SystemError("[SocketClient.TryConnect] Client : Null Reference Exception - On Connect (begin receive section)");
_clientSocket.Disconnect(false);
}
catch (SocketException e)
{
Broken();
Log.SystemError(e, "[SocketClient.TryConnect] Client : Exception - On Connect (begin receive section)");
_clientSocket.Disconnect(false);
}
}
public void ReceiveAsyncCallback(IAsyncResult ar)
{
try
@ -174,7 +211,8 @@ namespace ZeroLevel.Network
var count = _clientSocket.EndReceive(ar);
if (count > 0)
{
_parser.Push(_buffer, count);
_parser.Push(_bufferManager.Buffer, count);
_bufferManager.ProcessReadResult(count);
}
else
{
@ -339,6 +377,17 @@ namespace ZeroLevel.Network
}
Disposed();
Sheduller.Remove(_heartbeat_key);
_bufferManager?.Dispose();
try
{
_clientSocket?.Shutdown(SocketShutdown.Both);
}
catch (Exception ex)
{
Log.Error(ex, "[SocketClient.Dispose] Shutdown");
}
try
{
_clientSocket?.Close();

@ -0,0 +1,68 @@
using System;
using System.Net.Sockets;
using System.Runtime.InteropServices;
namespace ZeroLevel.Services.Network
{
public static class SocketExtensions
{
// Структура для настройки TCP KeepAlive (Windows)
[StructLayout(LayoutKind.Sequential)]
private struct TcpKeepAlive
{
public uint OnOff;
public uint KeepAliveTime;
public uint KeepAliveInterval;
}
// Метод расширения для настройки KeepAlive
public static void SetKeepAlive(this Socket socket, bool on, uint keepAliveTime, uint keepAliveInterval)
{
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
// Windows-специфичная настройка через IOControl
var keepAlive = new TcpKeepAlive
{
OnOff = on ? 1u : 0u,
KeepAliveTime = keepAliveTime, // Время в миллисекундах до первой проверки
KeepAliveInterval = keepAliveInterval // Интервал между проверками в миллисекундах
};
int size = Marshal.SizeOf(keepAlive);
IntPtr ptr = Marshal.AllocHGlobal(size);
try
{
Marshal.StructureToPtr(keepAlive, ptr, false);
byte[] buffer = new byte[size];
Marshal.Copy(ptr, buffer, 0, size);
socket.IOControl(IOControlCode.KeepAliveValues, buffer, null);
}
finally
{
Marshal.FreeHGlobal(ptr);
}
}
else
{
// Для Linux/MacOS используем socket options
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, on);
if (on)
{
// На Linux эти параметры настраиваются через TCP socket options
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
{
// TCP_KEEPIDLE (секунды до первой проверки)
socket.SetSocketOption(SocketOptionLevel.Tcp, (SocketOptionName)4, (int)(keepAliveTime / 1000));
// TCP_KEEPINTVL (интервал между проверками в секундах)
socket.SetSocketOption(SocketOptionLevel.Tcp, (SocketOptionName)5, (int)(keepAliveInterval / 1000));
// TCP_KEEPCNT (количество проверок)
socket.SetSocketOption(SocketOptionLevel.Tcp, (SocketOptionName)6, 9);
}
}
}
}
}
}

@ -5,6 +5,7 @@ using System.Net;
using System.Net.Sockets;
using System.Threading;
using ZeroLevel.Network.SDL;
using ZeroLevel.Services.Network;
namespace ZeroLevel.Network
{
@ -62,6 +63,9 @@ namespace ZeroLevel.Network
LocalEndpoint = endpoint;
_serverSocket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_serverSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
_serverSocket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, true); // Отключение алгоритма Nagle
_serverSocket.SendBufferSize = 65536; // Увеличение буфера отправки
_serverSocket.ReceiveBufferSize = 65536; // Увеличение буфера приема
_serverSocket.Bind(endpoint);
_serverSocket.Listen(100);
Working();
@ -75,6 +79,14 @@ namespace ZeroLevel.Network
try
{
var client_socket = _serverSocket.EndAccept(ar);
// Настройка клиентского сокета
client_socket.NoDelay = true;
client_socket.SendBufferSize = 65536;
client_socket.ReceiveBufferSize = 65536;
// Настройка KeepAlive для клиентского соединения
client_socket.SetKeepAlive(true, 30000, 10000);
var ep = client_socket.RemoteEndPoint as IPEndPoint;
Log.SystemInfo($"[ZSocketServer.BeginAcceptCallback] Incoming connection {(ep?.Address?.ToString() ?? string.Empty)}:{(ep?.Port.ToString() ?? string.Empty)}");
_connection_set_lock.EnterWriteLock();

@ -575,7 +575,7 @@ namespace ZeroLevel.Services.Serialization
}
public partial class MemoryStreamReader
: IAsyncBinaryReader
: IAsyncBinaryReader
{
/// <summary>
/// Reading byte-package (read the size of the specified number of bytes, and then the packet itself read size)

Loading…
Cancel
Save

Powered by TurnKey Linux.