using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
namespace FASTER.core
{
///
/// Interface that encapsulates a sharding strategy that is used by . This
/// allows users to customize their sharding behaviors. Some default implementations are supplied for common
/// partitioning schemes.
///
interface IPartitionScheme
{
///
/// A list of that represents the shards. Indexes into this list will be
/// used as unique identifiers for the shards.
///
IList Devices { get; }
///
/// Maps a range in the unified logical address space into a contiguous physical chunk on a shard's address space.
/// Because the given range may be sharded across multiple devices, only the largest contiguous chunk starting from
/// start address but smaller than end address is returned in shard, shardStartAddress, and shardEndAddress.
///
/// start address of the range to map in the logical address space
/// end address of the range to map in the logical address space
/// the shard (potentially part of) the given range resides in, given as index into
/// start address translated into physical start address on the returned shard
///
/// physical address of the end of the part of the range on the returned shard. This is not necessarily a translation of the end address
/// given, as the tail of the range maybe on (a) different device(s).
///
///
/// the logical address translated from the returned shardEndAddress. If this is not equal to the given end address, the caller is
/// expected to repeatedly call this method using the returned value as the new startAddress until the entire original range is
/// covered.
///
long MapRange(long startAddress, long endAddress, out int shard, out long shardStartAddress, out long shardEndAddress);
///
/// Maps the sector size of a composed device into sector sizes for each shard
///
/// sector size of the composed device
/// the shard
/// sector size on shard
long MapSectorSize(long sectorSize, int shard);
}
///
/// Uniformly shards data across given devices.
///
class UniformPartitionScheme : IPartitionScheme
{
public IList Devices { get; }
private readonly long chunkSize;
///
/// Constructs a UniformPartitionScheme to shard data uniformly across given devices. Suppose we have 3 devices and the following logical write:
/// [chunk 1][chunk 2][chunk 3][chunk 4]...
/// chunk 1 is written on device 0, 2 on device 1, 3 on device 2, 4 on device 0, etc.
///
/// size of each chunk
/// the devices to compose from
public UniformPartitionScheme(long chunkSize, IList devices)
{
Debug.Assert(devices.Count != 0, "There cannot be zero shards");
Debug.Assert(chunkSize > 0, "chunk size should not be negative");
Debug.Assert((chunkSize & (chunkSize - 1)) == 0, "Chunk size must be a power of 2");
this.Devices = devices;
this.chunkSize = chunkSize;
foreach (IDevice device in Devices)
{
Debug.Assert(chunkSize % device.SectorSize == 0, "A single device sector cannot be partitioned");
}
}
///
/// vararg version of
///
///
///
public UniformPartitionScheme(long chunkSize, params IDevice[] devices) : this(chunkSize, (IList)devices)
{
}
///
///
///
///
///
///
///
///
///
public long MapRange(long startAddress, long endAddress, out int shard, out long shardStartAddress, out long shardEndAddress)
{
long chunkId = startAddress / chunkSize;
shard = (int)(chunkId % Devices.Count);
shardStartAddress = chunkId / Devices.Count * chunkSize + startAddress % chunkSize;
long chunkEndAddress = (chunkId + 1) * chunkSize;
if (endAddress > chunkEndAddress)
{
shardEndAddress = shardStartAddress + chunkSize;
return chunkEndAddress;
}
else
{
shardEndAddress = endAddress - startAddress + shardStartAddress;
return endAddress;
}
}
///
///
///
///
///
///
public long MapSectorSize(long sectorSize, int shard)
{
var numChunks = sectorSize / chunkSize;
// ceiling of (a div b) is (a + b - 1) / b where div is mathematical division and / is integer division
return (numChunks + Devices.Count - 1) / Devices.Count * chunkSize;
}
}
///
/// A logically composes multiple into a single storage device
/// by sharding writes into different devices according to a supplied . The goal is to be
/// able to issue large reads and writes in parallel into multiple devices and improve throughput. Beware that this
/// code does not contain error detection or correction mechanism to cope with increased failure from more devices.
///
class ShardedStorageDevice : StorageDeviceBase
{
private readonly IPartitionScheme partitions;
///
/// Constructs a new ShardedStorageDevice with the given partition scheme
///
/// The parition scheme to use
public ShardedStorageDevice(IPartitionScheme partitions) : base("", 512, -1)
{
this.partitions = partitions;
}
///
///
///
public override void Close()
{
foreach (IDevice device in partitions.Devices)
{
device.Close();
}
}
///
///
///
///
///
public override void Initialize(long segmentSize, LightEpoch epoch)
{
base.Initialize(segmentSize, epoch);
for (int i = 0; i < partitions.Devices.Count; i++)
{
partitions.Devices[i].Initialize(partitions.MapSectorSize(segmentSize, 0), epoch);
}
}
///
///
///
///
///
///
public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAsyncResult result)
{
var countdown = new CountdownEvent(partitions.Devices.Count);
foreach (IDevice shard in partitions.Devices)
{
shard.RemoveSegmentAsync(segment, ar =>
{
if (countdown.Signal())
{
callback(ar);
countdown.Dispose();
}
}, result);
}
}
///
///
///
///
///
///
///
///
///
public unsafe override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
{
// Starts off in one, in order to prevent some issued writes calling the callback before all parallel writes are issued.
var countdown = new CountdownEvent(1);
long currentWriteStart = (long)destinationAddress;
long writeEnd = currentWriteStart + (long)numBytesToWrite;
uint aggregateErrorCode = 0;
while (currentWriteStart < writeEnd)
{
long newStart = partitions.MapRange(currentWriteStart, writeEnd, out int shard, out long shardStartAddress, out long shardEndAddress);
ulong writeOffset = (ulong)currentWriteStart - destinationAddress;
// Indicate that there is one more task to wait for
countdown.AddCount();
// Because more than one device can return with an error, it is important that we remember the most recent error code we saw. (It is okay to only
// report one error out of many. It will be as if we failed on that error and cancelled all other reads, even though we issue reads in parallel and
// wait until all of them are complete in the implementation)
// Can there be races on async result as we issue writes or reads in parallel?
partitions.Devices[shard].WriteAsync(IntPtr.Add(sourceAddress, (int)writeOffset),
segmentId,
(ulong)shardStartAddress,
(uint)(shardEndAddress - shardStartAddress),
(e, n, o) =>
{
// TODO: Check if it is incorrect to ignore o
if (e != 0) aggregateErrorCode = e;
if (countdown.Signal())
{
callback(aggregateErrorCode, n, o);
countdown.Dispose();
}
else
{
Overlapped.Free(o);
}
},
asyncResult);
currentWriteStart = newStart;
}
// TODO: Check if overlapped wrapper is handled correctly
if (countdown.Signal())
{
Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult);
NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero);
callback(aggregateErrorCode, numBytesToWrite, ovNative);
countdown.Dispose();
}
}
///
///
///
///
///
///
///
///
///
public unsafe override void ReadAsync(int segmentId, ulong sourceAddress, IntPtr destinationAddress, uint readLength, IOCompletionCallback callback, IAsyncResult asyncResult)
{
// Starts off in one, in order to prevent some issued writes calling the callback before all parallel writes are issued.
var countdown = new CountdownEvent(1);
long currentReadStart = (long)sourceAddress;
long readEnd = currentReadStart + readLength;
uint aggregateErrorCode = 0;
while (currentReadStart < readEnd)
{
long newStart = partitions.MapRange(currentReadStart, readEnd, out int shard, out long shardStartAddress, out long shardEndAddress);
ulong writeOffset = (ulong)currentReadStart - sourceAddress;
// Because more than one device can return with an error, it is important that we remember the most recent error code we saw. (It is okay to only
// report one error out of many. It will be as if we failed on that error and cancelled all other reads, even though we issue reads in parallel and
// wait until all of them are complete in the implementation)
countdown.AddCount();
partitions.Devices[shard].ReadAsync(segmentId,
(ulong)shardStartAddress,
IntPtr.Add(destinationAddress, (int)writeOffset),
(uint)(shardEndAddress - shardStartAddress),
(e, n, o) =>
{
// TODO: this is incorrect if returned "bytes" written is allowed to be less than requested like POSIX.
if (e != 0) aggregateErrorCode = e;
if (countdown.Signal())
{
callback(aggregateErrorCode, n, o);
countdown.Dispose();
}
else
{
Overlapped.Free(o);
}
},
asyncResult);
currentReadStart = newStart;
}
// TODO: Check handling of overlapped wrapper
if (countdown.Signal())
{
Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult);
NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero);
callback(aggregateErrorCode, readLength, ovNative);
countdown.Dispose();
}
}
}
}