in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/SingleItemFunctionExecutor.cs [68:115]
private async Task ProcessPartitionItemsAsync(int partition, IEnumerable<IKafkaEventData> events, CancellationToken cancellationToken)
{
TopicPartition topicPartition = null;
foreach (var kafkaEventData in events)
{
var triggerInput = KafkaTriggerInput.New(kafkaEventData);
var triggerData = new TriggeredFunctionData
{
TriggerValue = triggerInput,
};
// Create Single Event Activity Provider and Start the activity
var singleEventActivityProvider = new SingleEventActivityProvider(kafkaEventData, consumerGroup);
singleEventActivityProvider.StartActivity();
FunctionResult functionResult = null;
try
{
// Execute the Function
functionResult = await this.ExecuteFunctionAsync(triggerData, cancellationToken);
// Set the status of activity.
singleEventActivityProvider.SetActivityStatus(functionResult.Succeeded, functionResult.Exception);
}
catch (Exception ex)
{
singleEventActivityProvider.SetActivityStatus(false, ex);
throw;
}
finally
{
// Stop the activity
singleEventActivityProvider.StopCurrentActivity();
}
if (topicPartition == null)
{
topicPartition = new TopicPartition(kafkaEventData.Topic, partition);
}
// Commiting after each function execution plays nicer with function scaler.
// When processing a large batch of events where the execution of each event takes time
// it would take Events_In_Batch_For_Partition * Event_Processing_Time to update the current offset.
// Doing it after each event minimizes the delay
if (!cancellationToken.IsCancellationRequested)
{
this.Commit(new[] { new TopicPartitionOffset(topicPartition, kafkaEventData.Offset + 1) }); // offset is inclusive when resuming
}
}
}