diff --git a/ZeroLevel/Services/Collections/RoundRobinCollection.cs b/ZeroLevel/Services/Collections/RoundRobinCollection.cs index f6555ea..715ad30 100644 --- a/ZeroLevel/Services/Collections/RoundRobinCollection.cs +++ b/ZeroLevel/Services/Collections/RoundRobinCollection.cs @@ -99,6 +99,31 @@ namespace ZeroLevel.Services.Collections return false; } + public IEnumerable MoveNextSeq() + { + _lock.EnterReadLock(); + try + { + if (_collection.Count > 0) + { + _index = Interlocked.Increment(ref _index) % _collection.Count; + int p = 0; + for (int i = _index; i < _collection.Count; i++, p++) + { + yield return _collection[i]; + } + for (int i = 0; i < _index; i++, p++) + { + yield return _collection[i]; + } + } + } + finally + { + _lock.ExitReadLock(); + } + } + public bool MoveNextAndHandle(Action handler) { _lock.EnterReadLock(); @@ -133,17 +158,15 @@ namespace ZeroLevel.Services.Collections _lock.EnterReadLock(); try { - var arr = new T[_collection.Count]; int p = 0; for (int i = _index; i < _collection.Count; i++, p++) { - arr[p] = _collection[i]; + yield return _collection[i]; } for (int i = 0; i < _index; i++, p++) { - arr[p] = _collection[i]; + yield return _collection[i]; } - return arr; } finally { diff --git a/ZeroLevel/Services/Network/Proxies/Proxy.cs b/ZeroLevel/Services/Network/Proxies/Proxy.cs new file mode 100644 index 0000000..68844a0 --- /dev/null +++ b/ZeroLevel/Services/Network/Proxies/Proxy.cs @@ -0,0 +1,45 @@ +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; + +namespace ZeroLevel.Services.Network.Proxies +{ + public class Proxy + { + private readonly ProxyBalancer _balancer = new ProxyBalancer(); + + public void AppendServer(IPEndPoint ep) => _balancer.AddEndpoint(ep); + + private Socket _incomingSocket; + + public Proxy(IPEndPoint listenEndpoint) + { + _incomingSocket = new Socket(listenEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + _incomingSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); + _incomingSocket.Bind(listenEndpoint); + _incomingSocket.Listen(100); + } + + public async Task Run() + { + while (true) + { + var socket = await _incomingSocket.AcceptAsync(); + // no await! + CreateProxyConnection(socket); + } + } + + public async Task CreateProxyConnection(Socket connection) + { + var endpoint = _balancer.GetServerProxy(); + var server = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); + server.Bind(endpoint); + using (var bind = new ProxyBinding(connection, server)) + { + await bind.Bind(); + } + } + } +} diff --git a/ZeroLevel/Services/Network/Proxies/ProxyBalancer.cs b/ZeroLevel/Services/Network/Proxies/ProxyBalancer.cs new file mode 100644 index 0000000..3eadcaf --- /dev/null +++ b/ZeroLevel/Services/Network/Proxies/ProxyBalancer.cs @@ -0,0 +1,32 @@ +using System.Collections.Generic; +using System.Net; +using ZeroLevel.Services.Collections; + +namespace ZeroLevel.Services.Network.Proxies +{ + internal sealed class ProxyBalancer + { + private RoundRobinCollection _servers; + + public ProxyBalancer() + { + _servers = new RoundRobinCollection(); + } + + public ProxyBalancer(IEnumerable endpoints) + { + _servers = new RoundRobinCollection(endpoints); + } + + public void AddEndpoint(IPEndPoint ep) => _servers.Add(ep); + + public IPEndPoint GetServerProxy() + { + if (_servers.MoveNext()) + { + return _servers.Current; + } + return null; + } + } +} diff --git a/ZeroLevel/Services/Network/Proxies/ProxyBinding.cs b/ZeroLevel/Services/Network/Proxies/ProxyBinding.cs new file mode 100644 index 0000000..2a35875 --- /dev/null +++ b/ZeroLevel/Services/Network/Proxies/ProxyBinding.cs @@ -0,0 +1,107 @@ +using System; +using System.Net.Sockets; +using System.Threading.Tasks; + +namespace ZeroLevel.Services.Network.Proxies +{ + public class ProxyBinding + : IDisposable + { + private readonly Socket _left; + private readonly Socket _right; + + public ProxyBinding(Socket left, Socket right) + { + _left = left; + _right = right; + } + + public async Task Bind() + { + using (var serverStream = new NetworkStream(_left)) + { + using (var clientStream = new NetworkStream(_right)) + { + while (IsConnected(_left) && IsConnected(_right)) + { + if (await Request(clientStream, serverStream) == 0 + && + await Response(clientStream, serverStream) == 0) + { + await Task.Delay(50); + } + } + } + } + } + + private static bool IsConnected(Socket socket) + { + try + { + return !(socket.Poll(1, SelectMode.SelectRead) && socket.Available == 0); + } + catch (SocketException) { return false; } + } + + private static async Task Request(NetworkStream left, NetworkStream right) + { + int total = 0; + if (left.DataAvailable && left.CanRead && right.CanWrite) + { + int count; + byte[] buffer = new byte[32 * 1024]; + while (left.DataAvailable && (count = await left.ReadAsync(buffer, 0, buffer.Length)) > 0) + { + await right.WriteAsync(buffer, 0, count); + total += count; + } + } + return total; + } + + private async static Task Response(NetworkStream left, NetworkStream right) + { + int total = 0; + if (right.DataAvailable && right.CanRead && left.CanWrite) + { + int count; + byte[] buffer = new byte[32 * 1024]; + while (right.DataAvailable && (count = await right.ReadAsync(buffer, 0, buffer.Length)) > 0) + { + await left.WriteAsync(buffer, 0, count); + total += count; + } + } + return total; + } + + public void Dispose() + { + if (_left != null) + { + try + { + if (_left.Connected) + { + _left.Shutdown(SocketShutdown.Both); + } + _left.Dispose(); + } + catch { } + } + if (_right != null) + { + try + { + if (_right.Connected) + { + _right.Shutdown(SocketShutdown.Both); + } + _right.Dispose(); + } + catch { } + } + } + } +}