Update StorePartitionAccessor.cs

pull/4/head
Ogoun 1 year ago
parent b6d1d30d10
commit c9cbad6d33

@ -101,23 +101,26 @@ namespace ZeroLevel.Services.PartitionStorage
} }
public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate() public IEnumerable<StorePartitionKeyValueSearchResult<TKey, TValue>> Iterate()
{ {
TKey k; if (Directory.Exists(_catalog))
TValue v;
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 0)
{ {
foreach (var file in files) TKey k;
TValue v;
var files = Directory.GetFiles(_catalog);
if (files != null && files.Length > 0)
{ {
var accessor = PhisicalFileAccessorCachee.GetDataAccessor(file, 0); foreach (var file in files)
if (accessor != null)
{ {
using (var reader = new MemoryStreamReader(accessor)) var accessor = PhisicalFileAccessorCachee.GetDataAccessor(file, 0);
if (accessor != null)
{ {
while (reader.EOS == false) using (var reader = new MemoryStreamReader(accessor))
{ {
if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break; while (reader.EOS == false)
if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break; {
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Status = SearchResult.Success }; if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break;
if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break;
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> { Key = k, Value = v, Status = SearchResult.Success };
}
} }
} }
} }
@ -203,118 +206,160 @@ namespace ZeroLevel.Services.PartitionStorage
TKey k; TKey k;
TValue v; TValue v;
var filePath = Path.Combine(_catalog, fileName); var filePath = Path.Combine(_catalog, fileName);
if (_options.Index.Enabled) if (File.Exists(filePath))
{ {
var offsets = Indexes.GetOffset(keys, true); if (_options.Index.Enabled)
for (int i = 0; i < keys.Length; i++)
{ {
var searchKey = keys[i]; var offsets = Indexes.GetOffset(keys, true);
var offset = offsets[i]; for (int i = 0; i < keys.Length; i++)
IViewAccessor memoryAccessor;
if (offset.Length > 0)
{ {
memoryAccessor = GetViewAccessor(filePath, offset.Offset, offset.Length); var searchKey = keys[i];
} var offset = offsets[i];
else IViewAccessor memoryAccessor;
{ if (offset.Length > 0)
memoryAccessor = GetViewAccessor(filePath, offset.Offset); {
memoryAccessor = GetViewAccessor(filePath, offset.Offset, offset.Length);
}
else
{
memoryAccessor = GetViewAccessor(filePath, offset.Offset);
}
if (memoryAccessor != null)
{
using (var reader = new MemoryStreamReader(memoryAccessor))
{
while (reader.EOS == false)
{
if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break;
if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break;
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{
yield return new StorePartitionKeyValueSearchResult<TKey, TValue>
{
Key = searchKey,
Value = v,
Status = SearchResult.Success
};
break;
}
else if (c == -1)
{
break;
}
}
}
}
} }
}
else
{
var memoryAccessor = GetViewAccessor(filePath, 0);
if (memoryAccessor != null) if (memoryAccessor != null)
{ {
using (var reader = new MemoryStreamReader(memoryAccessor)) using (var reader = new MemoryStreamReader(memoryAccessor))
{ {
while (reader.EOS == false) int index = 0;
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{ {
if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break; if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break;
if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break; if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break;
var c = _options.KeyComparer(searchKey, k); var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0) if (c == 0)
{ {
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> yield return new StorePartitionKeyValueSearchResult<TKey, TValue>
{ {
Key = searchKey, Key = keys_arr[index],
Value = v, Value = v,
Status = SearchResult.Success Status = SearchResult.Success
}; };
break; index++;
} }
else if (c == -1) else if (c == -1)
{ {
break; do
{
index++;
if (index < keys_arr.Length)
{
c = _options.KeyComparer(keys_arr[index], k);
}
} while (index < keys_arr.Length && c == -1);
} }
} }
} }
} }
} }
} }
else }
private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove, bool autoReindex)
{
TKey k;
var filePath = Path.Combine(_catalog, fileName);
if (File.Exists(filePath))
{ {
var memoryAccessor = GetViewAccessor(filePath, 0); // 1. Find ranges
if (memoryAccessor != null) var ranges = new List<FilePositionRange>();
if (_options.Index.Enabled && autoReindex)
{ {
using (var reader = new MemoryStreamReader(memoryAccessor)) var offsets = Indexes.GetOffset(keys, true);
for (int i = 0; i < keys.Length; i++)
{ {
int index = 0; var searchKey = keys[i];
var keys_arr = keys.OrderBy(k => k).ToArray(); var offset = offsets[i];
while (reader.EOS == false && index < keys_arr.Length) IViewAccessor memoryAccessor;
if (offset.Length > 0)
{ {
if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) break; memoryAccessor = GetViewAccessor(filePath, offset.Offset, offset.Length);
if (Serializer.ValueDeserializer.Invoke(reader, out v) == false) break; }
var c = _options.KeyComparer(keys_arr[index], k); else
if (c == 0) {
{ memoryAccessor = GetViewAccessor(filePath, offset.Offset);
yield return new StorePartitionKeyValueSearchResult<TKey, TValue> }
{ if (memoryAccessor != null)
Key = keys_arr[index], {
Value = v, using (var reader = new MemoryStreamReader(memoryAccessor))
Status = SearchResult.Success
};
index++;
}
else if (c == -1)
{ {
do while (reader.EOS == false)
{ {
index++; var startPosition = reader.Position;
if (index < keys_arr.Length) if (Serializer.KeyDeserializer.Invoke(reader, out k) == false)
{
Log.Error($"[StorePartitionAccessor.RemoveKeyGroup] Fault remove keys from file '{fileName}'. Incorrect file structure. Fault read key.");
return;
}
if (Serializer.ValueDeserializer.Invoke(reader, out var _) == false)
{
Log.Error($"[StorePartitionAccessor.RemoveKeyGroup] Fault remove keys from file '{fileName}'. Incorrect file structure. Fault read value.");
return;
}
var endPosition = reader.Position;
var c = _options.KeyComparer(searchKey, k);
if (c == 0)
{ {
c = _options.KeyComparer(keys_arr[index], k); ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition });
} }
} while (index < keys_arr.Length && c == -1); else if (c == -1)
{
break;
}
}
} }
} }
} }
} }
} else
}
private void RemoveKeyGroup(string fileName, TKey[] keys, bool inverseRemove, bool autoReindex)
{
TKey k;
var filePath = Path.Combine(_catalog, fileName);
// 1. Find ranges
var ranges = new List<FilePositionRange>();
if (_options.Index.Enabled && autoReindex)
{
var offsets = Indexes.GetOffset(keys, true);
for (int i = 0; i < keys.Length; i++)
{ {
var searchKey = keys[i]; var memoryAccessor = GetViewAccessor(filePath, 0);
var offset = offsets[i];
IViewAccessor memoryAccessor;
if (offset.Length > 0)
{
memoryAccessor = GetViewAccessor(filePath, offset.Offset, offset.Length);
}
else
{
memoryAccessor = GetViewAccessor(filePath, offset.Offset);
}
if (memoryAccessor != null) if (memoryAccessor != null)
{ {
using (var reader = new MemoryStreamReader(memoryAccessor)) using (var reader = new MemoryStreamReader(memoryAccessor))
{ {
while (reader.EOS == false) int index = 0;
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{ {
var startPosition = reader.Position; var startPosition = reader.Position;
if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) if (Serializer.KeyDeserializer.Invoke(reader, out k) == false)
@ -328,102 +373,66 @@ namespace ZeroLevel.Services.PartitionStorage
return; return;
} }
var endPosition = reader.Position; var endPosition = reader.Position;
var c = _options.KeyComparer(searchKey, k); var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0) if (c == 0)
{ {
ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition }); ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition });
index++;
} }
else if (c == -1) else if (c == -1)
{ {
break; do
{
index++;
if (index < keys_arr.Length)
{
c = _options.KeyComparer(keys_arr[index], k);
}
} while (index < keys_arr.Length && c == -1);
} }
} }
} }
} }
} }
}
else // 2. Temporary file from ranges
{ var tempFile = FSUtils.GetAppLocalTemporaryFile();
var memoryAccessor = GetViewAccessor(filePath, 0);
if (memoryAccessor != null) using (var readStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024))
{ {
using (var reader = new MemoryStreamReader(memoryAccessor)) RangeCompression(ranges);
using (var writeStream = new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))
{ {
int index = 0; if (inverseRemove)
var keys_arr = keys.OrderBy(k => k).ToArray();
while (reader.EOS == false && index < keys_arr.Length)
{ {
var startPosition = reader.Position; var inverted = RangeInversion(ranges, readStream.Length);
if (Serializer.KeyDeserializer.Invoke(reader, out k) == false) foreach (var range in inverted)
{ {
Log.Error($"[StorePartitionAccessor.RemoveKeyGroup] Fault remove keys from file '{fileName}'. Incorrect file structure. Fault read key."); CopyRange(range, readStream, writeStream);
return;
} }
if (Serializer.ValueDeserializer.Invoke(reader, out var _) == false) }
{ else
Log.Error($"[StorePartitionAccessor.RemoveKeyGroup] Fault remove keys from file '{fileName}'. Incorrect file structure. Fault read value."); {
return; foreach (var range in ranges)
}
var endPosition = reader.Position;
var c = _options.KeyComparer(keys_arr[index], k);
if (c == 0)
{
ranges.Add(new FilePositionRange { Start = startPosition, End = endPosition });
index++;
}
else if (c == -1)
{ {
do CopyRange(range, readStream, writeStream);
{
index++;
if (index < keys_arr.Length)
{
c = _options.KeyComparer(keys_arr[index], k);
}
} while (index < keys_arr.Length && c == -1);
} }
} }
writeStream.Flush();
} }
} }
}
// 2. Temporary file from ranges // 3. Replace from temporary to original
var tempFile = FSUtils.GetAppLocalTemporaryFile(); PhisicalFileAccessorCachee.DropDataReader(filePath);
File.Delete(filePath);
File.Move(tempFile, filePath, true);
using (var readStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096 * 1024)) // Rebuild index if needs
{ if (_options.Index.Enabled && autoReindex)
RangeCompression(ranges);
using (var writeStream = new FileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.None, 4096 * 1024))
{ {
if (inverseRemove) RebuildFileIndex(filePath);
{
var inverted = RangeInversion(ranges, readStream.Length);
foreach (var range in inverted)
{
CopyRange(range, readStream, writeStream);
}
}
else
{
foreach (var range in ranges)
{
CopyRange(range, readStream, writeStream);
}
}
writeStream.Flush();
} }
} }
// 3. Replace from temporary to original
PhisicalFileAccessorCachee.DropDataReader(filePath);
File.Delete(filePath);
File.Move(tempFile, filePath, true);
// Rebuild index if needs
if (_options.Index.Enabled && autoReindex)
{
RebuildFileIndex(filePath);
}
} }
#endregion #endregion

Loading…
Cancel
Save

Powered by TurnKey Linux.