in foreign/csharp/Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs [37:99]
internal async Task SendMessages()
{
var messagesSendRequests = new MessageSendRequest[_maxRequests];
while (await _timer.WaitForNextTickAsync(_cts.Token))
{
int idx = 0;
while (_channel.Reader.TryRead(out var msg))
{
messagesSendRequests[idx++] = msg;
}
if (idx == 0)
{
continue;
}
var canBatchMessages = CanBatchMessages(messagesSendRequests.AsSpan()[..idx]);
if (!canBatchMessages)
{
for (int i = 0; i < idx; i++)
{
try
{
await _messageInvoker.SendMessagesAsync(messagesSendRequests[i], token: _cts.Token);
}
catch
{
var partId = BinaryPrimitives.ReadInt32LittleEndian(messagesSendRequests[i].Partitioning.Value);
_logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}",
messagesSendRequests[i].StreamId, messagesSendRequests[i].TopicId, partId);
}
}
continue;
}
var messagesBatches = BatchMessages(messagesSendRequests.AsSpan()[..idx]);
try
{
foreach (var messages in messagesBatches)
{
try
{
if (messages is null)
{
break;
}
await _messageInvoker.SendMessagesAsync(messages, _cts.Token);
}
catch
{
var partId = BinaryPrimitives.ReadInt32LittleEndian(messages.Partitioning.Value);
_logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}",
messages.StreamId, messages.TopicId, partId);
}
}
}
finally
{
ArrayPool<MessageSendRequest?>.Shared.Return(messagesBatches);
}
}
}