in core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala [104:231]
def apply[K, V](
system: org.apache.pekko.actor.ActorSystem,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]): ProducerSettings[K, V] =
apply(system, Option(keySerializer), Option(valueSerializer))
/**
* Create settings from the default configuration
* `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*
* For use with the `org.apache.pekko.actor.typed` API.
*/
def apply[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]): ProducerSettings[K, V] =
apply(system, Option(keySerializer), Option(valueSerializer))
/**
* Create settings from a configuration with the same layout as
* the default configuration `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*/
def apply[K, V](
config: Config,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]): ProducerSettings[K, V] =
apply(config, Option(keySerializer), Option(valueSerializer))
/**
* Java API: Create settings from the default configuration
* `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
system: org.apache.pekko.actor.ActorSystem,
keySerializer: Optional[Serializer[K]],
valueSerializer: Optional[Serializer[V]]): ProducerSettings[K, V] =
apply(system, keySerializer.toScala, valueSerializer.toScala)
/**
* Java API: Create settings from the default configuration
* `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*
* For use with the `org.apache.pekko.actor.typed` API.
*/
def create[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
keySerializer: Optional[Serializer[K]],
valueSerializer: Optional[Serializer[V]]): ProducerSettings[K, V] =
apply(system, keySerializer.toScala, valueSerializer.toScala)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
config: Config,
keySerializer: Optional[Serializer[K]],
valueSerializer: Optional[Serializer[V]]): ProducerSettings[K, V] =
apply(config, keySerializer.toScala, valueSerializer.toScala)
/**
* Java API: Create settings from the default configuration
* `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
system: org.apache.pekko.actor.ActorSystem,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]): ProducerSettings[K, V] =
apply(system, keySerializer, valueSerializer)
/**
* Java API: Create settings from the default configuration
* `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*
* For use with the `org.apache.pekko.actor.typed` API.
*/
def create[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]): ProducerSettings[K, V] =
apply(system, keySerializer, valueSerializer)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
config: Config,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]): ProducerSettings[K, V] =
apply(config, keySerializer, valueSerializer)
/**
* Create a [[org.apache.kafka.clients.producer.KafkaProducer KafkaProducer]] instance from the settings.
*/
def createKafkaProducer[K, V](settings: ProducerSettings[K, V]): KafkaProducer[K, V] =
new KafkaProducer[K, V](settings.getProperties,
settings.keySerializerOpt.orNull,
settings.valueSerializerOpt.orNull)
}
/**
* Settings for producers. See `pekko.kafka.producer` section in
* reference.conf. Note that the [[pekko.kafka.ProducerSettings companion]] object provides
* `apply` and `create` functions for convenient construction of the settings, together with
* the `with` methods.
*
* The constructor is Internal API.
*/
class ProducerSettings[K, V] @InternalApi private[kafka] (
val properties: Map[String, String],
val keySerializerOpt: Option[Serializer[K]],
val valueSerializerOpt: Option[Serializer[V]],
val closeTimeout: FiniteDuration,
val closeProducerOnStop: Boolean,
val parallelism: Int,
val dispatcher: String,
val eosCommitInterval: FiniteDuration,
val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]],
val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]]) {