async Task ProduceMessages()

in foreign/csharp/Iggy_Sample_Producer/Program.cs [151:243]


async Task ProduceMessages(IIggyClient bus, StreamResponse? stream, TopicResponse? topic)
{
    var messageBatchCount = 1;
    int intervalInMs = 1000;
    Console.WriteLine(
        $"Messages will be sent to stream {stream!.Id}, topic {topic!.Id}, partition {topic.PartitionsCount} with interval {intervalInMs} ms");
    Func<Envelope, byte[]> serializer = static envelope =>
    {
        Span<byte> buffer = stackalloc byte[envelope.MessageType.Length + 4 + envelope.Payload.Length];
        BinaryPrimitives.WriteInt32LittleEndian(
            buffer[..4], envelope.MessageType.Length);
        Encoding.UTF8.GetBytes(envelope.MessageType).CopyTo(buffer[4..(envelope.MessageType.Length + 4)]);
        Encoding.UTF8.GetBytes(envelope.Payload).CopyTo(buffer[(envelope.MessageType.Length + 4)..]);
        return buffer.ToArray();
    };
    //can this be optimized ? this lambda doesn't seem to get cached
    Func<byte[], byte[]> encryptor = static payload =>
    {
        string aes_key = "AXe8YwuIn1zxt3FPWTZFlAa14EHdPAdN9FaZ9RQWihc=";
        string aes_iv = "bsxnWolsAyO7kCfWuyrnqg==";
        var key = Convert.FromBase64String(aes_key);
        var iv = Convert.FromBase64String(aes_iv);
        
        using Aes aes = Aes.Create();
        ICryptoTransform encryptor = aes.CreateEncryptor(key, iv);
        using MemoryStream memoryStream = new MemoryStream();
        using CryptoStream cryptoStream = new CryptoStream(memoryStream, encryptor, CryptoStreamMode.Write);
        using (BinaryWriter streamWriter = new BinaryWriter(cryptoStream))
        {
            streamWriter.Write(payload);
        }
        return memoryStream.ToArray();
    };

    var byteArray = new byte[] { 6, 9, 4, 2, 0 };

    
    var headers = new Dictionary<HeaderKey, HeaderValue>();
    headers.Add(new HeaderKey { Value = "key_1".ToLower() }, HeaderValue.FromString("test-value-1"));
    headers.Add(new HeaderKey { Value = "key_2".ToLower() }, HeaderValue.FromInt32(69));
    headers.Add(new HeaderKey { Value = "key_3".ToLower() }, HeaderValue.FromFloat(420.69f));
    headers.Add(new HeaderKey { Value = "key_4".ToLower() }, HeaderValue.FromBool(true));
    headers.Add(new HeaderKey { Value = "key_5".ToLower() }, HeaderValue.FromBytes(byteArray));
    headers.Add(new HeaderKey { Value = "key_6".ToLower() }, HeaderValue.FromInt128(new Int128(6969696969, 420420420)));
    headers.Add(new HeaderKey { Value = "key7".ToLower() }, HeaderValue.FromGuid(Guid.NewGuid()));

    while (true)
    {
        var debugMessages = new List<ISerializableMessage>();
        var messages = new Envelope[messageBatchCount];

        for (int i = 0; i < messageBatchCount; i++)
        {
            var message = MessageGenerator.GenerateMessage();
            var envelope = message.ToEnvelope();

            debugMessages.Add(message);
            messages[i] = envelope;
        }

        var messagesSerialized = new List<Message>();
        foreach (var message in messages)
        {
            messagesSerialized.Add(new Message
            {
                Id = Guid.NewGuid(),
                Headers = headers,
                Payload = encryptor(serializer(message))
            });
        }
        try
        {
            await bus.SendMessagesAsync(new MessageSendRequest<Envelope>
            {
                StreamId = streamId,
                TopicId = topicId,
                Partitioning = Partitioning.PartitionId(3),
                Messages = messages
            },
                serializer,
                encryptor, headers);
        }
        catch (Exception e)
        {
            Console.WriteLine(e.Message);
            throw;
        }

        Console.WriteLine(
            $"Sent messages: {string.Join(Environment.NewLine, debugMessages.ConvertAll(m => m.ToString()))}");
        await Task.Delay(intervalInMs);
    }
}