in foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs [306:386]
internal static PolledMessages MapMessages(ReadOnlySpan<byte> payload,
Func<byte[], byte[]>? decryptor = null)
{
int length = payload.Length;
var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[0..4]);
var currentOffset = BinaryPrimitives.ReadUInt64LittleEndian(payload[4..12]);
var messagesCount = BinaryPrimitives.ReadUInt32LittleEndian(payload[12..16]);
int position = 16;
if (position >= length)
{
return PolledMessages.Empty;
}
List<MessageResponse> messages = new();
while (position < length)
{
ulong offset = BinaryPrimitives.ReadUInt64LittleEndian(payload[position..(position + 8)]);
var state = MapMessageState(payload, position);
ulong timestamp = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 9)..(position + 17)]);
var id = new Guid(payload[(position + 17)..(position + 33)]);
var checksum = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 33)..(position + 37)]);
int headersLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 37)..(position + 41)]);
var headers = headersLength switch
{
0 => null,
> 0 => MapHeaders(payload[(position + 41)..(position + 41 + headersLength)]),
< 0 => throw new ArgumentOutOfRangeException()
};
position += headersLength;
uint messageLength = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 41)..(position + 45)]);
int payloadRangeStart = position + PROPERTIES_SIZE;
int payloadRangeEnd = position + PROPERTIES_SIZE + (int)messageLength;
if (payloadRangeStart > length || payloadRangeEnd > length)
{
break;
}
var payloadSlice = payload[payloadRangeStart..payloadRangeEnd];
var messagePayload = ArrayPool<byte>.Shared.Rent(payloadSlice.Length);
var payloadSliceLen = payloadSlice.Length;
try
{
payloadSlice.CopyTo(messagePayload.AsSpan()[..payloadSliceLen]);
int totalSize = PROPERTIES_SIZE + (int)messageLength;
position += totalSize;
messages.Add(new MessageResponse
{
Offset = offset,
Timestamp = timestamp,
Id = id,
Checksum = checksum,
State = state,
Headers = headers,
Payload = decryptor is not null
? decryptor(messagePayload[..payloadSliceLen])
: messagePayload[..payloadSliceLen]
});
}
finally
{
ArrayPool<byte>.Shared.Return(messagePayload);
}
if (position + PROPERTIES_SIZE >= length)
{
break;
}
}
return new PolledMessages
{
PartitionId = partitionId,
CurrentOffset = currentOffset,
Messages = messages.AsReadOnly()
};
}