def apply[K, V]()

in core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala [58:122]


  def apply[K, V](
      system: pekko.actor.ClassicActorSystemProvider,
      keyDeserializer: Option[Deserializer[K]],
      valueDeserializer: Option[Deserializer[V]]): ConsumerSettings[K, V] =
    apply(system.classicSystem, keyDeserializer, valueDeserializer)

  /**
   * Create settings from a configuration with the same layout as
   * the default configuration `pekko.kafka.consumer`.
   * Key or value deserializer can be passed explicitly or retrieved from configuration.
   */
  def apply[K, V](
      config: Config,
      keyDeserializer: Option[Deserializer[K]],
      valueDeserializer: Option[Deserializer[V]]): ConsumerSettings[K, V] = {
    val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
    require(
      keyDeserializer != null &&
      (keyDeserializer.isDefined || properties.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)),
      "Key deserializer should be defined or declared in configuration")
    require(
      valueDeserializer != null &&
      (valueDeserializer.isDefined || properties.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
      "Value deserializer should be defined or declared in configuration")
    val pollInterval = config.getDuration("poll-interval").asScala
    val pollTimeout = config.getDuration("poll-timeout").asScala
    val stopTimeout = config.getDuration("stop-timeout").asScala
    val closeTimeout = config.getDuration("close-timeout").asScala
    val commitTimeout = config.getDuration("commit-timeout").asScala
    val commitTimeWarning = config.getDuration("commit-time-warning").asScala
    val commitRefreshInterval = ConfigSettings.getPotentiallyInfiniteDuration(config, "commit-refresh-interval")
    val dispatcher = config.getString("use-dispatcher")
    val waitClosePartition = config.getDuration("wait-close-partition").asScala
    val positionTimeout = config.getDuration("position-timeout").asScala
    val offsetForTimesTimeout = config.getDuration("offset-for-times-timeout").asScala
    val metadataRequestTimeout = config.getDuration("metadata-request-timeout").asScala
    val drainingCheckInterval = config.getDuration("eos-draining-check-interval").asScala
    val connectionCheckerSettings = ConnectionCheckerSettings(config.getConfig(ConnectionCheckerSettings.configPath))
    val partitionHandlerWarning = config.getDuration("partition-handler-warning").asScala
    val resetProtectionThreshold = OffsetResetProtectionSettings(
      config.getConfig(OffsetResetProtectionSettings.configPath))

    new ConsumerSettings[K, V](
      properties,
      keyDeserializer,
      valueDeserializer,
      pollInterval,
      pollTimeout,
      stopTimeout,
      closeTimeout,
      commitTimeout,
      commitRefreshInterval,
      dispatcher,
      commitTimeWarning,
      waitClosePartition,
      positionTimeout,
      offsetForTimesTimeout,
      metadataRequestTimeout,
      drainingCheckInterval,
      enrichAsync = None,
      ConsumerSettings.createKafkaConsumer,
      connectionCheckerSettings,
      partitionHandlerWarning,
      resetProtectionThreshold)
  }