in testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java [86:151]
public KafkaContainerCluster(
DockerImageName zooKeeperImage,
DockerImageName kafkaImage,
DockerImageName schemaRegistryImage,
int brokersNum,
int internalTopicsRf,
boolean useSchemaRegistry,
boolean containerLogging,
Duration clusterStartTimeout,
Duration readinessCheckTimeout) {
if (brokersNum < 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
throw new IllegalArgumentException(
"internalTopicsRf '"
+ internalTopicsRf
+ "' must be less than brokersNum and greater than 0");
}
this.kafkaImageTag = new Version(kafkaImage.getVersionPart());
this.brokersNum = brokersNum;
this.useSchemaRegistry = useSchemaRegistry;
this.containerLogging = containerLogging;
this.clusterStartTimeout = clusterStartTimeout;
this.readinessCheckTimeout = readinessCheckTimeout;
this.network = Network.newNetwork();
this.schemaRegistryImage = schemaRegistryImage;
this.zookeeper =
new GenericContainer(zooKeeperImage)
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv(
"ZOOKEEPER_CLIENT_PORT",
String.valueOf(PekkoConnectorsKafkaContainer.ZOOKEEPER_PORT));
this.brokers =
IntStream.range(0, this.brokersNum)
.mapToObj(
brokerNum ->
new PekkoConnectorsKafkaContainer(kafkaImage)
.withNetwork(this.network)
.withBrokerNum(brokerNum)
.withRemoteJmxService()
.dependsOn(this.zookeeper)
.withExternalZookeeper(
"zookeeper:" + PekkoConnectorsKafkaContainer.ZOOKEEPER_PORT)
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
.withEnv(
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + ""))
.collect(Collectors.toList());
if (useSchemaRegistry) {
this.schemaRegistry =
Optional.of(
new SchemaRegistryContainer(this.schemaRegistryImage)
.withNetworkAliases("schema-registry")
.withCluster(this));
} else {
this.schemaRegistry = Optional.empty();
}
}