internal async Task SendMessages()

in foreign/csharp/Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs [37:99]


    internal async Task SendMessages()
    {
        var messagesSendRequests = new MessageSendRequest[_maxRequests];
        while (await _timer.WaitForNextTickAsync(_cts.Token))
        {
            int idx = 0;
            while (_channel.Reader.TryRead(out var msg))
            {
                messagesSendRequests[idx++] = msg;
            }

            if (idx == 0)
            {
                continue;
            }

            var canBatchMessages = CanBatchMessages(messagesSendRequests.AsSpan()[..idx]);
            if (!canBatchMessages)
            {
                for (int i = 0; i < idx; i++)
                {
                    try
                    {
                        await _messageInvoker.SendMessagesAsync(messagesSendRequests[i], token: _cts.Token);
                    }
                    catch
                    {
                        var partId = BinaryPrimitives.ReadInt32LittleEndian(messagesSendRequests[i].Partitioning.Value);
                       _logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}",
                           messagesSendRequests[i].StreamId, messagesSendRequests[i].TopicId, partId); 
                    }
                }

                continue;
            }

            var messagesBatches = BatchMessages(messagesSendRequests.AsSpan()[..idx]);
            try
            {
                foreach (var messages in messagesBatches)
                {
                    try
                    {
                        if (messages is null)
                        {
                            break;
                        }
                        await _messageInvoker.SendMessagesAsync(messages, _cts.Token);
                    }
                    catch
                    {
                        var partId = BinaryPrimitives.ReadInt32LittleEndian(messages.Partitioning.Value);
                        _logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}",
                            messages.StreamId, messages.TopicId, partId);
                    }
                }
            }
            finally
            {
                ArrayPool<MessageSendRequest?>.Shared.Return(messagesBatches);
            }
        }
    }