in foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs [307:344]
public async Task SendMessagesAsync<TMessage>(MessageSendRequest<TMessage> request,
Func<TMessage, byte[]> serializer,
Func<byte[], byte[]>? encryptor = null, Dictionary<HeaderKey, HeaderValue>? headers = null,
CancellationToken token = default)
{
var messages = request.Messages;
if (messages.Count == 0)
{
return;
}
//TODO - explore making fields of Message class mutable, so there is no need to create em from scratch
var messagesBuffer = new Message[messages.Count];
for (var i = 0; i < messages.Count || token.IsCancellationRequested; i++)
{
messagesBuffer[i] = new Message
{
Payload = encryptor is not null ? encryptor(serializer(messages[i])) : serializer(messages[i]),
Headers = headers,
Id = Guid.NewGuid()
};
}
var sendRequest = new MessageSendRequest
{
StreamId = request.StreamId,
TopicId = request.TopicId,
Partitioning = request.Partitioning,
Messages = messagesBuffer
};
if (_messageInvoker is not null)
{
await _messageInvoker.SendMessagesAsync(sendRequest, token);
return;
}
await _channel!.Writer.WriteAsync(sendRequest, token);
}