diff --git a/ZeroLevel/Services/Network/FileTransfer/FileReader.cs b/ZeroLevel/Services/Network/FileTransfer/FileReader.cs index 4c716dc..f484896 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileReader.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileReader.cs @@ -39,6 +39,7 @@ namespace ZeroLevel.Network.FileTransfer }; Array.Copy(buffer, 0, fragment.Payload, 0, bytesRead); var hash = Murmur3.ComputeHash(fragment.Payload); + fragment.ChecksumL = BitConverter.ToUInt64(hash, 0); fragment.ChecksumH = BitConverter.ToUInt64(hash, 8); diff --git a/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs b/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs index 91e0f9b..8ad27cf 100644 --- a/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs +++ b/ZeroLevel/Services/Network/FileTransfer/FileWriter.cs @@ -3,63 +3,19 @@ using System.Collections.Generic; using System.IO; using ZeroLevel.Models; using ZeroLevel.Services.HashFunctions; +using ZeroLevel.Services.Network.FileTransfer.Writers; namespace ZeroLevel.Network.FileTransfer { internal sealed class FileWriter { - private class _FileWriter - : IDisposable - { - private readonly FileStream _stream; - internal DateTime _writeTime { get; private set; } = DateTime.UtcNow; - private bool _gotCompleteMessage = false; - - public bool GotCompleteMessage() => _gotCompleteMessage = true; - - public bool ReadyToRemove() - { - if (_gotCompleteMessage) - { - return (DateTime.UtcNow - _writeTime).TotalSeconds > 15; - } - return false; - } - - public _FileWriter(string path) - { - _stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); - } - - public void Write(long offset, byte[] data) - { - _stream.Position = offset; - _stream.Write(data, 0, data.Length); - _writeTime = DateTime.Now; - } - - public bool IsTimeoutBy(TimeSpan period) - { - return (DateTime.Now - _writeTime) > period; - } - - public void Dispose() - { - _stream.Flush(); - _stream.Close(); - _stream.Dispose(); - } - } private string _basePath; - private string _disk_prefix; - - private readonly Dictionary _incoming = new Dictionary(); + private readonly Dictionary _incoming = new Dictionary(); private readonly object _locker = new object(); private long _cleanErrorsTaskId; - public FileWriter(string path, string disk_prefix = "DRIVE_") + public FileWriter(string path) { - _disk_prefix = disk_prefix; _basePath = path; _cleanErrorsTaskId = Sheduller.RemindEvery(TimeSpan.FromMinutes(1), CleanBadFiles); } @@ -70,7 +26,7 @@ namespace ZeroLevel.Network.FileTransfer { foreach (var pair in _incoming) { - if (pair.Value.IsTimeoutBy(TimeSpan.FromMinutes(3)) || pair.Value.ReadyToRemove()) + if (pair.Value.IsTimeoutBy(TimeSpan.FromMinutes(3))) { Remove(pair.Key); } @@ -89,7 +45,8 @@ namespace ZeroLevel.Network.FileTransfer if (false == _incoming.ContainsKey(info.UploadFileTaskId)) { string path = BuildFilePath(clientFolderName, info.FilePath); - _incoming.Add(info.UploadFileTaskId, new _FileWriter(path)); + _incoming.Add(info.UploadFileTaskId, new SafeDataWriter(new DiskFileWriter(path, info.Size) + , () => Remove(info.UploadFileTaskId))); } } } @@ -106,20 +63,17 @@ namespace ZeroLevel.Network.FileTransfer { try { - _FileWriter stream; - if (_incoming.TryGetValue(chunk.UploadFileTaskId, out stream)) + SafeDataWriter writer; + if (_incoming.TryGetValue(chunk.UploadFileTaskId, out writer)) { - var hash = Murmur3.ComputeHash(chunk.Payload); var checksumL = BitConverter.ToUInt64(hash, 0); var checksumH = BitConverter.ToUInt64(hash, 8); - if (chunk.ChecksumH != checksumH || chunk.ChecksumL != checksumL) return InvokeResult.Fault("Checksum incorrect"); - - stream.Write(chunk.Offset, chunk.Payload); + writer.Write(chunk); return InvokeResult.Succeeding(); } return InvokeResult.Fault("File not expected."); @@ -137,14 +91,10 @@ namespace ZeroLevel.Network.FileTransfer { lock (_locker) { - _FileWriter stream; - if (_incoming.TryGetValue(info.UploadFileTaskId, out stream) && stream != null) + SafeDataWriter writer; + if (_incoming.TryGetValue(info.UploadFileTaskId, out writer) && writer != null) { - using (stream) - { - stream.GotCompleteMessage(); - } - _incoming.Remove(info.UploadFileTaskId); + writer.CompleteReceiving(); } } } @@ -158,7 +108,7 @@ namespace ZeroLevel.Network.FileTransfer private void Remove(long uploadTaskId) { - _FileWriter stream; + SafeDataWriter stream; if (_incoming.TryGetValue(uploadTaskId, out stream)) { _incoming.Remove(uploadTaskId); diff --git a/ZeroLevel/Services/Network/FileTransfer/Writers/DiskFileWriter.cs b/ZeroLevel/Services/Network/FileTransfer/Writers/DiskFileWriter.cs new file mode 100644 index 0000000..f10944d --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/Writers/DiskFileWriter.cs @@ -0,0 +1,49 @@ +using System; +using System.IO; + +namespace ZeroLevel.Services.Network.FileTransfer.Writers +{ + internal sealed class DiskFileWriter + : IDataWriter + { + private readonly FileStream _stream; + private DateTime _writeTime = DateTime.UtcNow; + private readonly long _size; + private long _receive_size = 0; + + public DiskFileWriter(string path, long size) + { + _size = size; + _stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); + _stream.SetLength(_size); + } + + public void CompleteReceiving() + { + if (_receive_size != _size) + { + Log.Error("Incomplete file data"); + } + } + + public void Write(long offset, byte[] data) + { + _receive_size += data.Length; + _stream.Position = offset; + _stream.Write(data, 0, data.Length); + _writeTime = DateTime.Now; + } + + public bool IsTimeoutBy(TimeSpan period) + { + return (DateTime.Now - _writeTime) > period; + } + + public void Dispose() + { + _stream.Flush(); + _stream.Close(); + _stream.Dispose(); + } + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/Writers/IDataWriter.cs b/ZeroLevel/Services/Network/FileTransfer/Writers/IDataWriter.cs new file mode 100644 index 0000000..e7be10c --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/Writers/IDataWriter.cs @@ -0,0 +1,12 @@ +using System; + +namespace ZeroLevel.Services.Network.FileTransfer +{ + interface IDataWriter + : IDisposable + { + void Write(long offset, byte[] data); + bool IsTimeoutBy(TimeSpan period); + void CompleteReceiving(); + } +} diff --git a/ZeroLevel/Services/Network/FileTransfer/Writers/SafeDataWriter.cs b/ZeroLevel/Services/Network/FileTransfer/Writers/SafeDataWriter.cs new file mode 100644 index 0000000..2d3379f --- /dev/null +++ b/ZeroLevel/Services/Network/FileTransfer/Writers/SafeDataWriter.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using ZeroLevel.Network.FileTransfer; + +namespace ZeroLevel.Services.Network.FileTransfer.Writers +{ + internal sealed class SafeDataWriter + : IDisposable + { + private readonly IDataWriter _writer; + private readonly Action _complete; + private BlockingCollection _chunks = new + BlockingCollection(); + private volatile bool _disposed = false; + + public SafeDataWriter(IDataWriter writer, Action complete) + { + _writer = writer; + _complete = complete; + Task.Run(() => + { + try + { + FileFrame frame; + while (!_chunks.IsCompleted) + { + if (_chunks.TryTake(out frame, 200)) + { + writer.Write(frame.Offset, frame.Payload); + } + } + _writer.CompleteReceiving(); + _complete?.Invoke(); + } + catch (Exception ex) + { + Log.Error(ex, "[SafeDataWriter.ctor receive loop] Fault receive data"); + } + }); + } + + public void CompleteReceiving() + { + Sheduller.RemindAfter(TimeSpan.FromSeconds(1), _chunks.CompleteAdding); + } + + public void Dispose() + { + _disposed = true; + _chunks.Dispose(); + _writer.Dispose(); + } + + public bool IsTimeoutBy(TimeSpan period) => _writer.IsTimeoutBy(period); + + public void Write(FileFrame frame) + { + if (!_disposed) + { + _chunks.Add(frame); + } + } + } +}