Fix file transfer

pull/1/head
a.bozhenov 5 years ago
parent aa1528c018
commit 9d4c5553e1

@ -39,6 +39,7 @@ namespace ZeroLevel.Network.FileTransfer
}; };
Array.Copy(buffer, 0, fragment.Payload, 0, bytesRead); Array.Copy(buffer, 0, fragment.Payload, 0, bytesRead);
var hash = Murmur3.ComputeHash(fragment.Payload); var hash = Murmur3.ComputeHash(fragment.Payload);
fragment.ChecksumL = BitConverter.ToUInt64(hash, 0); fragment.ChecksumL = BitConverter.ToUInt64(hash, 0);
fragment.ChecksumH = BitConverter.ToUInt64(hash, 8); fragment.ChecksumH = BitConverter.ToUInt64(hash, 8);

@ -3,63 +3,19 @@ using System.Collections.Generic;
using System.IO; using System.IO;
using ZeroLevel.Models; using ZeroLevel.Models;
using ZeroLevel.Services.HashFunctions; using ZeroLevel.Services.HashFunctions;
using ZeroLevel.Services.Network.FileTransfer.Writers;
namespace ZeroLevel.Network.FileTransfer namespace ZeroLevel.Network.FileTransfer
{ {
internal sealed class FileWriter internal sealed class FileWriter
{ {
private class _FileWriter
: IDisposable
{
private readonly FileStream _stream;
internal DateTime _writeTime { get; private set; } = DateTime.UtcNow;
private bool _gotCompleteMessage = false;
public bool GotCompleteMessage() => _gotCompleteMessage = true;
public bool ReadyToRemove()
{
if (_gotCompleteMessage)
{
return (DateTime.UtcNow - _writeTime).TotalSeconds > 15;
}
return false;
}
public _FileWriter(string path)
{
_stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None);
}
public void Write(long offset, byte[] data)
{
_stream.Position = offset;
_stream.Write(data, 0, data.Length);
_writeTime = DateTime.Now;
}
public bool IsTimeoutBy(TimeSpan period)
{
return (DateTime.Now - _writeTime) > period;
}
public void Dispose()
{
_stream.Flush();
_stream.Close();
_stream.Dispose();
}
}
private string _basePath; private string _basePath;
private string _disk_prefix; private readonly Dictionary<long, SafeDataWriter> _incoming = new Dictionary<long, SafeDataWriter>();
private readonly Dictionary<long, _FileWriter> _incoming = new Dictionary<long, _FileWriter>();
private readonly object _locker = new object(); private readonly object _locker = new object();
private long _cleanErrorsTaskId; private long _cleanErrorsTaskId;
public FileWriter(string path, string disk_prefix = "DRIVE_") public FileWriter(string path)
{ {
_disk_prefix = disk_prefix;
_basePath = path; _basePath = path;
_cleanErrorsTaskId = Sheduller.RemindEvery(TimeSpan.FromMinutes(1), CleanBadFiles); _cleanErrorsTaskId = Sheduller.RemindEvery(TimeSpan.FromMinutes(1), CleanBadFiles);
} }
@ -70,7 +26,7 @@ namespace ZeroLevel.Network.FileTransfer
{ {
foreach (var pair in _incoming) foreach (var pair in _incoming)
{ {
if (pair.Value.IsTimeoutBy(TimeSpan.FromMinutes(3)) || pair.Value.ReadyToRemove()) if (pair.Value.IsTimeoutBy(TimeSpan.FromMinutes(3)))
{ {
Remove(pair.Key); Remove(pair.Key);
} }
@ -89,7 +45,8 @@ namespace ZeroLevel.Network.FileTransfer
if (false == _incoming.ContainsKey(info.UploadFileTaskId)) if (false == _incoming.ContainsKey(info.UploadFileTaskId))
{ {
string path = BuildFilePath(clientFolderName, info.FilePath); string path = BuildFilePath(clientFolderName, info.FilePath);
_incoming.Add(info.UploadFileTaskId, new _FileWriter(path)); _incoming.Add(info.UploadFileTaskId, new SafeDataWriter(new DiskFileWriter(path, info.Size)
, () => Remove(info.UploadFileTaskId)));
} }
} }
} }
@ -106,20 +63,17 @@ namespace ZeroLevel.Network.FileTransfer
{ {
try try
{ {
_FileWriter stream; SafeDataWriter writer;
if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream)) if (_incoming.TryGetValue(chunk.UploadFileTaskId, out writer))
{ {
var hash = Murmur3.ComputeHash(chunk.Payload); var hash = Murmur3.ComputeHash(chunk.Payload);
var checksumL = BitConverter.ToUInt64(hash, 0); var checksumL = BitConverter.ToUInt64(hash, 0);
var checksumH = BitConverter.ToUInt64(hash, 8); var checksumH = BitConverter.ToUInt64(hash, 8);
if (chunk.ChecksumH != checksumH if (chunk.ChecksumH != checksumH
|| chunk.ChecksumL != checksumL) || chunk.ChecksumL != checksumL)
return InvokeResult.Fault("Checksum incorrect"); return InvokeResult.Fault("Checksum incorrect");
writer.Write(chunk);
stream.Write(chunk.Offset, chunk.Payload);
return InvokeResult.Succeeding(); return InvokeResult.Succeeding();
} }
return InvokeResult.Fault("File not expected."); return InvokeResult.Fault("File not expected.");
@ -137,14 +91,10 @@ namespace ZeroLevel.Network.FileTransfer
{ {
lock (_locker) lock (_locker)
{ {
_FileWriter stream; SafeDataWriter writer;
if (_incoming.TryGetValue(info.UploadFileTaskId, out stream) && stream != null) if (_incoming.TryGetValue(info.UploadFileTaskId, out writer) && writer != null)
{ {
using (stream) writer.CompleteReceiving();
{
stream.GotCompleteMessage();
}
_incoming.Remove(info.UploadFileTaskId);
} }
} }
} }
@ -158,7 +108,7 @@ namespace ZeroLevel.Network.FileTransfer
private void Remove(long uploadTaskId) private void Remove(long uploadTaskId)
{ {
_FileWriter stream; SafeDataWriter stream;
if (_incoming.TryGetValue(uploadTaskId, out stream)) if (_incoming.TryGetValue(uploadTaskId, out stream))
{ {
_incoming.Remove(uploadTaskId); _incoming.Remove(uploadTaskId);

@ -0,0 +1,49 @@
using System;
using System.IO;
namespace ZeroLevel.Services.Network.FileTransfer.Writers
{
internal sealed class DiskFileWriter
: IDataWriter
{
private readonly FileStream _stream;
private DateTime _writeTime = DateTime.UtcNow;
private readonly long _size;
private long _receive_size = 0;
public DiskFileWriter(string path, long size)
{
_size = size;
_stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None);
_stream.SetLength(_size);
}
public void CompleteReceiving()
{
if (_receive_size != _size)
{
Log.Error("Incomplete file data");
}
}
public void Write(long offset, byte[] data)
{
_receive_size += data.Length;
_stream.Position = offset;
_stream.Write(data, 0, data.Length);
_writeTime = DateTime.Now;
}
public bool IsTimeoutBy(TimeSpan period)
{
return (DateTime.Now - _writeTime) > period;
}
public void Dispose()
{
_stream.Flush();
_stream.Close();
_stream.Dispose();
}
}
}

@ -0,0 +1,12 @@
using System;
namespace ZeroLevel.Services.Network.FileTransfer
{
interface IDataWriter
: IDisposable
{
void Write(long offset, byte[] data);
bool IsTimeoutBy(TimeSpan period);
void CompleteReceiving();
}
}

@ -0,0 +1,65 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using ZeroLevel.Network.FileTransfer;
namespace ZeroLevel.Services.Network.FileTransfer.Writers
{
internal sealed class SafeDataWriter
: IDisposable
{
private readonly IDataWriter _writer;
private readonly Action _complete;
private BlockingCollection<FileFrame> _chunks = new
BlockingCollection<FileFrame>();
private volatile bool _disposed = false;
public SafeDataWriter(IDataWriter writer, Action complete)
{
_writer = writer;
_complete = complete;
Task.Run(() =>
{
try
{
FileFrame frame;
while (!_chunks.IsCompleted)
{
if (_chunks.TryTake(out frame, 200))
{
writer.Write(frame.Offset, frame.Payload);
}
}
_writer.CompleteReceiving();
_complete?.Invoke();
}
catch (Exception ex)
{
Log.Error(ex, "[SafeDataWriter.ctor receive loop] Fault receive data");
}
});
}
public void CompleteReceiving()
{
Sheduller.RemindAfter(TimeSpan.FromSeconds(1), _chunks.CompleteAdding);
}
public void Dispose()
{
_disposed = true;
_chunks.Dispose();
_writer.Dispose();
}
public bool IsTimeoutBy(TimeSpan period) => _writer.IsTimeoutBy(period);
public void Write(FileFrame frame)
{
if (!_disposed)
{
_chunks.Add(frame);
}
}
}
}
Loading…
Cancel
Save

Powered by TurnKey Linux.