Fix PeriodicFileSystemWatcher

pull/1/head
unknown 4 years ago
parent 4d0d77bd62
commit b2e78763e7

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic;
using System.Linq;
using System.Net; using System.Net;
using System.Threading;
using ZeroLevel; using ZeroLevel;
using ZeroLevel.Network; using ZeroLevel.Network;
using ZeroLevel.Services.HashFunctions; using ZeroLevel.Services.HashFunctions;
@ -65,7 +66,7 @@ namespace Client
var ex = Bootstrap.CreateExchange(); var ex = Bootstrap.CreateExchange();
var address = ReadIP(); var address = ReadIP();
var port = ReadPort(); var port = ReadPort();
var client = ex.GetConnection(new IPEndPoint(address, port)); ex.RoutesStorage.Set("server", new IPEndPoint(address, port));
uint index = 0; uint index = 0;
while (true) while (true)
@ -75,17 +76,17 @@ namespace Client
switch (Console.ReadKey().Key) switch (Console.ReadKey().Key)
{ {
case ConsoleKey.Escape: case ConsoleKey.Escape:
client?.Dispose(); ex?.Dispose();
return; return;
} }
} }
if (index % 2 == 0) if (index % 2 == 0)
{ {
SendDataEqParts(client, index, 1024 * 1024 + index * 3 + 1); SendDataEqParts(ex, index, 1024 + index * 3 + 1);
} }
else else
{ {
SendDataDiffParts(client, index, 1024 * 1024 + index * 3 + 1); SendDataDiffParts(ex.GetConnection("server"), index, 1024 + index * 3 + 1);
} }
index++; index++;
} }
@ -127,42 +128,26 @@ namespace Client
} }
} }
static void SendDataEqParts(IClient client, uint id, uint length) static void SendDataEqParts(IExchange exchange, uint id, uint length)
{ {
var payload = GetByteArray(length); var payload = GetByteArray(length);
var full_checksum = _hash.Hash(payload); var full_checksum = _hash.Hash(payload);
var info = new Info { Checksum = full_checksum, Id = id, Length = length }; var info = new Info { Checksum = full_checksum, Id = id, Length = length };
if (client.Request<Info, bool>("start", info, res => if (exchange.Request<Info, bool>("server", "start", info))
{
if (res)
{
Log.Info($"Success start sending packet '{id}'");
}
else
{
Log.Info($"Fault server start incoming packet '{id}'");
}
}))
{ {
Log.Info($"Success start sending packet '{id}'");
uint size = 4096; uint size = 4096;
uint offset = 0; uint offset = 0;
while (offset < payload.Length) while (offset < payload.Length)
{ {
var fragment = GetFragment(id, payload, offset, size); var fragment = GetFragment(id, payload, offset, size);
if (!client.Request<Fragment, bool>("part", fragment, res => if (!exchange.Request<Fragment, bool>("server", "part", fragment))
{ {
if (!res)
{
Log.Info($"Fault server incoming packet '{id}' fragment. Offset: '{offset}'. Size: '{size}' bytes."); Log.Info($"Fault server incoming packet '{id}' fragment. Offset: '{offset}'. Size: '{size}' bytes.");
}
}))
{
Log.Warning($"Can't start send packet '{id}' fragment. Offset: '{offset}'. Size: '{size}' bytes. No connection");
} }
offset += size; offset += size;
} }
client.Send<uint>("complete", id); exchange.Send<uint>("server", "complete", id);
} }
else else
{ {

@ -68,7 +68,7 @@ namespace Server
static void Main(string[] args) static void Main(string[] args)
{ {
Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.Warning); Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.FullDebug | ZeroLevel.Logging.LogLevel.System);
var ex = Bootstrap.CreateExchange(); var ex = Bootstrap.CreateExchange();
var port = ReadPort(); var port = ReadPort();
var server = ex.UseHost(port); var server = ex.UseHost(port);

@ -1,6 +1,8 @@
using System; using System;
using System.Collections.Generic;
using System.Globalization; using System.Globalization;
using System.IO; using System.IO;
using System.Linq;
namespace ZeroLevel.Services.FileSystem namespace ZeroLevel.Services.FileSystem
{ {
@ -12,6 +14,7 @@ namespace ZeroLevel.Services.FileSystem
private readonly string _temporaryFolder; private readonly string _temporaryFolder;
private readonly TimeSpan _period; private readonly TimeSpan _period;
private readonly Action<FileMeta> _callback; private readonly Action<FileMeta> _callback;
private readonly HashSet<string> _extensions;
public event Action<int> OnStartMovingFilesToTemporary = delegate { }; public event Action<int> OnStartMovingFilesToTemporary = delegate { };
public event Action OnMovingFileToTemporary = delegate { }; public event Action OnMovingFileToTemporary = delegate { };
@ -21,6 +24,7 @@ namespace ZeroLevel.Services.FileSystem
private readonly bool _useSubdirectories = false; private readonly bool _useSubdirectories = false;
public PeriodicFileSystemWatcher(TimeSpan period, string watch_folder, string temp_folder, Action<FileMeta> callback public PeriodicFileSystemWatcher(TimeSpan period, string watch_folder, string temp_folder, Action<FileMeta> callback
, IEnumerable<string> extensions = null
, bool removeTempFileAfterCallback = false , bool removeTempFileAfterCallback = false
, bool useSubdirectories = false) , bool useSubdirectories = false)
{ {
@ -32,13 +36,14 @@ namespace ZeroLevel.Services.FileSystem
{ {
throw new ArgumentNullException(nameof(callback)); throw new ArgumentNullException(nameof(callback));
} }
_extensions = new HashSet<string>(extensions?.Select(e => e.ToLowerInvariant()) ?? Enumerable.Empty<string>());
_useSubdirectories = useSubdirectories; _useSubdirectories = useSubdirectories;
_autoRemoveTempFileAfterCallback = removeTempFileAfterCallback; _autoRemoveTempFileAfterCallback = removeTempFileAfterCallback;
_callback = callback; _callback = callback;
_sourceFolder = watch_folder; _sourceFolder = watch_folder;
_temporaryFolder = temp_folder; _temporaryFolder = temp_folder;
_period = period; _period = period;
if (_temporaryFolder.IndexOf(':') < 0) if (Path.IsPathRooted(_temporaryFolder) == false)
{ {
_temporaryFolder = Path.Combine(Configuration.BaseDirectory, _temporaryFolder); _temporaryFolder = Path.Combine(Configuration.BaseDirectory, _temporaryFolder);
} }
@ -172,9 +177,19 @@ namespace ZeroLevel.Services.FileSystem
/// </summary> /// </summary>
private string[] GetFilesFromSource() private string[] GetFilesFromSource()
{ {
string[] files = Directory.GetFiles(_sourceFolder, "*.*", _useSubdirectories ? SearchOption.AllDirectories : SearchOption.TopDirectoryOnly); if (_extensions.Count > 0)
Array.Sort<string>(files, FileNameSortCompare); {
return files; string[] files = Directory.GetFiles(_sourceFolder, "*.*", _useSubdirectories ? SearchOption.AllDirectories : SearchOption.TopDirectoryOnly)
.Where(f => _extensions.Contains(Path.GetExtension(f).ToLowerInvariant())).ToArray();
Array.Sort<string>(files, FileNameSortCompare);
return files;
}
else
{
string[] files = Directory.GetFiles(_sourceFolder, "*.*", _useSubdirectories ? SearchOption.AllDirectories : SearchOption.TopDirectoryOnly);
Array.Sort<string>(files, FileNameSortCompare);
return files;
}
} }
/// <summary> /// <summary>

@ -75,6 +75,8 @@ namespace ZeroLevel.Network
try try
{ {
var client_socket = _serverSocket.EndAccept(ar); var client_socket = _serverSocket.EndAccept(ar);
var ep = client_socket.RemoteEndPoint as IPEndPoint;
Log.SystemInfo($"[ZSocketServer.BeginAcceptCallback] Incoming connection {ep.Address}:{ep.Port}");
_connection_set_lock.EnterWriteLock(); _connection_set_lock.EnterWriteLock();
var connection = new SocketClient(client_socket, _router); var connection = new SocketClient(client_socket, _router);
connection.OnDisconnect += Connection_OnDisconnect; connection.OnDisconnect += Connection_OnDisconnect;
@ -88,7 +90,7 @@ namespace ZeroLevel.Network
} }
finally finally
{ {
_connection_set_lock.ExitWriteLock(); _connection_set_lock.ExitWriteLock();
} }
try try
{ {

@ -6,16 +6,16 @@
</Description> </Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>3.3.4.4</AssemblyVersion> <AssemblyVersion>3.3.4.5</AssemblyVersion>
<PackageReleaseNotes>Additional options for PeriodicFileSystemWatcher</PackageReleaseNotes> <PackageReleaseNotes>Fix PeriodicFileSystemWatcher</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2020</Copyright> <Copyright>Copyright Ogoun 2020</Copyright>
<PackageLicenseUrl></PackageLicenseUrl> <PackageLicenseUrl></PackageLicenseUrl>
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>GitHub</RepositoryType> <RepositoryType>GitHub</RepositoryType>
<Version>3.3.4.4</Version> <Version>3.3.4.5</Version>
<FileVersion>3.3.4.4</FileVersion> <FileVersion>3.3.4.5</FileVersion>
<Platforms>AnyCPU;x64</Platforms> <Platforms>AnyCPU;x64</Platforms>
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
</PropertyGroup> </PropertyGroup>

Loading…
Cancel
Save

Powered by TurnKey Linux.