def startCluster()

in testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala [74:105]


    def startCluster(): String = startCluster(testcontainersSettings)

    def startCluster(settings: KafkaTestkitTestcontainersSettings): String = {
      import settings._
      // check if already initialized
      if (kafkaPortInternal == -1) {
        cluster = new KafkaContainerCluster(
          DockerImageName.parse(settings.zooKeeperImage).withTag(settings.zooKeeperImageTag),
          DockerImageName.parse(settings.kafkaImage).withTag(settings.kafkaImageTag),
          DockerImageName.parse(settings.schemaRegistryImage).withTag(settings.schemaRegistryImageTag),
          numBrokers,
          internalTopicsReplicationFactor,
          settings.useSchemaRegistry,
          settings.containerLogging,
          settings.clusterStartTimeout.asJava,
          settings.readinessCheckTimeout.asJava)
        configureKafka(brokerContainers)
        configureKafkaConsumer.accept(brokerContainers.asJavaCollection)
        configureZooKeeper(zookeeperContainer)
        configureZooKeeperConsumer.accept(zookeeperContainer)
        schemaRegistryContainer match {
          case Some(container) => configureSchemaRegistry(container)
          case _               =>
        }
        log.info("Starting Kafka cluster with settings: {}", settings)
        cluster.start()
        kafkaBootstrapServersInternal = cluster.getBootstrapServers
        kafkaPortInternal =
          kafkaBootstrapServersInternal.substring(kafkaBootstrapServersInternal.lastIndexOf(":") + 1).toInt
      }
      kafkaBootstrapServersInternal
    }