in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs [379:418]
private async Task SafeCloseConsumerAsync()
{
if (Interlocked.Exchange(ref isClosed, 1) == 1)
{
return;
}
try
{
// Stop subscriber thread
this.listenerCancellationTokenSource.Cancel();
// Stop function executor
if (this.functionExecutor != null)
{
await this.functionExecutor.CloseAsync();
}
// Wait for subscriber thread to end
if (this.subscriberFinished != null)
{
await this.subscriberFinished.WaitAsync(TimeToWaitForRunningProcessToEnd);
}
if (this.consumer.IsValueCreated)
{
var localConsumer = this.consumer.Value;
localConsumer.Unsubscribe();
localConsumer.Dispose();
}
this.functionExecutor?.Dispose();
this.subscriberFinished?.Dispose();
this.listenerCancellationTokenSource.Dispose();
}
catch (Exception ex)
{
this.logger.LogError(ex, "Failed to close Kafka listener");
}
}