in testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/PekkoConnectorsKafkaContainer.java [197:273]
protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
try {
super.containerIsStarting(containerInfo, reused);
port = getMappedPort(KAFKA_PORT);
if (enableRemoteJmxService) {
jmxPort = getMappedPort(KAFKA_JMX_PORT);
}
if (reused) {
return;
}
String command = "#!/bin/bash\n";
final String zookeeperConnect;
if (externalZookeeperConnect != null) {
zookeeperConnect = externalZookeeperConnect;
} else {
zookeeperConnect = "localhost:" + ZOOKEEPER_PORT;
command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n";
command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
command += "zookeeper-server-start zookeeper.properties &\n";
}
List<String> internalIps =
containerInfo.getNetworkSettings().getNetworks().values().stream()
.map(ContainerNetwork::getIpAddress)
.collect(Collectors.toList());
command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n";
command +=
"export KAFKA_ADVERTISED_LISTENERS='"
+ Stream.concat(
Stream.of(getBootstrapServers()),
internalIps.stream().map(ip -> "BROKER://" + ip + ":9092"))
.collect(Collectors.joining(","))
+ "'\n";
if (enableRemoteJmxService) {
String jmxIp =
internalIps.stream()
.findFirst()
.orElseThrow(() -> new IllegalStateException("Could not find IP address for JMX"));
command += "export KAFKA_JMX_PORT='" + KAFKA_JMX_PORT + "' \n";
command += "export KAFKA_JMX_HOSTNAME='" + jmxIp + "' \n";
}
command += ". /etc/confluent/docker/bash-config \n";
command += "/etc/confluent/docker/configure \n";
command += "/etc/confluent/docker/launch \n";
copyFileToContainer(
Transferable.of(command.getBytes(StandardCharsets.UTF_8), 0777), STARTER_SCRIPT);
// start and stop the Kafka broker process without stopping the container
String startStopWrapper =
"#!/bin/bash\n"
+ "STARTER_SCRIPT='"
+ STARTER_SCRIPT
+ "'\n"
+ "touch /tmp/start\n"
+ "while :; do\n"
+ "\tif [ -f $STARTER_SCRIPT ]; then\n"
+ "\t\tif [ -f /tmp/stop ]; then rm /tmp/stop; /usr/bin/kafka-server-stop;\n"
+ "\t\telif [ -f /tmp/start ]; then rm /tmp/start; bash -c \"$STARTER_SCRIPT &\";fi\n"
+ "\tfi\n"
+ "\tsleep 0.1\n"
+ "done\n";
copyFileToContainer(
Transferable.of(startStopWrapper.getBytes(StandardCharsets.UTF_8), 0777),
START_STOP_SCRIPT);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}