Prepare to discovery

Added aliases
pull/1/head
unknown 5 years ago
parent 2feed1901d
commit e5415d8dbc

@ -1,6 +1,7 @@
using System;
using System.Net;
using ZeroLevel;
using ZeroLevel.Network;
using ZeroLevel.Services.Applications;
namespace TestApp
@ -16,10 +17,23 @@ namespace TestApp
protected override void StartAction()
{
Log.Info("Started");
Sheduller.RemindEvery(TimeSpan.FromSeconds(5), () => {
UseHost(8800)
.RegisterInbox<string, string>("upper", (c, s) => s.ToUpperInvariant())
.RegisterInbox<IPEndPoint, string>("ip2str", (c, ip) => $"{ip.Address}:{ip.Port}");
Sheduller.RemindEvery(TimeSpan.FromSeconds(5), () =>
{
var client = ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800));
client.Request<string, string>("upper", "hello", s => Log.Info(s));
});
Sheduller.RemindEvery(TimeSpan.FromSeconds(6), () =>
{
var client = ConnectToService(new IPEndPoint(IPAddress.Loopback, 8800));
client.Request<IPEndPoint, string>("ip2str", new IPEndPoint(NetUtils.GetNonLoopbackAddress(), NetUtils.GetFreeTcpPort()), s => Log.Info(s));
});
}
protected override void StopAction()

@ -8,19 +8,14 @@ namespace TestApp
{
private static void Main(string[] args)
{
var se = Bootstrap.Startup<MyService>(args,
Bootstrap.Startup<MyService>(args,
() => Configuration.ReadSetFromIniFile("config.ini"))
.ReadServiceInfo()
//.ReadServiceInfo()
//.UseDiscovery()
.Run();
var router = se.Service.UseHost(8800);
router.RegisterInbox<string, string>("upper", (c, s) => s.ToUpperInvariant());
se.WaitWhileStatus(ZeroServiceStatus.Running)
.Run()
.WaitWhileStatus(ZeroServiceStatus.Running)
.Stop();
Bootstrap.Shutdown();
}
}
}

@ -118,24 +118,37 @@ namespace ZeroLevel.Services.Applications
private IDiscoveryClient _discoveryClient;
public void UseDiscovery()
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
{
var discovery = Configuration.Default.First("discovery");
_discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(discovery), _null_router, false));
}
}
public void UseDiscovery(string endpoint)
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
{
_discoveryClient = new DiscoveryClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), _null_router, false));
}
}
public void UseDiscovery(IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
{
_discoveryClient = new DiscoveryClient(GetClient(endpoint, _null_router, false));
}
}
public IRouter UseHost()
{
if (_state == ZeroServiceStatus.Running)
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
{
return GetServer(new IPEndPoint(IPAddress.Any, NetUtils.GetFreeTcpPort()), new Router()).Router;
}
@ -144,7 +157,8 @@ namespace ZeroLevel.Services.Applications
public IRouter UseHost(int port)
{
if (_state == ZeroServiceStatus.Running)
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
{
return GetServer(new IPEndPoint(IPAddress.Any, port), new Router()).Router;
}
@ -153,7 +167,8 @@ namespace ZeroLevel.Services.Applications
public IRouter UseHost(IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running)
if (_state == ZeroServiceStatus.Running ||
_state == ZeroServiceStatus.Initialized)
{
return GetServer(endpoint, new Router()).Router;
}
@ -169,6 +184,15 @@ namespace ZeroLevel.Services.Applications
return null;
}
public ExClient ConnectToService(string alias, string endpoint)
{
if (_state == ZeroServiceStatus.Running)
{
return new ExClient(GetClient(NetUtils.CreateIPEndPoint(endpoint), new Router(), true));
}
return null;
}
public ExClient ConnectToService(IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running)
@ -177,6 +201,15 @@ namespace ZeroLevel.Services.Applications
}
return null;
}
public ExClient ConnectToService(string alias, IPEndPoint endpoint)
{
if (_state == ZeroServiceStatus.Running)
{
return new ExClient(GetClient(endpoint, new Router(), true));
}
return null;
}
#endregion
#region Service control

@ -35,13 +35,6 @@ namespace ZeroLevel
public BootstrapFluent UseDiscovery(string url) { _service?.UseDiscovery(url); return this; }
public BootstrapFluent UseDiscovery(IPEndPoint endpoint) { _service?.UseDiscovery(endpoint); return this; }
/* public BootstrapFluent UseHost() { _service?.UseHost(); return this; }
public BootstrapFluent UseHost(int port) { _service?.UseHost(port); return this; }
public BootstrapFluent UseHost(IPEndPoint endpoint) { _service?.UseHost(endpoint); return this; }
public BootstrapFluent ConnectToService(string url) { _service.ConnectToService(url); return this; }
public BootstrapFluent ConnectToService(IPEndPoint endpoint) { _service.ConnectToService(endpoint); return this; }
*/
public BootstrapFluent ReadServiceInfo() { _service?.ReadServiceInfo(); return this; }
public BootstrapFluent ReadServiceInfo(IConfigurationSet config) { _service?.ReadServiceInfo(config); return this; }
@ -184,7 +177,7 @@ namespace ZeroLevel
return service;
}
private static void Shutdown()
public static void Shutdown()
{
try { Sheduller.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose default sheduller error"); }
try { Log.Dispose(); } catch (Exception ex) { Log.Error(ex, "[Bootstrap] Dispose log error"); }

@ -0,0 +1,152 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
namespace ZeroLevel.Network
{
public sealed class AliasSet
{
public sealed class _RoundRobinCollection<T> :
IDisposable
{
private readonly List<T> _collection =
new List<T>();
private int _index = -1;
private readonly ReaderWriterLockSlim _lock =
new ReaderWriterLockSlim();
public int Count { get { return _collection.Count; } }
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
_collection.Add(item);
if (_index == -1) _index = 0;
}
finally
{
_lock.ExitWriteLock();
}
}
public void Remove(T item)
{
_lock.EnterWriteLock();
try
{
_collection.Remove(item);
if (_index >= _collection.Count)
{
if (_collection.Count == 0) _index = -1;
else _index = 0;
}
}
finally
{
_lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return _collection.Contains(item);
}
finally
{
_lock.ExitReadLock();
}
}
public bool MoveNext()
{
_lock.EnterReadLock();
try
{
if (_collection.Count > 0)
{
_index = Interlocked.Increment(ref _index) % _collection.Count;
return true;
}
}
finally
{
_lock.ExitReadLock();
}
return false;
}
public T Current
{
get
{
return _index == -1 ? default(T) : _collection[_index];
}
}
public void Clear()
{
_collection.Clear();
_index = -1;
}
public void Dispose()
{
_collection.Clear();
_lock.Dispose();
}
}
private readonly ConcurrentDictionary<string, _RoundRobinCollection<string>> _aliases = new ConcurrentDictionary<string, _RoundRobinCollection<string>>();
public void Set(string alias, string address)
{
if (_aliases.ContainsKey(alias) == false)
{
if (_aliases.TryAdd(alias, new _RoundRobinCollection<string>()))
{
_aliases[alias].Add(address);
}
}
else
{
_aliases[alias].Clear();
_aliases[alias].Add(address);
}
}
public void Set(string alias, IEnumerable<string> addresses)
{
if (_aliases.ContainsKey(alias) == false)
{
if (_aliases.TryAdd(alias, new _RoundRobinCollection<string>()))
{
foreach (var address in addresses)
_aliases[alias].Add(address);
}
}
else
{
_aliases[alias].Clear();
foreach (var address in addresses)
_aliases[alias].Add(address);
}
}
public string GetAddress(string alias)
{
if (_aliases.ContainsKey(alias) && _aliases[alias].MoveNext())
{
return _aliases[alias].Current;
}
return null;
}
}
}

@ -3,21 +3,21 @@
public interface IServer
{
#region Messages
void RegisterInbox(string inbox, MessageHandler handler);
void RegisterInbox<T>(string inbox, MessageHandler<T> handler);
IServer RegisterInbox(string inbox, MessageHandler handler);
IServer RegisterInbox<T>(string inbox, MessageHandler<T> handler);
// Default inboxe
void RegisterInbox(MessageHandler handler);
void RegisterInbox<T>(MessageHandler<T> handler);
IServer RegisterInbox(MessageHandler handler);
IServer RegisterInbox<T>(MessageHandler<T> handler);
#endregion
#region Requests
void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler);
void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler);
IServer RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler);
IServer RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler);
// Default inboxe
void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler);
void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler);
IServer RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler);
IServer RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler);
#endregion
}
}

@ -179,45 +179,49 @@ namespace ZeroLevel.Network
#endregion Invokation
#region Message handlers registration
public void RegisterInbox(string inbox, MessageHandler handler)
public IServer RegisterInbox(string inbox, MessageHandler handler)
{
if (false == _handlers.ContainsKey(inbox))
{
_handlers.Add(inbox, new List<MRInvoker>());
}
_handlers[inbox].Add(MRInvoker.Create(handler));
return this;
}
public void RegisterInbox<T>(string inbox, MessageHandler<T> handler)
public IServer RegisterInbox<T>(string inbox, MessageHandler<T> handler)
{
if (false == _handlers.ContainsKey(inbox))
{
_handlers.Add(inbox, new List<MRInvoker>());
}
_handlers[inbox].Add(MRInvoker.Create<T>(handler));
return this;
}
public void RegisterInbox(MessageHandler handler)
public IServer RegisterInbox(MessageHandler handler)
{
if (false == _handlers.ContainsKey(BaseSocket.DEFAULT_MESSAGE_INBOX))
{
_handlers.Add(BaseSocket.DEFAULT_MESSAGE_INBOX, new List<MRInvoker>());
}
_handlers[BaseSocket.DEFAULT_MESSAGE_INBOX].Add(MRInvoker.Create(handler));
return this;
}
public void RegisterInbox<T>(MessageHandler<T> handler)
public IServer RegisterInbox<T>(MessageHandler<T> handler)
{
if (false == _handlers.ContainsKey(BaseSocket.DEFAULT_MESSAGE_INBOX))
{
_handlers.Add(BaseSocket.DEFAULT_MESSAGE_INBOX, new List<MRInvoker>());
}
_handlers[BaseSocket.DEFAULT_MESSAGE_INBOX].Add(MRInvoker.Create<T>(handler));
return this;
}
#endregion
#region Request handlers registration
public void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler)
public IServer RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler)
{
if (false == _requestors.ContainsKey(inbox))
{
@ -227,9 +231,10 @@ namespace ZeroLevel.Network
{
throw new Exception($"[SocketExchangeServer] Inbox {inbox} already exists");
}
return this;
}
public void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler)
public IServer RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler)
{
if (false == _requestors.ContainsKey(inbox))
{
@ -239,9 +244,10 @@ namespace ZeroLevel.Network
{
throw new Exception($"[SocketExchangeServer] Inbox {inbox} already exists");
}
return this;
}
public void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler)
public IServer RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler)
{
if (false == _requestors.ContainsKey(BaseSocket.DEFAULT_REQUEST_INBOX))
{
@ -251,9 +257,10 @@ namespace ZeroLevel.Network
{
throw new Exception($"[SocketExchangeServer] Inbox {BaseSocket.DEFAULT_REQUEST_INBOX} already exists");
}
return this;
}
public void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler)
public IServer RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler)
{
if (false == _requestors.ContainsKey(BaseSocket.DEFAULT_REQUEST_INBOX))
{
@ -263,6 +270,7 @@ namespace ZeroLevel.Network
{
throw new Exception($"[SocketExchangeServer] Inbox {BaseSocket.DEFAULT_REQUEST_INBOX} already exists");
}
return this;
}
#endregion
}
@ -272,13 +280,13 @@ namespace ZeroLevel.Network
{
public void HandleMessage(Frame frame, ISocketClient client) { }
public byte[] HandleRequest(Frame frame, ISocketClient client) { return null; }
public void RegisterInbox(string inbox, MessageHandler handler) { }
public void RegisterInbox<T>(string inbox, MessageHandler<T> handler) { }
public void RegisterInbox(MessageHandler handler) { }
public void RegisterInbox<T>(MessageHandler<T> handler) { }
public void RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler) { }
public void RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler) { }
public void RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler) { }
public void RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler) { }
public IServer RegisterInbox(string inbox, MessageHandler handler) { return this; }
public IServer RegisterInbox<T>(string inbox, MessageHandler<T> handler) { return this; }
public IServer RegisterInbox(MessageHandler handler) { return this; }
public IServer RegisterInbox<T>(MessageHandler<T> handler) { return this; }
public IServer RegisterInbox<Tresponse>(string inbox, RequestHandler<Tresponse> handler) { return this; }
public IServer RegisterInbox<Trequest, Tresponse>(string inbox, RequestHandler<Trequest, Tresponse> handler) { return this; }
public IServer RegisterInbox<Tresponse>(RequestHandler<Tresponse> handler) { return this; }
public IServer RegisterInbox<Trequest, Tresponse>(RequestHandler<Trequest, Tresponse> handler) { return this; }
}
}

Loading…
Cancel
Save

Powered by TurnKey Linux.