in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaMetricsProvider.cs [66:94]
protected virtual List<TopicPartition> LoadTopicPartitions()
{
try
{
var timeout = TimeSpan.FromSeconds(5);
using var adminClient = new AdminClientBuilder(adminClientConfig).Build();
var metadata = adminClient.GetMetadata(this.topicName, timeout);
if (metadata.Topics == null || metadata.Topics.Count == 0)
{
logger.LogError($"Could not load metadata information about topic '{this.topicName}'");
return new List<TopicPartition>();
}
var topicMetadata = metadata.Topics[0];
var partitions = topicMetadata.Partitions;
if (partitions == null || partitions.Count == 0)
{
logger.LogError($"Could not load partition information about topic '{this.topicName}'");
return new List<TopicPartition>();
}
return partitions.Select(x => new TopicPartition(topicMetadata.Topic, new Partition(x.PartitionId))).ToList();
}
catch (Exception ex)
{
logger.LogError(ex, $"Failed to load partition information from topic '{this.topicName}'");
}
return new List<TopicPartition>();
}