File transfer

pull/1/head
unknown 6 years ago
parent 1857171d85
commit afa699aad5

@ -7,5 +7,6 @@ namespace ZeroLevel.Network
IPEndPoint Endpoint { get; } IPEndPoint Endpoint { get; }
void SendBackward(Frame frame); void SendBackward(Frame frame);
void SendBackward<T>(string inbox, T message);
} }
} }

@ -0,0 +1,78 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using ZeroLevel.Network;
using ZeroLevel.Services.Network.FileTransfer.Model;
using ZeroLevel.Services.Pools;
namespace ZeroLevel.Services.Network.FileTransfer
{
public abstract class BaseFileTransfer
{
private readonly FileReceiver _receiver;
internal FileReceiver Receiver => _receiver;
private ObjectPool<FileTransferTask> _taskPool = new ObjectPool<FileTransferTask>(() => new FileTransferTask(), 100);
private BlockingCollection<FileTransferTask> _tasks = new BlockingCollection<FileTransferTask>();
private readonly Thread _uploadFileThread;
/*private int _maxParallelFileTransfer;
private int _currentFileTransfers;*/
internal BaseFileTransfer(string baseFolder/*, int maxParallelFileTransfer = 6*/)
{
_receiver = new FileReceiver(baseFolder);
_uploadFileThread = new Thread(UploadFileProcessing);
_uploadFileThread.IsBackground = true;
_uploadFileThread.Start();
/*_maxParallelFileTransfer = maxParallelFileTransfer;
_currentFileTransfers = 0;*/
}
protected void PushTransferTask(string filePath, Action<string> completeHandler = null, Action<string, string> errorHandler = null, IZBackward client = null)
{
if (string.IsNullOrWhiteSpace(filePath))
{
throw new ArgumentNullException(nameof(filePath));
}
if (false == File.Exists(filePath))
{
throw new FileNotFoundException(filePath);
}
var task = _taskPool.Allocate();
task.CompletedHandler = completeHandler;
task.ErrorHandler = errorHandler;
task.FilePath = filePath;
task.Client = client;
_tasks.Add(task);
}
private void UploadFileProcessing()
{
while (true)
{
var task = _tasks.Take();
try
{
ExecuteSendFile(GetReaderFor(task.FilePath), task);
task.CompletedHandler?.Invoke(task.FilePath);
}
catch (Exception ex)
{
task.ErrorHandler?.Invoke(task.FilePath, ex.ToString());
}
finally
{
_taskPool.Free(task);
}
}
}
internal abstract void ExecuteSendFile(FileReader reader, FileTransferTask task);
private FileReader GetReaderFor(string filePath)
{
return new FileReader(filePath);
}
}
}

@ -0,0 +1,6 @@
using ZeroLevel.Network;
namespace ZeroLevel.Services.Network.FileTransfer
{
public delegate string ClientFolderNameMapper(IExClient client);
}

@ -0,0 +1,52 @@
using System;
using System.IO;
using ZeroLevel.Network;
using ZeroLevel.Services.Network.FileTransfer.Model;
namespace ZeroLevel.Services.Network.FileTransfer
{
public sealed class FileClient
: BaseFileTransfer, IFileClient
{
private readonly IExClient _client;
private readonly string _baseFolder;
private readonly ClientFolderNameMapper _nameMapper;
private readonly bool _disposeClient;
internal FileClient(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
: base(baseFolder)
{
_client = client ?? throw new Exception(nameof(client));
_baseFolder = baseFolder ?? throw new Exception(nameof(baseFolder));
_nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper));
_disposeClient = disposeClient;
}
public void Dispose()
{
if (_disposeClient)
{
_client?.Dispose();
}
}
public void Send(string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
{
PushTransferTask(fileName, completeHandler, errorHandler);
}
internal override void ExecuteSendFile(FileReader reader, FileTransferTask task)
{
Log.Info($"Start upload file {reader.Path}");
var startinfo = reader.GetStartInfo();
startinfo.FilePath = Path.GetFileName(startinfo.FilePath);
_client.Send("__upload_file_start", startinfo);
foreach (var chunk in reader.Read())
{
_client.Send("__upload_file_frame", chunk);
}
_client.Send("__upload_file_complete", reader.GetCompleteInfo());
Log.Info($"Stop upload file {reader.Path}");
}
}
}

@ -0,0 +1,24 @@
using ZeroLevel.Network;
using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Services.Network.FileTransfer
{
public static class FileClientFactory
{
public static IFileClient Create(string serverEndpoint, string baseFolder, ClientFolderNameMapper nameMapper = null)
{
return CreateFileServerClient(ExchangeTransportFactory.GetClient(serverEndpoint), baseFolder,
nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true);
}
public static IFileClient Create(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper = null)
{
return CreateFileServerClient(client, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false);
}
private static IFileClient CreateFileServerClient(IExClient client, string baseFolder, ClientFolderNameMapper nameMapper, bool disposeClient)
{
return new FileClient(client, baseFolder, nameMapper, disposeClient);
}
}
}

@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Services.Network.FileTransfer.Model;
namespace ZeroLevel.Services.Network.FileTransfer
{
internal sealed class FileReader
{
private readonly FileStartFrame _startInfo;
public string Path { get; }
private const int CHUNK_SIZE = 512 * 1024;
public FileReader(string path)
{
Path = path;
_startInfo = FileStartFrame.GetTransferFileInfo(path);
}
public FileStartFrame GetStartInfo()
{
return _startInfo;
}
public IEnumerable<FileFrame> Read()
{
long offset = 0;
using (FileStream stream = new FileStream(Path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
int bytesRead;
var buffer = new byte[CHUNK_SIZE];
while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0)
{
var fragment = new FileFrame
{
UploadTaskId = _startInfo.FileUploadTaskId,
Offset = offset * CHUNK_SIZE,
Payload = new byte[bytesRead]
};
Array.Copy(buffer, 0, fragment.Payload, 0, bytesRead);
offset = offset + 1;
yield return fragment;
}
}
GC.Collect();
}
public FileEndFrame GetCompleteInfo()
{
return new FileEndFrame { FileUploadTaskId = _startInfo.FileUploadTaskId };
}
}
}

@ -0,0 +1,132 @@
using System;
using System.Collections.Generic;
using System.IO;
using ZeroLevel.Services.Network.FileTransfer.Model;
namespace ZeroLevel.Services.Network.FileTransfer
{
public class FileReceiver
{
private class FileWriter
: IDisposable
{
private readonly FileStream _stream;
internal DateTime _writeTime { get; private set; } = DateTime.UtcNow;
public FileWriter(string path)
{
_stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None);
}
public void Write(long offset, byte[] data)
{
_stream.Position = offset;
_stream.Write(data, 0, data.Length);
_writeTime = DateTime.Now;
}
public bool IsTimeoutBy(TimeSpan period)
{
return (DateTime.Now - _writeTime) > period;
}
public void Dispose()
{
_stream.Flush();
_stream.Close();
_stream.Dispose();
}
}
private string _basePath;
private string _disk_prefix;
private readonly Dictionary<int, FileWriter> _incoming = new Dictionary<int, FileWriter>();
private readonly object _locker = new object();
private long _cleanErrorsTaskId;
public FileReceiver(string path, string disk_prefix = "DRIVE_")
{
_disk_prefix = disk_prefix;
_basePath = path;
_cleanErrorsTaskId = Sheduller.RemindEvery(TimeSpan.FromMinutes(1), CleanBadFiles);
}
private void CleanBadFiles()
{
lock (_locker)
{
foreach (var pair in _incoming)
{
if (pair.Value.IsTimeoutBy(TimeSpan.FromMinutes(3)))
{
Remove(pair.Key);
}
}
}
}
public void Incoming(FileStartFrame info, string clientFolderName)
{
if (false == _incoming.ContainsKey(info.FileUploadTaskId))
{
lock (_locker)
{
if (false == _incoming.ContainsKey(info.FileUploadTaskId))
{
string path = BuildFilePath(clientFolderName, info.FilePath);
_incoming.Add(info.FileUploadTaskId, new FileWriter(path));
}
}
}
}
public void Incoming(FileFrame chunk)
{
FileWriter stream;
if (_incoming.TryGetValue(chunk.UploadTaskId, out stream))
{
stream.Write(chunk.Offset, chunk.Payload);
}
}
public void Incoming(FileEndFrame info)
{
lock (_locker)
{
Remove(info.FileUploadTaskId);
}
}
private void Remove(int uploadTaskId)
{
FileWriter stream;
if (_incoming.TryGetValue(uploadTaskId, out stream))
{
_incoming.Remove(uploadTaskId);
stream?.Dispose();
}
}
private string BuildFilePath(string client_folder_name, string clientPath)
{
if (string.IsNullOrEmpty(client_folder_name))
{
if (false == Directory.Exists(_basePath))
{
Directory.CreateDirectory(_basePath);
}
return Path.Combine(_basePath, Path.GetFileName(clientPath));
}
else
{
string folder = Path.Combine(Path.Combine(_basePath, client_folder_name), Path.GetDirectoryName(clientPath).Replace(":", "_DRIVE"));
if (false == System.IO.Directory.Exists(folder))
{
System.IO.Directory.CreateDirectory(folder);
}
return Path.Combine(folder, Path.GetFileName(clientPath));
}
}
}
}

@ -0,0 +1,52 @@
using System;
using System.IO;
using ZeroLevel.Network;
using ZeroLevel.Services.Network.FileTransfer.Model;
namespace ZeroLevel.Services.Network.FileTransfer
{
public sealed class FileServer
: BaseFileTransfer, IFileServer
{
private readonly IExService _service;
private readonly string _baseFolder;
private readonly ServerFolderNameMapperDelegate _nameMapper;
private readonly bool _disposeService;
internal FileServer(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService)
: base(baseFolder)
{
_service = service ?? throw new Exception(nameof(service));
_baseFolder = baseFolder ?? throw new Exception(nameof(baseFolder));
_nameMapper = nameMapper ?? throw new Exception(nameof(nameMapper));
_disposeService = disposeService;
}
public void Dispose()
{
if (_disposeService)
{
_service?.Dispose();
}
}
public void Send(IZBackward client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null)
{
PushTransferTask(fileName, completeHandler, errorHandler, client);
}
internal override void ExecuteSendFile(FileReader reader, FileTransferTask task)
{
Log.Info($"Start upload file {reader.Path}");
var startinfo = reader.GetStartInfo();
startinfo.FilePath = Path.GetFileName(startinfo.FilePath);
task.Client.SendBackward("__upload_file_start", startinfo);
foreach (var chunk in reader.Read())
{
task.Client.SendBackward("__upload_file_frame", chunk);
}
task.Client.SendBackward("__upload_file_complete", reader.GetCompleteInfo());
Log.Info($"Stop upload file {reader.Path}");
}
}
}

@ -0,0 +1,23 @@
using ZeroLevel.Network;
using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Services.Network.FileTransfer
{
public static class FileServerFactory
{
public static IFileServer Create(int port, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null)
{
return CreateFileServer(ExchangeTransportFactory.GetServer(port), baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), true);
}
public static IFileServer Create(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper = null)
{
return CreateFileServer(service, baseFolder, nameMapper ?? (c => FSUtils.FileNameCorrection($"{c.Endpoint.Address}_{c.Endpoint.Port}")), false);
}
private static IFileServer CreateFileServer(IExService service, string baseFolder, ServerFolderNameMapperDelegate nameMapper, bool disposeService)
{
return new FileServer(service, baseFolder, nameMapper, disposeService);
}
}
}

@ -0,0 +1,10 @@
using System;
namespace ZeroLevel.Services.Network.FileTransfer
{
public interface IFileClient
: IDisposable
{
void Send(string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null);
}
}

@ -0,0 +1,11 @@
using System;
using ZeroLevel.Network;
namespace ZeroLevel.Services.Network.FileTransfer
{
public interface IFileServer
: IDisposable
{
void Send(IZBackward client, string fileName, Action<string> completeHandler = null, Action<string, string> errorHandler = null);
}
}

@ -0,0 +1,20 @@
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.Network.FileTransfer.Model
{
public sealed class FileEndFrame
: IBinarySerializable
{
public int FileUploadTaskId;
public void Serialize(IBinaryWriter writer)
{
writer.WriteInt32(this.FileUploadTaskId);
}
public void Deserialize(IBinaryReader reader)
{
this.FileUploadTaskId = reader.ReadInt32();
}
}
}

@ -0,0 +1,26 @@
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.Network.FileTransfer.Model
{
public sealed class FileFrame :
IBinarySerializable
{
public int UploadTaskId { get; set; }
public long Offset { get; set; }
public byte[] Payload { get; set; }
public void Serialize(IBinaryWriter writer)
{
writer.WriteInt32(this.UploadTaskId);
writer.WriteLong(this.Offset);
writer.WriteBytes(this.Payload);
}
public void Deserialize(IBinaryReader reader)
{
this.UploadTaskId = reader.ReadInt32();
this.Offset = reader.ReadLong();
this.Payload = reader.ReadBytes();
}
}
}

@ -0,0 +1,40 @@
using System.Threading;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.Network.FileTransfer.Model
{
public sealed class FileStartFrame
: IBinarySerializable
{
private static int _uploadTaskIdCounter = 0;
public int FileUploadTaskId;
public string FilePath;
public long Size;
public void Serialize(IBinaryWriter writer)
{
writer.WriteInt32(this.FileUploadTaskId);
writer.WriteString(this.FilePath);
writer.WriteLong(this.Size);
}
public void Deserialize(IBinaryReader reader)
{
this.FileUploadTaskId = reader.ReadInt32();
this.FilePath = reader.ReadString();
this.Size = reader.ReadLong();
}
public static FileStartFrame GetTransferFileInfo(string path)
{
var fi = new System.IO.FileInfo(path);
return new FileStartFrame
{
FilePath = fi.FullName,
FileUploadTaskId = Interlocked.Increment(ref _uploadTaskIdCounter),
Size = fi.Length
};
}
}
}

@ -0,0 +1,13 @@
using System;
using ZeroLevel.Network;
namespace ZeroLevel.Services.Network.FileTransfer.Model
{
internal class FileTransferTask
{
public string FilePath;
public Action<string> CompletedHandler;
public Action<string, string> ErrorHandler;
public IZBackward Client;
}
}

@ -0,0 +1,6 @@
using ZeroLevel.Network;
namespace ZeroLevel.Services.Network.FileTransfer
{
public delegate string ServerFolderNameMapperDelegate(IZBackward connection);
}

@ -105,5 +105,10 @@ namespace ZeroLevel.Network
{ {
_fe.Send(frame); _fe.Send(frame);
} }
public void SendBackward<T>(string inbox, T obj)
{
Send(inbox, obj);
}
} }
} }

@ -205,5 +205,26 @@ namespace ZeroLevel.Network
if (other == null) return false; if (other == null) return false;
return this.Endpoint.Compare(other.Endpoint) == 0; return this.Endpoint.Compare(other.Endpoint) == 0;
} }
public void SendBackward<T>(string inbox, T message)
{
var frame = FrameBuilder.BuildFrame<T>(message, inbox);
if (Status == ZTransportStatus.Working && false == _send_queue.IsCompleted && false == _send_queue.IsAddingCompleted)
{
var data = MessageSerializer.Serialize(frame);
try
{
_send_queue.Add(NetworkStreamFastObfuscator.PrepareData(data));
}
catch (ObjectDisposedException)
{
// Ignore
}
finally
{
frame?.Release();
}
}
}
} }
} }

@ -5,15 +5,15 @@
<Description>Infrastructure layer library</Description> <Description>Infrastructure layer library</Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>2.0.4.0</AssemblyVersion> <AssemblyVersion>2.0.6.0</AssemblyVersion>
<PackageReleaseNotes>IDXReader, SoftMax function</PackageReleaseNotes> <PackageReleaseNotes>File client/server</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2019</Copyright> <Copyright>Copyright Ogoun 2019</Copyright>
<PackageLicenseUrl>https://opensource.org/licenses/MIT</PackageLicenseUrl> <PackageLicenseUrl>https://opensource.org/licenses/MIT</PackageLicenseUrl>
<PackageIconUrl>https://raw.githubusercontent.com/ogoun/Zero/master/zero.png</PackageIconUrl> <PackageIconUrl>https://raw.githubusercontent.com/ogoun/Zero/master/zero.png</PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>GitHub</RepositoryType> <RepositoryType>GitHub</RepositoryType>
<Version>2.0.5</Version> <Version>2.0.6</Version>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

Loading…
Cancel
Save

Powered by TurnKey Linux.