in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/SingleItemFunctionExecutor.cs [32:66]
protected override async Task ReaderAsync(ChannelReader<IKafkaEventData[]> reader, CancellationToken cancellationToken, ILogger logger)
{
var partitionTasks = new List<Task>();
while (!cancellationToken.IsCancellationRequested && await reader.WaitToReadAsync(cancellationToken))
{
while (!cancellationToken.IsCancellationRequested && reader.TryRead(out var itemsToExecute))
{
try
{
partitionTasks.Clear();
// Create one task per partition, this way slow partition executions will not delay others
// Order in a partition must be followed.
var itemsByPartition = itemsToExecute.GroupBy(x => x.Partition);
foreach (var partitionAndEvents in itemsByPartition)
{
var partition = partitionAndEvents.Key;
var kafkaEvents = partitionAndEvents;
partitionTasks.Add(ProcessPartitionItemsAsync(partition, kafkaEvents, cancellationToken));
}
await Task.WhenAll(partitionTasks);
}
catch (Exception ex)
{
logger.LogError(ex, $"Error in executor reader");
}
}
}
logger.LogInformation("Exiting reader {processName}", nameof(SingleItemFunctionExecutor<TKey, TValue>));
}