internal FunctionExecutorBase()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/FunctionExecutorBase.cs [33:76]


        internal FunctionExecutorBase(
            ITriggeredFunctionExecutor executor,
            IConsumer<TKey, TValue> consumer,
            int channelCapacity,
            int channelFullRetryIntervalInMs,
            ICommitStrategy<TKey, TValue> commitStrategy,
            ILogger logger,
            IDrainModeManager drainModeManager)
        {
            this.executor = executor ?? throw new System.ArgumentNullException(nameof(executor));
            this.consumer = consumer ?? throw new System.ArgumentNullException(nameof(consumer));
            this.channelFullRetryIntervalInMs = channelFullRetryIntervalInMs;
            this.commitStrategy = commitStrategy;
            this.logger = logger;
            this.functionExecutionCancellationTokenSource = new CancellationTokenSource();
            this.currentBatch = new List<IKafkaEventData>();
            this.drainModeManager = drainModeManager;

            this.channel = Channel.CreateBounded<IKafkaEventData[]>(new BoundedChannelOptions(channelCapacity)
            {
                SingleReader = true,
                SingleWriter = true,
            });

            Task.Run(async () =>
            {
                try
                {
                    await this.ReaderAsync(this.channel.Reader, this.functionExecutionCancellationTokenSource.Token, this.logger);
                }
                catch (Exception ex)
                {
                    // Channel reader will throw OperationCanceledException if cancellation token is cancelled during a call
                    if (!(ex is OperationCanceledException))
                    {
                        this.logger.LogError(ex, $"Function executor error while processing channel");
                    }
                }
                finally
                {
                    this.readerFinished.Release();
                }
            });
        }