Test refactoring

pull/1/head
Ogoun 5 years ago
parent 3dd4d22c22
commit 7d7090b4c9

@ -1,60 +1,89 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using ZeroLevel.Services.Pools;
namespace ZeroLevel.Network namespace ZeroLevel.Network
{ {
internal sealed class RequestBuffer internal sealed class RequestBuffer
{ {
private ConcurrentDictionary<long, RequestInfo> _requests = new ConcurrentDictionary<long, RequestInfo>(); private ConcurrentDictionary<long, Action<byte[]>> _callbacks = new ConcurrentDictionary<long, Action<byte[]>>();
private static ObjectPool<RequestInfo> _ri_pool = new ObjectPool<RequestInfo>(() => new RequestInfo()); private ConcurrentDictionary<long, Action<string>> _fallbacks = new ConcurrentDictionary<long, Action<string>>();
private ConcurrentDictionary<long, long> _timeouts = new ConcurrentDictionary<long, long>();
public void RegisterForFrame(int identity, Action<byte[]> callback, Action<string> fail = null) public void RegisterForFrame(int identity, Action<byte[]> callback, Action<string> fallback = null)
{ {
var ri = _ri_pool.Allocate(); if (callback != null)
ri.Reset(callback, fail); {
_requests[identity] = ri; _callbacks.TryAdd(identity, callback);
}
if (fallback != null)
{
_fallbacks.TryAdd(identity, fallback);
}
} }
public void Fail(long frameId, string message) public void Fail(long identity, string message)
{ {
RequestInfo ri; Action<string> rec;
if (_requests.TryRemove(frameId, out ri)) if (_fallbacks.TryRemove(identity, out rec))
{ {
ri.Fail(message); try
_ri_pool.Free(ri); {
rec(message);
}
catch (Exception ex)
{
Log.Error(ex, $"Fail invoke fallback for request '{identity}' with message '{message ?? string.Empty}'");
}
rec = null;
} }
} }
public void Success(long frameId, byte[] data) public void Success(long identity, byte[] data)
{ {
RequestInfo ri; Action<byte[]> rec;
if (_requests.TryRemove(frameId, out ri)) if (_callbacks.TryRemove(identity, out rec))
{ {
ri.Success(data); try
_ri_pool.Free(ri); {
rec(data);
}
catch (Exception ex)
{
Log.Error(ex, $"Fail invoke callback for request '{identity}'. Response size '{data?.Length ?? 0}'");
}
rec = null;
} }
} }
public void StartSend(long frameId) public void StartSend(long identity)
{ {
RequestInfo ri; if (_callbacks.ContainsKey(identity)
if (_requests.TryGetValue(frameId, out ri)) || _fallbacks.ContainsKey(identity))
{ {
ri.StartSend(); _timeouts.TryAdd(identity, DateTime.UtcNow.Ticks);
} }
} }
public void Timeout(List<long> frameIds) public void Timeout(List<long> identities)
{ {
RequestInfo ri; long t;
for (int i = 0; i < frameIds.Count; i++) Action<string> rec;
foreach (var id in identities)
{ {
if (_requests.TryRemove(frameIds[i], out ri)) if (_fallbacks.TryRemove(id, out rec))
{ {
_ri_pool.Free(ri); try
{
rec("Timeout");
}
catch (Exception ex)
{
Log.Error(ex, $"Fail invoke fallback for request '{id}' by timeout");
}
rec = null;
} }
_timeouts.TryRemove(id, out t);
} }
} }
@ -62,10 +91,9 @@ namespace ZeroLevel.Network
{ {
var now_ticks = DateTime.UtcNow.Ticks; var now_ticks = DateTime.UtcNow.Ticks;
var to_remove = new List<long>(); var to_remove = new List<long>();
foreach (var pair in _requests) foreach (var pair in _timeouts)
{ {
if (pair.Value.Sended == false) continue; var diff = now_ticks - pair.Value;
var diff = now_ticks - pair.Value.Timestamp;
if (diff > BaseSocket.MAX_REQUEST_TIME_TICKS) if (diff > BaseSocket.MAX_REQUEST_TIME_TICKS)
{ {
to_remove.Add(pair.Key); to_remove.Add(pair.Key);

Loading…
Cancel
Save

Powered by TurnKey Linux.