internal void Flush()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/FunctionExecutorBase.cs [111:143]


        internal void Flush(CancellationToken cancellationToken)
        {
            if (this.currentBatch.Count == 0)
            {
                return;
            }

            var items = this.currentBatch.ToArray();
            this.currentBatch.Clear();

            var loggedWaitingForFunction = false;


            while (!cancellationToken.IsCancellationRequested)
            {
                if (channel.Writer.TryWrite(items))
                {
                    break;
                }

                if (!loggedWaitingForFunction)
                {
                    this.logger.LogInformation("Channel {topic} / {partition} / {offset} is full, waiting for the function execution to catch up",
                           items[0].Topic,
                           items[0].Partition,
                           items[0].Offset);

                    loggedWaitingForFunction = true;
                }

                Thread.Sleep(this.channelFullRetryIntervalInMs);
            }
        }