in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/FunctionExecutorBase.cs [111:143]
internal void Flush(CancellationToken cancellationToken)
{
if (this.currentBatch.Count == 0)
{
return;
}
var items = this.currentBatch.ToArray();
this.currentBatch.Clear();
var loggedWaitingForFunction = false;
while (!cancellationToken.IsCancellationRequested)
{
if (channel.Writer.TryWrite(items))
{
break;
}
if (!loggedWaitingForFunction)
{
this.logger.LogInformation("Channel {topic} / {partition} / {offset} is full, waiting for the function execution to catch up",
items[0].Topic,
items[0].Partition,
items[0].Offset);
loggedWaitingForFunction = true;
}
Thread.Sleep(this.channelFullRetryIntervalInMs);
}
}