in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs [296:370]
private void ProcessSubscription(object parameter)
{
this.subscriberFinished = new SemaphoreSlim(0, 1);
var cancellationToken = (CancellationToken)parameter;
var maxBatchSize = this.options.MaxBatchSize;
var maxBatchReleaseTime = TimeSpan.FromSeconds(this.options.SubscriberIntervalInSeconds);
var localConsumer = this.consumer.Value;
try
{
var alreadyFlushedInCurrentExecution = false;
while (!cancellationToken.IsCancellationRequested)
{
var batchStart = DateTime.UtcNow;
var availableTime = maxBatchReleaseTime - (DateTime.UtcNow - batchStart);
alreadyFlushedInCurrentExecution = false;
while (availableTime > TimeSpan.Zero)
{
try
{
var consumeResult = localConsumer.Consume(availableTime);
// If no message was consumed during the available time, returns null
if (consumeResult != null)
{
if (consumeResult.IsPartitionEOF)
{
this.logger.LogInformation("Reached end of {topic} / {partition} / {offset}", consumeResult.Topic, consumeResult.Partition, consumeResult.Offset);
}
else
{
var kafkaEventData = this.requiresKey ?
(IKafkaEventData)new KafkaEventData<TKey, TValue>(consumeResult) :
KafkaEventData<TValue>.CreateFrom(consumeResult);
// add message to executor
// if executor pending items is full, flush it
var currentSize = this.functionExecutor.Add(kafkaEventData);
if (currentSize >= maxBatchSize)
{
this.functionExecutor.Flush(listenerCancellationTokenSource.Token);
alreadyFlushedInCurrentExecution = true;
}
}
availableTime = maxBatchReleaseTime - (DateTime.UtcNow - batchStart);
}
else
{
// TODO: maybe slow down if there isn't much incoming data
break;
}
}
catch (ConsumeException ex)
{
this.logger.LogError(ex, $"Consume error");
}
}
if (!alreadyFlushedInCurrentExecution)
{
this.functionExecutor.Flush(listenerCancellationTokenSource.Token);
}
}
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error in Kafka subscriber");
}
finally
{
this.logger.LogInformation("Exiting {processName} for {topic}", nameof(ProcessSubscription), this.listenerConfiguration.Topic);
this.subscriberFinished.Release();
}
}