in cs/src/core/Device/ManagedLocalStorageDevice.cs [104:246]
public override void ReadAsync(int segmentId, ulong sourceAddress,
IntPtr destinationAddress,
uint readLength,
DeviceIOCompletionCallback callback,
object context)
{
Stream logReadHandle = null;
AsyncPool<Stream> streampool = null;
uint errorCode = 0;
Task<int> readTask = default;
bool gotHandle;
int numBytes = 0;
#if NETSTANDARD2_1 || NET
UnmanagedMemoryManager<byte> umm = default;
#else
SectorAlignedMemory memory = default;
#endif
try
{
Interlocked.Increment(ref numPending);
streampool = GetOrAddHandle(segmentId).Item1;
gotHandle = streampool.TryGet(out logReadHandle);
if (gotHandle)
{
logReadHandle.Seek((long)sourceAddress, SeekOrigin.Begin);
#if NETSTANDARD2_1 || NET
unsafe
{
umm = new UnmanagedMemoryManager<byte>((byte*)destinationAddress, (int)readLength);
}
// FileStream.ReadAsync is not thread-safe hence need a lock here
lock (this)
{
readTask = logReadHandle.ReadAsync(umm.Memory).AsTask();
}
#else
memory = pool.Get((int)readLength);
// FileStream.ReadAsync is not thread-safe hence need a lock here
lock (this)
{
readTask = logReadHandle.ReadAsync(memory.buffer, 0, (int)readLength);
}
#endif
}
}
catch
{
Interlocked.Decrement(ref numPending);
// Perform pool returns and disposals
#if !(NETSTANDARD2_1 || NET)
memory?.Return();
#endif
if (logReadHandle != null) streampool?.Return(logReadHandle);
// Issue user callback
callback(uint.MaxValue, 0, context);
return;
}
_ = Task.Run(async () =>
{
if (!gotHandle)
{
try
{
logReadHandle = await streampool.GetAsync().ConfigureAwait(false);
logReadHandle.Seek((long)sourceAddress, SeekOrigin.Begin);
#if NETSTANDARD2_1 || NET
unsafe
{
umm = new UnmanagedMemoryManager<byte>((byte*)destinationAddress, (int)readLength);
}
// FileStream.ReadAsync is not thread-safe hence need a lock here
lock (this)
{
readTask = logReadHandle.ReadAsync(umm.Memory).AsTask();
}
#else
memory = pool.Get((int)readLength);
// FileStream.ReadAsync is not thread-safe hence need a lock here
lock (this)
{
readTask = logReadHandle.ReadAsync(memory.buffer, 0, (int)readLength);
}
#endif
}
catch
{
Interlocked.Decrement(ref numPending);
// Perform pool returns and disposals
#if !(NETSTANDARD2_1 || NET)
memory?.Return();
#endif
if (logReadHandle != null) streampool?.Return(logReadHandle);
// Issue user callback
callback(uint.MaxValue, 0, context);
return;
}
}
try
{
numBytes = await readTask.ConfigureAwait(false);
#if !(NETSTANDARD2_1 || NET)
unsafe
{
fixed (void* source = memory.buffer)
Buffer.MemoryCopy(source, (void*)destinationAddress, numBytes, numBytes);
}
#endif
}
catch (Exception ex)
{
if (ex.InnerException != null && ex.InnerException is IOException ioex)
errorCode = (uint)(ioex.HResult & 0x0000FFFF);
else
errorCode = uint.MaxValue;
numBytes = 0;
}
finally
{
Interlocked.Decrement(ref numPending);
// Perform pool returns and disposals
#if !(NETSTANDARD2_1 || NET)
memory?.Return();
#endif
// Sequentialize all reads from same handle
streampool?.Return(logReadHandle);
// Issue user callback
callback(errorCode, (uint)numBytes, context);
}
});
}