private async Task SafeCloseConsumerAsync()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs [379:418]


        private async Task SafeCloseConsumerAsync()
        {
            if (Interlocked.Exchange(ref isClosed, 1) == 1)
            {
                return;
            }

            try
            {
                // Stop subscriber thread
                this.listenerCancellationTokenSource.Cancel();

                // Stop function executor                
                if (this.functionExecutor != null)
                {
                    await this.functionExecutor.CloseAsync();
                }

                // Wait for subscriber thread to end                
                if (this.subscriberFinished != null)
                {
                    await this.subscriberFinished.WaitAsync(TimeToWaitForRunningProcessToEnd);
                }

                if (this.consumer.IsValueCreated)
                {
                    var localConsumer = this.consumer.Value;
                    localConsumer.Unsubscribe();
                    localConsumer.Dispose();
                }
                
                this.functionExecutor?.Dispose();
                this.subscriberFinished?.Dispose();
                this.listenerCancellationTokenSource.Dispose();                
            }
            catch (Exception ex)
            {
                this.logger.LogError(ex, "Failed to close Kafka listener");
            }
        }