def createKafkaConsumerAsync()

in core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala [615:655]


  def createKafkaConsumerAsync()(implicit executionContext: ExecutionContext): Future[Consumer[K, V]] =
    enriched.map(consumerFactory)

  /**
   * Java API.
   *
   * Create a [[org.apache.kafka.clients.consumer.Consumer Kafka Consumer]] instance from these settings
   * (without blocking for `enriched`).
   */
  def createKafkaConsumerCompletionStage(executor: Executor): CompletionStage[Consumer[K, V]] =
    enriched.map(consumerFactory)(ExecutionContext.fromExecutor(executor)).asJava

  override def toString: String = {
    val propertiesWithMandatoryKeys = properties ++ Map(
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> keyDeserializerOpt.map(_.getClass).orNull,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> valueDeserializerOpt.map(_.getClass).orNull)

    val kafkaClients =
      ConfigSettings.serializeAndMaskKafkaProperties(propertiesWithMandatoryKeys,
        new org.apache.kafka.clients.consumer.ConsumerConfig(_))
    "org.apache.pekko.kafka.ConsumerSettings(" +
    s"properties=$kafkaClients," +
    s"keyDeserializer=$keyDeserializerOpt," +
    s"valueDeserializer=$valueDeserializerOpt," +
    s"pollInterval=${pollInterval.toCoarsest}," +
    s"pollTimeout=${pollTimeout.toCoarsest}," +
    s"stopTimeout=${stopTimeout.toCoarsest}," +
    s"closeTimeout=${closeTimeout.toCoarsest}," +
    s"commitTimeout=${commitTimeout.toCoarsest}," +
    s"commitRefreshInterval=${commitRefreshInterval.toCoarsest}," +
    s"dispatcher=$dispatcher," +
    s"commitTimeWarning=${commitTimeWarning.toCoarsest}," +
    s"waitClosePartition=${waitClosePartition.toCoarsest}," +
    s"metadataRequestTimeout=${metadataRequestTimeout.toCoarsest}," +
    s"drainingCheckInterval=${drainingCheckInterval.toCoarsest}," +
    s"connectionCheckerSettings=$connectionCheckerSettings," +
    s"partitionHandlerWarning=${partitionHandlerWarning.toCoarsest}," +
    s"resetProtectionSettings=$resetProtectionSettings," +
    s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}" +
    ")"
  }