in foreign/csharp/Iggy_Sample_Producer/Program.cs [151:243]
async Task ProduceMessages(IIggyClient bus, StreamResponse? stream, TopicResponse? topic)
{
var messageBatchCount = 1;
int intervalInMs = 1000;
Console.WriteLine(
$"Messages will be sent to stream {stream!.Id}, topic {topic!.Id}, partition {topic.PartitionsCount} with interval {intervalInMs} ms");
Func<Envelope, byte[]> serializer = static envelope =>
{
Span<byte> buffer = stackalloc byte[envelope.MessageType.Length + 4 + envelope.Payload.Length];
BinaryPrimitives.WriteInt32LittleEndian(
buffer[..4], envelope.MessageType.Length);
Encoding.UTF8.GetBytes(envelope.MessageType).CopyTo(buffer[4..(envelope.MessageType.Length + 4)]);
Encoding.UTF8.GetBytes(envelope.Payload).CopyTo(buffer[(envelope.MessageType.Length + 4)..]);
return buffer.ToArray();
};
//can this be optimized ? this lambda doesn't seem to get cached
Func<byte[], byte[]> encryptor = static payload =>
{
string aes_key = "AXe8YwuIn1zxt3FPWTZFlAa14EHdPAdN9FaZ9RQWihc=";
string aes_iv = "bsxnWolsAyO7kCfWuyrnqg==";
var key = Convert.FromBase64String(aes_key);
var iv = Convert.FromBase64String(aes_iv);
using Aes aes = Aes.Create();
ICryptoTransform encryptor = aes.CreateEncryptor(key, iv);
using MemoryStream memoryStream = new MemoryStream();
using CryptoStream cryptoStream = new CryptoStream(memoryStream, encryptor, CryptoStreamMode.Write);
using (BinaryWriter streamWriter = new BinaryWriter(cryptoStream))
{
streamWriter.Write(payload);
}
return memoryStream.ToArray();
};
var byteArray = new byte[] { 6, 9, 4, 2, 0 };
var headers = new Dictionary<HeaderKey, HeaderValue>();
headers.Add(new HeaderKey { Value = "key_1".ToLower() }, HeaderValue.FromString("test-value-1"));
headers.Add(new HeaderKey { Value = "key_2".ToLower() }, HeaderValue.FromInt32(69));
headers.Add(new HeaderKey { Value = "key_3".ToLower() }, HeaderValue.FromFloat(420.69f));
headers.Add(new HeaderKey { Value = "key_4".ToLower() }, HeaderValue.FromBool(true));
headers.Add(new HeaderKey { Value = "key_5".ToLower() }, HeaderValue.FromBytes(byteArray));
headers.Add(new HeaderKey { Value = "key_6".ToLower() }, HeaderValue.FromInt128(new Int128(6969696969, 420420420)));
headers.Add(new HeaderKey { Value = "key7".ToLower() }, HeaderValue.FromGuid(Guid.NewGuid()));
while (true)
{
var debugMessages = new List<ISerializableMessage>();
var messages = new Envelope[messageBatchCount];
for (int i = 0; i < messageBatchCount; i++)
{
var message = MessageGenerator.GenerateMessage();
var envelope = message.ToEnvelope();
debugMessages.Add(message);
messages[i] = envelope;
}
var messagesSerialized = new List<Message>();
foreach (var message in messages)
{
messagesSerialized.Add(new Message
{
Id = Guid.NewGuid(),
Headers = headers,
Payload = encryptor(serializer(message))
});
}
try
{
await bus.SendMessagesAsync(new MessageSendRequest<Envelope>
{
StreamId = streamId,
TopicId = topicId,
Partitioning = Partitioning.PartitionId(3),
Messages = messages
},
serializer,
encryptor, headers);
}
catch (Exception e)
{
Console.WriteLine(e.Message);
throw;
}
Console.WriteLine(
$"Sent messages: {string.Join(Environment.NewLine, debugMessages.ConvertAll(m => m.ToString()))}");
await Task.Delay(intervalInMs);
}
}