public KafkaContainerCluster()

in testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java [86:151]


  public KafkaContainerCluster(
      DockerImageName zooKeeperImage,
      DockerImageName kafkaImage,
      DockerImageName schemaRegistryImage,
      int brokersNum,
      int internalTopicsRf,
      boolean useSchemaRegistry,
      boolean containerLogging,
      Duration clusterStartTimeout,
      Duration readinessCheckTimeout) {
    if (brokersNum < 0) {
      throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
    }
    if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
      throw new IllegalArgumentException(
          "internalTopicsRf '"
              + internalTopicsRf
              + "' must be less than brokersNum and greater than 0");
    }

    this.kafkaImageTag = new Version(kafkaImage.getVersionPart());
    this.brokersNum = brokersNum;
    this.useSchemaRegistry = useSchemaRegistry;
    this.containerLogging = containerLogging;
    this.clusterStartTimeout = clusterStartTimeout;
    this.readinessCheckTimeout = readinessCheckTimeout;
    this.network = Network.newNetwork();
    this.schemaRegistryImage = schemaRegistryImage;

    this.zookeeper =
        new GenericContainer(zooKeeperImage)
            .withNetwork(network)
            .withNetworkAliases("zookeeper")
            .withEnv(
                "ZOOKEEPER_CLIENT_PORT",
                String.valueOf(PekkoConnectorsKafkaContainer.ZOOKEEPER_PORT));

    this.brokers =
        IntStream.range(0, this.brokersNum)
            .mapToObj(
                brokerNum ->
                    new PekkoConnectorsKafkaContainer(kafkaImage)
                        .withNetwork(this.network)
                        .withBrokerNum(brokerNum)
                        .withRemoteJmxService()
                        .dependsOn(this.zookeeper)
                        .withExternalZookeeper(
                            "zookeeper:" + PekkoConnectorsKafkaContainer.ZOOKEEPER_PORT)
                        .withEnv("KAFKA_BROKER_ID", brokerNum + "")
                        .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
                        .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
                        .withEnv(
                            "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
                        .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + ""))
            .collect(Collectors.toList());

    if (useSchemaRegistry) {
      this.schemaRegistry =
          Optional.of(
              new SchemaRegistryContainer(this.schemaRegistryImage)
                  .withNetworkAliases("schema-registry")
                  .withCluster(this));
    } else {
      this.schemaRegistry = Optional.empty();
    }
  }