protected void containerIsStarting()

in testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/PekkoConnectorsKafkaContainer.java [197:273]


  protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
    try {
      super.containerIsStarting(containerInfo, reused);

      port = getMappedPort(KAFKA_PORT);
      if (enableRemoteJmxService) {
        jmxPort = getMappedPort(KAFKA_JMX_PORT);
      }

      if (reused) {
        return;
      }

      String command = "#!/bin/bash\n";
      final String zookeeperConnect;
      if (externalZookeeperConnect != null) {
        zookeeperConnect = externalZookeeperConnect;
      } else {
        zookeeperConnect = "localhost:" + ZOOKEEPER_PORT;
        command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n";
        command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
        command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
        command += "zookeeper-server-start zookeeper.properties &\n";
      }

      List<String> internalIps =
          containerInfo.getNetworkSettings().getNetworks().values().stream()
              .map(ContainerNetwork::getIpAddress)
              .collect(Collectors.toList());

      command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n";
      command +=
          "export KAFKA_ADVERTISED_LISTENERS='"
              + Stream.concat(
                      Stream.of(getBootstrapServers()),
                      internalIps.stream().map(ip -> "BROKER://" + ip + ":9092"))
                  .collect(Collectors.joining(","))
              + "'\n";

      if (enableRemoteJmxService) {
        String jmxIp =
            internalIps.stream()
                .findFirst()
                .orElseThrow(() -> new IllegalStateException("Could not find IP address for JMX"));
        command += "export KAFKA_JMX_PORT='" + KAFKA_JMX_PORT + "' \n";
        command += "export KAFKA_JMX_HOSTNAME='" + jmxIp + "' \n";
      }

      command += ". /etc/confluent/docker/bash-config \n";
      command += "/etc/confluent/docker/configure \n";
      command += "/etc/confluent/docker/launch \n";

      copyFileToContainer(
          Transferable.of(command.getBytes(StandardCharsets.UTF_8), 0777), STARTER_SCRIPT);

      // start and stop the Kafka broker process without stopping the container
      String startStopWrapper =
          "#!/bin/bash\n"
              + "STARTER_SCRIPT='"
              + STARTER_SCRIPT
              + "'\n"
              + "touch /tmp/start\n"
              + "while :; do\n"
              + "\tif [ -f $STARTER_SCRIPT ]; then\n"
              + "\t\tif [ -f /tmp/stop ]; then rm /tmp/stop; /usr/bin/kafka-server-stop;\n"
              + "\t\telif [ -f /tmp/start ]; then rm /tmp/start; bash -c \"$STARTER_SCRIPT &\";fi\n"
              + "\tfi\n"
              + "\tsleep 0.1\n"
              + "done\n";

      copyFileToContainer(
          Transferable.of(startStopWrapper.getBytes(StandardCharsets.UTF_8), 0777),
          START_STOP_SCRIPT);
    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  }