protected override async Task ReaderAsync()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/MultipleItemFunctionExecutor.cs [32:118]


        protected override async Task ReaderAsync(ChannelReader<IKafkaEventData[]> reader, CancellationToken cancellationToken, ILogger logger)
        {
            while (!cancellationToken.IsCancellationRequested && await reader.WaitToReadAsync(cancellationToken))
            {
                while (!cancellationToken.IsCancellationRequested && reader.TryRead(out var itemsToExecute))
                {
                    try
                    {
                        // Try to publish them
                        var triggerInput = KafkaTriggerInput.New(itemsToExecute);
                        var triggerData = new TriggeredFunctionData
                        {
                            TriggerValue = triggerInput,
                        };

                        // Create Batch Event Activity Provider and Start the activity 
                        var batchEventActivityProvider = new BatchEventActivityProvider(itemsToExecute, consumerGroup);
                        batchEventActivityProvider.StartActivity();

                        FunctionResult functionResult = null;
                        try
                        {
                            // Execute the function
                            functionResult = await this.ExecuteFunctionAsync(triggerData, cancellationToken);
                            // Set the status of activity.
                            batchEventActivityProvider.SetActivityStatus(functionResult.Succeeded, functionResult.Exception);
                        }
                        catch (Exception ex)
                        {
                            batchEventActivityProvider.SetActivityStatus(false, ex);
                            throw;
                        }
                        finally
                        {
                            // Stop the Activity
                            batchEventActivityProvider.StopCurrentActivity();
                        }

                        var offsetsToCommit = new Dictionary<int, TopicPartitionOffset>();
                        for (var i=itemsToExecute.Length - 1; i >= 0; i--)
                        {
                            var currentItem = itemsToExecute[i];
                            if (!offsetsToCommit.ContainsKey(currentItem.Partition))
                            {
                                offsetsToCommit.Add(
                                    currentItem.Partition, 
                                    new TopicPartitionOffset(
                                        currentItem.Topic,
                                        currentItem.Partition,
                                        currentItem.Offset + 1)); // offset is inclusive when resuming
                            }
                        }

                        if (!cancellationToken.IsCancellationRequested)
                        {
                            this.Commit(offsetsToCommit.Values);

                            if (functionResult.Succeeded)
                            {
                                if (logger.IsEnabled(LogLevel.Debug))
                                {
                                    logger.LogDebug("Function executed with {batchSize} items in {topic} / {partitions} / {offsets}",
                                        itemsToExecute.Length,
                                        itemsToExecute[0].Topic,
                                        string.Join(",", offsetsToCommit.Keys),
                                        string.Join(",", offsetsToCommit.Values.Select(x => x.Offset)));
                                }
                            }
                            else
                            {
                                logger.LogError(functionResult.Exception, "Failed to executed function with {batchSize} items in {topic} / {partitions} / {offsets}",
                                    itemsToExecute.Length,
                                    itemsToExecute[0].Topic,
                                    string.Join(",", offsetsToCommit.Keys),
                                    string.Join(",", offsetsToCommit.Values.Select(x => x.Offset)));
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        logger.LogError(ex, $"Error in executor reader");
                    }
                }
            }

            logger.LogInformation("Exiting reader {processName}", nameof(MultipleItemFunctionExecutor<TKey, TValue>));
        }