in foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs [446:473]
private async Task StartPollingMessagesAsync<TMessage>(MessageFetchRequest request,
Func<byte[], TMessage> deserializer, TimeSpan interval, ChannelWriter<MessageResponse<TMessage>> writer,
Func<byte[], byte[]>? decryptor = null,
CancellationToken token = default)
{
var timer = new PeriodicTimer(interval);
while (await timer.WaitForNextTickAsync(token) || token.IsCancellationRequested)
{
try
{
var fetchResponse = await FetchMessagesAsync(request, deserializer, decryptor, token);
if (fetchResponse.Messages.Count == 0)
{
continue;
}
foreach (var messageResponse in fetchResponse.Messages)
{
await writer.WriteAsync(messageResponse, token);
}
}
catch(InvalidResponseException e)
{
_logger.LogError("Error encountered while polling messages - Stream ID: {streamId}, Topic ID: {topicId}, Partition ID: {partitionId}, error message {message}",
request.StreamId, request.TopicId, request.PartitionId, e.Message);
}
}
writer.Complete();
}