in cs/src/core/Allocator/GenericAllocator.cs [334:514]
private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddress, uint numBytesToWrite,
DeviceIOCompletionCallback callback, PageAsyncFlushResult<TContext> asyncResult,
IDevice device, IDevice objlogDevice, long intendedDestinationPage = -1, long[] localSegmentOffsets = null)
{
// Short circuit if we are using a null device
if (device as NullDevice != null)
{
device.WriteAsync(IntPtr.Zero, 0, 0, numBytesToWrite, callback, asyncResult);
return;
}
int start = 0, aligned_start = 0, end = (int)numBytesToWrite;
if (asyncResult.partial)
{
start = (int)((asyncResult.fromAddress - (asyncResult.page << LogPageSizeBits)));
aligned_start = (start / sectorSize) * sectorSize;
end = (int)((asyncResult.untilAddress - (asyncResult.page << LogPageSizeBits)));
}
// Check if user did not override with special segment offsets
if (localSegmentOffsets == null) localSegmentOffsets = segmentOffsets;
var src = values[flushPage % BufferSize];
var buffer = bufferPool.Get((int)numBytesToWrite);
if (aligned_start < start && (KeyHasObjects() || ValueHasObjects()))
{
// Do not read back the invalid header of page 0
if ((flushPage > 0) || (start > GetFirstValidLogicalAddress(flushPage)))
{
// Get the overlapping HLOG from disk as we wrote it with
// object pointers previously. This avoids object reserialization
PageAsyncReadResult<Empty> result =
new PageAsyncReadResult<Empty>
{
handle = new CountdownEvent(1)
};
device.ReadAsync(alignedDestinationAddress + (ulong)aligned_start, (IntPtr)buffer.aligned_pointer + aligned_start,
(uint)sectorSize, AsyncReadPageCallback, result);
result.handle.Wait();
}
fixed (RecordInfo* pin = &src[0].info)
{
Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)buffer.handle.AddrOfPinnedObject() + buffer.buffer.Length);
Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + start), buffer.aligned_pointer + start,
numBytesToWrite - start, numBytesToWrite - start);
}
}
else
{
fixed (RecordInfo* pin = &src[0].info)
{
Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)buffer.handle.AddrOfPinnedObject() + buffer.buffer.Length);
Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + aligned_start), buffer.aligned_pointer + aligned_start,
numBytesToWrite - aligned_start, numBytesToWrite - aligned_start);
}
}
long ptr = (long)buffer.aligned_pointer;
List<long> addr = new List<long>();
asyncResult.freeBuffer1 = buffer;
MemoryStream ms = new();
IObjectSerializer<Key> keySerializer = null;
IObjectSerializer<Value> valueSerializer = null;
if (KeyHasObjects())
{
keySerializer = SerializerSettings.keySerializer();
keySerializer.BeginSerialize(ms);
}
if (ValueHasObjects())
{
valueSerializer = SerializerSettings.valueSerializer();
valueSerializer.BeginSerialize(ms);
}
for (int i=start/recordSize; i<end/recordSize; i++)
{
long endPosition = 0;
if (!src[i].info.Invalid)
{
if (KeyHasObjects())
{
long pos = ms.Position;
keySerializer.Serialize(ref src[i].key);
var key_address = GetKeyAddressInfo((long)(buffer.aligned_pointer + i * recordSize));
key_address->Address = pos;
key_address->Size = (int)(ms.Position - pos);
addr.Add((long)key_address);
endPosition = pos + key_address->Size;
}
if (ValueHasObjects() && !src[i].info.Tombstone)
{
long pos = ms.Position;
valueSerializer.Serialize(ref src[i].value);
var value_address = GetValueAddressInfo((long)(buffer.aligned_pointer + i * recordSize));
value_address->Address = pos;
value_address->Size = (int)(ms.Position - pos);
addr.Add((long)value_address);
endPosition = pos + value_address->Size;
}
}
if (endPosition > ObjectBlockSize || i == (end / recordSize) - 1)
{
var memoryStreamLength = (int)ms.Position;
var _objBuffer = bufferPool.Get(memoryStreamLength);
asyncResult.done = new AutoResetEvent(false);
var _alignedLength = (memoryStreamLength + (sectorSize - 1)) & ~(sectorSize - 1);
var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength;
if (KeyHasObjects())
keySerializer.EndSerialize();
if (ValueHasObjects())
valueSerializer.EndSerialize();
ms.Close();
fixed (void* src_ = ms.GetBuffer())
Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, memoryStreamLength, memoryStreamLength);
foreach (var address in addr)
((AddressInfo*)address)->Address += _objAddr;
if (i < (end / recordSize) - 1)
{
ms = new MemoryStream();
if (KeyHasObjects())
keySerializer.BeginSerialize(ms);
if (ValueHasObjects())
valueSerializer.BeginSerialize(ms);
// Reset address list for next chunk
addr = new List<long>();
objlogDevice.WriteAsync(
(IntPtr)_objBuffer.aligned_pointer,
(int)(alignedDestinationAddress >> LogSegmentSizeBits),
(ulong)_objAddr, (uint)_alignedLength, AsyncFlushPartialObjectLogCallback<TContext>, asyncResult);
// Wait for write to complete before resuming next write
asyncResult.done.WaitOne();
_objBuffer.Return();
}
else
{
// need to write both page and object cache
Interlocked.Increment(ref asyncResult.count);
asyncResult.freeBuffer2 = _objBuffer;
objlogDevice.WriteAsync(
(IntPtr)_objBuffer.aligned_pointer,
(int)(alignedDestinationAddress >> LogSegmentSizeBits),
(ulong)_objAddr, (uint)_alignedLength, callback, asyncResult);
}
}
}
if (asyncResult.partial)
{
var aligned_end = (int)((asyncResult.untilAddress - (asyncResult.page << LogPageSizeBits)));
aligned_end = ((aligned_end + (sectorSize - 1)) & ~(sectorSize - 1));
numBytesToWrite = (uint)(aligned_end - aligned_start);
}
var alignedNumBytesToWrite = (uint)((numBytesToWrite + (sectorSize - 1)) & ~(sectorSize - 1));
// Finally write the hlog page
device.WriteAsync((IntPtr)buffer.aligned_pointer + aligned_start, alignedDestinationAddress + (ulong)aligned_start,
alignedNumBytesToWrite, callback, asyncResult);
}