in cs/src/core/Device/ManagedLocalStorageDevice.cs [257:408]
public override void WriteAsync(IntPtr sourceAddress,
int segmentId,
ulong destinationAddress,
uint numBytesToWrite,
DeviceIOCompletionCallback callback,
object context)
{
Stream logWriteHandle = null;
AsyncPool<Stream> streampool = null;
uint errorCode = 0;
Task writeTask = default;
bool gotHandle;
#if NETSTANDARD2_1 || NET
UnmanagedMemoryManager<byte> umm = default;
#else
SectorAlignedMemory memory = default;
#endif
HandleCapacity(segmentId);
try
{
Interlocked.Increment(ref numPending);
streampool = GetOrAddHandle(segmentId).Item2;
gotHandle = streampool.TryGet(out logWriteHandle);
if (gotHandle)
{
logWriteHandle.Seek((long)destinationAddress, SeekOrigin.Begin);
#if NETSTANDARD2_1 || NET
unsafe
{
umm = new UnmanagedMemoryManager<byte>((byte*)sourceAddress, (int)numBytesToWrite);
}
// FileStream.WriteAsync is not thread-safe hence need a lock here
lock (this)
{
writeTask = logWriteHandle.WriteAsync(umm.Memory).AsTask();
}
#else
memory = pool.Get((int)numBytesToWrite);
unsafe
{
fixed (void* destination = memory.buffer)
{
Buffer.MemoryCopy((void*)sourceAddress, destination, numBytesToWrite, numBytesToWrite);
}
}
// FileStream.WriteAsync is not thread-safe hence need a lock here
lock (this)
{
writeTask = logWriteHandle.WriteAsync(memory.buffer, 0, (int)numBytesToWrite);
}
#endif
}
}
catch
{
Interlocked.Decrement(ref numPending);
// Perform pool returns and disposals
#if !(NETSTANDARD2_1 || NET)
memory?.Return();
#endif
if (logWriteHandle != null) streampool?.Return(logWriteHandle);
// Issue user callback
callback(uint.MaxValue, 0, context);
return;
}
_ = Task.Run(async () =>
{
if (!gotHandle)
{
try
{
logWriteHandle = await streampool.GetAsync().ConfigureAwait(false);
logWriteHandle.Seek((long)destinationAddress, SeekOrigin.Begin);
#if NETSTANDARD2_1 || NET
unsafe
{
umm = new UnmanagedMemoryManager<byte>((byte*)sourceAddress, (int)numBytesToWrite);
}
// FileStream.WriteAsync is not thread-safe hence need a lock here
lock (this)
{
writeTask = logWriteHandle.WriteAsync(umm.Memory).AsTask();
}
#else
memory = pool.Get((int)numBytesToWrite);
unsafe
{
fixed (void* destination = memory.buffer)
{
Buffer.MemoryCopy((void*)sourceAddress, destination, numBytesToWrite, numBytesToWrite);
}
}
// FileStream.WriteAsync is not thread-safe hence need a lock here
lock (this)
{
writeTask = logWriteHandle.WriteAsync(memory.buffer, 0, (int)numBytesToWrite);
}
#endif
}
catch
{
Interlocked.Decrement(ref numPending);
// Perform pool returns and disposals
#if !(NETSTANDARD2_1 || NET)
memory?.Return();
#endif
if (logWriteHandle != null) streampool?.Return(logWriteHandle);
// Issue user callback
callback(uint.MaxValue, 0, context);
return;
}
}
try
{
await writeTask.ConfigureAwait(false);
}
catch (Exception ex)
{
if (ex.InnerException != null && ex.InnerException is IOException ioex)
errorCode = (uint)(ioex.HResult & 0x0000FFFF);
else
errorCode = uint.MaxValue;
numBytesToWrite = 0;
}
finally
{
Interlocked.Decrement(ref numPending);
// Perform pool returns and disposals
#if !(NETSTANDARD2_1 || NET)
memory?.Return();
#endif
// Sequentialize all writes to same handle
await ((FileStream)logWriteHandle).FlushAsync().ConfigureAwait(false);
streampool?.Return(logWriteHandle);
// Issue user callback
callback(errorCode, numBytesToWrite, context);
}
});
}