in core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala [615:655]
def createKafkaConsumerAsync()(implicit executionContext: ExecutionContext): Future[Consumer[K, V]] =
enriched.map(consumerFactory)
/**
* Java API.
*
* Create a [[org.apache.kafka.clients.consumer.Consumer Kafka Consumer]] instance from these settings
* (without blocking for `enriched`).
*/
def createKafkaConsumerCompletionStage(executor: Executor): CompletionStage[Consumer[K, V]] =
enriched.map(consumerFactory)(ExecutionContext.fromExecutor(executor)).asJava
override def toString: String = {
val propertiesWithMandatoryKeys = properties ++ Map(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> keyDeserializerOpt.map(_.getClass).orNull,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> valueDeserializerOpt.map(_.getClass).orNull)
val kafkaClients =
ConfigSettings.serializeAndMaskKafkaProperties(propertiesWithMandatoryKeys,
new org.apache.kafka.clients.consumer.ConsumerConfig(_))
"org.apache.pekko.kafka.ConsumerSettings(" +
s"properties=$kafkaClients," +
s"keyDeserializer=$keyDeserializerOpt," +
s"valueDeserializer=$valueDeserializerOpt," +
s"pollInterval=${pollInterval.toCoarsest}," +
s"pollTimeout=${pollTimeout.toCoarsest}," +
s"stopTimeout=${stopTimeout.toCoarsest}," +
s"closeTimeout=${closeTimeout.toCoarsest}," +
s"commitTimeout=${commitTimeout.toCoarsest}," +
s"commitRefreshInterval=${commitRefreshInterval.toCoarsest}," +
s"dispatcher=$dispatcher," +
s"commitTimeWarning=${commitTimeWarning.toCoarsest}," +
s"waitClosePartition=${waitClosePartition.toCoarsest}," +
s"metadataRequestTimeout=${metadataRequestTimeout.toCoarsest}," +
s"drainingCheckInterval=${drainingCheckInterval.toCoarsest}," +
s"connectionCheckerSettings=$connectionCheckerSettings," +
s"partitionHandlerWarning=${partitionHandlerWarning.toCoarsest}," +
s"resetProtectionSettings=$resetProtectionSettings," +
s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}" +
")"
}