PartitionStorage indexes

pull/4/head
Ogoun 2 years ago
parent 58fc490d3a
commit 65133c3cec

@ -12,6 +12,7 @@ namespace ZeroLevel.Services.PartitionStorage
public interface IStorePartitionAccessor<TKey, TInput, TValue> public interface IStorePartitionAccessor<TKey, TInput, TValue>
: IDisposable : IDisposable
{ {
string GetCatalogPath();
/// <summary> /// <summary>
/// Save one record /// Save one record
/// </summary> /// </summary>

@ -32,13 +32,20 @@ namespace ZeroLevel.Services.PartitionStorage
var results = new ConcurrentDictionary<TMeta, IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>>>(); var results = new ConcurrentDictionary<TMeta, IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>>>();
if (searchRequest.PartitionSearchRequests?.Any() ?? false) if (searchRequest.PartitionSearchRequests?.Any() ?? false)
{ {
var partitionsSearchInfo = searchRequest.PartitionSearchRequests.ToDictionary(r => r.Info, r => r.Keys); var partitionsSearchInfo = searchRequest
var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism }; .PartitionSearchRequests
.ToDictionary(r => r.Info, r => r.Keys);
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism
};
await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) => await Parallel.ForEachAsync(partitionsSearchInfo, options, async (pair, _) =>
{ {
using (var accessor = CreateAccessor(pair.Key)) using (var accessor = CreateAccessor(pair.Key))
{ {
results[pair.Key] = accessor.Find(pair.Value).ToArray(); results[pair.Key] = accessor
.Find(pair.Value)
.ToArray();
} }
}); });
} }

@ -30,14 +30,29 @@ namespace ZeroLevel.Services.PartitionStorage
Directory.CreateDirectory(_catalog); Directory.CreateDirectory(_catalog);
} }
} }
#region API
public string GetCatalogPath()
{
return _catalog;
}
public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key) public StorePartitionKeyValueSearchResult<TKey, TValue> Find(TKey key)
{ {
var fileName = _options.GetFileName(key, _info); var fileName = _options.GetFileName(key, _info);
if (File.Exists(Path.Combine(_catalog, fileName))) if (File.Exists(Path.Combine(_catalog, fileName)))
{ {
long startOffset = 0;
if (_options.Index.Enabled)
{
var index = new StorePartitionIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer);
var offset = index.GetOffset(key);
startOffset = offset.Offset;
}
using (var reader = GetReadStream(fileName)) using (var reader = GetReadStream(fileName))
{ {
if (startOffset > 0)
{
reader.Stream.Seek(startOffset, SeekOrigin.Begin);
}
while (reader.EOS == false) while (reader.EOS == false)
{ {
var k = reader.ReadCompatible<TKey>(); var k = reader.ReadCompatible<TKey>();
@ -63,85 +78,6 @@ namespace ZeroLevel.Services.PartitionStorage
Value = default Value = default
}; };
} }
private IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(string fileName,
TKey[] keys)
{
if (File.Exists(Path.Combine(_catalog, fileName)))
{
if (_options.Index.Enabled)
{
var index = new StorePartitionIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer);
var offsets = index.GetOffset(keys, true);
using (var reader = GetReadStream(fileName))
{
for (int i = 0; i < keys.Length; i++)
{
var searchKey = keys[i];
var off = offsets[i];
reader.Stream.Seek(off.Offset, SeekOrigin.Begin);
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
yield return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = searchKey,
Value = v,
Found = true
};
break;
}
else if (c == -1)
{
break;
}
}
}
}
}
else
{
using (var reader = GetReadStream(fileName))
{
int index = 0;
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{
yield return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = keys_arr[index],
Value = v,
Found = true
};
index++;
}
else if (c == -1)
{
do
{
index++;
if (index < keys_arr.Length)
{
c = _options.KeyComparer(keys_arr[index], k);
}
} while (index < keys_arr.Length && c == -1);
}
}
}
}
}
}
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys) public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(IEnumerable<TKey> keys)
{ {
var results = keys var results = keys
@ -156,7 +92,6 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
} }
public void CompleteStoreAndRebuild() public void CompleteStoreAndRebuild()
{ {
// Close all write streams // Close all write streams
@ -174,39 +109,6 @@ namespace ZeroLevel.Services.PartitionStorage
Parallel.ForEach(files, file => CompressFile(file)); Parallel.ForEach(files, file => CompressFile(file));
} }
} }
private void CompressFile(string file)
{
var dict = new Dictionary<TKey, HashSet<TInput>>();
using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024)))
{
while (reader.EOS == false)
{
TKey k = reader.ReadCompatible<TKey>();
TInput v = reader.ReadCompatible<TInput>();
if (false == dict.ContainsKey(k))
{
dict[k] = new HashSet<TInput>();
}
dict[k].Add(v);
}
}
var tempPath = Path.GetTempPath();
var tempFile = Path.Combine(tempPath, Path.GetTempFileName());
using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
{
// sort for search acceleration
foreach (var pair in dict.OrderBy(p => p.Key))
{
var v = _options.MergeFunction(pair.Value);
writer.SerializeCompatible(pair.Key);
writer.SerializeCompatible(v);
}
}
File.Delete(file);
File.Move(tempFile, file, true);
}
public void Store(TKey key, TInput value) public void Store(TKey key, TInput value)
{ {
var fileName = _options.GetFileName(key, _info); var fileName = _options.GetFileName(key, _info);
@ -214,21 +116,6 @@ namespace ZeroLevel.Services.PartitionStorage
stream.SerializeCompatible(key); stream.SerializeCompatible(key);
stream.SerializeCompatible(value); stream.SerializeCompatible(value);
} }
private MemoryStreamWriter GetWriteStream(string fileName)
{
return _writeStreams.GetOrAdd(fileName, k =>
{
var filePath = Path.Combine(_catalog, k);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
return new MemoryStreamWriter(stream);
});
}
private MemoryStreamReader GetReadStream(string fileName)
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
return new MemoryStreamReader(stream);
}
public int CountDataFiles() public int CountDataFiles()
{ {
var files = Directory.GetFiles(_catalog); var files = Directory.GetFiles(_catalog);
@ -316,9 +203,136 @@ namespace ZeroLevel.Services.PartitionStorage
} }
} }
} }
#endregion
#region Private methods
private IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Find(string fileName,
TKey[] keys)
{
if (File.Exists(Path.Combine(_catalog, fileName)))
{
if (_options.Index.Enabled)
{
var index = new StorePartitionIndex<TKey, TMeta>(_catalog, _info, _options.FilePartition, _options.KeyComparer);
var offsets = index.GetOffset(keys, true);
using (var reader = GetReadStream(fileName))
{
for (int i = 0; i < keys.Length; i++)
{
var searchKey = keys[i];
var off = offsets[i];
reader.Stream.Seek(off.Offset, SeekOrigin.Begin);
while (reader.EOS == false)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
yield return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = searchKey,
Value = v,
Found = true
};
break;
}
else if (c == -1)
{
break;
}
}
}
}
}
else
{
using (var reader = GetReadStream(fileName))
{
int index = 0;
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{
var k = reader.ReadCompatible<TKey>();
var v = reader.ReadCompatible<TValue>();
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{
yield return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = keys_arr[index],
Value = v,
Found = true
};
index++;
}
else if (c == -1)
{
do
{
index++;
if (index < keys_arr.Length)
{
c = _options.KeyComparer(keys_arr[index], k);
}
} while (index < keys_arr.Length && c == -1);
}
}
}
}
}
}
private void CompressFile(string file)
{
var dict = new Dictionary<TKey, HashSet<TInput>>();
using (var reader = new MemoryStreamReader(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.None, 4096 * 1024)))
{
while (reader.EOS == false)
{
TKey k = reader.ReadCompatible<TKey>();
TInput v = reader.ReadCompatible<TInput>();
if (false == dict.ContainsKey(k))
{
dict[k] = new HashSet<TInput>();
}
dict[k].Add(v);
}
}
var tempPath = Path.GetTempPath();
var tempFile = Path.Combine(tempPath, Path.GetTempFileName());
using (var writer = new MemoryStreamWriter(new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024)))
{
// sort for search acceleration
foreach (var pair in dict.OrderBy(p => p.Key))
{
var v = _options.MergeFunction(pair.Value);
writer.SerializeCompatible(pair.Key);
writer.SerializeCompatible(v);
}
}
File.Delete(file);
File.Move(tempFile, file, true);
}
private MemoryStreamWriter GetWriteStream(string fileName)
{
return _writeStreams.GetOrAdd(fileName, k =>
{
var filePath = Path.Combine(_catalog, k);
var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write, FileShare.None, 4096 * 1024);
return new MemoryStreamWriter(stream);
});
}
private MemoryStreamReader GetReadStream(string fileName)
{
var filePath = Path.Combine(_catalog, fileName);
var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024);
return new MemoryStreamReader(stream);
}
#endregion
public void Dispose() public void Dispose()
{ {
} }
} }
} }

@ -6,16 +6,16 @@
</Description> </Description>
<Authors>ogoun</Authors> <Authors>ogoun</Authors>
<Company>ogoun</Company> <Company>ogoun</Company>
<AssemblyVersion>3.3.7.5</AssemblyVersion> <AssemblyVersion>3.3.7.6</AssemblyVersion>
<PackageReleaseNotes>PartitionStorage fix</PackageReleaseNotes> <PackageReleaseNotes>PartitionStorage update indexes</PackageReleaseNotes>
<PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl> <PackageProjectUrl>https://github.com/ogoun/Zero/wiki</PackageProjectUrl>
<Copyright>Copyright Ogoun 2022</Copyright> <Copyright>Copyright Ogoun 2022</Copyright>
<PackageLicenseUrl></PackageLicenseUrl> <PackageLicenseUrl></PackageLicenseUrl>
<PackageIconUrl></PackageIconUrl> <PackageIconUrl></PackageIconUrl>
<RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl> <RepositoryUrl>https://github.com/ogoun/Zero</RepositoryUrl>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<Version>3.3.7.5</Version> <Version>3.3.7.6</Version>
<FileVersion>3.3.7.5</FileVersion> <FileVersion>3.3.7.6</FileVersion>
<Platforms>AnyCPU;x64;x86</Platforms> <Platforms>AnyCPU;x64;x86</Platforms>
<PackageIcon>zero.png</PackageIcon> <PackageIcon>zero.png</PackageIcon>
<DebugType>full</DebugType> <DebugType>full</DebugType>

Loading…
Cancel
Save

Powered by TurnKey Linux.