in testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala [74:105]
def startCluster(): String = startCluster(testcontainersSettings)
def startCluster(settings: KafkaTestkitTestcontainersSettings): String = {
import settings._
// check if already initialized
if (kafkaPortInternal == -1) {
cluster = new KafkaContainerCluster(
DockerImageName.parse(settings.zooKeeperImage).withTag(settings.zooKeeperImageTag),
DockerImageName.parse(settings.kafkaImage).withTag(settings.kafkaImageTag),
DockerImageName.parse(settings.schemaRegistryImage).withTag(settings.schemaRegistryImageTag),
numBrokers,
internalTopicsReplicationFactor,
settings.useSchemaRegistry,
settings.containerLogging,
settings.clusterStartTimeout.asJava,
settings.readinessCheckTimeout.asJava)
configureKafka(brokerContainers)
configureKafkaConsumer.accept(brokerContainers.asJavaCollection)
configureZooKeeper(zookeeperContainer)
configureZooKeeperConsumer.accept(zookeeperContainer)
schemaRegistryContainer match {
case Some(container) => configureSchemaRegistry(container)
case _ =>
}
log.info("Starting Kafka cluster with settings: {}", settings)
cluster.start()
kafkaBootstrapServersInternal = cluster.getBootstrapServers
kafkaPortInternal =
kafkaBootstrapServersInternal.substring(kafkaBootstrapServersInternal.lastIndexOf(":") + 1).toInt
}
kafkaBootstrapServersInternal
}