in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/FunctionExecutorBase.cs [33:76]
internal FunctionExecutorBase(
ITriggeredFunctionExecutor executor,
IConsumer<TKey, TValue> consumer,
int channelCapacity,
int channelFullRetryIntervalInMs,
ICommitStrategy<TKey, TValue> commitStrategy,
ILogger logger,
IDrainModeManager drainModeManager)
{
this.executor = executor ?? throw new System.ArgumentNullException(nameof(executor));
this.consumer = consumer ?? throw new System.ArgumentNullException(nameof(consumer));
this.channelFullRetryIntervalInMs = channelFullRetryIntervalInMs;
this.commitStrategy = commitStrategy;
this.logger = logger;
this.functionExecutionCancellationTokenSource = new CancellationTokenSource();
this.currentBatch = new List<IKafkaEventData>();
this.drainModeManager = drainModeManager;
this.channel = Channel.CreateBounded<IKafkaEventData[]>(new BoundedChannelOptions(channelCapacity)
{
SingleReader = true,
SingleWriter = true,
});
Task.Run(async () =>
{
try
{
await this.ReaderAsync(this.channel.Reader, this.functionExecutionCancellationTokenSource.Token, this.logger);
}
catch (Exception ex)
{
// Channel reader will throw OperationCanceledException if cancellation token is cancelled during a call
if (!(ex is OperationCanceledException))
{
this.logger.LogError(ex, $"Function executor error while processing channel");
}
}
finally
{
this.readerFinished.Release();
}
});
}