private async Task ProcessPartitionItemsAsync()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/SingleItemFunctionExecutor.cs [68:115]


        private async Task ProcessPartitionItemsAsync(int partition, IEnumerable<IKafkaEventData> events, CancellationToken cancellationToken)
        {
            TopicPartition topicPartition = null;
            foreach (var kafkaEventData in events)
            {
                var triggerInput = KafkaTriggerInput.New(kafkaEventData);
                var triggerData = new TriggeredFunctionData
                {
                    TriggerValue = triggerInput,
                };

                // Create Single Event Activity Provider and Start the activity
                var singleEventActivityProvider = new SingleEventActivityProvider(kafkaEventData, consumerGroup);
                singleEventActivityProvider.StartActivity();
                FunctionResult functionResult = null;
                try
                {
                    // Execute the Function
                    functionResult = await this.ExecuteFunctionAsync(triggerData, cancellationToken);
                    // Set the status of activity.
                    singleEventActivityProvider.SetActivityStatus(functionResult.Succeeded, functionResult.Exception);
                }
                catch (Exception ex)
                {
                    singleEventActivityProvider.SetActivityStatus(false, ex);
                    throw;
                }
                finally
                {
                    // Stop the activity
                    singleEventActivityProvider.StopCurrentActivity();
                }

                if (topicPartition == null)
                {
                    topicPartition = new TopicPartition(kafkaEventData.Topic, partition);
                }

                // Commiting after each function execution plays nicer with function scaler.
                // When processing a large batch of events where the execution of each event takes time
                // it would take Events_In_Batch_For_Partition * Event_Processing_Time to update the current offset.
                // Doing it after each event minimizes the delay
                if (!cancellationToken.IsCancellationRequested)
                {
                    this.Commit(new[] { new TopicPartitionOffset(topicPartition, kafkaEventData.Offset + 1) });  // offset is inclusive when resuming
                }
            }
        }