in core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala [44:97]
def apply[K, V](
system: org.apache.pekko.actor.ActorSystem,
keySerializer: Option[Serializer[K]],
valueSerializer: Option[Serializer[V]]): ProducerSettings[K, V] =
apply(system.settings.config.getConfig(configPath), keySerializer, valueSerializer)
/**
* Create settings from the default configuration
* `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*
* For use with the `org.apache.pekko.actor.typed` API.
*/
def apply[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
keySerializer: Option[Serializer[K]],
valueSerializer: Option[Serializer[V]]): ProducerSettings[K, V] =
apply(system.classicSystem, keySerializer, valueSerializer)
/**
* Create settings from a configuration with the same layout as
* the default configuration `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def apply[K, V](
config: Config,
keySerializer: Option[Serializer[K]],
valueSerializer: Option[Serializer[V]]): ProducerSettings[K, V] = {
val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
require(
keySerializer != null &&
(keySerializer.isDefined || properties.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)),
"Key serializer should be defined or declared in configuration")
require(
valueSerializer != null &&
(valueSerializer.isDefined || properties.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)),
"Value serializer should be defined or declared in configuration")
val closeTimeout = config.getDuration("close-timeout").asScala
val closeOnProducerStop = config.getBoolean("close-on-producer-stop")
val parallelism = config.getInt("parallelism")
val dispatcher = config.getString("use-dispatcher")
val eosCommitInterval = config.getDuration("eos-commit-interval").asScala
new ProducerSettings[K, V](
properties,
keySerializer,
valueSerializer,
closeTimeout,
closeOnProducerStop,
parallelism,
dispatcher,
eosCommitInterval,
enrichAsync = None,
producerFactorySync = None)
}