PartitionStorage refactoring

pull/4/head
Ogoun 2 years ago
parent f43a269986
commit 8a897cf29c

@ -1,5 +1,7 @@
using System.Diagnostics;
using System.Text;
using ZeroLevel;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.PartitionStorage;
namespace PartitionFileStorageTest
@ -20,68 +22,7 @@ namespace PartitionFileStorageTest
return ulong.Parse(num.ToString());
}
private static void BuildStore(string root)
{
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{
Index = new IndexOptions { Enabled = true, FileIndexCount = 64 },
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 128).ToString()),
MergeFunction = list =>
{
ulong s = 0;
return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
},
Partitions = new List<StoreCatalogPartition<Metadata>>
{
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
};
var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storePart1 = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) });
var storePart2 = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 09) });
var sw = new Stopwatch();
sw.Start();
var r = new Random(Environment.TickCount);
for (int i = 0; i < 1000000; i++)
{
var s = Generate(r);
var count = r.Next(300);
for (int j = 0; j < count; j++)
{
var t = Generate(r);
storePart1.Store(s, t);
}
}
for (int i = 0; i < 1000000; i++)
{
var s = Generate(r);
var count = r.Next(300);
for (int j = 0; j < count; j++)
{
var t = Generate(r);
storePart2.Store(s, t);
}
}
sw.Stop();
Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storePart1.CompleteAddingAndCompress();
storePart2.CompleteAddingAndCompress();
sw.Stop();
Console.WriteLine($"Rebuild journal to store: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storePart1.RebuildIndex();
storePart2.RebuildIndex();
sw.Stop();
Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms");
}
private static void SmallFullTest(string root)
private static void FastTest(string root)
{
var r = new Random(Environment.TickCount);
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
@ -119,7 +60,8 @@ namespace PartitionFileStorageTest
storePart.Store(c3, Generate(r));
storePart.Store(c3, Generate(r));
storePart.Store(c3, Generate(r));
storePart.CompleteAddingAndCompress();
storePart.CompleteAdding();
storePart.Compress();
var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) });
Console.WriteLine("Data:");
foreach (var e in readPart.Iterate())
@ -134,7 +76,7 @@ namespace PartitionFileStorageTest
}
}
private static void TestBuildRemoveStore(string root)
private static void FullStoreTest(string root)
{
var r = new Random(Environment.TickCount);
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
@ -153,23 +95,76 @@ namespace PartitionFileStorageTest
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
};
FSUtils.CleanAndTestFolder(root);
var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storePart = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08) });
Log.Info("Fill start");
for (int i = 0; i < 30000000; i++)
{
var s = Generate(r);
var count = r.Next(200);
for (int j = 0; j < count; j++)
{
var t = Generate(r);
storePart.Store(s, t);
}
}
storePart.CompleteAdding();
Log.Info("Fill complete");
long cnt = 0;
foreach (var p in storePart.Iterate())
{
if (p.Key % 2 == 0) cnt++;
}
Log.Info(cnt.ToString());
Log.Info("Fill test complete");
storePart.Compress();
Log.Info("Compress complete");
var reader = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) });
cnt = 0;
foreach (var p in reader.Iterate())
{
if (p.Key % 2 == 0) cnt++;
}
Log.Info(cnt.ToString());
Log.Info("Compress test complete");
storePart.DropData();
Log.Info("Complete#1");
Console.ReadKey();
FSUtils.CleanAndTestFolder(root);
var sw = new Stopwatch();
sw.Start();
var testKeys1 = new List<ulong>();
var testKeys2 = new List<ulong>();
for (int i = 0; i < 1000000; i++)
var testData = new Dictionary<ulong, HashSet<ulong>>();
var total = 0L;
for (int i = 0; i < 2000000; i++)
{
var s = Generate(r);
if (testData.ContainsKey(s) == false) testData[s] = new HashSet<ulong>();
var count = r.Next(300);
total++;
for (int j = 0; j < count; j++)
{
total++;
var t = Generate(r);
storePart.Store(s, t);
testData[s].Add(t);
}
if (s % 177 == 0)
{
@ -182,184 +177,134 @@ namespace PartitionFileStorageTest
}
sw.Stop();
Console.WriteLine($"Fill journal: {sw.ElapsedMilliseconds}ms");
Log.Info($"Fill journal: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storePart.CompleteAddingAndCompress();
storePart.CompleteAdding();
storePart.Compress();
sw.Stop();
Console.WriteLine($"Compress: {sw.ElapsedMilliseconds}ms");
Log.Info($"Compress: {sw.ElapsedMilliseconds}ms");
sw.Restart();
storePart.RebuildIndex();
sw.Stop();
Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms");
Log.Info($"Rebuild indexes: {sw.ElapsedMilliseconds}ms");
Console.WriteLine("Start merge test");
Log.Info("Start merge test");
sw.Restart();
var merger = store.CreateMergeAccessor(new Metadata { Date = new DateTime(2022, 11, 08) }, data => Compressor.DecodeBytesContent(data));
for (int i = 0; i < 1000000; i++)
for (int i = 0; i < 2300000; i++)
{
total++;
var s = Generate(r);
if (testData.ContainsKey(s) == false) testData[s] = new HashSet<ulong>();
var count = r.Next(300);
for (int j = 0; j < count; j++)
{
total++;
var t = Generate(r);
merger.Store(s, t);
testData[s].Add(t);
}
}
Console.WriteLine($"Merge journal filled: {sw.ElapsedMilliseconds}ms");
merger.CompleteAddingAndCompress();
Log.Info($"Merge journal filled: {sw.ElapsedMilliseconds}ms. Total data count: {total}");
merger.Compress(); // auto reindex
sw.Stop();
Console.WriteLine($"Compress after merge: {sw.ElapsedMilliseconds}ms");
sw.Restart();
merger.RebuildIndex();
sw.Stop();
Console.WriteLine($"Rebuild indexes after merge: {sw.ElapsedMilliseconds}ms");
Log.Info($"Compress after merge: {sw.ElapsedMilliseconds}ms");
Console.WriteLine("Test #1 reading");
Log.Info("Test #1 reading");
var readPart = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) });
ulong totalData = 0;
ulong totalKeys = 0;
foreach (var key in testKeys1)
{
Console.WriteLine($"\tKey: {key}");
var result = readPart.Find(key);
Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes");
totalData += (ulong)(result.Value?.Length ?? 0);
totalKeys++;
}
Console.WriteLine("Press to continue");
Console.ReadKey();
Console.WriteLine("Test #1 remove by keys");
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
totalData = 0;
totalKeys = 0;
Log.Info("Test #1 remove by keys");
for (int i = 0; i < testKeys1.Count; i++)
{
readPart.RemoveKey(testKeys1[i]);
readPart.RemoveKey(testKeys1[i], false);
}
Console.WriteLine("Test #1 reading after remove");
sw.Restart();
readPart.RebuildIndex();
sw.Stop();
Log.Info($"Rebuild indexes after remove: {sw.ElapsedMilliseconds}ms");
Log.Info("Test #1 reading after remove");
foreach (var key in testKeys1)
{
Console.WriteLine($"\tKey: {key}");
var result = readPart.Find(key);
Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes");
totalData += (ulong)(result.Value?.Length ?? 0);
totalKeys++;
}
Console.WriteLine("Press to continue");
Console.ReadKey();
Console.WriteLine();
Console.WriteLine("---------------------------------------");
Console.WriteLine();
Console.WriteLine("Test #2 reading");
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
totalData = 0;
totalKeys = 0;
Log.Info("Test #2 reading");
foreach (var key in testKeys2)
{
Console.WriteLine($"\tKey: {key}");
var result = readPart.Find(key);
Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes");
totalData += (ulong)(result.Value?.Length ?? 0);
totalKeys++;
}
Console.WriteLine("Press to continue");
Console.ReadKey();
Console.WriteLine("Test #2 remove keys batch");
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
totalData = 0;
totalKeys = 0;
Log.Info("Test #2 remove keys batch");
readPart.RemoveKeys(testKeys2);
Console.WriteLine("Test #2 reading after remove");
Log.Info("Test #2 reading after remove");
foreach (var key in testKeys2)
{
Console.WriteLine($"\tKey: {key}");
var result = readPart.Find(key);
Console.WriteLine($"\t\tFound: {result.Found}. {result.Value?.Length ?? 0} bytes");
totalData += (ulong)(result.Value?.Length ?? 0);
totalKeys++;
}
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
totalData = 0;
totalKeys = 0;
Console.WriteLine("Press to continue for iteration");
Console.ReadKey();
Log.Info("Iterator test");
foreach (var e in readPart.Iterate())
{
Console.WriteLine($"{e.Key}: {e.Value.Length}");
}
totalData += (ulong)(e.Value?.Length ?? 0);
totalKeys++;
}
Log.Info($"\t\tFound: {totalKeys} keys. {totalData} bytes");
totalData = 0;
totalKeys = 0;
Log.Info("Test stored data");
private static void TestReading(string root)
{
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
foreach (var test in testData)
{
Index = new IndexOptions { Enabled = true, FileIndexCount = 256 },
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 512).ToString()),
MergeFunction = list =>
if (test.Value.Count > 0 && testKeys1.Contains(test.Key) == false && testKeys2.Contains(test.Key) == false)
{
ulong s = 0;
return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
},
Partitions = new List<StoreCatalogPartition<Metadata>>
var result = Compressor.DecodeBytesContent(readPart.Find(test.Key).Value).ToHashSet();
if (test.Value.Count != result.Count)
{
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
};
var store = new Store<ulong, ulong, byte[], Metadata>(options);
var request = new StoreSearchRequest<ulong, Metadata>
{
PartitionSearchRequests = new List<PartitionSearchRequest<ulong, Metadata>>
{
new PartitionSearchRequest<ulong, Metadata>
{
Info = new Metadata { Date = new DateTime(2022, 11, 08) },
Keys = new ulong[] { }
},
new PartitionSearchRequest<ulong, Metadata>
{
Info = new Metadata { Date = new DateTime(2022, 11, 09) },
Keys = new ulong[] { }
}
Log.Info($"Key '{test.Key}' not found!");
continue;
}
};
var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) });
Console.WriteLine($"Incoming data files: {storeIncoming.CountDataFiles()}");
var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 09) });
Console.WriteLine($"Outcoming data files: {storeOutcoming.CountDataFiles()}");
var sw = new Stopwatch();
sw.Start();
var result = store.Search(request).Result;
foreach (var r in result.Results)
{
foreach (var mr in r.Value)
foreach (var t in test.Value)
{
Console.WriteLine($"\tKey: {mr.Key}. Sucess: {mr.Found}");
if (mr.Found && mr.Value.Length > 0)
if (result.Contains(t) == false)
{
var ctns = Compressor.DecodeBytesContent(mr.Value);
Console.WriteLine($"\t\t{string.Join(';', ctns)}");
Log.Info($"Value '{t}' from test data missed in base");
}
}
}
sw.Stop();
Console.WriteLine($"Search time: {sw.ElapsedMilliseconds}ms");
}
private static void TestIterations(string root)
{
var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{
Index = new IndexOptions { Enabled = true, FileIndexCount = 256 },
RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 512).ToString()),
MergeFunction = list =>
{
ulong s = 0;
return Compressor.GetEncodedBytes(list.OrderBy(c => c), ref s);
},
Partitions = new List<StoreCatalogPartition<Metadata>>
{
new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd"))
},
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
};
var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08) });
foreach (var r in storeIncoming.Iterate())
{
Console.WriteLine($"{r.Key}: {r.Value.Length}");
}
Log.Info("Completed");
}
static void Main(string[] args)
{
Log.AddConsoleLogger(ZeroLevel.Logging.LogLevel.FullDebug);
var root = @"H:\temp";
//SmallFullTest(root);
TestBuildRemoveStore(root);
//BuildStore(root);
//TestReading(root);
//FastTest(root);
FullStoreTest(root);
//TestIterations(root);
//TestRangeCompressionAndInversion();
Console.ReadKey();

@ -15,6 +15,7 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly Func<TKey, TKey, int> _keyComparer;
private readonly string _indexFolder;
private readonly bool _indexExists = false;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly TMeta _meta;
public StorePartitionSparseIndex(string partitionFolder, TMeta meta,
StoreFilePartition<TKey, TMeta> filePartition,
@ -25,6 +26,7 @@ namespace ZeroLevel.Services.PartitionStorage
_meta = meta;
_keyComparer = keyComparer;
_filePartition = filePartition;
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
}
public KeyIndex<TKey> GetOffset(TKey key)
@ -110,7 +112,7 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var k = _keyDeserializer.Invoke(reader);
var o = reader.ReadLong();
list.Add(new KeyIndex<TKey> { Key = k, Offset = o });
}

@ -1,26 +1,38 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using ZeroLevel.Services.PartitionStorage.Interfaces;
namespace ZeroLevel.Services.PartitionStorage
{
/// <summary>
/// Partition store interface
/// </summary>
/// <typeparam name="TKey">Record key</typeparam>
/// <typeparam name="TInput">The value that is written in the stream</typeparam>
/// <typeparam name="TValue">Value after compression of TInput values by duplicate keys (TInput list or similar)</typeparam>
/// <typeparam name="TMeta">Meta information for partition search</typeparam>
/// <typeparam name="TKey">Key type</typeparam>
/// <typeparam name="TInput">Value type</typeparam>
/// <typeparam name="TValue">The type of compressed array of values for the key</typeparam>
/// <typeparam name="TMeta">Metadata for creating or searching for a partition</typeparam>
public interface IStore<TKey, TInput, TValue, TMeta>
{
/// <summary>
/// Returns an object to create a partition
/// </summary>
IStorePartitionBuilder<TKey, TInput, TValue> CreateBuilder(TMeta info);
IStorePartitionBuilder<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor);
/// <summary>
/// Returns an object to overwrite data in an existing partition
/// </summary>
IStorePartitionMergeBuilder<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor);
/// <summary>
/// Creates an object to access the data in the partition
/// </summary>
IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info);
/// <summary>
/// Performs a search for data in the repository
/// </summary>
Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest);
/// <summary>
/// Deleting a partition
/// </summary>
void RemovePartition(TMeta info);
}
}

@ -1,12 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ZeroLevel.Services.PartitionStorage.Interfaces
{
public interface IStoreCache<TKey, TInput, TValue, TMeta>
{
}
}

@ -26,9 +26,9 @@ namespace ZeroLevel.Services.PartitionStorage
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate();
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key);
void RemoveKey(TKey key);
void RemoveKeys(IEnumerable<TKey> keys);
void RemoveAllExceptKey(TKey key);
void RemoveAllExceptKeys(IEnumerable<TKey> keys);
void RemoveKey(TKey key, bool autoReindex = false);
void RemoveKeys(IEnumerable<TKey> keys, bool autoReindex = true);
void RemoveAllExceptKey(TKey key, bool autoReindex = false);
void RemoveAllExceptKeys(IEnumerable<TKey> keys, bool autoReindex = true);
}
}

@ -2,12 +2,6 @@
namespace ZeroLevel.Services.PartitionStorage
{
public class InsertValue<TKey, TInput>
{
public TKey Key;
public TInput Value;
}
/// <summary>
/// Provides write operations in catalog partition
/// </summary>
@ -17,15 +11,22 @@ namespace ZeroLevel.Services.PartitionStorage
public interface IStorePartitionBuilder<TKey, TInput, TValue>
: IStorePartitionBase<TKey, TInput, TValue>
{
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TInput>> Iterate();
/// <summary>
/// Save one record
/// Writing a key-value pair
/// </summary>
void Store(TKey key, TInput value);
/// <summary>
/// Complete the recording and perform the conversion of the records from
/// (TKey; TInput) to (TKey; TValue)
/// Called after all key-value pairs are written to the partition
/// </summary>
void CompleteAdding();
/// <summary>
/// Perform the conversion of the records from (TKey; TInput) to (TKey; TValue). Called after CompleteAdding
/// </summary>
void Compress();
/// <summary>
/// Rebuilds index files. Only for compressed data.
/// </summary>
void CompleteAddingAndCompress();
void RebuildIndex();
}
}

@ -2,7 +2,13 @@
{
internal interface IStorePartitionIndex<TKey>
{
/// <summary>
/// Search for the offset of the closest to the specified key.
/// </summary>
KeyIndex<TKey> GetOffset(TKey key);
/// <summary>
/// Search for offsets of the keys closest to the specified ones.
/// </summary>
KeyIndex<TKey>[] GetOffset(TKey[] keys, bool inOneGroup);
}
}

@ -0,0 +1,21 @@
namespace ZeroLevel.Services.PartitionStorage.Interfaces
{
/// <summary>
/// Provides write operations in catalog partition
/// </summary>
/// <typeparam name="TKey">Key type</typeparam>
/// <typeparam name="TInput">Type of one input value</typeparam>
/// <typeparam name="TValue">Type of records aggregate</typeparam>
public interface IStorePartitionMergeBuilder<TKey, TInput, TValue>
: IStorePartitionBase<TKey, TInput, TValue>
{
/// <summary>
/// Writing a key-value pair
/// </summary>
void Store(TKey key, TInput value);
/// <summary>
/// Perform the conversion of the records from (TKey; TInput) to (TKey; TValue). Called after CompleteAdding
/// </summary>
void Compress();
}
}

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using ZeroLevel.Services.PartitionStorage.Interfaces;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
@ -13,7 +14,7 @@ namespace ZeroLevel.Services.PartitionStorage
///
/// </summary>
public class StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>
: IStorePartitionBuilder<TKey, TInput, TValue>
: IStorePartitionMergeBuilder<TKey, TInput, TValue>
{
private readonly Func<TValue, IEnumerable<TInput>> _decompress;
/// <summary>
@ -22,6 +23,8 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly IStorePartitionAccessor<TKey, TInput, TValue> _accessor;
private readonly string _temporaryFolder;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
/// <summary>
/// Write catalog
@ -37,33 +40,21 @@ namespace ZeroLevel.Services.PartitionStorage
var tempOptions = options.Clone();
tempOptions.RootFolder = _temporaryFolder;
_temporaryAccessor = new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(tempOptions, info);
}
private IEnumerable<StorePartitionKeyValueSearchResult<TKey, IEnumerable<TInput>>>
IterateReadKeyInputs(string filePath)
{
if (File.Exists(filePath))
{
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
using (var reader = new MemoryStreamReader(stream))
{
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var input = _decompress(v);
yield return
new StorePartitionKeyValueSearchResult<TKey, IEnumerable<TInput>>
{
Key = k,
Value = input,
Found = true
};
}
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
}
}
}
public void CompleteAddingAndCompress()
#region API methods
/// <summary>
/// Deletes only new entries. Existing entries remain unchanged.
/// </summary>
public void DropData() => _temporaryAccessor.DropData();
public string GetCatalogPath() => _accessor.GetCatalogPath();
public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value);
public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(),
_temporaryAccessor.CountDataFiles());
public void Compress()
{
var newFiles = Directory.GetFiles(_temporaryAccessor.GetCatalogPath());
@ -90,7 +81,7 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
(_temporaryAccessor as StorePartitionBuilder<TKey, TInput, TValue, TMeta>).CloseStreams();
_temporaryAccessor.CompleteAdding();
// compress new file
foreach (var file in newFiles)
@ -102,24 +93,51 @@ namespace ZeroLevel.Services.PartitionStorage
// replace old file by new
foreach (var file in newFiles)
{
// 1. Remove index file
(_accessor as StorePartitionAccessor<TKey, TInput, TValue, TMeta>)
.DropFileIndex(file);
// 2. Replace source
var name = Path.GetFileName(file);
File.Move(file, Path.Combine(folder, name), true);
// 3. Rebuil index
(_accessor as StorePartitionAccessor<TKey, TInput, TValue, TMeta>)
.RebuildFileIndex(file);
}
}
// remove temporary files
_temporaryAccessor.DropData();
Directory.Delete(_temporaryFolder, true);
}
#endregion
/// <summary>
/// Deletes only new entries. Existing entries remain unchanged.
/// </summary>
public void DropData() => _temporaryAccessor.DropData();
public string GetCatalogPath() => _accessor.GetCatalogPath();
public void RebuildIndex() => _accessor.RebuildIndex();
public void Store(TKey key, TInput value) => _temporaryAccessor.Store(key, value);
public int CountDataFiles() => Math.Max(_accessor.CountDataFiles(),
_temporaryAccessor.CountDataFiles());
#region Private methods
private IEnumerable<StorePartitionKeyValueSearchResult<TKey, IEnumerable<TInput>>>
IterateReadKeyInputs(string filePath)
{
if (File.Exists(filePath))
{
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
using (var reader = new MemoryStreamReader(stream))
{
while (reader.EOS == false)
{
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var input = _decompress(v);
yield return
new StorePartitionKeyValueSearchResult<TKey, IEnumerable<TInput>>
{
Key = k,
Value = input,
Found = true
};
}
}
}
}
#endregion
public void Dispose()
{

@ -15,6 +15,9 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly string _indexCatalog;
private readonly TMeta _info;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
public string Catalog { get { return _catalog; } }
public StorePartitionAccessor(StoreOptions<TKey, TInput, TValue, TMeta> options, TMeta info)
{
@ -26,12 +29,14 @@ namespace ZeroLevel.Services.PartitionStorage
{
_indexCatalog = Path.Combine(_catalog, "__indexes__");
}
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
}
#region API methods
public int CountDataFiles() => Directory.GetFiles(_catalog)?.Length ?? 0;
public string GetCatalogPath() => _catalog;
public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key)
{
var fileName = _options.GetFileName(key, _info);
@ -48,12 +53,12 @@ namespace ZeroLevel.Services.PartitionStorage
{
if (startOffset > 0)
{
reader.Stream.Seek(startOffset, SeekOrigin.Begin);
reader.Seek(startOffset, SeekOrigin.Begin);
}
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var c = _options.KeyComparer(key, k);
if (c == 0) return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
@ -100,8 +105,8 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Found = true };
}
}
@ -117,8 +122,8 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Found = true };
}
}
@ -139,8 +144,53 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
}
public void RemoveAllExceptKey(TKey key, bool autoReindex = false)
{
RemoveAllExceptKeys(new[] { key }, autoReindex);
}
public void RemoveAllExceptKeys(IEnumerable<TKey> keys, bool autoReindex = true)
{
var results = keys.Distinct()
.GroupBy(
k => _options.GetFileName(k, _info),
k => k, (key, g) => new { FileName = key, Keys = g.OrderBy(k => k).ToArray() });
foreach (var group in results)
{
RemoveKeyGroup(group.FileName, group.Keys, false, autoReindex);
}
}
public void RemoveKey(TKey key, bool autoReindex = false)
{
RemoveKeys(new[] { key }, autoReindex);
}
public void RemoveKeys(IEnumerable<TKey> keys, bool autoReindex = true)
{
var results = keys.Distinct()
.GroupBy(
k => _options.GetFileName(k, _info),
k => k, (key, g) => new { FileName = key, Keys = g.OrderBy(k => k).ToArray() });
foreach (var group in results)
{
RemoveKeyGroup(group.FileName, group.Keys, true, autoReindex);
}
}
#endregion
private void RebuildFileIndex(string file)
#region Internal methods
internal void DropFileIndex(string file)
{
if (_options.Index.Enabled)
{
var index_file = Path.Combine(_indexCatalog, Path.GetFileName(file));
if (File.Exists(index_file))
{
File.Delete(index_file);
}
}
}
internal void RebuildFileIndex(string file)
{
if (_options.Index.Enabled)
{
if (false == Directory.Exists(_indexCatalog))
{
@ -151,10 +201,10 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
var pos = reader.Stream.Position;
var k = reader.ReadCompatible<TKey>();
var pos = reader.Position;
var k = _keyDeserializer.Invoke(reader);
dict[k] = pos;
reader.ReadCompatible<TValue>();
_valueDeserializer.Invoke(reader);
}
}
if (dict.Count > _options.Index.FileIndexCount * 8)
@ -174,6 +224,8 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
}
}
#endregion
#region Private methods
private IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(string fileName,
@ -191,11 +243,11 @@ namespace ZeroLevel.Services.PartitionStorage
{
var searchKey = keys[i];
var off = offsets[i];
reader.Stream.Seek(off.Offset, SeekOrigin.Begin);
reader.Seek(off.Offset, SeekOrigin.Begin);
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
@ -223,8 +275,8 @@ namespace ZeroLevel.Services.PartitionStorage
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var k = _keyDeserializer.Invoke(reader);
var v = _valueDeserializer.Invoke(reader);
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{
@ -253,59 +305,14 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
private MemoryStreamReader GetReadStream(string fileName)
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
return new MemoryStreamReader(stream);
}
#endregion
public void Dispose()
{
}
public void RemoveAllExceptKey(TKey key)
{
RemoveAllExceptKeys(new[] { key });
}
public void RemoveAllExceptKeys(IEnumerable<TKey> keys)
{
var results = keys.Distinct()
.GroupBy(
k => _options.GetFileName(k, _info),
k => k, (key, g) => new { FileName = key, Keys = g.OrderBy(k => k).ToArray() });
foreach (var group in results)
{
RemoveKeyGroup(group.FileName, group.Keys, false);
}
}
public void RemoveKey(TKey key)
{
RemoveKeys(new[] { key });
}
public void RemoveKeys(IEnumerable<TKey> keys)
{
var results = keys.Distinct()
.GroupBy(
k => _options.GetFileName(k, _info),
k => k, (key, g) => new { FileName = key, Keys = g.OrderBy(k => k).ToArray() });
foreach (var group in results)
{
RemoveKeyGroup(group.FileName, group.Keys, true);
}
}
private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove)
private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove, bool autoReindex)
{
var filePath = Path.Combine(_catalog, fileName);
if (File.Exists(filePath))
{
// 1. Find ranges
var ranges = new List<FilePositionRange>();
if (_options.Index.Enabled)
if (_options.Index.Enabled && autoReindex)
{
var index = new StorePartitionSparseIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer);
var offsets = index.GetOffset(keys, true);
@ -315,14 +322,13 @@ namespace ZeroLevel.Services.PartitionStorage
{
var searchKey = keys[i];
var off = offsets[i];
reader.Stream.Seek(off.Offset, SeekOrigin.Begin);
reader.Seek(off.Offset, SeekOrigin.Begin);
while (reader.EOS == false)
{
var startPosition = reader.Stream.Position;
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var endPosition = reader.Stream.Position;
var startPosition = reader.Position;
var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
var endPosition = reader.Position;
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
@ -344,11 +350,10 @@ namespace ZeroLevel.Services.PartitionStorage
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{
var startPosition = reader.Stream.Position;
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var endPosition = reader.Stream.Position;
var startPosition = reader.Position;
var k = _keyDeserializer.Invoke(reader);
_valueDeserializer.Invoke(reader);
var endPosition = reader.Position;
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{
@ -402,13 +407,22 @@ namespace ZeroLevel.Services.PartitionStorage
File.Move(tempFile, filePath, true);
// Rebuild index if needs
if (_options.Index.Enabled)
if (_options.Index.Enabled && autoReindex)
{
RebuildFileIndex(filePath);
}
}
}
private MemoryStreamReader GetReadStream(string fileName)
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
return new MemoryStreamReader(stream);
}
#endregion
#region Static
private static void RangeCompression(List<FilePositionRange> ranges)
{
for (var i = 0; i < ranges.Count - 1; i++)
@ -456,5 +470,10 @@ namespace ZeroLevel.Services.PartitionStorage
source.Read(buffer, 0, buffer.Length);
target.Write(buffer, 0, buffer.Length);
}
#endregion
public void Dispose()
{
}
}
}

@ -3,6 +3,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Serialization;
@ -18,7 +19,12 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly string _catalog;
private readonly TMeta _info;
private readonly Action<MemoryStreamWriter, TKey> _keySerializer;
private readonly Action<MemoryStreamWriter, TInput> _inputSerializer;
private readonly Func<MemoryStreamReader, TKey> _keyDeserializer;
private readonly Func<MemoryStreamReader, TInput> _inputDeserializer;
private readonly Func<MemoryStreamReader, TValue> _valueDeserializer;
public string Catalog { get { return _catalog; } }
public StorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options, TMeta info)
{
@ -30,28 +36,73 @@ namespace ZeroLevel.Services.PartitionStorage
{
Directory.CreateDirectory(_catalog);
}
_keySerializer = MessageSerializer.GetSerializer<TKey>();
_inputSerializer = MessageSerializer.GetSerializer<TInput>();
_keyDeserializer = MessageSerializer.GetDeserializer<TKey>();
_inputDeserializer = MessageSerializer.GetDeserializer<TInput>();
_valueDeserializer = MessageSerializer.GetDeserializer<TValue>();
}
#region API methods
public int CountDataFiles() => Directory.GetFiles(_catalog)?.Length ?? 0;
public string GetCatalogPath() => _catalog;
public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
public void Store(TKey key, TInput value)
{
var fileName = _options.GetFileName(key, _info);
var stream = GetWriteStream(fileName);
stream.SerializeCompatible(key);
stream.SerializeCompatible(value);
_keySerializer.Invoke(stream, key);
Thread.MemoryBarrier();
_inputSerializer.Invoke(stream, value);
}
public void CompleteAdding()
{
// Close all write streams
foreach (var s in _writeStreams)
{
try
{
s.Value.Stream.Flush();
s.Value.Dispose();
}
catch { }
}
_writeStreams.Clear();
}
public void CompleteAddingAndCompress()
public void Compress()
{
CloseStreams();
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 0)
{
Parallel.ForEach(files, file => CompressFile(file));
}
}
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TInput>> Iterate()
{
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 0)
{
foreach (var file in files)
{
using (var reader = GetReadStream(Path.GetFileName(file)))
{
while (reader.EOS == false)
{
var key = _keyDeserializer.Invoke(reader);
if (reader.EOS)
{
yield return new StorePartitionKeyValueSearchResult<TKey, TInput> { Key = key, Value = default, Found = true };
break;
}
var val = _inputDeserializer.Invoke(reader);
yield return new StorePartitionKeyValueSearchResult<TKey, TInput> { Key = key, Value = val, Found = true };
}
}
}
}
}
public void RebuildIndex()
{
if (_options.Index.Enabled)
@ -70,9 +121,10 @@ namespace ZeroLevel.Services.PartitionStorage
while (reader.EOS == false)
{
var pos = reader.Stream.Position;
var k = reader.ReadCompatible<TKey>();
dict[k] = pos;
reader.ReadCompatible<TValue>();
var key = _keyDeserializer.Invoke(reader);
dict[key] = pos;
if (reader.EOS) break;
_valueDeserializer.Invoke(reader);
}
}
if (dict.Count > _options.Index.FileIndexCount * 8)
@ -95,21 +147,9 @@ namespace ZeroLevel.Services.PartitionStorage
}
}
}
#endregion
#region Private methods
internal void CloseStreams()
{
// Close all write streams
foreach (var s in _writeStreams)
{
try
{
s.Value.Dispose();
}
catch { }
}
}
internal void CompressFile(string file)
{
var dict = new Dictionary<TKey, HashSet<TInput>>();
@ -117,13 +157,17 @@ namespace ZeroLevel.Services.PartitionStorage
{
while (reader.EOS == false)
{
TKey k = reader.ReadCompatible<TKey>();
TInput v = reader.ReadCompatible<TInput>();
if (false == dict.ContainsKey(k))
var key = _keyDeserializer.Invoke(reader);
if (false == dict.ContainsKey(key))
{
dict[k] = new HashSet<TInput>();
dict[key] = new HashSet<TInput>();
}
dict[k].Add(v);
if (reader.EOS)
{
break;
}
var input = _inputDeserializer.Invoke(reader);
dict[key].Add(input);
}
}
var tempPath = Path.GetTempPath();
@ -157,6 +201,7 @@ namespace ZeroLevel.Services.PartitionStorage
return new MemoryStreamReader(stream);
}
#endregion
public void Dispose()
{
}

@ -5,6 +5,7 @@ using System.IO;
using System.Linq;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.PartitionStorage.Interfaces;
namespace ZeroLevel.Services.PartitionStorage
{
@ -39,7 +40,7 @@ namespace ZeroLevel.Services.PartitionStorage
return new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(_options, info);
}
public IStorePartitionBuilder<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor)
public IStorePartitionMergeBuilder<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor)
{
return new StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, decompressor);
}

@ -48,6 +48,13 @@ namespace ZeroLevel.Services.Serialization
_stream = reader._stream;
}
public void Seek(long offset, SeekOrigin origin)
{
_stream.Seek(offset, origin);
}
public long Position => _stream.Position;
/// <summary>
/// Flag reading
/// </summary>

@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization;
namespace ZeroLevel.Services.Serialization
{
@ -29,6 +30,30 @@ namespace ZeroLevel.Services.Serialization
}
}
public static Action<MemoryStreamWriter, T> GetSerializer<T>()
{
var t = typeof(T);
if (t.IsAssignableTo(typeof(IBinarySerializable)))
{
return (w, o) => ((IBinarySerializable)o).Serialize(w);
}
return (w, o) => PrimitiveTypeSerializer.Serialize<T>(w, o);
}
public static Func<IBinaryReader, T> GetDeserializer<T>()
{
if (typeof(IBinarySerializable).IsAssignableFrom(typeof(T)))
{
return (r) =>
{
var o = (IBinarySerializable)FormatterServices.GetUninitializedObject(typeof(T));
o.Deserialize(r);
return (T)o;
};
}
return (r) => PrimitiveTypeSerializer.Deserialize<T>(r);
}
public static byte[] SerializeCompatible(object obj)
{
if (null == obj)

Loading…
Cancel
Save

Powered by TurnKey Linux.