protected override async Task ReaderAsync()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/SingleItemFunctionExecutor.cs [32:66]


        protected override async Task ReaderAsync(ChannelReader<IKafkaEventData[]> reader, CancellationToken cancellationToken, ILogger logger)
        {
            var partitionTasks = new List<Task>();

            while (!cancellationToken.IsCancellationRequested && await reader.WaitToReadAsync(cancellationToken))
            {
                while (!cancellationToken.IsCancellationRequested && reader.TryRead(out var itemsToExecute))
                {
                    try
                    {
                        partitionTasks.Clear();

                        // Create one task per partition, this way slow partition executions will not delay others
                        // Order in a partition must be followed.
                        var itemsByPartition = itemsToExecute.GroupBy(x => x.Partition);

                        foreach (var partitionAndEvents in itemsByPartition)
                        {
                            var partition = partitionAndEvents.Key;
                            var kafkaEvents = partitionAndEvents;

                            partitionTasks.Add(ProcessPartitionItemsAsync(partition, kafkaEvents, cancellationToken));
                        }

                        await Task.WhenAll(partitionTasks);
                    }
                    catch (Exception ex)
                    {
                        logger.LogError(ex, $"Error in executor reader");
                    }
                }
            }

            logger.LogInformation("Exiting reader {processName}", nameof(SingleItemFunctionExecutor<TKey, TValue>));
        }