using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.PartitionStorage.Interfaces; using ZeroLevel.Services.PartitionStorage.Partition; using ZeroLevel.Services.Serialization; namespace ZeroLevel.Services.PartitionStorage { internal sealed class StorePartitionBuilder : BasePartition, IStorePartitionBuilder { private readonly Func _storeMethod; private long _totalRecords = 0; public long TotalRecords { get { return _totalRecords; } } public StorePartitionBuilder(StoreOptions options, TMeta info, IStoreSerializer serializer, PhisicalFileAccessorCachee fileAccessorCachee) : base(options, info, serializer, fileAccessorCachee) { if (options == null) throw new ArgumentNullException(nameof(options)); if (options.ThreadSafeWriting) { _storeMethod = StoreDirectSafe; } else { _storeMethod = StoreDirect; } } #region IStorePartitionBuilder public void Store(TKey key, TInput value) { if (_storeMethod.Invoke(key, value)) { Interlocked.Increment(ref _totalRecords); } } public void CompleteAdding() { CloseWriteStreams(); } public void Compress() { var files = Directory.GetFiles(_catalog); if (files != null && files.Length > 0) { Parallel.ForEach(files, file => CompressFile(file)); } } public IEnumerable> Iterate() { TKey key; TInput val; var files = Directory.GetFiles(_catalog); if (files != null && files.Length > 0) { foreach (var file in files) { var accessor = GetViewAccessor(file, 0); if (accessor != null) { using (var reader = new MemoryStreamReader(accessor)) { while (reader.EOS == false) { if (Serializer.KeyDeserializer.Invoke(reader, out key) == false) break; if (Serializer.InputDeserializer.Invoke(reader, out val) == false) break; yield return new StorePartitionKeyValueSearchResult { Key = key, Value = val, Status = SearchResult.Success }; } } } } } } public void RebuildIndex() => RebuildIndexes(); #endregion #region Private methods private bool StoreDirect(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); if (TryGetWriteStream(groupKey, out var stream)) { Serializer.KeySerializer.Invoke(stream, key); Thread.MemoryBarrier(); Serializer.InputSerializer.Invoke(stream, value); return true; } else { Log.SystemError($"Fault create write stream for key '{groupKey}'"); } return false; } private bool StoreDirectSafe(TKey key, TInput value) { var groupKey = _options.GetFileName(key, _info); bool lockTaken = false; if (TryGetWriteStream(groupKey, out var stream)) { Monitor.Enter(stream, ref lockTaken); try { Serializer.KeySerializer.Invoke(stream, key); Thread.MemoryBarrier(); Serializer.InputSerializer.Invoke(stream, value); return true; } finally { if (lockTaken) { Monitor.Exit(stream); } } } else { Log.SystemError($"Fault create write stream for key '{groupKey}'"); } return false; } internal void CompressFile(string file) { TKey key; TInput input; PhisicalFileAccessorCachee.LockFile(file); try { var dict = new Dictionary>(); var accessor = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 1024 * 1024 * 32); if (accessor != null) { using (var reader = new MemoryStreamReader(accessor)) { while (reader.EOS == false) { if (Serializer.KeyDeserializer.Invoke(reader, out key) == false) { throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read key."); } if (key != null) { if (false == dict.ContainsKey(key)) { dict[key] = new HashSet(); } if (reader.EOS) { break; } if (Serializer.InputDeserializer.Invoke(reader, out input) == false) { throw new Exception($"[StorePartitionBuilder.CompressFile] Fault compress data in file '{file}'. Incorrect file structure. Fault read input value."); } dict[key].Add(input); } else { Log.SystemWarning($"[StorePartitionBuilder.CompressFile] Null-value key in file '{file}'"); } } } } var tempFile = FSUtils.GetAppLocalTemporaryFile(); using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))) { // sort for search acceleration foreach (var pair in dict.OrderBy(p => p.Key)) { var v = _options.MergeFunction(pair.Value); writer.SerializeCompatible(pair.Key); Thread.MemoryBarrier(); writer.SerializeCompatible(v); } } File.Delete(file); File.Move(tempFile, file, true); } finally { PhisicalFileAccessorCachee.UnlockFile(file); } } public override void Release() { } #endregion } }