PartitionStorage update

New data Merge method
RemoveKeys method
RemoveAllExcetpKeys method
pull/4/head
Ogoun 2 years ago
parent 4ed708ee47
commit 94d3e1ca71

@ -4,109 +4,144 @@ using ZeroLevel.Services.PartitionStorage;
namespace PartitionFileStorageTest namespace PartitionFileStorageTest
{ {
public class CallRecordParser internal class Program
{
private static HashSet<char> _partsOfNumbers = new HashSet<char> { '*', '#', '+', '(', ')', '-' };
private StringBuilder sb = new StringBuilder();
private const string NO_VAL = null;
private string ReadNumber(string line)
{
sb.Clear();
var started = false;
foreach (var ch in line)
{
if (char.IsDigit(ch))
{ {
if (started) private class Metadata
{ {
sb.Append(ch); public DateTime Date { get; set; }
} }
else if (ch != '0')
private static ulong Generate(Random r)
{ {
sb.Append(ch); var num = new StringBuilder();
started = true; num.Append("79");
} num.Append(r.Next(99).ToString("D2"));
} num.Append(r.Next(999).ToString("D7"));
else if (char.IsWhiteSpace(ch) || _partsOfNumbers.Contains(ch)) continue; return ulong.Parse(num.ToString());
else return NO_VAL;
} }
if (sb.Length == 11 && sb[0] == '8') sb[0] = '7';
if (sb.Length == 3 || sb.Length == 4 || sb.Length > 10) private static void BuildStore(string root)
return sb.ToString();
return NO_VAL;
}
private HashSet<string> ReadNumbers(string line)
{
var result = new HashSet<string>();
if (string.IsNullOrWhiteSpace(line) == false)
{
char STX = (char)0x0002;
var values = line.Split(STX);
if (values.Length > 0)
{ {
foreach (var val in values) var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{ {
var number = ReadNumber(val); Index = new IndexOptions { Enabled = true, FileIndexCount = 64 },
if (number != null) RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
{ {
result.Add(number); ulong s = 0;
} return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
} },
} Partitions = new List<StoreCatalogPartition<Metadata>>
}
return result;
}
/// <summary>
/// Парсинг строки исходного файла
/// </summary>
/// <param name="line"></param>
/// <returns></returns>
public CallRecord Parse(string line)
{ {
var parts = line.Split('\t'); new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
},
if (parts.Length != 2) return null; KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
};
var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storePart1 = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) });
var storePart2 = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 09) });
var msisdn = ReadNumber(parts[0].Trim()); var sw = new Stopwatch();
sw.Start();
if (string.IsNullOrWhiteSpace(msisdn) == false) var r = new Random(Environment.TickCount);
{ for (int i = 0; i < 1000000; i++)
var numbers = ReadNumbers(parts[1]);
if (numbers != null && numbers.Count > 0)
{ {
return new CallRecord var s = Generate(r);
var count = r.Next(300);
for (int j = 0; j < count; j++)
{ {
Msisdn = msisdn, var t = Generate(r);
Msisdns = numbers storePart1.Store(s, t);
};
} }
} }
return null; for (int i = 0; i < 1000000; i++)
{
var s = Generate(r);
var count = r.Next(300);
for (int j = 0; j < count; j++)
{
var t = Generate(r);
storePart2.Store(s, t);
} }
} }
public class CallRecord sw.Stop();
{ Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms");
public string Msisdn; sw.Restart();
public HashSet<string> Msisdns; storePart1.CompleteAddingAndCompress();
storePart2.CompleteAddingAndCompress();
sw.Stop();
Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storePart1.RebuildIndex();
storePart2.RebuildIndex();
sw.Stop();
Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms");
} }
internal class Program private static void SmallFullTest(string root)
{ {
private class Metadata var r = new Random(Environment.TickCount);
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{ {
public DateTime Date { get; set; } Index = new IndexOptions { Enabled = true, FileIndexCount = 64 },
public bool Incoming { get; set; } RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
{
ulong s = 0;
return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
},
Partitions = new List<StoreCatalogPartition<Metadata>>
{
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
};
var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) });
Console.WriteLine("Small test start");
var c1 = (ulong)(86438 * 128);
var c2 = (ulong)(83438 * 128);
var c3 = (ulong)(831238 * 128);
storePart.Store(c1, Generate(r));
storePart.Store(c1, Generate(r));
storePart.Store(c1, Generate(r));
storePart.Store(c2, Generate(r));
storePart.Store(c2, Generate(r));
storePart.Store(c2, Generate(r));
storePart.Store(c2, Generate(r));
storePart.Store(c2, Generate(r));
storePart.Store(c3, Generate(r));
storePart.Store(c3, Generate(r));
storePart.Store(c3, Generate(r));
storePart.CompleteAddingAndCompress();
var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) });
Console.WriteLine("Data:");
foreach (var e in readPart.Iterate())
{
Console.WriteLine($"{e.Key}: {e.Value.Length}");
}
readPart.RemoveKey(c1);
Console.WriteLine("Data after remove:");
foreach (var e in readPart.Iterate())
{
Console.WriteLine($"{e.Key}: {e.Value.Length}");
}
} }
private static void BuildStore(string source, string root) private static void TestBuildRemoveStore(string root)
{ {
var r = new Random(Environment.TickCount);
var options = new StoreOptions<ulong, ulong, byte[], Metadata> var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{ {
Index = new IndexOptions { Enabled = true, FileIndexCount = 256 }, Index = new IndexOptions { Enabled = true, FileIndexCount = 64 },
RootFolder = root, RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 512).ToString()), FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list => MergeFunction = list =>
{ {
ulong s = 0; ulong s = 0;
@ -114,55 +149,127 @@ namespace PartitionFileStorageTest
}, },
Partitions = new List<StoreCatalogPartition<Metadata>> Partitions = new List<StoreCatalogPartition<Metadata>>
{ {
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd")), new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
new StoreCatalogPartition<Metadata>("Date", m => m.Incoming ? "incoming" : "outcoming")
}, },
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
}; };
var store = new Store<ulong, ulong, byte[], Metadata>(options); var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) });
var storeIncoming = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true });
var storeOutcoming = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false });
var parser = new CallRecordParser();
var sw = new Stopwatch(); var sw = new Stopwatch();
sw.Start(); sw.Start();
using (FileStream fs = File.Open(source, FileMode.Open, FileAccess.Read, FileShare.None))
{ var testKeys1 = new List<ulong>();
using (BufferedStream bs = new BufferedStream(fs, 1024 * 1024 * 64)) var testKeys2 = new List<ulong>();
{
using (StreamReader sr = new StreamReader(bs)) for (int i = 0; i < 1000000; i++)
{
string line;
while ((line = sr.ReadLine()) != null)
{
var record = parser.Parse(line);
if (record == null) continue;
if (!string.IsNullOrWhiteSpace(record?.Msisdn ?? string.Empty) && ulong.TryParse(record.Msisdn, out var n))
{ {
var ctns = record.Msisdns.ParseMsisdns().ToArray(); var s = Generate(r);
foreach (var ctn in ctns) var count = r.Next(300);
for (int j = 0; j < count; j++)
{ {
storeIncoming.Store(n, ctn); var t = Generate(r);
storeOutcoming.Store(ctn, n); storePart.Store(s, t);
}
}
} }
if (s % 11217 == 0)
{
testKeys1.Add(s);
} }
if (s % 11219 == 0)
{
testKeys2.Add(s);
} }
} }
sw.Stop(); sw.Stop();
Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms"); Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms");
sw.Restart(); sw.Restart();
storeIncoming.CompleteAddingAndCompress(); storePart.CompleteAddingAndCompress();
storeOutcoming.CompleteAddingAndCompress();
sw.Stop(); sw.Stop();
Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms"); Console.WriteLine($"Compress: {sw.ElapsedMilliseconds}ms");
sw.Restart(); sw.Restart();
storeIncoming.RebuildIndex(); storePart.RebuildIndex();
storeOutcoming.RebuildIndex();
sw.Stop(); sw.Stop();
Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms");
Console.WriteLine("Start merge test");
sw.Restart();
var merger = store.CreateMergeAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }, data => Compressor.DecodeBytesContent(data));
for (int i = 0; i < 1000000; i++)
{
var s = Generate(r);
var count = r.Next(300);
for (int j = 0; j < count; j++)
{
var t = Generate(r);
merger.Store(s, t);
}
}
Console.WriteLine($"Merge journal filled: {sw.ElapsedMilliseconds}ms");
merger.CompleteAddingAndCompress();
sw.Stop();
Console.WriteLine($"Compress after merge: {sw.ElapsedMilliseconds}ms");
sw.Restart();
merger.RebuildIndex();
sw.Stop();
Console.WriteLine($"Rebuild indexes after merge: {sw.ElapsedMilliseconds}ms");
Console.WriteLine("Test #1 reading");
var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) });
foreach (var key in testKeys1)
{
Console.WriteLine($"\tKey: {key}");
var result = readPart.Find(key);
Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes");
}
Console.WriteLine("Press to continue");
Console.ReadKey();
Console.WriteLine("Test #1 remove by keys");
for (int i = 0; i < testKeys1.Count; i++)
{
if (i % 100 == 0)
{
readPart.RemoveKey(testKeys1[i]);
}
}
Console.WriteLine("Test #1 reading after remove");
foreach (var key in testKeys1)
{
Console.WriteLine($"\tKey: {key}");
var result = readPart.Find(key);
Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes");
}
Console.WriteLine("Press to continue");
Console.ReadKey();
Console.WriteLine();
Console.WriteLine("---------------------------------------");
Console.WriteLine();
Console.WriteLine("Test #2 reading");
foreach (var key in testKeys2)
{
Console.WriteLine($"\tKey: {key}");
var result = readPart.Find(key);
Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes");
}
Console.WriteLine("Press to continue");
Console.ReadKey();
Console.WriteLine("Test #2 remove keys batch");
readPart.RemoveKeys(testKeys2);
Console.WriteLine("Test #2 reading after remove");
foreach (var key in testKeys2)
{
Console.WriteLine($"\tKey: {key}");
var result = readPart.Find(key);
Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes");
}
Console.WriteLine("Press to continue for iteration");
Console.ReadKey();
foreach (var e in readPart.Iterate())
{
Console.WriteLine($"{e.Key}: {e.Value.Length}");
}
} }
private static void TestReading(string root) private static void TestReading(string root)
@ -179,8 +286,7 @@ namespace PartitionFileStorageTest
}, },
Partitions = new List<StoreCatalogPartition<Metadata>> Partitions = new List<StoreCatalogPartition<Metadata>>
{ {
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd")), new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
new StoreCatalogPartition<Metadata>("Date", m => m.Incoming ? "incoming" : "outcoming")
}, },
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
}; };
@ -191,26 +297,25 @@ namespace PartitionFileStorageTest
{ {
new PartitionSearchRequest<ulong, Metadata> new PartitionSearchRequest<ulong, Metadata>
{ {
Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }, Info = new Metadata { Date = new DateTime(2022, 11, 08) },
Keys = new ulong[] { } Keys = new ulong[] { }
}, },
new PartitionSearchRequest<ulong, Metadata> new PartitionSearchRequest<ulong, Metadata>
{ {
Info = new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }, Info = new Metadata { Date = new DateTime(2022, 11, 09) },
Keys = new ulong[] { } Keys = new ulong[] { }
} }
} }
}; };
var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }); var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) });
Console.WriteLine($"Incoming data files: {storeIncoming.CountDataFiles()}"); Console.WriteLine($"Incoming data files: {storeIncoming.CountDataFiles()}");
var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }); var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 09) });
Console.WriteLine($"Outcoming data files: {storeOutcoming.CountDataFiles()}"); Console.WriteLine($"Outcoming data files: {storeOutcoming.CountDataFiles()}");
var sw = new Stopwatch(); var sw = new Stopwatch();
sw.Start(); sw.Start();
var result = store.Search(request).Result; var result = store.Search(request).Result;
foreach (var r in result.Results) foreach (var r in result.Results)
{ {
Console.WriteLine($"Incoming: {r.Key.Incoming}");
foreach (var mr in r.Value) foreach (var mr in r.Value)
{ {
Console.WriteLine($"\tKey: {mr.Key}. Sucess: {mr.Found}"); Console.WriteLine($"\tKey: {mr.Key}. Sucess: {mr.Found}");
@ -225,29 +330,104 @@ namespace PartitionFileStorageTest
Console.WriteLine($"Search time: {sw.ElapsedMilliseconds}ms"); Console.WriteLine($"Search time: {sw.ElapsedMilliseconds}ms");
} }
private struct KeyIndex<TKey> private static void TestIterations(string root)
{ {
public TKey Key { get; set; } var options = new StoreOptions<ulong, ulong, byte[], Metadata>
public long Offset { get; set; } {
} Index = new IndexOptions { Enabled = true, FileIndexCount = 256 },
RootFolder = root,
static KeyIndex<long>[] Generate(int count) FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 512).ToString()),
MergeFunction = list =>
{ {
var arr = new KeyIndex<long>[count]; ulong s = 0;
for (int i = 0; i < count; i++) return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
},
Partitions = new List<StoreCatalogPartition<Metadata>>
{ {
arr[i] = new KeyIndex<long> { Key = i * 3, Offset = i * 17 }; new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
};
var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) });
foreach (var r in storeIncoming.Iterate())
{
Console.WriteLine($"{r.Key}: {r.Value.Length}");
} }
return arr;
} }
static void Main(string[] args) static void Main(string[] args)
{ {
var root = @"H:\temp"; var root = @"H:\temp";
var source = @"H:\319a9c31-d823-4dd1-89b0-7fb1bb9c4859.txt"; SmallFullTest(root);
//BuildStore(source, root); //TestBuildRemoveStore(root);
TestReading(root); //BuildStore(root);
//TestReading(root);
//TestIterations(root);
//TestRangeCompressionAndInversion();
Console.ReadKey(); Console.ReadKey();
} }
private static void TestRangeCompressionAndInversion()
{
var list = new List<FilePositionRange>();
list.Add(new FilePositionRange { Start = 1, End = 36 });
list.Add(new FilePositionRange { Start = 36, End = 63 });
list.Add(new FilePositionRange { Start = 63, End = 89 });
list.Add(new FilePositionRange { Start = 93, End = 118 });
list.Add(new FilePositionRange { Start = 126, End = 199 });
list.Add(new FilePositionRange { Start = 199, End = 216 });
list.Add(new FilePositionRange { Start = 277, End = 500 });
RangeCompression(list);
foreach (var r in list)
{
Console.WriteLine($"{r.Start}: {r.End}");
}
Console.WriteLine("Invert ranges");
var inverted = RangeInversion(list, 500);
foreach (var r in inverted)
{
Console.WriteLine($"{r.Start}: {r.End}");
}
}
private static void RangeCompression(List<FilePositionRange> ranges)
{
for (var i = 0; i < ranges.Count - 1; i++)
{
var current = ranges[i];
var next = ranges[i + 1];
if (current.End == next.Start)
{
current.End = next.End;
ranges.RemoveAt(i + 1);
i--;
}
}
}
private static List<FilePositionRange> RangeInversion(List<FilePositionRange> ranges, long length)
{
if ((ranges?.Count ?? 0) == 0) return new List<FilePositionRange> { new FilePositionRange { Start = 0, End = length } };
var inverted = new List<FilePositionRange>();
var current = new FilePositionRange { Start = 0, End = ranges[0].Start };
for (var i = 0; i < ranges.Count; i++)
{
current.End = ranges[i].Start;
if (current.Start != current.End)
{
inverted.Add(new FilePositionRange { Start = current.Start, End = current.End });
}
current.Start = ranges[i].End;
}
if (current.End != length)
{
if (current.Start != length)
{
inverted.Add(new FilePositionRange { Start = current.Start, End = length });
}
}
return inverted;
}
} }
} }

@ -0,0 +1,52 @@
using System;
using System.Collections.Generic;
namespace ZeroLevel.Services.Collections
{
public sealed class BatchProcessor<T>
: IDisposable
{
private readonly List<T> _batch;
private readonly int _batchSize;
private readonly Action<IReadOnlyList<T>> _insertAction;
public BatchProcessor(int batchSize, Action<IReadOnlyList<T>> insertAction)
{
_batch = new List<T>(batchSize);
_insertAction = insertAction;
_batchSize = batchSize;
}
public void Add(T val)
{
_batch.Add(val);
if (_batch.Count >= _batchSize)
{
try
{
_insertAction.Invoke(_batch);
}
catch (Exception ex)
{
Log.Error(ex, $"[BatchProcessor.Add] Fault insert");
}
_batch.Clear();
}
}
public void Dispose()
{
if (_batch.Count > 0)
{
try
{
_insertAction.Invoke(_batch);
}
catch (Exception ex)
{
Log.Error(ex, $"[BatchProcessor.Dispose] Fault insert");
}
}
_batch.Clear();
}
}
}

@ -0,0 +1,8 @@
namespace ZeroLevel.Services.PartitionStorage
{
public class FilePositionRange
{
public long Start;
public long End;
}
}

@ -20,5 +20,7 @@ namespace ZeroLevel.Services.PartitionStorage
IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info); IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info);
Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest); Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest);
void RemovePartition(TMeta info);
} }
} }

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ZeroLevel.Services.PartitionStorage.Interfaces
{
public interface IStoreCache<TKey, TInput, TValue, TMeta>
{
}
}

@ -25,5 +25,10 @@ namespace ZeroLevel.Services.PartitionStorage
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys); IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys);
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate(); IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate();
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key); IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key);
void RemoveKey(TKey key);
void RemoveKeys(IEnumerable<TKey> keys);
void RemoveAllExceptKey(TKey key);
void RemoveAllExceptKeys(IEnumerable<TKey> keys);
} }
} }

@ -1,5 +1,13 @@
namespace ZeroLevel.Services.PartitionStorage using System.Collections.Generic;
namespace ZeroLevel.Services.PartitionStorage
{ {
public class InsertValue<TKey, TInput>
{
public TKey Key;
public TInput Value;
}
/// <summary> /// <summary>
/// Provides write operations in catalog partition /// Provides write operations in catalog partition
/// </summary> /// </summary>

@ -1,17 +0,0 @@
namespace ZeroLevel.Services.PartitionStorage
{
public class CacheOptions
{
public bool UsePersistentCache { get; set; }
public string PersistentCacheFolder { get; set; } = "cachee";
public int PersistentCacheRemoveTimeoutInSeconds { get; set; } = 3600;
public bool UseMemoryCache { get; set; }
public int MemoryCacheLimitInMb { get; set; } = 1024;
public int MemoryCacheRemoveTimeoutInSeconds { get; set; } = 900;
}
}

@ -47,12 +47,6 @@ namespace ZeroLevel.Services.PartitionStorage
FileIndexCount = 64 FileIndexCount = 64
}; };
public CacheOptions Cache { get; set; } = new CacheOptions
{
UseMemoryCache = false,
UsePersistentCache = false
};
internal string GetFileName(TKey key, TMeta info) internal string GetFileName(TKey key, TMeta info)
{ {
return FilePartition.PathExtractor(key, info); return FilePartition.PathExtractor(key, info);
@ -89,16 +83,7 @@ namespace ZeroLevel.Services.PartitionStorage
Partitions = this.Partitions Partitions = this.Partitions
.Select(p => new StoreCatalogPartition<TMeta>(p.Name, p.PathExtractor)) .Select(p => new StoreCatalogPartition<TMeta>(p.Name, p.PathExtractor))
.ToList(), .ToList(),
RootFolder = this.RootFolder, RootFolder = this.RootFolder
Cache = new CacheOptions
{
MemoryCacheLimitInMb = this.Cache.MemoryCacheLimitInMb,
MemoryCacheRemoveTimeoutInSeconds = this.Cache.MemoryCacheRemoveTimeoutInSeconds,
PersistentCacheFolder = this.Cache.PersistentCacheFolder,
PersistentCacheRemoveTimeoutInSeconds = this.Cache.PersistentCacheRemoveTimeoutInSeconds,
UseMemoryCache = this.Cache.UseMemoryCache,
UsePersistentCache = this.Cache.UsePersistentCache
}
}; };
return options; return options;
} }

@ -20,6 +20,9 @@ namespace ZeroLevel.Services.PartitionStorage
/// Exists compressed catalog /// Exists compressed catalog
/// </summary> /// </summary>
private readonly IStorePartitionAccessor<TKey, TInput, TValue> _accessor; private readonly IStorePartitionAccessor<TKey, TInput, TValue> _accessor;
private readonly string _temporaryFolder;
/// <summary> /// <summary>
/// Write catalog /// Write catalog
/// </summary> /// </summary>
@ -30,9 +33,9 @@ namespace ZeroLevel.Services.PartitionStorage
if (decompress == null) throw new ArgumentNullException(nameof(decompress)); if (decompress == null) throw new ArgumentNullException(nameof(decompress));
_decompress = decompress; _decompress = decompress;
_accessor = new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(options, info); _accessor = new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(options, info);
var tempCatalog = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); _temporaryFolder = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString());
var tempOptions = options.Clone(); var tempOptions = options.Clone();
tempOptions.RootFolder = tempCatalog; tempOptions.RootFolder = _temporaryFolder;
_temporaryAccessor = new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(tempOptions, info); _temporaryAccessor = new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(tempOptions, info);
} }
@ -64,7 +67,7 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath()); var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath());
if (newFiles != null && newFiles.Length > 1) if (newFiles != null && newFiles.Length > 0)
{ {
var folder = _accessor.GetCatalogPath(); var folder = _accessor.GetCatalogPath();
var existsFiles = Directory.GetFiles(folder) var existsFiles = Directory.GetFiles(folder)
@ -85,17 +88,27 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
} }
}
(_temporaryAccessor as StorePartitionBuilder<TKey, TInput, TValue, TMeta>).CloseStreams();
// compress new file // compress new file
foreach (var file in newFiles)
{
(_temporaryAccessor as StorePartitionBuilder<TKey, TInput, TValue, TMeta>) (_temporaryAccessor as StorePartitionBuilder<TKey, TInput, TValue, TMeta>)
.CompressFile(file); .CompressFile(file);
}
// replace old file by new // replace old file by new
foreach (var file in newFiles)
{
var name = Path.GetFileName(file);
File.Move(file, Path.Combine(folder, name), true); File.Move(file, Path.Combine(folder, name), true);
} }
} }
// remove temporary files // remove temporary files
_temporaryAccessor.DropData(); _temporaryAccessor.DropData();
Directory.Delete(_temporaryAccessor.GetCatalogPath(), true); Directory.Delete(_temporaryFolder, true);
} }
/// <summary> /// <summary>

@ -87,7 +87,7 @@ namespace ZeroLevel.Services.PartitionStorage
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate() public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate()
{ {
var files = Directory.GetFiles(_catalog); var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 1) if (files != null && files.Length > 0)
{ {
foreach (var file in files) foreach (var file in files)
{ {
@ -126,7 +126,7 @@ namespace ZeroLevel.Services.PartitionStorage
var indexFolder = Path.Combine(_catalog, "__indexes__"); var indexFolder = Path.Combine(_catalog, "__indexes__");
FSUtils.CleanAndTestFolder(indexFolder); FSUtils.CleanAndTestFolder(indexFolder);
var files = Directory.GetFiles(_catalog); var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 1) if (files != null && files.Length > 0)
{ {
var dict = new Dictionary<TKey, long>(); var dict = new Dictionary<TKey, long>();
foreach (var file in files) foreach (var file in files)
@ -251,5 +251,192 @@ namespace ZeroLevel.Services.PartitionStorage
public void Dispose() public void Dispose()
{ {
} }
public void RemoveAllExceptKey(TKey key)
{
RemoveAllExceptKeys(new[] { key });
}
public void RemoveAllExceptKeys(IEnumerable<TKey> keys)
{
var results = keys
.GroupBy(
k => _options.GetFileName(k, _info),
k => k, (key, g) => new { FileName = key, Keys = g.ToArray() });
foreach (var group in results)
{
RemoveKeyGroup(group.FileName, group.Keys, false);
}
}
public void RemoveKey(TKey key)
{
RemoveKeys(new[] { key });
}
public void RemoveKeys(IEnumerable<TKey> keys)
{
var results = keys
.GroupBy(
k => _options.GetFileName(k, _info),
k => k, (key, g) => new { FileName = key, Keys = g.ToArray() });
foreach (var group in results)
{
RemoveKeyGroup(group.FileName, group.Keys, true);
}
}
private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove)
{
var filePath = Path.Combine(_catalog, fileName);
if (File.Exists(filePath))
{
// 1. Find ranges
var ranges = new List<FilePositionRange>();
if (_options.Index.Enabled)
{
var index = new StorePartitionSparseIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer);
var offsets = index.GetOffset(keys, true);
using (var reader = GetReadStream(fileName))
{
for (int i = 0; i < keys.Length; i++)
{
var searchKey = keys[i];
var off = offsets[i];
reader.Stream.Seek(off.Offset, SeekOrigin.Begin);
while (reader.EOS == false)
{
var startPosition = reader.Stream.Position;
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var endPosition = reader.Stream.Position;
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition });
}
else if (c == -1)
{
break;
}
}
}
}
}
else
{
using (var reader = GetReadStream(fileName))
{
int index = 0;
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{
var startPosition = reader.Stream.Position;
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var endPosition = reader.Stream.Position;
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{
ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition });
index++;
}
else if (c == -1)
{
do
{
index++;
if (index < keys_arr.Length)
{
c = _options.KeyComparer(keys_arr[index], k);
}
} while (index < keys_arr.Length && c == -1);
}
}
}
}
// 2. Temporary file from ranges
var tempPath = Path.GetTempPath();
var tempFile = Path.Combine(tempPath, Path.GetTempFileName());
using (var readStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024))
{
RangeCompression(ranges);
using (var writeStream = new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))
{
if (inverseRemove)
{
var inverted = RangeInversion(ranges, readStream.Length);
foreach (var range in inverted)
{
CopyRange(range, readStream, writeStream);
}
}
else
{
foreach (var range in ranges)
{
CopyRange(range, readStream, writeStream);
}
}
writeStream.Flush();
}
}
// 3. Replace from temporary to original
File.Move(tempFile, filePath, true);
}
}
private static void RangeCompression(List<FilePositionRange> ranges)
{
for (var i = 0; i < ranges.Count - 1; i++)
{
var current = ranges[i];
var next = ranges[i + 1];
if (current.End == next.Start)
{
current.End = next.End;
ranges.RemoveAt(i + 1);
i--;
}
}
}
private static List<FilePositionRange> RangeInversion(List<FilePositionRange> ranges, long length)
{
if ((ranges?.Count ?? 0) == 0) return new List<FilePositionRange> { new FilePositionRange { Start = 0, End = length } };
var inverted = new List<FilePositionRange>();
var current = new FilePositionRange { Start = 0, End = ranges[0].Start };
for (var i = 0; i < ranges.Count; i++)
{
current.End = ranges[i].Start;
if (current.Start != current.End)
{
inverted.Add(new FilePositionRange { Start = current.Start, End = current.End });
}
current.Start = ranges[i].End;
}
if (current.End != length)
{
if (current.Start != length)
{
inverted.Add(new FilePositionRange { Start = current.Start, End = length });
}
}
return inverted;
}
private static void CopyRange(FilePositionRange range, Stream source, Stream target)
{
source.Seek(range.Start, SeekOrigin.Begin);
var size = range.End - range.Start;
byte[] buffer = new byte[size];
source.Read(buffer, 0, buffer.Length);
target.Write(buffer, 0, buffer.Length);
}
} }
} }

@ -14,6 +14,7 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
private readonly ConcurrentDictionary<string, MemoryStreamWriter> _writeStreams private readonly ConcurrentDictionary<string, MemoryStreamWriter> _writeStreams
= new ConcurrentDictionary<string, MemoryStreamWriter>(); = new ConcurrentDictionary<string, MemoryStreamWriter>();
private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options; private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly string _catalog; private readonly string _catalog;
private readonly TMeta _info; private readonly TMeta _info;
@ -44,17 +45,9 @@ namespace ZeroLevel.Services.PartitionStorage
} }
public void CompleteAddingAndCompress() public void CompleteAddingAndCompress()
{ {
// Close all write streams CloseStreams();
foreach (var s in _writeStreams)
{
try
{
s.Value.Dispose();
}
catch { }
}
var files = Directory.GetFiles(_catalog); var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 1) if (files != null && files.Length > 0)
{ {
Parallel.ForEach(files, file => CompressFile(file)); Parallel.ForEach(files, file => CompressFile(file));
} }
@ -66,7 +59,7 @@ namespace ZeroLevel.Services.PartitionStorage
var indexFolder = Path.Combine(_catalog, "__indexes__"); var indexFolder = Path.Combine(_catalog, "__indexes__");
FSUtils.CleanAndTestFolder(indexFolder); FSUtils.CleanAndTestFolder(indexFolder);
var files = Directory.GetFiles(_catalog); var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 1) if (files != null && files.Length > 0)
{ {
var dict = new Dictionary<TKey, long>(); var dict = new Dictionary<TKey, long>();
foreach (var file in files) foreach (var file in files)
@ -104,6 +97,19 @@ namespace ZeroLevel.Services.PartitionStorage
} }
#region Private methods #region Private methods
internal void CloseStreams()
{
// Close all write streams
foreach (var s in _writeStreams)
{
try
{
s.Value.Dispose();
}
catch { }
}
}
internal void CompressFile(string file) internal void CompressFile(string file)
{ {
var dict = new Dictionary<TKey, HashSet<TInput>>(); var dict = new Dictionary<TKey, HashSet<TInput>>();
@ -154,7 +160,5 @@ namespace ZeroLevel.Services.PartitionStorage
public void Dispose() public void Dispose()
{ {
} }
} }
} }

@ -4,6 +4,7 @@ using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
{ {
@ -21,6 +22,13 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
public void RemovePartition(TMeta info)
{
var partition = CreateAccessor(info);
partition.DropData();
FSUtils.RemoveFolder(partition.GetCatalogPath());
}
public IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info) public IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info)
{ {
return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info); return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info);

Loading…
Cancel
Save

Powered by TurnKey Linux.