def apply[K, V]()

in core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala [44:97]


  def apply[K, V](
      system: org.apache.pekko.actor.ActorSystem,
      keySerializer: Option[Serializer[K]],
      valueSerializer: Option[Serializer[V]]): ProducerSettings[K, V] =
    apply(system.settings.config.getConfig(configPath), keySerializer, valueSerializer)

  /**
   * Create settings from the default configuration
   * `pekko.kafka.producer`.
   * Key or value serializer can be passed explicitly or retrieved from configuration.
   *
   * For use with the `org.apache.pekko.actor.typed` API.
   */
  def apply[K, V](
      system: org.apache.pekko.actor.ClassicActorSystemProvider,
      keySerializer: Option[Serializer[K]],
      valueSerializer: Option[Serializer[V]]): ProducerSettings[K, V] =
    apply(system.classicSystem, keySerializer, valueSerializer)

  /**
   * Create settings from a configuration with the same layout as
   * the default configuration `pekko.kafka.producer`.
   * Key or value serializer can be passed explicitly or retrieved from configuration.
   */
  def apply[K, V](
      config: Config,
      keySerializer: Option[Serializer[K]],
      valueSerializer: Option[Serializer[V]]): ProducerSettings[K, V] = {
    val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
    require(
      keySerializer != null &&
      (keySerializer.isDefined || properties.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)),
      "Key serializer should be defined or declared in configuration")
    require(
      valueSerializer != null &&
      (valueSerializer.isDefined || properties.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)),
      "Value serializer should be defined or declared in configuration")
    val closeTimeout = config.getDuration("close-timeout").asScala
    val closeOnProducerStop = config.getBoolean("close-on-producer-stop")
    val parallelism = config.getInt("parallelism")
    val dispatcher = config.getString("use-dispatcher")
    val eosCommitInterval = config.getDuration("eos-commit-interval").asScala
    new ProducerSettings[K, V](
      properties,
      keySerializer,
      valueSerializer,
      closeTimeout,
      closeOnProducerStop,
      parallelism,
      dispatcher,
      eosCommitInterval,
      enrichAsync = None,
      producerFactorySync = None)
  }