internal static PolledMessages MapMessages()

in foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs [387:466]


    internal static PolledMessages<TMessage> MapMessages<TMessage>(ReadOnlySpan<byte> payload,
        Func<byte[], TMessage> serializer, Func<byte[], byte[]>? decryptor = null)
    {
        int length = payload.Length;
        var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[0..4]);
        var currentOffset = BinaryPrimitives.ReadUInt64LittleEndian(payload[4..12]);
        var messagesCount = BinaryPrimitives.ReadUInt32LittleEndian(payload[12..16]);
        int position = 16;
        if (position >= length)
        {
            return PolledMessages<TMessage>.Empty;
        }

        List<MessageResponse<TMessage>> messages = new();
        while (position < length)
        {
            ulong offset = BinaryPrimitives.ReadUInt64LittleEndian(payload[position..(position + 8)]);
            var state = MapMessageState(payload, position);
            ulong timestamp = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 9)..(position + 17)]);
            var id = new Guid(payload[(position + 17)..(position + 33)]);
            var checksum = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 33)..(position + 37)]);
            int headersLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 37)..(position + 41)]);

            var headers = headersLength switch
            {
                0 => null,
                > 0 => MapHeaders(payload[(position + 41)..(position + 41 + headersLength)]),
                < 0 => throw new ArgumentOutOfRangeException()
            };
            position += headersLength;
            uint messageLength = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 41)..(position + 45)]);

            int payloadRangeStart = position + PROPERTIES_SIZE;
            int payloadRangeEnd = position + PROPERTIES_SIZE + (int)messageLength;
            if (payloadRangeStart > length || payloadRangeEnd > length)
            {
                break;
            }

            var payloadSlice = payload[payloadRangeStart..payloadRangeEnd];
            var messagePayload = ArrayPool<byte>.Shared.Rent(payloadSlice.Length);
            var payloadSliceLen = payloadSlice.Length;
            try
            {
                payloadSlice.CopyTo(messagePayload.AsSpan()[..payloadSliceLen]);

                int totalSize = PROPERTIES_SIZE + (int)messageLength;
                position += totalSize;

                messages.Add(new MessageResponse<TMessage>
                {
                    Offset = offset,
                    Timestamp = timestamp,
                    Checksum = checksum,
                    Id = id,
                    Headers = headers,
                    State = state,
                    Message = decryptor is not null
                        ? serializer(decryptor(messagePayload[..payloadSliceLen]))
                        : serializer(messagePayload[..payloadSliceLen])
                });
            }
            finally
            {
                ArrayPool<byte>.Shared.Return(messagePayload);
            }

            if (position + PROPERTIES_SIZE >= length)
            {
                break;
            }
        }

        return new PolledMessages<TMessage>
        {
            PartitionId = partitionId,
            CurrentOffset = currentOffset,
            Messages = messages.AsReadOnly()
        };
    }