private ConsumerConfig GetConsumerConfiguration()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs [178:272]


        private ConsumerConfig GetConsumerConfiguration()
        {
            ConsumerConfig conf = new ConsumerConfig()
            {
                // enable auto-commit 
                EnableAutoCommit = true,

                // disable auto storing read offsets since we need to store them after calling the trigger function
                EnableAutoOffsetStore = false,

                // Interval in which commits stored in memory will be saved
                AutoCommitIntervalMs = this.options.AutoCommitIntervalMs,

                // Librdkafka debug options               
                Debug = this.options.LibkafkaDebug,

                // start from earliest if no checkpoint has been committed
                AutoOffsetReset = this.options.AutoOffsetReset,

                // Secure communication/authentication
                SaslMechanism = this.listenerConfiguration.SaslMechanism,
                SaslUsername = this.listenerConfiguration.SaslUsername,
                SaslPassword = this.listenerConfiguration.SaslPassword,
                SecurityProtocol = this.listenerConfiguration.SecurityProtocol,
                SslCaLocation = this.listenerConfiguration.SslCaLocation,
                SslCertificateLocation = this.listenerConfiguration.SslCertificateLocation,
                SslKeyLocation = this.listenerConfiguration.SslKeyLocation,
                SslKeyPassword = this.listenerConfiguration.SslKeyPassword,
                SslCaPem = this.listenerConfiguration.SslCaPEM,
                SslCertificatePem = this.listenerConfiguration.SslCertificatePEM,
                SslKeyPem = this.listenerConfiguration.SslKeyPEM,

                // OAuthBearer config
                SaslOauthbearerMethod = this.listenerConfiguration.SaslOAuthBearerMethod,
                SaslOauthbearerClientId = this.listenerConfiguration.SaslOAuthBearerClientId,
                SaslOauthbearerClientSecret = this.listenerConfiguration.SaslOAuthBearerClientSecret,
                SaslOauthbearerScope = this.listenerConfiguration.SaslOAuthBearerScope,
                SaslOauthbearerTokenEndpointUrl = this.listenerConfiguration.SaslOAuthBearerTokenEndpointUrl,
                SaslOauthbearerExtensions = this.listenerConfiguration.SaslOAuthBearerExtensions,

                // Values from host configuration
                StatisticsIntervalMs = this.options.StatisticsIntervalMs,
                ReconnectBackoffMs = this.options.ReconnectBackoffMs,
                ReconnectBackoffMaxMs = this.options.ReconnectBackoffMaxMs,
                SessionTimeoutMs = this.options.SessionTimeoutMs,
                MaxPollIntervalMs = this.options.MaxPollIntervalMs,
                QueuedMinMessages = this.options.QueuedMinMessages,
                QueuedMaxMessagesKbytes = this.options.QueuedMaxMessagesKbytes,
                MaxPartitionFetchBytes = this.options.MaxPartitionFetchBytes,
                FetchMaxBytes = this.options.FetchMaxBytes,
                MetadataMaxAgeMs = this.options.MetadataMaxAgeMs,
                SocketKeepaliveEnable = this.options.SocketKeepaliveEnable
            };

            if (string.IsNullOrEmpty(this.listenerConfiguration.EventHubConnectionString))
            {
                // Setup native kafka configuration
                conf.BootstrapServers = this.listenerConfiguration.BrokerList;
                conf.GroupId = this.listenerConfiguration.ConsumerGroup;

                if (!string.IsNullOrWhiteSpace(conf.SslCaLocation))
                {
                    if (AzureFunctionsFileHelper.TryGetValidFilePath(conf.SslCaLocation, out var resolvedSslCaLocation))
                    {
                        this.logger.LogDebug("Found SslCaLocation in {filePath}", resolvedSslCaLocation);
                        conf.SslCaLocation = resolvedSslCaLocation;
                    }
                    else
                    {
                        this.logger.LogWarning("Could not find valid file path for SslCaLocation {filePath}", conf.SslCaLocation);
                    }
                }                
            }
            else
            {
                // Setup eventhubs kafka head configuration
                var ehBrokerList = this.listenerConfiguration.BrokerList;
                if (!ehBrokerList.Contains(EventHubsBrokerListDns))
                {
                    ehBrokerList = $"{this.listenerConfiguration.BrokerList}{EventHubsBrokerListDns}:{EventHubsBrokerListPort}";
                }

                var consumerGroupToUse = string.IsNullOrEmpty(this.listenerConfiguration.ConsumerGroup) ? "$Default" : this.listenerConfiguration.ConsumerGroup;
                conf.BootstrapServers = ehBrokerList;
                conf.SecurityProtocol = SecurityProtocol.SaslSsl;
                conf.SaslMechanism = SaslMechanism.Plain;
                conf.SaslUsername = EventHubsSaslUsername;
                conf.SaslPassword = this.listenerConfiguration.EventHubConnectionString;
                conf.SslCaLocation= this.EnsureValidEventHubsCertificateLocation(this.listenerConfiguration.SslCaLocation);
                conf.GroupId = consumerGroupToUse;
                conf.BrokerVersionFallback = EventHubsBrokerVersionFallback;
            }

            return conf;
        }