diff --git a/PartitionFileStorageTest/Program.cs b/PartitionFileStorageTest/Program.cs index 6ccf115..8f5fe00 100644 --- a/PartitionFileStorageTest/Program.cs +++ b/PartitionFileStorageTest/Program.cs @@ -4,109 +4,144 @@ using ZeroLevel.Services.PartitionStorage; namespace PartitionFileStorageTest { - public class CallRecordParser + internal class Program { - private static HashSet _partsOfNumbers = new HashSet { '*', '#', '+', '(', ')', '-' }; - private StringBuilder sb = new StringBuilder(); - private const string NO_VAL = null; + private class Metadata + { + public DateTime Date { get; set; } + } - private string ReadNumber(string line) + private static ulong Generate(Random r) { - sb.Clear(); - var started = false; - foreach (var ch in line) + var num = new StringBuilder(); + num.Append("79"); + num.Append(r.Next(99).ToString("D2")); + num.Append(r.Next(999).ToString("D7")); + return ulong.Parse(num.ToString()); + } + + private static void BuildStore(string root) + { + var options = new StoreOptions { - if (char.IsDigit(ch)) + Index = new IndexOptions { Enabled = true, FileIndexCount = 64 }, + RootFolder = root, + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), + MergeFunction = list => { - if (started) - { - sb.Append(ch); - } - else if (ch != '0') - { - sb.Append(ch); - started = true; - } + ulong s = 0; + return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); + }, + Partitions = new List> + { + new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) + }, + KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, + }; + var store = new Store(options); + var storePart1 = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); + var storePart2 = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 09) }); + + var sw = new Stopwatch(); + sw.Start(); + + var r = new Random(Environment.TickCount); + for (int i = 0; i < 1000000; i++) + { + var s = Generate(r); + var count = r.Next(300); + for (int j = 0; j < count; j++) + { + var t = Generate(r); + storePart1.Store(s, t); } - else if (char.IsWhiteSpace(ch) || _partsOfNumbers.Contains(ch)) continue; - else return NO_VAL; } - if (sb.Length == 11 && sb[0] == '8') sb[0] = '7'; - if (sb.Length == 3 || sb.Length == 4 || sb.Length > 10) - return sb.ToString(); - return NO_VAL; - } - private HashSet ReadNumbers(string line) - { - var result = new HashSet(); - if (string.IsNullOrWhiteSpace(line) == false) + for (int i = 0; i < 1000000; i++) { - char STX = (char)0x0002; - var values = line.Split(STX); - if (values.Length > 0) + var s = Generate(r); + var count = r.Next(300); + for (int j = 0; j < count; j++) { - foreach (var val in values) - { - var number = ReadNumber(val); - if (number != null) - { - result.Add(number); - } - } + var t = Generate(r); + storePart2.Store(s, t); } } - return result; - } - /// - /// Парсинг строки исходного файла - /// - /// - /// - public CallRecord Parse(string line) - { - var parts = line.Split('\t'); - if (parts.Length != 2) return null; - - var msisdn = ReadNumber(parts[0].Trim()); + sw.Stop(); + Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms"); + sw.Restart(); + storePart1.CompleteAddingAndCompress(); + storePart2.CompleteAddingAndCompress(); + sw.Stop(); + Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms"); + sw.Restart(); + storePart1.RebuildIndex(); + storePart2.RebuildIndex(); + sw.Stop(); + Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); + } - if (string.IsNullOrWhiteSpace(msisdn) == false) + private static void SmallFullTest(string root) + { + var r = new Random(Environment.TickCount); + var options = new StoreOptions { - var numbers = ReadNumbers(parts[1]); - if (numbers != null && numbers.Count > 0) + Index = new IndexOptions { Enabled = true, FileIndexCount = 64 }, + RootFolder = root, + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), + MergeFunction = list => { - return new CallRecord - { - Msisdn = msisdn, - Msisdns = numbers - }; - } - } - return null; - } - } + ulong s = 0; + return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); + }, + Partitions = new List> + { + new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) + }, + KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, + }; + var store = new Store(options); + var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); - public class CallRecord - { - public string Msisdn; - public HashSet Msisdns; - } + Console.WriteLine("Small test start"); + var c1 = (ulong)(86438 * 128); + var c2 = (ulong)(83438 * 128); + var c3 = (ulong)(831238 * 128); - internal class Program - { - private class Metadata - { - public DateTime Date { get; set; } - public bool Incoming { get; set; } + storePart.Store(c1, Generate(r)); + storePart.Store(c1, Generate(r)); + storePart.Store(c1, Generate(r)); + storePart.Store(c2, Generate(r)); + storePart.Store(c2, Generate(r)); + storePart.Store(c2, Generate(r)); + storePart.Store(c2, Generate(r)); + storePart.Store(c2, Generate(r)); + storePart.Store(c3, Generate(r)); + storePart.Store(c3, Generate(r)); + storePart.Store(c3, Generate(r)); + storePart.CompleteAddingAndCompress(); + var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); + Console.WriteLine("Data:"); + foreach (var e in readPart.Iterate()) + { + Console.WriteLine($"{e.Key}: {e.Value.Length}"); + } + readPart.RemoveKey(c1); + Console.WriteLine("Data after remove:"); + foreach (var e in readPart.Iterate()) + { + Console.WriteLine($"{e.Key}: {e.Value.Length}"); + } } - private static void BuildStore(string source, string root) + private static void TestBuildRemoveStore(string root) { + var r = new Random(Environment.TickCount); var options = new StoreOptions { - Index = new IndexOptions { Enabled = true, FileIndexCount = 256 }, + Index = new IndexOptions { Enabled = true, FileIndexCount = 64 }, RootFolder = root, - FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 512).ToString()), + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 128).ToString()), MergeFunction = list => { ulong s = 0; @@ -114,55 +149,127 @@ namespace PartitionFileStorageTest }, Partitions = new List> { - new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")), - new StoreCatalogPartition("Date", m => m.Incoming ? "incoming" : "outcoming") + new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) }, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, }; var store = new Store(options); + var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) }); - - var storeIncoming = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }); - var storeOutcoming = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }); - var parser = new CallRecordParser(); var sw = new Stopwatch(); sw.Start(); - using (FileStream fs = File.Open(source, FileMode.Open, FileAccess.Read, FileShare.None)) + + var testKeys1 = new List(); + var testKeys2 = new List(); + + for (int i = 0; i < 1000000; i++) { - using (BufferedStream bs = new BufferedStream(fs, 1024 * 1024 * 64)) + var s = Generate(r); + var count = r.Next(300); + for (int j = 0; j < count; j++) { - using (StreamReader sr = new StreamReader(bs)) - { - string line; - while ((line = sr.ReadLine()) != null) - { - var record = parser.Parse(line); - if (record == null) continue; - if (!string.IsNullOrWhiteSpace(record?.Msisdn ?? string.Empty) && ulong.TryParse(record.Msisdn, out var n)) - { - var ctns = record.Msisdns.ParseMsisdns().ToArray(); - foreach (var ctn in ctns) - { - storeIncoming.Store(n, ctn); - storeOutcoming.Store(ctn, n); - } - } - } - } + var t = Generate(r); + storePart.Store(s, t); + } + if (s % 11217 == 0) + { + testKeys1.Add(s); + } + if (s % 11219 == 0) + { + testKeys2.Add(s); } } + sw.Stop(); Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms"); sw.Restart(); - storeIncoming.CompleteAddingAndCompress(); - storeOutcoming.CompleteAddingAndCompress(); + storePart.CompleteAddingAndCompress(); sw.Stop(); - Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms"); + Console.WriteLine($"Compress: {sw.ElapsedMilliseconds}ms"); sw.Restart(); - storeIncoming.RebuildIndex(); - storeOutcoming.RebuildIndex(); + storePart.RebuildIndex(); sw.Stop(); Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); + + Console.WriteLine("Start merge test"); + sw.Restart(); + var merger = store.CreateMergeAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }, data => Compressor.DecodeBytesContent(data)); + for (int i = 0; i < 1000000; i++) + { + var s = Generate(r); + var count = r.Next(300); + for (int j = 0; j < count; j++) + { + var t = Generate(r); + merger.Store(s, t); + } + } + Console.WriteLine($"Merge journal filled: {sw.ElapsedMilliseconds}ms"); + merger.CompleteAddingAndCompress(); + sw.Stop(); + Console.WriteLine($"Compress after merge: {sw.ElapsedMilliseconds}ms"); + sw.Restart(); + merger.RebuildIndex(); + sw.Stop(); + Console.WriteLine($"Rebuild indexes after merge: {sw.ElapsedMilliseconds}ms"); + + + Console.WriteLine("Test #1 reading"); + var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); + foreach (var key in testKeys1) + { + Console.WriteLine($"\tKey: {key}"); + var result = readPart.Find(key); + Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes"); + } + Console.WriteLine("Press to continue"); + Console.ReadKey(); + Console.WriteLine("Test #1 remove by keys"); + for (int i = 0; i < testKeys1.Count; i++) + { + if (i % 100 == 0) + { + readPart.RemoveKey(testKeys1[i]); + } + } + Console.WriteLine("Test #1 reading after remove"); + foreach (var key in testKeys1) + { + Console.WriteLine($"\tKey: {key}"); + var result = readPart.Find(key); + Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes"); + } + Console.WriteLine("Press to continue"); + Console.ReadKey(); + Console.WriteLine(); + Console.WriteLine("---------------------------------------"); + Console.WriteLine(); + Console.WriteLine("Test #2 reading"); + foreach (var key in testKeys2) + { + Console.WriteLine($"\tKey: {key}"); + var result = readPart.Find(key); + Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes"); + } + Console.WriteLine("Press to continue"); + Console.ReadKey(); + Console.WriteLine("Test #2 remove keys batch"); + readPart.RemoveKeys(testKeys2); + Console.WriteLine("Test #2 reading after remove"); + foreach (var key in testKeys2) + { + Console.WriteLine($"\tKey: {key}"); + var result = readPart.Find(key); + Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes"); + } + + Console.WriteLine("Press to continue for iteration"); + Console.ReadKey(); + foreach (var e in readPart.Iterate()) + { + Console.WriteLine($"{e.Key}: {e.Value.Length}"); + } } private static void TestReading(string root) @@ -179,8 +286,7 @@ namespace PartitionFileStorageTest }, Partitions = new List> { - new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")), - new StoreCatalogPartition("Date", m => m.Incoming ? "incoming" : "outcoming") + new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) }, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, }; @@ -191,26 +297,25 @@ namespace PartitionFileStorageTest { new PartitionSearchRequest { - Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }, + Info = new Metadata { Date = new DateTime(2022, 11, 08) }, Keys = new ulong[] { } }, new PartitionSearchRequest { - Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }, + Info = new Metadata { Date = new DateTime(2022, 11, 09) }, Keys = new ulong[] { } } } }; - var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }); + var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); Console.WriteLine($"Incoming data files: {storeIncoming.CountDataFiles()}"); - var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }); + var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 09) }); Console.WriteLine($"Outcoming data files: {storeOutcoming.CountDataFiles()}"); var sw = new Stopwatch(); sw.Start(); var result = store.Search(request).Result; foreach (var r in result.Results) { - Console.WriteLine($"Incoming: {r.Key.Incoming}"); foreach (var mr in r.Value) { Console.WriteLine($"\tKey: {mr.Key}. Sucess: {mr.Found}"); @@ -225,29 +330,104 @@ namespace PartitionFileStorageTest Console.WriteLine($"Search time: {sw.ElapsedMilliseconds}ms"); } - private struct KeyIndex + private static void TestIterations(string root) { - public TKey Key { get; set; } - public long Offset { get; set; } - } - - static KeyIndex[] Generate(int count) - { - var arr = new KeyIndex[count]; - for (int i = 0; i < count; i++) + var options = new StoreOptions { - arr[i] = new KeyIndex { Key = i * 3, Offset = i * 17 }; + Index = new IndexOptions { Enabled = true, FileIndexCount = 256 }, + RootFolder = root, + FilePartition = new StoreFilePartition("Last three digits", (ctn, date) => (ctn % 512).ToString()), + MergeFunction = list => + { + ulong s = 0; + return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s); + }, + Partitions = new List> + { + new StoreCatalogPartition("Date", m => m.Date.ToString("yyyyMMdd")) + }, + KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, + }; + var store = new Store(options); + var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }); + foreach (var r in storeIncoming.Iterate()) + { + Console.WriteLine($"{r.Key}: {r.Value.Length}"); } - return arr; } static void Main(string[] args) { var root = @"H:\temp"; - var source = @"H:\319a9c31-d823-4dd1-89b0-7fb1bb9c4859.txt"; - //BuildStore(source, root); - TestReading(root); + SmallFullTest(root); + //TestBuildRemoveStore(root); + //BuildStore(root); + //TestReading(root); + //TestIterations(root); + //TestRangeCompressionAndInversion(); Console.ReadKey(); } + + private static void TestRangeCompressionAndInversion() + { + var list = new List(); + list.Add(new FilePositionRange { Start = 1, End = 36 }); + list.Add(new FilePositionRange { Start = 36, End = 63 }); + list.Add(new FilePositionRange { Start = 63, End = 89 }); + list.Add(new FilePositionRange { Start = 93, End = 118 }); + list.Add(new FilePositionRange { Start = 126, End = 199 }); + list.Add(new FilePositionRange { Start = 199, End = 216 }); + list.Add(new FilePositionRange { Start = 277, End = 500 }); + RangeCompression(list); + foreach (var r in list) + { + Console.WriteLine($"{r.Start}: {r.End}"); + } + Console.WriteLine("Invert ranges"); + var inverted = RangeInversion(list, 500); + foreach (var r in inverted) + { + Console.WriteLine($"{r.Start}: {r.End}"); + } + } + + private static void RangeCompression(List ranges) + { + for (var i = 0; i < ranges.Count - 1; i++) + { + var current = ranges[i]; + var next = ranges[i + 1]; + if (current.End == next.Start) + { + current.End = next.End; + ranges.RemoveAt(i + 1); + i--; + } + } + } + + private static List RangeInversion(List ranges, long length) + { + if ((ranges?.Count ?? 0) == 0) return new List { new FilePositionRange { Start = 0, End = length } }; + var inverted = new List(); + var current = new FilePositionRange { Start = 0, End = ranges[0].Start }; + for (var i = 0; i < ranges.Count; i++) + { + current.End = ranges[i].Start; + if (current.Start != current.End) + { + inverted.Add(new FilePositionRange { Start = current.Start, End = current.End }); + } + current.Start = ranges[i].End; + } + if (current.End != length) + { + if (current.Start != length) + { + inverted.Add(new FilePositionRange { Start = current.Start, End = length }); + } + } + return inverted; + } } } \ No newline at end of file diff --git a/ZeroLevel/Services/Collections/BatchProcessor.cs b/ZeroLevel/Services/Collections/BatchProcessor.cs new file mode 100644 index 0000000..246ef97 --- /dev/null +++ b/ZeroLevel/Services/Collections/BatchProcessor.cs @@ -0,0 +1,52 @@ +using System; +using System.Collections.Generic; + +namespace ZeroLevel.Services.Collections +{ + public sealed class BatchProcessor + : IDisposable + { + private readonly List _batch; + private readonly int _batchSize; + private readonly Action> _insertAction; + public BatchProcessor(int batchSize, Action> insertAction) + { + _batch = new List(batchSize); + _insertAction = insertAction; + _batchSize = batchSize; + } + + public void Add(T val) + { + _batch.Add(val); + if (_batch.Count >= _batchSize) + { + try + { + _insertAction.Invoke(_batch); + } + catch (Exception ex) + { + Log.Error(ex, $"[BatchProcessor.Add] Fault insert"); + } + _batch.Clear(); + } + } + + public void Dispose() + { + if (_batch.Count > 0) + { + try + { + _insertAction.Invoke(_batch); + } + catch (Exception ex) + { + Log.Error(ex, $"[BatchProcessor.Dispose] Fault insert"); + } + } + _batch.Clear(); + } + } +} diff --git a/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs b/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs new file mode 100644 index 0000000..77e6493 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/FilePositionRange.cs @@ -0,0 +1,8 @@ +namespace ZeroLevel.Services.PartitionStorage +{ + public class FilePositionRange + { + public long Start; + public long End; + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs index 884edc8..cdcb6d9 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStore.cs @@ -20,5 +20,7 @@ namespace ZeroLevel.Services.PartitionStorage IStorePartitionAccessor CreateAccessor(TMeta info); Task> Search(StoreSearchRequest searchRequest); + + void RemovePartition(TMeta info); } } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreCache.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreCache.cs new file mode 100644 index 0000000..4160164 --- /dev/null +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStoreCache.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ZeroLevel.Services.PartitionStorage.Interfaces +{ + public interface IStoreCache + { + } +} diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs index 3a19b76..f32b4ec 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionAccessor.cs @@ -25,5 +25,10 @@ namespace ZeroLevel.Services.PartitionStorage IEnumerable> Find(IEnumerable keys); IEnumerable> Iterate(); IEnumerable> IterateKeyBacket(TKey key); + + void RemoveKey(TKey key); + void RemoveKeys(IEnumerable keys); + void RemoveAllExceptKey(TKey key); + void RemoveAllExceptKeys(IEnumerable keys); } } diff --git a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs index 26d6379..1836227 100644 --- a/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Interfaces/IStorePartitionBuilder.cs @@ -1,5 +1,13 @@ -namespace ZeroLevel.Services.PartitionStorage +using System.Collections.Generic; + +namespace ZeroLevel.Services.PartitionStorage { + public class InsertValue + { + public TKey Key; + public TInput Value; + } + /// /// Provides write operations in catalog partition /// diff --git a/ZeroLevel/Services/PartitionStorage/Options/CacheOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/CacheOptions.cs deleted file mode 100644 index 6e22ec8..0000000 --- a/ZeroLevel/Services/PartitionStorage/Options/CacheOptions.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace ZeroLevel.Services.PartitionStorage -{ - public class CacheOptions - { - public bool UsePersistentCache { get; set; } - - public string PersistentCacheFolder { get; set; } = "cachee"; - - public int PersistentCacheRemoveTimeoutInSeconds { get; set; } = 3600; - - public bool UseMemoryCache { get; set; } - - public int MemoryCacheLimitInMb { get; set; } = 1024; - - public int MemoryCacheRemoveTimeoutInSeconds { get; set; } = 900; - } -} diff --git a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs index f97104a..9311230 100644 --- a/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs +++ b/ZeroLevel/Services/PartitionStorage/Options/StoreOptions.cs @@ -47,12 +47,6 @@ namespace ZeroLevel.Services.PartitionStorage FileIndexCount = 64 }; - public CacheOptions Cache { get; set; } = new CacheOptions - { - UseMemoryCache = false, - UsePersistentCache = false - }; - internal string GetFileName(TKey key, TMeta info) { return FilePartition.PathExtractor(key, info); @@ -89,16 +83,7 @@ namespace ZeroLevel.Services.PartitionStorage Partitions = this.Partitions .Select(p => new StoreCatalogPartition(p.Name, p.PathExtractor)) .ToList(), - RootFolder = this.RootFolder, - Cache = new CacheOptions - { - MemoryCacheLimitInMb = this.Cache.MemoryCacheLimitInMb, - MemoryCacheRemoveTimeoutInSeconds = this.Cache.MemoryCacheRemoveTimeoutInSeconds, - PersistentCacheFolder = this.Cache.PersistentCacheFolder, - PersistentCacheRemoveTimeoutInSeconds = this.Cache.PersistentCacheRemoveTimeoutInSeconds, - UseMemoryCache = this.Cache.UseMemoryCache, - UsePersistentCache = this.Cache.UsePersistentCache - } + RootFolder = this.RootFolder }; return options; } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs index fba4650..f262e78 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StoreMergePartitionAccessor.cs @@ -20,6 +20,9 @@ namespace ZeroLevel.Services.PartitionStorage /// Exists compressed catalog /// private readonly IStorePartitionAccessor _accessor; + + private readonly string _temporaryFolder; + /// /// Write catalog /// @@ -30,9 +33,9 @@ namespace ZeroLevel.Services.PartitionStorage if (decompress == null) throw new ArgumentNullException(nameof(decompress)); _decompress = decompress; _accessor = new StorePartitionAccessor(options, info); - var tempCatalog = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); + _temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); var tempOptions = options.Clone(); - tempOptions.RootFolder = tempCatalog; + tempOptions.RootFolder = _temporaryFolder; _temporaryAccessor = new StorePartitionBuilder(tempOptions, info); } @@ -64,7 +67,7 @@ namespace ZeroLevel.Services.PartitionStorage { var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath()); - if (newFiles != null && newFiles.Length > 1) + if (newFiles != null && newFiles.Length > 0) { var folder = _accessor.GetCatalogPath(); var existsFiles = Directory.GetFiles(folder) @@ -85,17 +88,27 @@ namespace ZeroLevel.Services.PartitionStorage } } } - // compress new file + } + + (_temporaryAccessor as StorePartitionBuilder).CloseStreams(); + + // compress new file + foreach (var file in newFiles) + { (_temporaryAccessor as StorePartitionBuilder) .CompressFile(file); - - // replace old file by new + } + + // replace old file by new + foreach (var file in newFiles) + { + var name = Path.GetFileName(file); File.Move(file, Path.Combine(folder, name), true); } } // remove temporary files _temporaryAccessor.DropData(); - Directory.Delete(_temporaryAccessor.GetCatalogPath(), true); + Directory.Delete(_temporaryFolder, true); } /// diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs index e6da0b8..e4f3c7d 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionAccessor.cs @@ -87,7 +87,7 @@ namespace ZeroLevel.Services.PartitionStorage public IEnumerable> Iterate() { var files = Directory.GetFiles(_catalog); - if (files != null && files.Length > 1) + if (files != null && files.Length > 0) { foreach (var file in files) { @@ -126,7 +126,7 @@ namespace ZeroLevel.Services.PartitionStorage var indexFolder = Path.Combine(_catalog, "__indexes__"); FSUtils.CleanAndTestFolder(indexFolder); var files = Directory.GetFiles(_catalog); - if (files != null && files.Length > 1) + if (files != null && files.Length > 0) { var dict = new Dictionary(); foreach (var file in files) @@ -240,7 +240,7 @@ namespace ZeroLevel.Services.PartitionStorage } } } - + private MemoryStreamReader GetReadStream(string fileName) { var filePath = Path.Combine(_catalog, fileName); @@ -251,5 +251,192 @@ namespace ZeroLevel.Services.PartitionStorage public void Dispose() { } + + public void RemoveAllExceptKey(TKey key) + { + RemoveAllExceptKeys(new[] { key }); + } + + public void RemoveAllExceptKeys(IEnumerable keys) + { + var results = keys + .GroupBy( + k => _options.GetFileName(k, _info), + k => k, (key, g) => new { FileName = key, Keys = g.ToArray() }); + foreach (var group in results) + { + RemoveKeyGroup(group.FileName, group.Keys, false); + } + } + + public void RemoveKey(TKey key) + { + RemoveKeys(new[] { key }); + } + + public void RemoveKeys(IEnumerable keys) + { + var results = keys + .GroupBy( + k => _options.GetFileName(k, _info), + k => k, (key, g) => new { FileName = key, Keys = g.ToArray() }); + foreach (var group in results) + { + RemoveKeyGroup(group.FileName, group.Keys, true); + } + } + + private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove) + { + var filePath = Path.Combine(_catalog, fileName); + if (File.Exists(filePath)) + { + // 1. Find ranges + var ranges = new List(); + if (_options.Index.Enabled) + { + var index = new StorePartitionSparseIndex(_catalog, _info, _options.FilePartition, _options.KeyComparer); + var offsets = index.GetOffset(keys, true); + using (var reader = GetReadStream(fileName)) + { + for (int i = 0; i < keys.Length; i++) + { + var searchKey = keys[i]; + var off = offsets[i]; + reader.Stream.Seek(off.Offset, SeekOrigin.Begin); + while (reader.EOS == false) + { + var startPosition = reader.Stream.Position; + var k = reader.ReadCompatible(); + var v = reader.ReadCompatible(); + var endPosition = reader.Stream.Position; + + var c = _options.KeyComparer(searchKey, k); + if (c == 0) + { + ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); + } + else if (c == -1) + { + break; + } + } + } + } + } + else + { + using (var reader = GetReadStream(fileName)) + { + int index = 0; + var keys_arr = keys.OrderBy(k => k).ToArray(); + while (reader.EOS == false && index < keys_arr.Length) + { + var startPosition = reader.Stream.Position; + var k = reader.ReadCompatible(); + var v = reader.ReadCompatible(); + var endPosition = reader.Stream.Position; + + var c = _options.KeyComparer(keys_arr[index], k); + if (c == 0) + { + ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); + index++; + } + else if (c == -1) + { + do + { + index++; + if (index < keys_arr.Length) + { + c = _options.KeyComparer(keys_arr[index], k); + } + } while (index < keys_arr.Length && c == -1); + } + } + } + } + + // 2. Temporary file from ranges + var tempPath = Path.GetTempPath(); + var tempFile = Path.Combine(tempPath, Path.GetTempFileName()); + + using (var readStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024)) + { + RangeCompression(ranges); + using (var writeStream = new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)) + { + if (inverseRemove) + { + var inverted = RangeInversion(ranges, readStream.Length); + foreach (var range in inverted) + { + CopyRange(range, readStream, writeStream); + } + } + else + { + foreach (var range in ranges) + { + CopyRange(range, readStream, writeStream); + } + } + writeStream.Flush(); + } + } + + // 3. Replace from temporary to original + File.Move(tempFile, filePath, true); + } + } + + private static void RangeCompression(List ranges) + { + for (var i = 0; i < ranges.Count - 1; i++) + { + var current = ranges[i]; + var next = ranges[i + 1]; + if (current.End == next.Start) + { + current.End = next.End; + ranges.RemoveAt(i + 1); + i--; + } + } + } + + private static List RangeInversion(List ranges, long length) + { + if ((ranges?.Count ?? 0) == 0) return new List { new FilePositionRange { Start = 0, End = length } }; + var inverted = new List(); + var current = new FilePositionRange { Start = 0, End = ranges[0].Start }; + for (var i = 0; i < ranges.Count; i++) + { + current.End = ranges[i].Start; + if (current.Start != current.End) + { + inverted.Add(new FilePositionRange { Start = current.Start, End = current.End }); + } + current.Start = ranges[i].End; + } + if (current.End != length) + { + if (current.Start != length) + { + inverted.Add(new FilePositionRange { Start = current.Start, End = length }); + } + } + return inverted; + } + + private static void CopyRange(FilePositionRange range, Stream source, Stream target) + { + source.Seek(range.Start, SeekOrigin.Begin); + var size = range.End - range.Start; + byte[] buffer = new byte[size]; + source.Read(buffer, 0, buffer.Length); + target.Write(buffer, 0, buffer.Length); + } } } diff --git a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs index 9323aec..eca6049 100644 --- a/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs +++ b/ZeroLevel/Services/PartitionStorage/Partition/StorePartitionBuilder.cs @@ -14,6 +14,7 @@ namespace ZeroLevel.Services.PartitionStorage { private readonly ConcurrentDictionary _writeStreams = new ConcurrentDictionary(); + private readonly StoreOptions _options; private readonly string _catalog; private readonly TMeta _info; @@ -44,17 +45,9 @@ namespace ZeroLevel.Services.PartitionStorage } public void CompleteAddingAndCompress() { - // Close all write streams - foreach (var s in _writeStreams) - { - try - { - s.Value.Dispose(); - } - catch { } - } + CloseStreams(); var files = Directory.GetFiles(_catalog); - if (files != null && files.Length > 1) + if (files != null && files.Length > 0) { Parallel.ForEach(files, file => CompressFile(file)); } @@ -66,7 +59,7 @@ namespace ZeroLevel.Services.PartitionStorage var indexFolder = Path.Combine(_catalog, "__indexes__"); FSUtils.CleanAndTestFolder(indexFolder); var files = Directory.GetFiles(_catalog); - if (files != null && files.Length > 1) + if (files != null && files.Length > 0) { var dict = new Dictionary(); foreach (var file in files) @@ -104,6 +97,19 @@ namespace ZeroLevel.Services.PartitionStorage } #region Private methods + + internal void CloseStreams() + { + // Close all write streams + foreach (var s in _writeStreams) + { + try + { + s.Value.Dispose(); + } + catch { } + } + } internal void CompressFile(string file) { var dict = new Dictionary>(); @@ -154,7 +160,5 @@ namespace ZeroLevel.Services.PartitionStorage public void Dispose() { } - - } } diff --git a/ZeroLevel/Services/PartitionStorage/Store.cs b/ZeroLevel/Services/PartitionStorage/Store.cs index 9bc6494..c23da43 100644 --- a/ZeroLevel/Services/PartitionStorage/Store.cs +++ b/ZeroLevel/Services/PartitionStorage/Store.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; +using ZeroLevel.Services.FileSystem; namespace ZeroLevel.Services.PartitionStorage { @@ -21,6 +22,13 @@ namespace ZeroLevel.Services.PartitionStorage } } + public void RemovePartition(TMeta info) + { + var partition = CreateAccessor(info); + partition.DropData(); + FSUtils.RemoveFolder(partition.GetCatalogPath()); + } + public IStorePartitionAccessor CreateAccessor(TMeta info) { return new StorePartitionAccessor(_options, info);