PartitionStorage refactoring

pull/4/head
Ogoun 2 years ago
parent 2bf1605bee
commit b070b56894

@ -102,10 +102,11 @@ namespace PartitionFileStorageTest
private static void BuildStore(string source, string root) private static void BuildStore(string source, string root)
{ {
var options = new IStoreOptions<ulong, ulong, byte[], Metadata> var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{ {
Index = new IndexOptions { Enabled = true, FileIndexCount = 256 },
RootFolder = root, RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 1000).ToString()), FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 512).ToString()),
MergeFunction = list => MergeFunction = list =>
{ {
ulong s = 0; ulong s = 0;
@ -118,12 +119,11 @@ namespace PartitionFileStorageTest
}, },
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1, KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
}; };
options.Index.Enabled = true;
var store = new Store<ulong, ulong, byte[], Metadata>(options); var store = new Store<ulong, ulong, byte[], Metadata>(options);
var storeIncoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true }); var storeIncoming = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = true });
var storeOutcoming = store.CreateAccessor(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false }); var storeOutcoming = store.CreateBuilder(new Metadata { Date = new DateTime(2022, 11, 08), Incoming = false });
var parser = new CallRecordParser(); var parser = new CallRecordParser();
var sw = new Stopwatch(); var sw = new Stopwatch();
sw.Start(); sw.Start();
@ -165,17 +165,13 @@ namespace PartitionFileStorageTest
Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms"); Console.WriteLine($"Rebuild indexes: {sw.ElapsedMilliseconds}ms");
} }
private static void TestReading(string source, string root) private static void TestReading(string root)
{ {
var options = new IStoreOptions<ulong, ulong, byte[], Metadata> var options = new StoreOptions<ulong, ulong, byte[], Metadata>
{ {
Index = new IndexOptions Index = new IndexOptions { Enabled = true, FileIndexCount = 256 },
{
Enabled = false,
FileIndexCount = 64
},
RootFolder = root, RootFolder = root,
FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 1000).ToString()), FilePartition = new StoreFilePartition<ulong, Metadata>("Last three digits", (ctn, date) => (ctn % 512).ToString()),
MergeFunction = list => MergeFunction = list =>
{ {
ulong s = 0; ulong s = 0;
@ -183,10 +179,10 @@ namespace PartitionFileStorageTest
}, },
Partitions = new List<StoreCatalogPartition<Metadata>> Partitions = new List<StoreCatalogPartition<Metadata>>
{ {
new StoreCatalogPartition<Metadata>("timestamp", m => m.Date.ToString("yyyyMMdd")), new StoreCatalogPartition<Metadata>("Date", m => m.Date.ToString("yyyyMMdd")),
new StoreCatalogPartition<Metadata>("timestamp", m => m.Incoming ? "incoming" : "outcoming") new StoreCatalogPartition<Metadata>("Date", m => m.Incoming ? "incoming" : "outcoming")
}, },
KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1 KeyComparer = (left, right) => left == right ? 0 : (left < right) ? -1 : 1,
}; };
var store = new Store<ulong, ulong, byte[], Metadata>(options); var store = new Store<ulong, ulong, byte[], Metadata>(options);
var request = new StoreSearchRequest<ulong, Metadata> var request = new StoreSearchRequest<ulong, Metadata>
@ -245,61 +241,12 @@ namespace PartitionFileStorageTest
return arr; return arr;
} }
private static KeyIndex<long> BinarySearchInIndex(KeyIndex<long>[] index,
long key,
Func<long, long, int> keyComparer,
ref int position)
{
if (index == null || index.Length == 0)
{
return new KeyIndex<long> { Key = key, Offset = 0 };
}
int left = position;
int right = index.Length - 1;
int mid = 0;
long test;
while (left <= right)
{
mid = (int)Math.Floor((right + left) / 2d);
test = index[mid].Key;
var c = keyComparer(test, key);
if (c < 0)
{
left = mid + 1;
}
else if (c > 0)
{
right = mid - 1;
}
else
{
position = mid;
return index[mid];
}
}
position = mid;
return index[mid];
}
static void Main(string[] args) static void Main(string[] args)
{ {
var root = @"H:\temp"; var root = @"H:\temp";
var source = @"H:\319a9c31-d823-4dd1-89b0-7fb1bb9c4859.txt"; var source = @"H:\319a9c31-d823-4dd1-89b0-7fb1bb9c4859.txt";
//BuildStore(source, root); //BuildStore(source, root);
TestReading(source, root); TestReading(root);
/*
Func<long, long, int> keyComparer =
(left, right) =>
(left == right) ? 0 : (left < right) ? -1 : 1;
var indexes = Generate(77);
int position = 0;
for (long i = 65; i < 700; i++)
{
var ind = BinarySearchInIndex(indexes, i, keyComparer, ref position);
Console.WriteLine($"{i}: {ind.Offset}. [{ind.Key}]");
}
*/
Console.ReadKey(); Console.ReadKey();
} }
} }

@ -0,0 +1,8 @@
namespace ZeroLevel.Services.PartitionStorage
{
internal struct KeyIndex<TKey>
{
public TKey Key { get; set; }
public long Offset { get; set; }
}
}

@ -5,13 +5,7 @@ using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
{ {
internal struct KeyIndex<TKey> internal class StorePartitionSparseIndex<TKey, TMeta>
{
public TKey Key { get; set; }
public long Offset { get; set; }
}
internal class StorePartitionIndex<TKey, TMeta>
: IStorePartitionIndex<TKey> : IStorePartitionIndex<TKey>
{ {
private readonly Dictionary<string, KeyIndex<TKey>[]> _indexCachee private readonly Dictionary<string, KeyIndex<TKey>[]> _indexCachee
@ -22,7 +16,7 @@ namespace ZeroLevel.Services.PartitionStorage
private readonly string _indexFolder; private readonly string _indexFolder;
private readonly bool _indexExists = false; private readonly bool _indexExists = false;
private readonly TMeta _meta; private readonly TMeta _meta;
public StorePartitionIndex(string partitionFolder, TMeta meta, public StorePartitionSparseIndex(string partitionFolder, TMeta meta,
StoreFilePartition<TKey, TMeta> filePartition, StoreFilePartition<TKey, TMeta> filePartition,
Func<TKey, TKey, int> keyComparer) Func<TKey, TKey, int> keyComparer)
{ {

@ -13,9 +13,11 @@ namespace ZeroLevel.Services.PartitionStorage
/// <typeparam name="TMeta">Meta information for partition search</typeparam> /// <typeparam name="TMeta">Meta information for partition search</typeparam>
public interface IStore<TKey, TInput, TValue, TMeta> public interface IStore<TKey, TInput, TValue, TMeta>
{ {
IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info); IStorePartitionBuilder<TKey, TInput, TValue> CreateBuilder(TMeta info);
IStorePartitionBuilder<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor);
IStorePartitionAccessor<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor); IStorePartitionAccessor<TKey, TInput, TValue> CreateAccessor(TMeta info);
Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest); Task<StoreSearchResult<TKey, TValue, TMeta>> Search(StoreSearchRequest<TKey, TMeta> searchRequest);
} }

@ -1,28 +1,16 @@
using System; using System.Collections.Generic;
using System.Collections.Generic;
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
{ {
/// <summary> /// <summary>
/// Provides read-write operations in catalog partition /// Provides read/reindex operations in catalog partition
/// </summary> /// </summary>
/// <typeparam name="TKey">Key type</typeparam> /// <typeparam name="TKey">Key type</typeparam>
/// <typeparam name="TInput">Type of one input value</typeparam> /// <typeparam name="TInput">Type of one input value</typeparam>
/// <typeparam name="TValue">Type of records aggregate</typeparam> /// <typeparam name="TValue">Type of records aggregate</typeparam>
public interface IStorePartitionAccessor<TKey, TInput, TValue> public interface IStorePartitionAccessor<TKey, TInput, TValue>
: IDisposable : IStorePartitionBase<TKey, TInput, TValue>
{ {
string GetCatalogPath();
/// <summary>
/// Has any files
/// </summary>
int CountDataFiles();
/// <summary>
/// Remove all files
/// </summary>
void DropData();
#region API !only after data compression!
/// <summary> /// <summary>
/// Rebuild indexes /// Rebuild indexes
/// </summary> /// </summary>
@ -37,18 +25,5 @@ namespace ZeroLevel.Services.PartitionStorage
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys); IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys);
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate(); IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate();
IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key); IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key);
#endregion
#region API !only before data compression!
/// <summary>
/// Save one record
/// </summary>
void Store(TKey key, TInput value);
/// <summary>
/// Complete the recording and perform the conversion of the records from
/// (TKey; TInput) to (TKey; TValue)
/// </summary>
void CompleteAddingAndCompress();
#endregion
} }
} }

@ -0,0 +1,24 @@
using System;
namespace ZeroLevel.Services.PartitionStorage
{
/// <summary>
/// Provides common operations in catalog partition
/// </summary>
/// <typeparam name="TKey">Key type</typeparam>
/// <typeparam name="TInput">Type of one input value</typeparam>
/// <typeparam name="TValue">Type of records aggregate</typeparam>
public interface IStorePartitionBase<TKey, TInput, TValue>
: IDisposable
{
string GetCatalogPath();
/// <summary>
/// Has any files
/// </summary>
int CountDataFiles();
/// <summary>
/// Remove all files
/// </summary>
void DropData();
}
}

@ -0,0 +1,23 @@
namespace ZeroLevel.Services.PartitionStorage
{
/// <summary>
/// Provides write operations in catalog partition
/// </summary>
/// <typeparam name="TKey">Key type</typeparam>
/// <typeparam name="TInput">Type of one input value</typeparam>
/// <typeparam name="TValue">Type of records aggregate</typeparam>
public interface IStorePartitionBuilder<TKey, TInput, TValue>
: IStorePartitionBase<TKey, TInput, TValue>
{
/// <summary>
/// Save one record
/// </summary>
void Store(TKey key, TInput value);
/// <summary>
/// Complete the recording and perform the conversion of the records from
/// (TKey; TInput) to (TKey; TValue)
/// </summary>
void CompleteAddingAndCompress();
void RebuildIndex();
}
}

@ -1,6 +1,4 @@
using System.Collections.Generic; namespace ZeroLevel.Services.PartitionStorage
namespace ZeroLevel.Services.PartitionStorage
{ {
internal interface IStorePartitionIndex<TKey> internal interface IStorePartitionIndex<TKey>
{ {

@ -0,0 +1,17 @@
namespace ZeroLevel.Services.PartitionStorage
{
public class CacheOptions
{
public bool UsePersistentCache { get; set; }
public string PersistentCacheFolder { get; set; } = "cachee";
public int PersistentCacheRemoveTimeoutInSeconds { get; set; } = 3600;
public bool UseMemoryCache { get; set; }
public int MemoryCacheLimitInMb { get; set; } = 1024;
public int MemoryCacheRemoveTimeoutInSeconds { get; set; } = 900;
}
}

@ -0,0 +1,8 @@
namespace ZeroLevel.Services.PartitionStorage
{
public class IndexOptions
{
public bool Enabled { get; set; }
public int FileIndexCount { get; set; } = 64;
}
}

@ -6,12 +6,6 @@ using ZeroLevel.Services.FileSystem;
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
{ {
public class IndexOptions
{
public bool Enabled { get; set; }
public int FileIndexCount { get; set; } = 64;
}
/// <summary> /// <summary>
/// Options /// Options
/// </summary> /// </summary>
@ -19,7 +13,7 @@ namespace ZeroLevel.Services.PartitionStorage
/// <typeparam name="TInput">The value that is written in the stream</typeparam> /// <typeparam name="TInput">The value that is written in the stream</typeparam>
/// <typeparam name="TValue">Value after compression of TInput values by duplicate keys (TInput list or similar)</typeparam> /// <typeparam name="TValue">Value after compression of TInput values by duplicate keys (TInput list or similar)</typeparam>
/// <typeparam name="TMeta">Meta information for partition search</typeparam> /// <typeparam name="TMeta">Meta information for partition search</typeparam>
public class IStoreOptions<TKey, TInput, TValue, TMeta> public class StoreOptions<TKey, TInput, TValue, TMeta>
{ {
/// <summary> /// <summary>
/// Method for key comparison /// Method for key comparison
@ -47,7 +41,17 @@ namespace ZeroLevel.Services.PartitionStorage
/// </summary> /// </summary>
public StoreFilePartition<TKey, TMeta> FilePartition { get; set; } public StoreFilePartition<TKey, TMeta> FilePartition { get; set; }
public IndexOptions Index { get; set; } = new IndexOptions { Enabled = false, FileIndexCount = 64 }; public IndexOptions Index { get; set; } = new IndexOptions
{
Enabled = false,
FileIndexCount = 64
};
public CacheOptions Cache { get; set; } = new CacheOptions
{
UseMemoryCache = false,
UsePersistentCache = false
};
internal string GetFileName(TKey key, TMeta info) internal string GetFileName(TKey key, TMeta info)
{ {
@ -69,9 +73,9 @@ namespace ZeroLevel.Services.PartitionStorage
return path; return path;
} }
public IStoreOptions<TKey, TInput, TValue, TMeta> Clone() public StoreOptions<TKey, TInput, TValue, TMeta> Clone()
{ {
var options = new IStoreOptions<TKey, TInput, TValue, TMeta> var options = new StoreOptions<TKey, TInput, TValue, TMeta>
{ {
Index = new IndexOptions Index = new IndexOptions
{ {
@ -85,7 +89,16 @@ namespace ZeroLevel.Services.PartitionStorage
Partitions = this.Partitions Partitions = this.Partitions
.Select(p => new StoreCatalogPartition<TMeta>(p.Name, p.PathExtractor)) .Select(p => new StoreCatalogPartition<TMeta>(p.Name, p.PathExtractor))
.ToList(), .ToList(),
RootFolder = this.RootFolder RootFolder = this.RootFolder,
Cache = new CacheOptions
{
MemoryCacheLimitInMb = this.Cache.MemoryCacheLimitInMb,
MemoryCacheRemoveTimeoutInSeconds = this.Cache.MemoryCacheRemoveTimeoutInSeconds,
PersistentCacheFolder = this.Cache.PersistentCacheFolder,
PersistentCacheRemoveTimeoutInSeconds = this.Cache.PersistentCacheRemoveTimeoutInSeconds,
UseMemoryCache = this.Cache.UseMemoryCache,
UsePersistentCache = this.Cache.UsePersistentCache
}
}; };
return options; return options;
} }

@ -13,7 +13,7 @@ namespace ZeroLevel.Services.PartitionStorage
/// ///
/// </summary> /// </summary>
public class StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta> public class StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>
: IStorePartitionAccessor<TKey, TInput, TValue> : IStorePartitionBuilder<TKey, TInput, TValue>
{ {
private readonly Func<TValue, IEnumerable<TInput>> _decompress; private readonly Func<TValue, IEnumerable<TInput>> _decompress;
/// <summary> /// <summary>
@ -23,8 +23,8 @@ namespace ZeroLevel.Services.PartitionStorage
/// <summary> /// <summary>
/// Write catalog /// Write catalog
/// </summary> /// </summary>
private readonly IStorePartitionAccessor<TKey, TInput, TValue> _temporaryAccessor; private readonly IStorePartitionBuilder<TKey, TInput, TValue> _temporaryAccessor;
public StoreMergePartitionAccessor(IStoreOptions<TKey, TInput, TValue, TMeta> options, public StoreMergePartitionAccessor(StoreOptions<TKey, TInput, TValue, TMeta> options,
TMeta info, Func<TValue, IEnumerable<TInput>> decompress) TMeta info, Func<TValue, IEnumerable<TInput>> decompress)
{ {
if (decompress == null) throw new ArgumentNullException(nameof(decompress)); if (decompress == null) throw new ArgumentNullException(nameof(decompress));
@ -33,7 +33,7 @@ namespace ZeroLevel.Services.PartitionStorage
var tempCatalog = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString()); var tempCatalog = Path.Combine(_accessor.GetCatalogPath(), Guid.NewGuid().ToString());
var tempOptions = options.Clone(); var tempOptions = options.Clone();
tempOptions.RootFolder = tempCatalog; tempOptions.RootFolder = tempCatalog;
_temporaryAccessor = new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(tempOptions, info); _temporaryAccessor = new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(tempOptions, info);
} }
private IEnumerable<StorePartitionKeyValueSearchResult<TKey, IEnumerable<TInput>>> private IEnumerable<StorePartitionKeyValueSearchResult<TKey, IEnumerable<TInput>>>
@ -86,7 +86,7 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
// compress new file // compress new file
(_temporaryAccessor as StorePartitionAccessor<TKey, TInput, TValue, TMeta>) (_temporaryAccessor as StorePartitionBuilder<TKey, TInput, TValue, TMeta>)
.CompressFile(file); .CompressFile(file);
// replace old file by new // replace old file by new
@ -98,16 +98,6 @@ namespace ZeroLevel.Services.PartitionStorage
Directory.Delete(_temporaryAccessor.GetCatalogPath(), true); Directory.Delete(_temporaryAccessor.GetCatalogPath(), true);
} }
public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key)
=> _accessor.Find(key);
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys)
=> _accessor.Find(keys);
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate()
=> _accessor.Iterate();
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> IterateKeyBacket(TKey key)
=> _accessor.IterateKeyBacket(key);
/// <summary> /// <summary>
/// Deletes only new entries. Existing entries remain unchanged. /// Deletes only new entries. Existing entries remain unchanged.
/// </summary> /// </summary>

@ -1,9 +1,7 @@
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem; using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Serialization; using ZeroLevel.Services.Serialization;
@ -12,30 +10,23 @@ namespace ZeroLevel.Services.PartitionStorage
public class StorePartitionAccessor<TKey, TInput, TValue, TMeta> public class StorePartitionAccessor<TKey, TInput, TValue, TMeta>
: IStorePartitionAccessor<TKey, TInput, TValue> : IStorePartitionAccessor<TKey, TInput, TValue>
{ {
private readonly ConcurrentDictionary<string, MemoryStreamWriter> _writeStreams private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
= new ConcurrentDictionary<string, MemoryStreamWriter>();
private readonly IStoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly string _catalog; private readonly string _catalog;
private readonly TMeta _info; private readonly TMeta _info;
public string Catalog { get { return _catalog; } } public string Catalog { get { return _catalog; } }
public StorePartitionAccessor(IStoreOptions<TKey, TInput, TValue, TMeta> options, TMeta info) public StorePartitionAccessor(StoreOptions<TKey, TInput, TValue, TMeta> options, TMeta info)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
_info = info; _info = info;
_options = options; _options = options;
_catalog = _options.GetCatalogPath(info); _catalog = _options.GetCatalogPath(info);
if (Directory.Exists(_catalog) == false)
{
Directory.CreateDirectory(_catalog);
}
} }
public int CountDataFiles() => Directory.GetFiles(_catalog)?.Length ?? 0; public int CountDataFiles() => Directory.GetFiles(_catalog)?.Length ?? 0;
public string GetCatalogPath() => _catalog; public string GetCatalogPath() => _catalog;
public void DropData() => FSUtils.CleanAndTestFolder(_catalog); public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
#region API !only after data compression!
public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key) public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key)
{ {
var fileName = _options.GetFileName(key, _info); var fileName = _options.GetFileName(key, _info);
@ -44,7 +35,7 @@ namespace ZeroLevel.Services.PartitionStorage
long startOffset = 0; long startOffset = 0;
if (_options.Index.Enabled) if (_options.Index.Enabled)
{ {
var index = new StorePartitionIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer); var index = new StorePartitionSparseIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer);
var offset = index.GetOffset(key); var offset = index.GetOffset(key);
startOffset = offset.Offset; startOffset = offset.Offset;
} }
@ -171,35 +162,6 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
} }
#endregion
#region API !only before data compression!
public void Store(TKey key, TInput value)
{
var fileName = _options.GetFileName(key, _info);
var stream = GetWriteStream(fileName);
stream.SerializeCompatible(key);
stream.SerializeCompatible(value);
}
public void CompleteAddingAndCompress()
{
// Close all write streams
foreach (var s in _writeStreams)
{
try
{
s.Value.Dispose();
}
catch { }
}
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 1)
{
Parallel.ForEach(files, file => CompressFile(file));
}
}
#endregion
#region Private methods #region Private methods
private IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(string fileName, private IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(string fileName,
@ -209,7 +171,7 @@ namespace ZeroLevel.Services.PartitionStorage
{ {
if (_options.Index.Enabled) if (_options.Index.Enabled)
{ {
var index = new StorePartitionIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer); var index = new StorePartitionSparseIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer);
var offsets = index.GetOffset(keys, true); var offsets = index.GetOffset(keys, true);
using (var reader = GetReadStream(fileName)) using (var reader = GetReadStream(fileName))
{ {
@ -278,46 +240,7 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
} }
internal void CompressFile(string file)
{
var dict = new Dictionary<TKey, HashSet<TInput>>();
using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024)))
{
while (reader.EOS == false)
{
TKey k = reader.ReadCompatible<TKey>();
TInput v = reader.ReadCompatible<TInput>();
if (false == dict.ContainsKey(k))
{
dict[k] = new HashSet<TInput>();
}
dict[k].Add(v);
}
}
var tempPath = Path.GetTempPath();
var tempFile = Path.Combine(tempPath, Path.GetTempFileName());
using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
{
// sort for search acceleration
foreach (var pair in dict.OrderBy(p => p.Key))
{
var v = _options.MergeFunction(pair.Value);
writer.SerializeCompatible(pair.Key);
writer.SerializeCompatible(v);
}
}
File.Delete(file);
File.Move(tempFile, file, true);
}
private MemoryStreamWriter GetWriteStream(string fileName)
{
return _writeStreams.GetOrAdd(fileName, k =>
{
var filePath = Path.Combine(_catalog, k);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
return new MemoryStreamWriter(stream);
});
}
private MemoryStreamReader GetReadStream(string fileName) private MemoryStreamReader GetReadStream(string fileName)
{ {
var filePath = Path.Combine(_catalog, fileName); var filePath = Path.Combine(_catalog, fileName);
@ -328,7 +251,5 @@ namespace ZeroLevel.Services.PartitionStorage
public void Dispose() public void Dispose()
{ {
} }
} }
} }

@ -0,0 +1,160 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using ZeroLevel.Services.FileSystem;
using ZeroLevel.Services.Serialization;
namespace ZeroLevel.Services.PartitionStorage
{
public class StorePartitionBuilder<TKey, TInput, TValue, TMeta>
: IStorePartitionBuilder<TKey, TInput, TValue>
{
private readonly ConcurrentDictionary<string, MemoryStreamWriter> _writeStreams
= new ConcurrentDictionary<string, MemoryStreamWriter>();
private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
private readonly string _catalog;
private readonly TMeta _info;
public string Catalog { get { return _catalog; } }
public StorePartitionBuilder(StoreOptions<TKey, TInput, TValue, TMeta> options, TMeta info)
{
if (options == null) throw new ArgumentNullException(nameof(options));
_info = info;
_options = options;
_catalog = _options.GetCatalogPath(info);
if (Directory.Exists(_catalog) == false)
{
Directory.CreateDirectory(_catalog);
}
}
public int CountDataFiles() => Directory.GetFiles(_catalog)?.Length ?? 0;
public string GetCatalogPath() => _catalog;
public void DropData() => FSUtils.CleanAndTestFolder(_catalog);
public void Store(TKey key, TInput value)
{
var fileName = _options.GetFileName(key, _info);
var stream = GetWriteStream(fileName);
stream.SerializeCompatible(key);
stream.SerializeCompatible(value);
}
public void CompleteAddingAndCompress()
{
// Close all write streams
foreach (var s in _writeStreams)
{
try
{
s.Value.Dispose();
}
catch { }
}
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 1)
{
Parallel.ForEach(files, file => CompressFile(file));
}
}
public void RebuildIndex()
{
if (_options.Index.Enabled)
{
var indexFolder = Path.Combine(_catalog, "__indexes__");
FSUtils.CleanAndTestFolder(indexFolder);
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 1)
{
var dict = new Dictionary<TKey, long>();
foreach (var file in files)
{
dict.Clear();
using (var reader = GetReadStream(Path.GetFileName(file)))
{
while (reader.EOS == false)
{
var pos = reader.Stream.Position;
var k = reader.ReadCompatible<TKey>();
dict[k] = pos;
reader.ReadCompatible<TValue>();
}
}
if (dict.Count > _options.Index.FileIndexCount * 8)
{
var step = (int)Math.Round(((float)dict.Count / (float)_options.Index.FileIndexCount), MidpointRounding.ToZero);
var index_file = Path.Combine(indexFolder, Path.GetFileName(file));
var d_arr = dict.OrderBy(p => p.Key).ToArray();
using (var writer = new MemoryStreamWriter(
new FileStream(index_file, FileMode.Create, FileAccess.Write, FileShare.None)))
{
for (int i = 0; i < _options.Index.FileIndexCount; i++)
{
var pair = d_arr[i * step];
writer.WriteCompatible(pair.Key);
writer.WriteLong(pair.Value);
}
}
}
}
}
}
}
#region Private methods
internal void CompressFile(string file)
{
var dict = new Dictionary<TKey, HashSet<TInput>>();
using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024)))
{
while (reader.EOS == false)
{
TKey k = reader.ReadCompatible<TKey>();
TInput v = reader.ReadCompatible<TInput>();
if (false == dict.ContainsKey(k))
{
dict[k] = new HashSet<TInput>();
}
dict[k].Add(v);
}
}
var tempPath = Path.GetTempPath();
var tempFile = Path.Combine(tempPath, Path.GetTempFileName());
using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
{
// sort for search acceleration
foreach (var pair in dict.OrderBy(p => p.Key))
{
var v = _options.MergeFunction(pair.Value);
writer.SerializeCompatible(pair.Key);
writer.SerializeCompatible(v);
}
}
File.Delete(file);
File.Move(tempFile, file, true);
}
private MemoryStreamWriter GetWriteStream(string fileName)
{
return _writeStreams.GetOrAdd(fileName, k =>
{
var filePath = Path.Combine(_catalog, k);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
return new MemoryStreamWriter(stream);
});
}
private MemoryStreamReader GetReadStream(string fileName)
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
return new MemoryStreamReader(stream);
}
#endregion
public void Dispose()
{
}
}
}

@ -0,0 +1,10 @@
using System.Collections.Generic;
namespace ZeroLevel.Services.PartitionStorage
{
public class PartitionSearchRequest<TKey, TMeta>
{
public TMeta Info { get; set; }
public IEnumerable<TKey> Keys { get; set; }
}
}

@ -2,11 +2,6 @@
namespace ZeroLevel.Services.PartitionStorage namespace ZeroLevel.Services.PartitionStorage
{ {
public class PartitionSearchRequest<TKey, TMeta>
{
public TMeta Info { get; set; }
public IEnumerable<TKey> Keys { get; set; }
}
public class StoreSearchRequest<TKey, TMeta> public class StoreSearchRequest<TKey, TMeta>
{ {
public IEnumerable<PartitionSearchRequest<TKey, TMeta>> PartitionSearchRequests { get; set; } public IEnumerable<PartitionSearchRequest<TKey, TMeta>> PartitionSearchRequests { get; set; }

@ -10,8 +10,8 @@ namespace ZeroLevel.Services.PartitionStorage
public class Store<TKey, TInput, TValue, TMeta> : public class Store<TKey, TInput, TValue, TMeta> :
IStore<TKey, TInput, TValue, TMeta> IStore<TKey, TInput, TValue, TMeta>
{ {
private readonly IStoreOptions<TKey, TInput, TValue, TMeta> _options; private readonly StoreOptions<TKey, TInput, TValue, TMeta> _options;
public Store(IStoreOptions<TKey, TInput, TValue, TMeta> options) public Store(StoreOptions<TKey, TInput, TValue, TMeta> options)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
_options = options; _options = options;
@ -26,8 +26,12 @@ namespace ZeroLevel.Services.PartitionStorage
return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info); return new StorePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info);
} }
public IStorePartitionAccessor<TKey, TInput, TValue> CreateMergeAccessor(TMeta info public IStorePartitionBuilder<TKey, TInput, TValue> CreateBuilder(TMeta info)
, Func<TValue, IEnumerable<TInput>> decompressor) {
return new StorePartitionBuilder<TKey, TInput, TValue, TMeta>(_options, info);
}
public IStorePartitionBuilder<TKey, TInput, TValue> CreateMergeAccessor(TMeta info, Func<TValue, IEnumerable<TInput>> decompressor)
{ {
return new StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, decompressor); return new StoreMergePartitionAccessor<TKey, TInput, TValue, TMeta>(_options, info, decompressor);
} }

Loading…
Cancel
Save

Powered by TurnKey Linux.