diff --git a/TestApp/MyService.cs b/TestApp/MyService.cs index b4cc0ee..543cb4b 100644 --- a/TestApp/MyService.cs +++ b/TestApp/MyService.cs @@ -32,16 +32,24 @@ namespace TestApp }); Exchange.RoutesStorage.Set("test.app", new IPEndPoint(IPAddress.Loopback, 8800)); - - while (true) + using (var waiter = new ManualResetEventSlim(false)) { - try - { - Exchange.GetConnection("test.app")?.Request("counter", s => Interlocked.Add(ref counter, s)); - } - catch + while (true) { - Thread.Sleep(300); + try + { + Exchange.GetConnection("test.app")?.Request("counter", s => + { + waiter.Set(); + Interlocked.Add(ref counter, s); + }); + } + catch + { + Thread.Sleep(300); + } + waiter.Wait(); + waiter.Reset(); } } } diff --git a/ZeroLevel/Services/Bootstrap.cs b/ZeroLevel/Services/Bootstrap.cs index db302b3..481b6ba 100644 --- a/ZeroLevel/Services/Bootstrap.cs +++ b/ZeroLevel/Services/Bootstrap.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Net; using System.Reflection; using ZeroLevel.Network; +using ZeroLevel.Services; using ZeroLevel.Services.Logging; namespace ZeroLevel @@ -194,6 +195,7 @@ namespace ZeroLevel try { Sheduller.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose default sheduller error"); } try { Log.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose log error"); } try { Injector.Default.Dispose(); Injector.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose DI containers error"); } + Dbg.Shutdown(); } } } \ No newline at end of file diff --git a/ZeroLevel/Services/Dbg.cs b/ZeroLevel/Services/Dbg.cs new file mode 100644 index 0000000..8e88cf1 --- /dev/null +++ b/ZeroLevel/Services/Dbg.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Threading; +using ZeroLevel.Services.Serialization; + +namespace ZeroLevel.Services +{ + public static class Dbg + { + private static BlockingCollection> _timestamps = + new BlockingCollection>(); + + private static Thread _writeThread; + private static readonly bool _started; + + static Dbg() + { + if (Configuration.Default.Contains("dbg")) + { + try + { + if (false == Directory.Exists(Configuration.Default.First("dbg"))) + { + Directory.CreateDirectory(Configuration.Default.First("dbg")); + } + } + catch (Exception ex) + { + Log.SystemError(ex, "[Dbg] Fault initialize, dbg files directory not exists and not may be created"); + _started = false; + return; + } + _writeThread = new Thread(HandleQueue); + _writeThread.IsBackground = true; + _writeThread.Start(); + _started = true; + } + else + { + _started = false; + } + } + + private static void HandleQueue() + { + using (var fs = new FileStream(Path.Combine(Configuration.Default.First("dbg"), $"{DateTime.Now.ToString("yyyyMMdd_HHmmss")}.dbg"), FileMode.Create, FileAccess.Write, FileShare.None)) + { + using (var writer = new MemoryStreamWriter(fs)) + { + while (_timestamps.IsCompleted == false) + { + var pair = _timestamps.Take(); + writer.WriteInt32(pair.Item1); + writer.WriteLong(pair.Item2); + writer.WriteString(pair.Item3); + } + fs.Flush(); + } + } + } + + internal static void Shutdown() + { + if (_started) + { + _timestamps.CompleteAdding(); + } + } + + internal static void Timestamp(int eventType, string description) + { + if (_started && _timestamps.IsAddingCompleted == false) + { + _timestamps.Add(Tuple.Create(eventType, DateTime.UtcNow.Ticks, description)); + } + } + } +} diff --git a/ZeroLevel/Services/Network/Contracts/IRouter.cs b/ZeroLevel/Services/Network/Contracts/IRouter.cs index fd40663..df6befa 100644 --- a/ZeroLevel/Services/Network/Contracts/IRouter.cs +++ b/ZeroLevel/Services/Network/Contracts/IRouter.cs @@ -6,6 +6,6 @@ namespace ZeroLevel.Network : IServer { void HandleMessage(Frame frame, ISocketClient client); - byte[] HandleRequest(Frame frame, ISocketClient client); + void HandleRequest(Frame frame, ISocketClient client, Action handler); } } diff --git a/ZeroLevel/Services/Network/DbgNetworkEvents.cs b/ZeroLevel/Services/Network/DbgNetworkEvents.cs new file mode 100644 index 0000000..c0b29a5 --- /dev/null +++ b/ZeroLevel/Services/Network/DbgNetworkEvents.cs @@ -0,0 +1,20 @@ +namespace ZeroLevel.Network +{ + public enum DbgNetworkEvents + : int + { + ServerClientConnected = 0, + ServerClientDisconnect = 1, + ClientStartPushRequest = 2, + ClientCompletePushRequest = 3, + ClientStartSendResponse = 4, + ClientCompleteSendResponse = 5, + ClientStartHandleRequest = 6, + ClientCompleteHandleRequest = 7, + ClientGotResponse = 8, + ClientLostConnection = 9, + + ClientStartSendRequest = 10, + ClientCompleteSendRequest = 11 + } +} diff --git a/ZeroLevel/Services/Network/SocketClient.cs b/ZeroLevel/Services/Network/SocketClient.cs index cd7cd1a..fac4819 100644 --- a/ZeroLevel/Services/Network/SocketClient.cs +++ b/ZeroLevel/Services/Network/SocketClient.cs @@ -4,6 +4,7 @@ using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; +using ZeroLevel.Services; using ZeroLevel.Services.Pools; using ZeroLevel.Services.Serialization; @@ -93,7 +94,11 @@ namespace ZeroLevel.Network public void Request(Frame frame, Action callback, Action fail = null) { if (frame == null) throw new ArgumentNullException(nameof(frame)); - if (frame != null && !_send_queue.IsAddingCompleted) + var data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id); + Dbg.Timestamp((int)DbgNetworkEvents.ClientStartPushRequest, id.ToString()); + frame.Release(); + + if (!_send_queue.IsAddingCompleted) { while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { @@ -101,12 +106,13 @@ namespace ZeroLevel.Network } var sendInfo = _sendinfo_pool.Allocate(); sendInfo.isRequest = true; - sendInfo.data = NetworkPacketFactory.Reqeust(MessageSerializer.Serialize(frame), out int id); + sendInfo.data = data; sendInfo.identity = id; _requests.RegisterForFrame(id, callback, fail); _send_queue.Add(sendInfo); - frame.Release(); + } + Dbg.Timestamp((int)DbgNetworkEvents.ClientCompletePushRequest, id.ToString()); } public void ForceConnect() @@ -117,7 +123,10 @@ namespace ZeroLevel.Network public void Send(Frame frame) { if (frame == null) throw new ArgumentNullException(nameof(frame)); - if (frame != null && !_send_queue.IsAddingCompleted) + var data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame)); + frame.Release(); + + if (!_send_queue.IsAddingCompleted) { while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) { @@ -126,15 +135,15 @@ namespace ZeroLevel.Network var info = _sendinfo_pool.Allocate(); info.isRequest = false; info.identity = 0; - info.data = NetworkPacketFactory.Message(MessageSerializer.Serialize(frame)); + info.data = data; _send_queue.Add(info); - frame.Release(); } } public void Response(byte[] data, int identity) { if (data == null) throw new ArgumentNullException(nameof(data)); + Dbg.Timestamp((int)DbgNetworkEvents.ClientStartSendResponse, identity.ToString()); if (!_send_queue.IsAddingCompleted) { while (_send_queue.Count >= MAX_SEND_QUEUE_SIZE) @@ -147,6 +156,7 @@ namespace ZeroLevel.Network info.data = NetworkPacketFactory.Response(data, identity); _send_queue.Add(info); } + Dbg.Timestamp((int)DbgNetworkEvents.ClientCompleteSendResponse, identity.ToString()); } public void UseKeepAlive(TimeSpan period) @@ -196,14 +206,23 @@ namespace ZeroLevel.Network Router?.HandleMessage(MessageSerializer.Deserialize(frame.data), this); break; case FrameType.Request: - var response = Router?.HandleRequest(MessageSerializer.Deserialize(frame.data), this); - if (response != null) { - this.Response(response, frame.identity); + Dbg.Timestamp((int)DbgNetworkEvents.ClientStartHandleRequest, frame.identity.ToString()); + Router?.HandleRequest(MessageSerializer.Deserialize(frame.data), this, response => + { + if (response != null) + { + this.Response(response, frame.identity); + } + Dbg.Timestamp((int)DbgNetworkEvents.ClientCompleteHandleRequest, frame.identity.ToString()); + }); } break; case FrameType.Response: - _requests.Success(frame.identity, frame.data); + { + Dbg.Timestamp((int)DbgNetworkEvents.ClientGotResponse, frame.identity.ToString()); + _requests.Success(frame.identity, frame.data); + } break; } } @@ -247,6 +266,8 @@ namespace ZeroLevel.Network } if (_clientSocket != null) { + Dbg.Timestamp((int)DbgNetworkEvents.ClientLostConnection, $"{(_clientSocket.RemoteEndPoint as IPEndPoint).Address}:{(_clientSocket.RemoteEndPoint as IPEndPoint).Port}"); + try { _stream?.Close(); @@ -338,7 +359,7 @@ namespace ZeroLevel.Network private void ReceiveAsyncCallback(IAsyncResult ar) { try - { + { var count = _stream.EndRead(ar); if (count > 0) { @@ -351,11 +372,7 @@ namespace ZeroLevel.Network Thread.Sleep(1); } EnsureConnection(); - if (Status == SocketClientStatus.Working - || Status == SocketClientStatus.Initialized) - { - _stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null); - } + _stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null); } catch (ObjectDisposedException) { @@ -387,7 +404,7 @@ namespace ZeroLevel.Network { Log.SystemError(ex, "[SocketClient.SendFramesJob] send_queue.Take"); _send_queue.Dispose(); - _send_queue = new BlockingCollection(); + _send_queue = new BlockingCollection(); continue; } while (_stream?.CanWrite == false || Status != SocketClientStatus.Working) @@ -422,10 +439,17 @@ namespace ZeroLevel.Network { if (frame.isRequest) { + Dbg.Timestamp((int)DbgNetworkEvents.ClientStartSendRequest, frame.identity.ToString()); + _requests.StartSend(frame.identity); } _stream.Write(frame.data, 0, frame.data.Length); _last_rw_time = DateTime.UtcNow.Ticks; + + if (frame.isRequest) + { + Dbg.Timestamp((int)DbgNetworkEvents.ClientCompleteSendRequest, frame.identity.ToString()); + } } catch (Exception ex) { diff --git a/ZeroLevel/Services/Network/SocketServer.cs b/ZeroLevel/Services/Network/SocketServer.cs index b3685de..358d483 100644 --- a/ZeroLevel/Services/Network/SocketServer.cs +++ b/ZeroLevel/Services/Network/SocketServer.cs @@ -5,11 +5,12 @@ using System.Net; using System.Net.Sockets; using System.Threading; using ZeroLevel.Network.SDL; +using ZeroLevel.Services; namespace ZeroLevel.Network { internal sealed class SocketServer - : BaseSocket, IRouter + : BaseSocket, IRouter { private Socket _serverSocket; private ReaderWriterLockSlim _connection_set_lock = new ReaderWriterLockSlim(); @@ -83,6 +84,8 @@ namespace ZeroLevel.Network _connections[connection.Endpoint] = new ExClient(connection); connection.UseKeepAlive(TimeSpan.FromMilliseconds(BaseSocket.MINIMUM_HEARTBEAT_UPDATE_PERIOD_MS)); ConnectEventRise(_connections[connection.Endpoint]); + + Dbg.Timestamp((int)DbgNetworkEvents.ServerClientConnected, $"{connection.Endpoint.Address}:{connection.Endpoint.Port}"); } catch (Exception ex) { @@ -104,6 +107,8 @@ namespace ZeroLevel.Network _connection_set_lock.EnterWriteLock(); _connections[client.Endpoint].Dispose(); _connections.Remove(client.Endpoint); + + Dbg.Timestamp((int)DbgNetworkEvents.ServerClientDisconnect, $"{client.Endpoint.Address}:{client.Endpoint.Port}"); } finally { @@ -130,7 +135,7 @@ namespace ZeroLevel.Network #region IRouter public void HandleMessage(Frame frame, ISocketClient client) => _router.HandleMessage(frame, client); - public byte[] HandleRequest(Frame frame, ISocketClient client) => _router.HandleRequest(frame, client); + public void HandleRequest(Frame frame, ISocketClient client, Action handler) => _router.HandleRequest(frame, client, handler); public IServer RegisterInbox(string inbox, MessageHandler handler) => _router.RegisterInbox(inbox, handler); public IServer RegisterInbox(MessageHandler handler) => _router.RegisterInbox(handler); diff --git a/ZeroLevel/Services/Network/Utils/Router.cs b/ZeroLevel/Services/Network/Utils/Router.cs index 37ba76b..d1563ff 100644 --- a/ZeroLevel/Services/Network/Utils/Router.cs +++ b/ZeroLevel/Services/Network/Utils/Router.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; using System.Reflection; +using System.Threading.Tasks; using ZeroLevel.Network.SDL; using ZeroLevel.Services.Invokation; using ZeroLevel.Services.Serialization; @@ -98,6 +99,7 @@ namespace ZeroLevel.Network }; } + /* public object Invoke(byte[] data, ISocketClient client) { if (_typeResp == null) @@ -123,6 +125,60 @@ namespace ZeroLevel.Network } return null; } + */ + + public void InvokeAsync(byte[] data, ISocketClient client) + { + if (_typeResp == null) + { + if (_noArguments) + { + Task.Run(() => this._invoker.Invoke(this._instance, new object[] { client })); + /* F**kin .net core not support asyn delegate invoking + this._invoker.BeginInvoke(this._instance, new object[] { client }, null, null); + */ + } + else + { + Task.Run(() => + { + var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data); + this._invoker.Invoke(this._instance, new object[] { client, incoming }); + }); + /* F**kin .net core not support asyn delegate invoking + this._invoker.BeginInvoke(this._instance, new object[] { client, incoming }, null, null); + */ + } + } + } + + public void InvokeAsync(byte[] data, ISocketClient client, Action callback) + { + if (_typeReq == null) + { + Task.Run(() => { callback(this._invoker.Invoke(this._instance, new object[] { client })); }); + /* F**kin .net core not support asyn delegate invoking + this._invoker.BeginInvoke(this._instance, new object[] { client }, ar => + { + callback(ar.AsyncState); + }, null); + */ + } + else + { + Task.Run(() => + { + var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data); + callback(this._invoker.Invoke(this._instance, new object[] { client, incoming })); + }); + /* F**kin .net core not support asyn delegate invoking + this._invoker.BeginInvoke(this._instance, new object[] { client, incoming }, ar => + { + callback(ar.AsyncState); + }, null); + */ + } + } public InboxServiceDescription GetDescription(string name) { @@ -194,7 +250,7 @@ namespace ZeroLevel.Network { try { - handler.Invoke(frame.Payload, client); + handler.InvokeAsync(frame.Payload, client); } catch (Exception ex) { @@ -209,13 +265,14 @@ namespace ZeroLevel.Network } } - public byte[] HandleRequest(Frame frame, ISocketClient client) + public void HandleRequest(Frame frame, ISocketClient client, Action handler) { try { if (_requestors.ContainsKey(frame.Inbox)) { - return MessageSerializer.SerializeCompatible(_requestors[frame.Inbox].Invoke(frame.Payload, client)); + _requestors[frame.Inbox].InvokeAsync(frame.Payload, client + , result => handler(MessageSerializer.SerializeCompatible(result))); } else { @@ -226,7 +283,6 @@ namespace ZeroLevel.Network { Log.SystemError(ex, $"[ExRouter] Fault handle incomind request"); } - return null; } #endregion Invokation @@ -366,7 +422,7 @@ namespace ZeroLevel.Network public event Action OnDisconnect = _ => { }; public event Action OnConnect = _ => { }; public void HandleMessage(Frame frame, ISocketClient client) { } - public byte[] HandleRequest(Frame frame, ISocketClient client) { return null; } + public void HandleRequest(Frame frame, ISocketClient client, Action handler) { } public IServer RegisterInbox(string inbox, MessageHandler handler) { return this; } public IServer RegisterInbox(string inbox, MessageHandler handler) { return this; } public IServer RegisterInbox(MessageHandler handler) { return this; } diff --git a/ZeroLevel/Services/Serialization/IBinaryReader.cs b/ZeroLevel/Services/Serialization/IBinaryReader.cs index cd3b538..d1f4226 100644 --- a/ZeroLevel/Services/Serialization/IBinaryReader.cs +++ b/ZeroLevel/Services/Serialization/IBinaryReader.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.IO; using System.Net; namespace ZeroLevel.Services.Serialization @@ -94,5 +95,7 @@ namespace ZeroLevel.Services.Serialization List ReadUShortCollection(); #endregion Extensions + + Stream Stream { get; } } } \ No newline at end of file diff --git a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs index e742b45..456a4a5 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamReader.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamReader.cs @@ -55,7 +55,7 @@ namespace ZeroLevel.Services.Serialization throw new OutOfMemoryException("Array index out of bounds"); return (byte)_stream.ReadByte(); } - + public char ReadChar() { if (CheckOutOfRange(_stream, 2)) @@ -245,7 +245,7 @@ namespace ZeroLevel.Services.Serialization public Dictionary ReadDictionary() { int count = ReadInt32(); - var collection = new Dictionary(count); + var collection = new Dictionary(count); if (count > 0) { TKey key; @@ -562,5 +562,7 @@ namespace ZeroLevel.Services.Serialization { _stream.Dispose(); } + + public Stream Stream => _stream; } } \ No newline at end of file diff --git a/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs b/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs index 0cbad0d..95b211a 100644 --- a/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs +++ b/ZeroLevel/Services/Serialization/MemoryStreamWriter.cs @@ -23,13 +23,18 @@ namespace ZeroLevel.Services.Serialization } } - private readonly MemoryStream _stream; + private readonly Stream _stream; public MemoryStreamWriter() { _stream = new MemoryStream(); } + public MemoryStreamWriter(Stream stream) + { + _stream = stream; + } + /// /// Record a boolean value (1 byte) /// @@ -188,7 +193,53 @@ namespace ZeroLevel.Services.Serialization public byte[] Complete() { - return _stream.ToArray(); + return (_stream as MemoryStream)?.ToArray() ?? ReadToEnd(_stream); + } + + private static byte[] ReadToEnd(System.IO.Stream stream) + { + long originalPosition = 0; + if (stream.CanSeek) + { + originalPosition = stream.Position; + stream.Position = 0; + } + try + { + byte[] readBuffer = new byte[4096]; + int totalBytesRead = 0; + int bytesRead; + while ((bytesRead = stream.Read(readBuffer, totalBytesRead, readBuffer.Length - totalBytesRead)) > 0) + { + totalBytesRead += bytesRead; + if (totalBytesRead == readBuffer.Length) + { + int nextByte = stream.ReadByte(); + if (nextByte != -1) + { + byte[] temp = new byte[readBuffer.Length * 2]; + Buffer.BlockCopy(readBuffer, 0, temp, 0, readBuffer.Length); + Buffer.SetByte(temp, totalBytesRead, (byte)nextByte); + readBuffer = temp; + totalBytesRead++; + } + } + } + byte[] buffer = readBuffer; + if (readBuffer.Length != totalBytesRead) + { + buffer = new byte[totalBytesRead]; + Buffer.BlockCopy(readBuffer, 0, buffer, 0, totalBytesRead); + } + return buffer; + } + finally + { + if (stream.CanSeek) + { + stream.Position = originalPosition; + } + } } public void Dispose()