Duplex tcp channel

pull/1/head
unknown 6 years ago
parent 1f8ebb36d5
commit 2feed1901d

@ -15,7 +15,9 @@ namespace TestApp
.Run();
var router = se.Service.UseHost(8800);
router.RegisterInbox<string, string>("upper", (c, s) => s.ToUpperInvariant());
router.RegisterInbox<string, string>("upper", (c, s) => s.ToUpperInvariant());
se.WaitWhileStatus(ZeroServiceStatus.Running)
.Stop();

@ -17,7 +17,7 @@ namespace ZeroLevel.Network
void ForceConnect();
void UseKeepAlive(TimeSpan period);
void Send(Frame data);
void Request(Frame data, Action<Frame> callback, Action<string> fail = null);
void Request(Frame data, Action<byte[]> callback, Action<string> fail = null);
void Response(byte[] data, int identity);
}
}

@ -66,7 +66,7 @@ namespace ZeroLevel.Network
{
try
{
_client.Request(Frame.FromPool(inbox), f => callback(f.Payload));
_client.Request(Frame.FromPool(inbox), f => callback(f));
}
catch (Exception ex)
{
@ -80,7 +80,7 @@ namespace ZeroLevel.Network
{
try
{
_client.Request(Frame.FromPool(inbox, data), f => callback(f.Payload));
_client.Request(Frame.FromPool(inbox, data), f => callback(f));
}
catch (Exception ex)
{
@ -94,7 +94,7 @@ namespace ZeroLevel.Network
{
try
{
_client.Request(Frame.FromPool(inbox), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f.Payload)));
_client.Request(Frame.FromPool(inbox), f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
}
catch (Exception ex)
{
@ -109,7 +109,7 @@ namespace ZeroLevel.Network
try
{
_client.Request(Frame.FromPool(inbox, MessageSerializer.SerializeCompatible<Trequest>(request)),
f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f.Payload)));
f => callback(MessageSerializer.DeserializeCompatible<Tresponse>(f)));
}
catch (Exception ex)
{

@ -4,14 +4,14 @@ namespace ZeroLevel.Network
{
internal sealed class RequestInfo
{
private Action<Frame> _handler;
private Action<byte[]> _handler;
private Action<string> _failHandler;
private long _timestamp;
public long Timestamp { get { return _timestamp; } }
private bool _sended;
public bool Sended { get { return _sended; } }
public void Reset(Action<Frame> handler, Action<string> failHandler)
public void Reset(Action<byte[]> handler, Action<string> failHandler)
{
_sended = false;
_handler = handler;
@ -24,10 +24,9 @@ namespace ZeroLevel.Network
_timestamp = DateTime.UtcNow.Ticks;
}
public void Success(Frame frame)
public void Success(byte[] data)
{
_handler(frame);
frame?.Release();
_handler(data);
}
public void Fail(string reasonPhrase)

@ -40,12 +40,13 @@ namespace ZeroLevel.Network
public SocketClient(IPEndPoint ep, IRouter router)
{
_router = router;
_router = router;
Endpoint = ep;
_parser.OnIncoming += _parser_OnIncoming;
_sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true;
_sendThread.Start();
EnsureConnection();
}
public SocketClient(Socket socket, IRouter router)
@ -53,11 +54,14 @@ namespace ZeroLevel.Network
_router = router;
_socket_freezed = true;
_clientSocket = socket;
_stream = new NetworkStream(_clientSocket, true);
Endpoint = (IPEndPoint)_clientSocket.RemoteEndPoint;
_parser.OnIncoming += _parser_OnIncoming;
_sendThread = new Thread(SendFramesJob);
_sendThread.IsBackground = true;
_sendThread.Start();
_sendThread.Start();
_stream.BeginRead(_buffer, 0, DEFAULT_RECEIVE_BUFFER_SIZE, ReceiveAsyncCallback, null);
Working();
}
#region API
@ -66,7 +70,7 @@ namespace ZeroLevel.Network
public event Action<ISocketClient, byte[], int> OnIncomingData = (_, __, ___) => { };
public IPEndPoint Endpoint { get; }
public void Request(Frame frame, Action<Frame> callback, Action<string> fail = null)
public void Request(Frame frame, Action<byte[]> callback, Action<string> fail = null)
{
if (frame == null) throw new ArgumentNullException(nameof(frame));
if (frame != null && false == _send_queue.IsAddingCompleted)
@ -169,7 +173,7 @@ namespace ZeroLevel.Network
}
break;
case FrameType.Response:
_requests.Success(identity, MessageSerializer.Deserialize<Frame>(data));
_requests.Success(identity, data);
break;
}
OnIncomingData(this, data, identity);

@ -48,6 +48,7 @@ namespace ZeroLevel.Network
}
private byte[] _size_buf = new byte[4];
private byte[] _id_buf = new byte[4];
private int offset;
public int WriteSize(byte[] buf, int start, int length)
@ -74,11 +75,11 @@ namespace ZeroLevel.Network
{
for (; offset < 4 && start < length; offset++, start++)
{
_size_buf[offset] = buf[start];
_id_buf[offset] = buf[start];
}
if (offset == 4)
{
Identity = BitConverter.ToInt32(_size_buf, 0);
Identity = BitConverter.ToInt32(_id_buf, 0);
IdentityFilled = true;
offset = 0;
}

@ -22,7 +22,7 @@ namespace ZeroLevel.Network
var packet = new byte[data.Length + 6];
packet[0] = MAGIC;
Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4);
packet[5] = (byte)(packet[0] ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]);
packet[5] = (byte)(MAGIC ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]);
HashData(data, packet[5]);
Array.Copy(data, 0, packet, 6, data.Length);
return packet;
@ -32,17 +32,18 @@ namespace ZeroLevel.Network
{
var packet = new byte[data.Length + 6 + 4];
packet[0] = (MAGIC | MAGIC_REQUEST);
Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4);
packet[5] = (byte)(packet[0] ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]);
Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4);
requestId = Interlocked.Increment(ref _current_request_id);
var id = BitConverter.GetBytes(requestId);
packet[6] = id[0];
packet[7] = id[1];
packet[8] = id[2];
packet[9] = id[3];
packet[5] = id[0];
packet[6] = id[1];
packet[7] = id[2];
packet[8] = id[3];
HashData(data, packet[5]);
packet[9] = (byte)(MAGIC ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]);
HashData(data, packet[9]);
Array.Copy(data, 0, packet, 10, data.Length);
return packet;
}
@ -51,16 +52,18 @@ namespace ZeroLevel.Network
{
var packet = new byte[data.Length + 6 + 4];
packet[0] = (MAGIC | MAGIC_RESPONSE);
Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4);
packet[5] = (byte)(packet[0] ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]);
Array.Copy(BitConverter.GetBytes(data.Length), 0, packet, 1, 4);
var id = BitConverter.GetBytes(requestId);
packet[6] = id[0];
packet[7] = id[1];
packet[8] = id[2];
packet[9] = id[3];
packet[5] = id[0];
packet[6] = id[1];
packet[7] = id[2];
packet[8] = id[3];
HashData(data, packet[5]);
packet[9] = (byte)(MAGIC ^ packet[1] ^ packet[2] ^ packet[3] ^ packet[4]);
HashData(data, packet[9]);
Array.Copy(data, 0, packet, 10, data.Length);
return packet;
}

@ -10,7 +10,7 @@ namespace ZeroLevel.Network
private Dictionary<long, RequestInfo> _requests = new Dictionary<long, RequestInfo>();
private static ObjectPool<RequestInfo> _ri_pool = new ObjectPool<RequestInfo>(() => new RequestInfo());
public void RegisterForFrame(int identity, Action<Frame> callback, Action<string> fail = null)
public void RegisterForFrame(int identity, Action<byte[]> callback, Action<string> fail = null)
{
var ri = _ri_pool.Allocate();
lock (_reqeust_lock)
@ -38,7 +38,7 @@ namespace ZeroLevel.Network
}
}
public void Success(long frameId, Frame frame)
public void Success(long frameId, byte[] data)
{
RequestInfo ri = null;
lock (_reqeust_lock)
@ -51,7 +51,7 @@ namespace ZeroLevel.Network
}
if (ri != null)
{
ri.Success(frame);
ri.Success(data);
_ri_pool.Free(ri);
}
}

@ -105,7 +105,7 @@ namespace ZeroLevel.Network
}
else
{
this._invoker.Invoke(this._instance, new object[] { incoming, client });
this._invoker.Invoke(this._instance, new object[] { client, incoming });
}
}
else if (_typeReq == null)
@ -115,7 +115,7 @@ namespace ZeroLevel.Network
else
{
var incoming = (_typeReq == typeof(byte[])) ? data : MessageSerializer.DeserializeCompatible(_typeReq, data);
return this._invoker.Invoke(this._instance, new object[] { incoming, client });
return this._invoker.Invoke(this._instance, new object[] { client, incoming });
}
return null;
}

Loading…
Cancel
Save

Powered by TurnKey Linux.