From fadb9689afd03830564043e3e07c3535b0b0cd10 Mon Sep 17 00:00:00 2001 From: "a.bozhenov" Date: Wed, 14 Aug 2019 20:40:26 +0300 Subject: [PATCH] Fix networking --- .../PublishProfiles/FolderProfile.pubxml | 13 + ZeroLevel/Services/Bootstrap.cs | 1 - ZeroLevel/Services/Dbg.cs | 79 ------ ZeroLevel/Services/FileSystem/FileArchive.cs | 53 ++-- .../Services/Network/Contracts/IRouter.cs | 2 +- .../Services/Network/DbgNetworkEvents.cs | 20 -- ZeroLevel/Services/Network/SocketClient.cs | 229 ++++++++---------- ZeroLevel/Services/Network/SocketServer.cs | 6 +- .../Services/Network/Utils/FrameParser.cs | 5 +- .../Services/Network/Utils/RequestBuffer.cs | 41 +++- ZeroLevel/Services/Network/Utils/Router.cs | 86 +++---- ZeroLevel/ZeroLevel.csproj | 8 +- .../ServiceControlPanel.xaml.cs | 2 +- 13 files changed, 217 insertions(+), 328 deletions(-) create mode 100644 TestApp/Properties/PublishProfiles/FolderProfile.pubxml delete mode 100644 ZeroLevel/Services/Dbg.cs delete mode 100644 ZeroLevel/Services/Network/DbgNetworkEvents.cs diff --git a/TestApp/Properties/PublishProfiles/FolderProfile.pubxml b/TestApp/Properties/PublishProfiles/FolderProfile.pubxml new file mode 100644 index 0000000..acc147b --- /dev/null +++ b/TestApp/Properties/PublishProfiles/FolderProfile.pubxml @@ -0,0 +1,13 @@ + + + + + FileSystem + Release + Any CPU + netcoreapp2.2 + bin\Release\netcoreapp2.2\publish\ + + \ No newline at end of file diff --git a/ZeroLevel/Services/Bootstrap.cs b/ZeroLevel/Services/Bootstrap.cs index 481b6ba..3a1c54c 100644 --- a/ZeroLevel/Services/Bootstrap.cs +++ b/ZeroLevel/Services/Bootstrap.cs @@ -195,7 +195,6 @@ namespace ZeroLevel try { Sheduller.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose default sheduller error"); } try { Log.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose log error"); } try { Injector.Default.Dispose(); Injector.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose DI containers error"); } - Dbg.Shutdown(); } } } \ No newline at end of file diff --git a/ZeroLevel/Services/Dbg.cs b/ZeroLevel/Services/Dbg.cs deleted file mode 100644 index 8e88cf1..0000000 --- a/ZeroLevel/Services/Dbg.cs +++ /dev/null @@ -1,79 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.IO; -using System.Threading; -using ZeroLevel.Services.Serialization; - -namespace ZeroLevel.Services -{ - public static class Dbg - { - private static BlockingCollection> _timestamps = - new BlockingCollection>(); - - private static Thread _writeThread; - private static readonly bool _started; - - static Dbg() - { - if (Configuration.Default.Contains("dbg")) - { - try - { - if (false == Directory.Exists(Configuration.Default.First("dbg"))) - { - Directory.CreateDirectory(Configuration.Default.First("dbg")); - } - } - catch (Exception ex) - { - Log.SystemError(ex, "[Dbg] Fault initialize, dbg files directory not exists and not may be created"); - _started = false; - return; - } - _writeThread = new Thread(HandleQueue); - _writeThread.IsBackground = true; - _writeThread.Start(); - _started = true; - } - else - { - _started = false; - } - } - - private static void HandleQueue() - { - using (var fs = new FileStream(Path.Combine(Configuration.Default.First("dbg"), $"{DateTime.Now.ToString("yyyyMMdd_HHmmss")}.dbg"), FileMode.Create, FileAccess.Write, FileShare.None)) - { - using (var writer = new MemoryStreamWriter(fs)) - { - while (_timestamps.IsCompleted == false) - { - var pair = _timestamps.Take(); - writer.WriteInt32(pair.Item1); - writer.WriteLong(pair.Item2); - writer.WriteString(pair.Item3); - } - fs.Flush(); - } - } - } - - internal static void Shutdown() - { - if (_started) - { - _timestamps.CompleteAdding(); - } - } - - internal static void Timestamp(int eventType, string description) - { - if (_started && _timestamps.IsAddingCompleted == false) - { - _timestamps.Add(Tuple.Create(eventType, DateTime.UtcNow.Ticks, description)); - } - } - } -} diff --git a/ZeroLevel/Services/FileSystem/FileArchive.cs b/ZeroLevel/Services/FileSystem/FileArchive.cs index dc158c9..323149e 100644 --- a/ZeroLevel/Services/FileSystem/FileArchive.cs +++ b/ZeroLevel/Services/FileSystem/FileArchive.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Concurrent; using System.IO; -using System.Security.AccessControl; using System.Threading; using System.Threading.Tasks; @@ -11,11 +10,11 @@ namespace ZeroLevel.Services.FileSystem { protected const int DEFAULT_STREAM_BUFFER_SIZE = 16384; - public async Task Store() + public void Store() { try { - await StoreImpl().ConfigureAwait(false); + StoreImpl(); } catch (Exception ex) { @@ -23,7 +22,7 @@ namespace ZeroLevel.Services.FileSystem } } - protected static async Task TransferAsync(Stream input, Stream output) + protected static void Transfer(Stream input, Stream output) { if (input.CanRead == false) { @@ -35,7 +34,7 @@ namespace ZeroLevel.Services.FileSystem } var readed = 0; var buffer = new byte[DEFAULT_STREAM_BUFFER_SIZE]; - while ((readed = await input.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) != 0) + while ((readed = input.Read(buffer, 0, buffer.Length)) != 0) { output.Write(buffer, 0, readed); } @@ -52,7 +51,7 @@ namespace ZeroLevel.Services.FileSystem return stream; } - protected abstract Task StoreImpl(); + protected abstract void StoreImpl(); } internal sealed class StoreText : @@ -67,13 +66,13 @@ namespace ZeroLevel.Services.FileSystem _path = path; } - protected override async Task StoreImpl() + protected override void StoreImpl() { using (var input_stream = GenerateStreamFromString(_text)) { using (var out_stream = File.Create(_path)) { - await TransferAsync(input_stream, out_stream).ConfigureAwait(false); + Transfer(input_stream, out_stream); } } } @@ -91,7 +90,7 @@ namespace ZeroLevel.Services.FileSystem _path = path; } - protected override async Task StoreImpl() + protected override void StoreImpl() { using (var input_stream = new MemoryStream(_data)) { @@ -99,7 +98,7 @@ namespace ZeroLevel.Services.FileSystem DEFAULT_STREAM_BUFFER_SIZE, FileOptions.Asynchronous)) { - await TransferAsync(input_stream, out_stream).ConfigureAwait(false); + Transfer(input_stream, out_stream); } } } @@ -117,7 +116,7 @@ namespace ZeroLevel.Services.FileSystem _path = path; } - protected override async Task StoreImpl() + protected override void StoreImpl() { using (_stream) { @@ -125,7 +124,7 @@ namespace ZeroLevel.Services.FileSystem DEFAULT_STREAM_BUFFER_SIZE, FileOptions.Asynchronous)) { - await TransferAsync(_stream, out_stream).ConfigureAwait(false); + Transfer(_stream, out_stream); } } } @@ -143,7 +142,7 @@ namespace ZeroLevel.Services.FileSystem _path = path; } - protected override async Task StoreImpl() + protected override void StoreImpl() { using (var input_stream = File.Open(_input_path, FileMode.Open, FileAccess.Read, FileShare.Read)) @@ -152,7 +151,7 @@ namespace ZeroLevel.Services.FileSystem DEFAULT_STREAM_BUFFER_SIZE, FileOptions.Asynchronous)) { - await TransferAsync(input_stream, out_stream).ConfigureAwait(false); + Transfer(input_stream, out_stream); } } } @@ -179,6 +178,7 @@ namespace ZeroLevel.Services.FileSystem private volatile bool _stopped = false; private string _currentArchivePath; private bool _split_by_date = false; + private Thread _backThread; public FileArchive(string base_path, string ext, @@ -196,7 +196,9 @@ namespace ZeroLevel.Services.FileSystem () => (DateTime.Now.AddDays(1).Date - DateTime.Now).Add(TimeSpan.FromMilliseconds(100)), RenewArchivePath); } - Task.Run(Consume); + _backThread = new Thread(Consume); + _backThread.IsBackground = true; + _backThread.Start(); } private void RenewArchivePath() @@ -216,14 +218,14 @@ namespace ZeroLevel.Services.FileSystem } } - private async Task Consume() + private void Consume() { do { StoreTask result; while (_tasks.TryDequeue(out result)) { - await result.Store().ConfigureAwait(false); + result.Store(); } Thread.Sleep(100); } while (_disposed == false); @@ -255,7 +257,7 @@ namespace ZeroLevel.Services.FileSystem { if (immediate) { - new StoreFile(file_path, CreateArchiveFilePath(subfolder_name, file_name)).Store().Wait(); + new StoreFile(file_path, CreateArchiveFilePath(subfolder_name, file_name)).Store(); } else { @@ -391,13 +393,14 @@ namespace ZeroLevel.Services.FileSystem while (_disposed == false) { result = _tasks.Take(); - result.Store().ContinueWith(t => + try { - if (t.IsFaulted) - { - Log.SystemError(t.Exception, "[FileBuffer] Fault store file"); - } - }); + result.Store(); + } + catch (Exception ex) + { + Log.SystemError(ex, "[FileBuffer] Fault store file"); + } } } @@ -433,7 +436,7 @@ namespace ZeroLevel.Services.FileSystem { if (immediate) { - new StoreFile(file_path, CreateArchiveFilePath(name)).Store().Wait(); + new StoreFile(file_path, CreateArchiveFilePath(name)).Store(); } else { diff --git a/ZeroLevel/Services/Network/Contracts/IRouter.cs b/ZeroLevel/Services/Network/Contracts/IRouter.cs index df6befa..194e7f6 100644 --- a/ZeroLevel/Services/Network/Contracts/IRouter.cs +++ b/ZeroLevel/Services/Network/Contracts/IRouter.cs @@ -6,6 +6,6 @@ namespace ZeroLevel.Network : IServer { void HandleMessage(Frame frame, ISocketClient client); - void HandleRequest(Frame frame, ISocketClient client, Action handler); + void HandleRequest(Frame frame, ISocketClient client, int identity, Action handler); } } diff --git a/ZeroLevel/Services/Network/DbgNetworkEvents.cs b/ZeroLevel/Services/Network/DbgNetworkEvents.cs deleted file mode 100644 index c0b29a5..0000000 --- a/ZeroLevel/Services/Network/DbgNetworkEvents.cs +++ /dev/null @@ -1,20 +0,0 @@ -namespace ZeroLevel.Network -{ - public enum DbgNetworkEvents - : int - { - ServerClientConnected = 0, - ServerClientDisconnect = 1, - ClientStartPushRequest = 2, - ClientCompletePushRequest = 3, - ClientStartSendResponse = 4, - ClientCompleteSendResponse = 5, - ClientStartHandleRequest = 6, - ClientCompleteHandleRequest = 7, - ClientGotResponse = 8, - ClientLostConnection = 9, - - ClientStartSendRequest = 10, - ClientCompleteSendRequest = 11 - } -} diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs index fac4819..a8f70f0 100644 --- a/ZeroLevel/Services/Network/SocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Threading; @@ -14,13 +15,13 @@ namespace ZeroLevel.Network : BaseSocket, ISocketClient { #region Private - private class IncomingFrame + private struct IncomingFrame { public FrameType type; public int identity; public byte[] data; } - private class SendFrame + private struct SendFrame { public bool isRequest; public int identity; @@ -44,8 +45,6 @@ namespace ZeroLevel.Network private Thread _receiveThread; private BlockingCollection _incoming_queue = new BlockingCollection(); private BlockingCollection _send_queue = new BlockingCollection(BaseSocket.MAX_SEND_QUEUE_SIZE); - private static ObjectPool _incoming_pool = new ObjectPool(() => new IncomingFrame()); - private static ObjectPool _sendinfo_pool = new ObjectPool(() => new SendFrame()); #endregion Private public IRouter Router { get; } @@ -95,7 +94,6 @@ namespace ZeroLevel.Network { if (frame == null) throw new ArgumentNullException(nameof(frame)); var data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id); - Dbg.Timestamp((int)DbgNetworkEvents.ClientStartPushRequest, id.ToString()); frame.Release(); if (!_send_queue.IsAddingCompleted) @@ -104,15 +102,15 @@ namespace ZeroLevel.Network { Thread.Sleep(1); } - var sendInfo = _sendinfo_pool.Allocate(); - sendInfo.isRequest = true; - sendInfo.data = data; - sendInfo.identity = id; _requests.RegisterForFrame(id, callback, fail); - _send_queue.Add(sendInfo); + _send_queue.Add(new SendFrame + { + isRequest = true, + data = data, + identity = id + }); } - Dbg.Timestamp((int)DbgNetworkEvents.ClientCompletePushRequest, id.ToString()); } public void ForceConnect() @@ -132,31 +130,31 @@ namespace ZeroLevel.Network { Thread.Sleep(1); } - var info = _sendinfo_pool.Allocate(); - info.isRequest = false; - info.identity = 0; - info.data = data; - _send_queue.Add(info); + _send_queue.Add(new SendFrame + { + isRequest = false, + identity = 0, + data = data + }); } } - + public void Response(byte[] data, int identity) { if (data == null) throw new ArgumentNullException(nameof(data)); - Dbg.Timestamp((int)DbgNetworkEvents.ClientStartSendResponse, identity.ToString()); if (!_send_queue.IsAddingCompleted) { while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { Thread.Sleep(1); } - var info = _sendinfo_pool.Allocate(); - info.isRequest = false; - info.identity = 0; - info.data = NetworkPacketFactory.Response(data, identity); - _send_queue.Add(info); + _send_queue.Add(new SendFrame + { + isRequest = false, + identity = 0, + data = NetworkPacketFactory.Response(data, identity) + }); } - Dbg.Timestamp((int)DbgNetworkEvents.ClientCompleteSendResponse, identity.ToString()); } public void UseKeepAlive(TimeSpan period) @@ -177,76 +175,19 @@ namespace ZeroLevel.Network } #endregion - #region Private methods - private void IncomingFramesJob() - { - IncomingFrame frame = default(IncomingFrame); - while (Status != SocketClientStatus.Disposed) - { - if (_send_queue.IsCompleted) - { - return; - } - try - { - frame = _incoming_queue.Take(); - } - catch (Exception ex) - { - Log.SystemError(ex, "[SocketClient.IncomingFramesJob] _incoming_queue.Take"); - _incoming_queue.Dispose(); - _incoming_queue = new BlockingCollection(); - continue; - } - try - { - switch (frame.type) - { - case FrameType.Message: - Router?.HandleMessage(MessageSerializer.Deserialize(frame.data), this); - break; - case FrameType.Request: - { - Dbg.Timestamp((int)DbgNetworkEvents.ClientStartHandleRequest, frame.identity.ToString()); - Router?.HandleRequest(MessageSerializer.Deserialize(frame.data), this, response => - { - if (response != null) - { - this.Response(response, frame.identity); - } - Dbg.Timestamp((int)DbgNetworkEvents.ClientCompleteHandleRequest, frame.identity.ToString()); - }); - } - break; - case FrameType.Response: - { - Dbg.Timestamp((int)DbgNetworkEvents.ClientGotResponse, frame.identity.ToString()); - _requests.Success(frame.identity, frame.data); - } - break; - } - } - catch (Exception ex) - { - Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame"); - } - finally - { - _incoming_pool.Free(frame); - } - } - } + #region Private methods private void _parser_OnIncoming(FrameType type, int identity, byte[] data) { try { if (type == FrameType.KeepAlive) return; - var incoming = _incoming_pool.Allocate(); - incoming.data = data; - incoming.type = type; - incoming.identity = identity; - _incoming_queue.Add(incoming); + _incoming_queue.Add(new IncomingFrame + { + data = data, + type = type, + identity = identity + }); } catch (Exception ex) { @@ -266,8 +207,6 @@ namespace ZeroLevel.Network } if (_clientSocket != null) { - Dbg.Timestamp((int)DbgNetworkEvents.ClientLostConnection, $"{(_clientSocket.RemoteEndPoint as IPEndPoint).Address}:{(_clientSocket.RemoteEndPoint as IPEndPoint).Port}"); - try { _stream?.Close(); @@ -337,10 +276,12 @@ namespace ZeroLevel.Network _requests.TestForTimeouts(); try { - var info = _sendinfo_pool.Allocate(); - info.isRequest = false; - info.identity = 0; - info.data = NetworkPacketFactory.KeepAliveMessage(); + var info = new SendFrame + { + isRequest = false, + identity = 0, + data = NetworkPacketFactory.KeepAliveMessage() + }; _send_queue.Add(info); } catch (Exception ex) @@ -386,16 +327,63 @@ namespace ZeroLevel.Network } } + private void IncomingFramesJob() + { + IncomingFrame frame = default(IncomingFrame); + while (Status != SocketClientStatus.Disposed && !_send_queue.IsCompleted) + { + try + { + frame = _incoming_queue.Take(); + } + catch (Exception ex) + { + Log.SystemError(ex, "[SocketClient.IncomingFramesJob] _incoming_queue.Take"); + if (Status != SocketClientStatus.Disposed) + { + _incoming_queue.Dispose(); + _incoming_queue = new BlockingCollection(); + } + continue; + } + try + { + switch (frame.type) + { + case FrameType.Message: + Router?.HandleMessage(MessageSerializer.Deserialize(frame.data), this); + break; + case FrameType.Request: + { + Router?.HandleRequest(MessageSerializer.Deserialize(frame.data), this, frame.identity, (id, response) => + { + if (response != null) + { + this.Response(response, id); + } + }); + } + break; + case FrameType.Response: + { + _requests.Success(frame.identity, frame.data); + } + break; + } + } + catch (Exception ex) + { + Log.SystemError(ex, "[SocketClient.IncomingFramesJob] Handle frame"); + } + } + } + private void SendFramesJob() { SendFrame frame; int unsuccess = 0; - while (Status != SocketClientStatus.Disposed) + while (Status != SocketClientStatus.Disposed && !_send_queue.IsCompleted) { - if (_send_queue.IsCompleted) - { - return; - } try { frame = _send_queue.Take(); @@ -403,8 +391,11 @@ namespace ZeroLevel.Network catch (Exception ex) { Log.SystemError(ex, "[SocketClient.SendFramesJob] send_queue.Take"); - _send_queue.Dispose(); - _send_queue = new BlockingCollection(); + if (Status != SocketClientStatus.Disposed) + { + _send_queue.Dispose(); + _send_queue = new BlockingCollection(); + } continue; } while (_stream?.CanWrite == false || Status != SocketClientStatus.Working) @@ -433,34 +424,20 @@ namespace ZeroLevel.Network } Thread.Sleep(unsuccess * 128); } - if (frame != null) + try { - try - { - if (frame.isRequest) - { - Dbg.Timestamp((int)DbgNetworkEvents.ClientStartSendRequest, frame.identity.ToString()); - - _requests.StartSend(frame.identity); - } - _stream.Write(frame.data, 0, frame.data.Length); - _last_rw_time = DateTime.UtcNow.Ticks; - - if (frame.isRequest) - { - Dbg.Timestamp((int)DbgNetworkEvents.ClientCompleteSendRequest, frame.identity.ToString()); - } - } - catch (Exception ex) - { - Log.SystemError(ex, $"[SocketClient.SendFramesJob] _stream.Write"); - Broken(); - OnDisconnect(this); - } - finally + if (frame.isRequest) { - _sendinfo_pool.Free(frame); + _requests.StartSend(frame.identity); } + _stream.Write(frame.data, 0, frame.data.Length); + _last_rw_time = DateTime.UtcNow.Ticks; + } + catch (Exception ex) + { + Log.SystemError(ex, $"[SocketClient.SendFramesJob] _stream.Write"); + Broken(); + OnDisconnect(this); } } } diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs index 358d483..1231203 100644 --- a/ZeroLevel/Services/Network/SocketServer.cs +++ b/ZeroLevel/Services/Network/SocketServer.cs @@ -84,8 +84,6 @@ namespace ZeroLevel.Network _connections[connection.Endpoint] = new ExClient(connection); connection.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS)); ConnectEventRise(_connections[connection.Endpoint]); - - Dbg.Timestamp((int)DbgNetworkEvents.ServerClientConnected, $"{connection.Endpoint.Address}:{connection.Endpoint.Port}"); } catch (Exception ex) { @@ -107,8 +105,6 @@ namespace ZeroLevel.Network _connection_set_lock.EnterWriteLock(); _connections[client.Endpoint].Dispose(); _connections.Remove(client.Endpoint); - - Dbg.Timestamp((int)DbgNetworkEvents.ServerClientDisconnect, $"{client.Endpoint.Address}:{client.Endpoint.Port}"); } finally { @@ -135,7 +131,7 @@ namespace ZeroLevel.Network #region IRouter public void HandleMessage(Frame frame, ISocketClient client) => _router.HandleMessage(frame, client); - public void HandleRequest(Frame frame, ISocketClient client, Action handler) => _router.HandleRequest(frame, client, handler); + public void HandleRequest(Frame frame, ISocketClient client, int identity, Action handler) => _router.HandleRequest(frame, client, identity, handler); public IServer RegisterInbox(string inbox, MessageHandler handler) => _router.RegisterInbox(inbox, handler); public IServer RegisterInbox(MessageHandler handler) => _router.RegisterInbox(handler); diff --git a/ZeroLevel/Services/Network/Utils/FrameParser.cs b/ZeroLevel/Services/Network/Utils/FrameParser.cs index 9050664..e877d92 100644 --- a/ZeroLevel/Services/Network/Utils/FrameParser.cs +++ b/ZeroLevel/Services/Network/Utils/FrameParser.cs @@ -1,5 +1,6 @@ using System; using System.Threading.Tasks; +using ZeroLevel.Services; namespace ZeroLevel.Network { @@ -25,7 +26,7 @@ namespace ZeroLevel.Network public bool IdentityFilled; public bool PayloadFilled; public bool Corrupted; - + public void Reset(byte magic) { @@ -151,7 +152,7 @@ namespace ZeroLevel.Network { try { - Task.Run(() => OnIncoming?.Invoke(type, identity, payload)); + OnIncoming?.Invoke(type, identity, payload); } catch (Exception ex) { diff --git a/ZeroLevel/Services/Network/Utils/RequestBuffer.cs b/ZeroLevel/Services/Network/Utils/RequestBuffer.cs index 7a59c9d..29ff36f 100644 --- a/ZeroLevel/Services/Network/Utils/RequestBuffer.cs +++ b/ZeroLevel/Services/Network/Utils/RequestBuffer.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using ZeroLevel.Services.Pools; @@ -97,21 +98,18 @@ namespace ZeroLevel.Network } } - public void TestForTimeouts() + public void Timeout(List frameIds) { - var now_ticks = DateTime.UtcNow.Ticks; - var to_remove = new List(); bool take = false; try { _reqeust_lock.Enter(ref take); - foreach (var pair in _requests) + for (int i = 0; i < frameIds.Count; i++) { - if (pair.Value.Sended == false) continue; - var diff = now_ticks - pair.Value.Timestamp; - if (diff > BaseSocket.MAX_REQUEST_TIME_TICKS) + if (_requests.ContainsKey(frameIds[i])) { - to_remove.Add(pair.Key); + _ri_pool.Free(_requests[frameIds[i]]); + _requests.Remove(frameIds[i]); } } } @@ -119,10 +117,33 @@ namespace ZeroLevel.Network { if (take) _reqeust_lock.Exit(false); } - foreach (var key in to_remove) + } + + public void TestForTimeouts() + { + var now_ticks = DateTime.UtcNow.Ticks; + var to_remove = new List(); + KeyValuePair[] to_proceed; + bool take = false; + try + { + _reqeust_lock.Enter(ref take); + to_proceed = _requests.Select(x => x).ToArray(); + } + finally + { + if (take) _reqeust_lock.Exit(false); + } + for (int i = 0; i < to_proceed.Length; i++) { - Fail(key, "Timeout"); + if (to_proceed[i].Value.Sended == false) continue; + var diff = now_ticks - to_proceed[i].Value.Timestamp; + if (diff > BaseSocket.MAX_REQUEST_TIME_TICKS) + { + to_remove.Add(to_proceed[i].Key); + } } + Timeout(to_remove); } } } diff --git a/ZeroLevel/Services/Network/Utils/Router.cs b/ZeroLevel/Services/Network/Utils/Router.cs index d1563ff..a8866ef 100644 --- a/ZeroLevel/Services/Network/Utils/Router.cs +++ b/ZeroLevel/Services/Network/Utils/Router.cs @@ -99,34 +99,6 @@ namespace ZeroLevel.Network }; } - /* - public object Invoke(byte[] data, ISocketClient client) - { - if (_typeResp == null) - { - var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data); - if (_noArguments) - { - this._invoker.Invoke(this._instance, new object[] { client }); - } - else - { - this._invoker.Invoke(this._instance, new object[] { client, incoming }); - } - } - else if (_typeReq == null) - { - return this._invoker.Invoke(this._instance, new object[] { client }); - } - else - { - var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data); - return this._invoker.Invoke(this._instance, new object[] { client, incoming }); - } - return null; - } - */ - public void InvokeAsync(byte[] data, ISocketClient client) { if (_typeResp == null) @@ -134,9 +106,6 @@ namespace ZeroLevel.Network if (_noArguments) { Task.Run(() => this._invoker.Invoke(this._instance, new object[] { client })); - /* F**kin .net core not support asyn delegate invoking - this._invoker.BeginInvoke(this._instance, new object[] { client }, null, null); - */ } else { @@ -145,9 +114,6 @@ namespace ZeroLevel.Network var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data); this._invoker.Invoke(this._instance, new object[] { client, incoming }); }); - /* F**kin .net core not support asyn delegate invoking - this._invoker.BeginInvoke(this._instance, new object[] { client, incoming }, null, null); - */ } } } @@ -157,12 +123,6 @@ namespace ZeroLevel.Network if (_typeReq == null) { Task.Run(() => { callback(this._invoker.Invoke(this._instance, new object[] { client })); }); - /* F**kin .net core not support asyn delegate invoking - this._invoker.BeginInvoke(this._instance, new object[] { client }, ar => - { - callback(ar.AsyncState); - }, null); - */ } else { @@ -171,12 +131,6 @@ namespace ZeroLevel.Network var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data); callback(this._invoker.Invoke(this._instance, new object[] { client, incoming })); }); - /* F**kin .net core not support asyn delegate invoking - this._invoker.BeginInvoke(this._instance, new object[] { client, incoming }, ar => - { - callback(ar.AsyncState); - }, null); - */ } } @@ -244,13 +198,14 @@ namespace ZeroLevel.Network { try { - if (_handlers.ContainsKey(frame.Inbox)) + List invokers; + if (_handlers.TryGetValue(frame.Inbox, out invokers)) { - foreach (var handler in _handlers[frame.Inbox]) + foreach (var invoker in invokers) { try { - handler.InvokeAsync(frame.Payload, client); + invoker.InvokeAsync(frame.Payload, client); } catch (Exception ex) { @@ -265,14 +220,37 @@ namespace ZeroLevel.Network } } - public void HandleRequest(Frame frame, ISocketClient client, Action handler) + public class Pack + : IBinarySerializable { + public int Identity; + public long Timestamp; + + public void Deserialize(IBinaryReader reader) + { + this.Identity = reader.ReadInt32(); + this.Timestamp = reader.ReadLong(); + } + + public void Serialize(IBinaryWriter writer) + { + writer.WriteInt32(this.Identity); + writer.WriteLong(this.Timestamp); + } + } + + public void HandleRequest(Frame frame, ISocketClient client, int identity, Action handler) + { try { - if (_requestors.ContainsKey(frame.Inbox)) + MRInvoker invoker; + if (_requestors.TryGetValue(frame.Inbox, out invoker)) { - _requestors[frame.Inbox].InvokeAsync(frame.Payload, client - , result => handler(MessageSerializer.SerializeCompatible(result))); + invoker.InvokeAsync(frame.Payload, client + , result => + { + handler(identity, MessageSerializer.SerializeCompatible(result)); + }); } else { @@ -422,7 +400,7 @@ namespace ZeroLevel.Network public event Action OnDisconnect = _ => { }; public event Action OnConnect = _ => { }; public void HandleMessage(Frame frame, ISocketClient client) { } - public void HandleRequest(Frame frame, ISocketClient client, Action handler) { } + public void HandleRequest(Frame frame, ISocketClient client, int identity, Action handler) { } public IServer RegisterInbox(string inbox, MessageHandler handler) { return this; } public IServer RegisterInbox(string inbox, MessageHandler handler) { return this; } public IServer RegisterInbox(MessageHandler handler) { return this; } diff --git a/ZeroLevel/ZeroLevel.csproj b/ZeroLevel/ZeroLevel.csproj index 33ed333..6ecb9de 100644 --- a/ZeroLevel/ZeroLevel.csproj +++ b/ZeroLevel/ZeroLevel.csproj @@ -6,16 +6,16 @@ ogoun ogoun - 3.0.0.8 - SDL and NetworkMonitor for discovering + 3.0.0.9 + Fix networking https://github.com/ogoun/Zero/wiki Copyright Ogoun 2019 https://opensource.org/licenses/MIT https://raw.githubusercontent.com/ogoun/Zero/master/zero.png https://github.com/ogoun/Zero GitHub - 3.0.8 - 3.0.0.8 + 3.0.9 + 3.0.0.9 diff --git a/ZeroNetworkMonitor/ServiceControlPanel.xaml.cs b/ZeroNetworkMonitor/ServiceControlPanel.xaml.cs index 6563daa..1cb3611 100644 --- a/ZeroNetworkMonitor/ServiceControlPanel.xaml.cs +++ b/ZeroNetworkMonitor/ServiceControlPanel.xaml.cs @@ -36,7 +36,7 @@ namespace ZeroNetworkMonitor { var exchange = Injector.Default.Resolve(); var client = exchange.GetConnection(serviceKey); - client.Request(SDL_INBOX, desc => + client?.Request(SDL_INBOX, desc => { _description = desc; UpdateDescriptionView();