in core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala [236:415]
def producerFactory: ProducerSettings[K, V] => Producer[K, V] = _ => createKafkaProducer()
/**
* An id string to pass to the server when making requests. The purpose of this is to be able to track the source
* of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
*/
def withClientId(clientId: String): ProducerSettings[K, V] =
withProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId)
/**
* A comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
*/
def withBootstrapServers(bootstrapServers: String): ProducerSettings[K, V] =
withProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
/**
* Scala API:
* The raw properties of the kafka-clients driver, see constants in
* [[org.apache.kafka.clients.producer.ProducerConfig]].
*/
def withProperties(properties: Map[String, String]): ProducerSettings[K, V] =
copy(properties = this.properties ++ properties)
/**
* Scala API:
* The raw properties of the kafka-clients driver, see constants in
* [[org.apache.kafka.clients.producer.ProducerConfig]].
*/
def withProperties(properties: (String, String)*): ProducerSettings[K, V] =
copy(properties = this.properties ++ properties.toMap)
/**
* Java API:
* The raw properties of the kafka-clients driver, see constants in
* [[org.apache.kafka.clients.producer.ProducerConfig]].
*/
def withProperties(properties: java.util.Map[String, String]): ProducerSettings[K, V] =
copy(properties = this.properties ++ properties.asScala)
/**
* The raw properties of the kafka-clients driver, see constants in
* [[org.apache.kafka.clients.producer.ProducerConfig]].
*/
def withProperty(key: String, value: String): ProducerSettings[K, V] =
copy(properties = properties.updated(key, value))
/**
* Java API: Get a raw property. `null` if it is not defined.
*/
def getProperty(key: String): String = properties.getOrElse(key, null)
/**
* Duration to wait for `KafkaProducer.close` to finish.
*/
def withCloseTimeout(closeTimeout: FiniteDuration): ProducerSettings[K, V] =
copy(closeTimeout = closeTimeout)
/**
* Java API:
* Duration to wait for `KafkaProducer.close` to finish.
*/
def withCloseTimeout(closeTimeout: java.time.Duration): ProducerSettings[K, V] =
copy(closeTimeout = closeTimeout.asScala)
/**
* Call `KafkaProducer.close` on the [[org.apache.kafka.clients.producer.KafkaProducer]] when the producer stage
* receives a shutdown signal.
*/
def withCloseProducerOnStop(closeProducerOnStop: Boolean): ProducerSettings[K, V] =
copy(closeProducerOnStop = closeProducerOnStop)
/**
* Tuning parameter of how many sends that can run in parallel.
*/
def withParallelism(parallelism: Int): ProducerSettings[K, V] =
copy(parallelism = parallelism)
/**
* Fully qualified config path which holds the dispatcher configuration
* to be used by the producer stages. Some blocking may occur.
* When this value is empty, the dispatcher configured for the stream
* will be used.
*/
def withDispatcher(dispatcher: String): ProducerSettings[K, V] =
copy(dispatcher = dispatcher)
/**
* The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`.
*/
def withEosCommitInterval(eosCommitInterval: FiniteDuration): ProducerSettings[K, V] =
copy(eosCommitInterval = eosCommitInterval)
/**
* Java API:
* The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`.
*/
def withEosCommitInterval(eosCommitInterval: java.time.Duration): ProducerSettings[K, V] =
copy(eosCommitInterval = eosCommitInterval.asScala)
/**
* Scala API.
* A hook to allow for resolving some settings asynchronously.
* @since Alpakka Kafka 2.0.0
*/
def withEnrichAsync(value: ProducerSettings[K, V] => Future[ProducerSettings[K, V]]): ProducerSettings[K, V] =
copy(enrichAsync = Some(value))
/**
* Java API.
* A hook to allow for resolving some settings asynchronously.
* @since Alpakka Kafka 2.0.0
*/
def withEnrichCompletionStage(
value: java.util.function.Function[ProducerSettings[K, V], CompletionStage[ProducerSettings[K, V]]])
: ProducerSettings[K, V] =
copy(enrichAsync = Some((s: ProducerSettings[K, V]) => value.apply(s).asScala))
/**
* Replaces the default Kafka producer creation logic with an external producer. This will also set
* `closeProducerOnStop = false` by default.
*/
def withProducer(
producer: Producer[K, V]): ProducerSettings[K, V] =
copy(producerFactorySync = Some(_ => producer), closeProducerOnStop = false)
/**
* Replaces the default Kafka producer creation logic.
*/
def withProducerFactory(
factory: ProducerSettings[K, V] => Producer[K, V]): ProducerSettings[K, V] =
copy(producerFactorySync = Some(factory))
/**
* Get the Kafka producer settings as map.
*/
def getProperties: java.util.Map[String, AnyRef] = properties.asInstanceOf[Map[String, AnyRef]].asJava
private def copy(
properties: Map[String, String] = properties,
keySerializer: Option[Serializer[K]] = keySerializerOpt,
valueSerializer: Option[Serializer[V]] = valueSerializerOpt,
closeTimeout: FiniteDuration = closeTimeout,
closeProducerOnStop: Boolean = closeProducerOnStop,
parallelism: Int = parallelism,
dispatcher: String = dispatcher,
eosCommitInterval: FiniteDuration = eosCommitInterval,
enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]] = enrichAsync,
producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync)
: ProducerSettings[K, V] =
new ProducerSettings[K, V](properties,
keySerializer,
valueSerializer,
closeTimeout,
closeProducerOnStop,
parallelism,
dispatcher,
eosCommitInterval,
enrichAsync,
producerFactorySync)
override def toString: String = {
val propertiesWithMandatoryKeys = properties ++ Map(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> keySerializerOpt.map(_.getClass).orNull,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> valueSerializerOpt.map(_.getClass).orNull)
val kafkaClients =
ConfigSettings.serializeAndMaskKafkaProperties(propertiesWithMandatoryKeys,
new org.apache.kafka.clients.producer.ProducerConfig(_))
"org.apache.pekko.kafka.ProducerSettings(" +
s"properties=$kafkaClients," +
s"keySerializer=$keySerializerOpt," +
s"valueSerializer=$valueSerializerOpt," +
s"closeTimeout=${closeTimeout.toCoarsest}," +
s"closeProducerOnStop=$closeProducerOnStop," +
s"parallelism=$parallelism," +
s"dispatcher=$dispatcher," +
s"eosCommitInterval=${eosCommitInterval.toCoarsest}," +
s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}," +
s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})"
}