in core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala [58:122]
def apply[K, V](
system: pekko.actor.ClassicActorSystemProvider,
keyDeserializer: Option[Deserializer[K]],
valueDeserializer: Option[Deserializer[V]]): ConsumerSettings[K, V] =
apply(system.classicSystem, keyDeserializer, valueDeserializer)
/**
* Create settings from a configuration with the same layout as
* the default configuration `pekko.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def apply[K, V](
config: Config,
keyDeserializer: Option[Deserializer[K]],
valueDeserializer: Option[Deserializer[V]]): ConsumerSettings[K, V] = {
val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
require(
keyDeserializer != null &&
(keyDeserializer.isDefined || properties.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)),
"Key deserializer should be defined or declared in configuration")
require(
valueDeserializer != null &&
(valueDeserializer.isDefined || properties.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
"Value deserializer should be defined or declared in configuration")
val pollInterval = config.getDuration("poll-interval").asScala
val pollTimeout = config.getDuration("poll-timeout").asScala
val stopTimeout = config.getDuration("stop-timeout").asScala
val closeTimeout = config.getDuration("close-timeout").asScala
val commitTimeout = config.getDuration("commit-timeout").asScala
val commitTimeWarning = config.getDuration("commit-time-warning").asScala
val commitRefreshInterval = ConfigSettings.getPotentiallyInfiniteDuration(config, "commit-refresh-interval")
val dispatcher = config.getString("use-dispatcher")
val waitClosePartition = config.getDuration("wait-close-partition").asScala
val positionTimeout = config.getDuration("position-timeout").asScala
val offsetForTimesTimeout = config.getDuration("offset-for-times-timeout").asScala
val metadataRequestTimeout = config.getDuration("metadata-request-timeout").asScala
val drainingCheckInterval = config.getDuration("eos-draining-check-interval").asScala
val connectionCheckerSettings = ConnectionCheckerSettings(config.getConfig(ConnectionCheckerSettings.configPath))
val partitionHandlerWarning = config.getDuration("partition-handler-warning").asScala
val resetProtectionThreshold = OffsetResetProtectionSettings(
config.getConfig(OffsetResetProtectionSettings.configPath))
new ConsumerSettings[K, V](
properties,
keyDeserializer,
valueDeserializer,
pollInterval,
pollTimeout,
stopTimeout,
closeTimeout,
commitTimeout,
commitRefreshInterval,
dispatcher,
commitTimeWarning,
waitClosePartition,
positionTimeout,
offsetForTimesTimeout,
metadataRequestTimeout,
drainingCheckInterval,
enrichAsync = None,
ConsumerSettings.createKafkaConsumer,
connectionCheckerSettings,
partitionHandlerWarning,
resetProtectionThreshold)
}